# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Instrument `boto3sqs`_ to trace SQS applications.
.. _boto3sqs: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html
Usage
-----
.. code-block:: python
import boto3
from opentelemetry.instrumentation.boto3sqs import Boto3SQSInstrumentor
Boto3SQSInstrumentor().instrument()
---
"""
import logging
from typing import Any, Collection, Dict, Generator, List, Mapping, Optional
import boto3
import botocore.client
from wrapt import wrap_function_wrapper
from opentelemetry import context, propagate, trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
is_instrumentation_enabled,
unwrap,
)
from opentelemetry.propagators.textmap import CarrierT, Getter, Setter
from opentelemetry.semconv.trace import (
MessagingDestinationKindValues,
MessagingOperationValues,
SpanAttributes,
)
from opentelemetry.trace import Link, Span, SpanKind, Tracer, TracerProvider
from .package import _instruments
from .version import __version__
_logger = logging.getLogger(__name__)
_IS_SQS_INSTRUMENTED_ATTRIBUTE = "_otel_boto3sqs_instrumented"
[docs]class Boto3SQSGetter(Getter[CarrierT]):
[docs] def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]:
msg_attr = carrier.get(key)
if not isinstance(msg_attr, Mapping):
return None
value = msg_attr.get("StringValue")
if value is None:
return None
return [value]
[docs] def keys(self, carrier: CarrierT) -> List[str]:
return list(carrier.keys())
[docs]class Boto3SQSSetter(Setter[CarrierT]):
[docs] def set(self, carrier: CarrierT, key: str, value: str) -> None:
# This is a limitation defined by AWS for SQS MessageAttributes size
if len(carrier.items()) < 10:
carrier[key] = {
"StringValue": value,
"DataType": "String",
}
else:
_logger.warning(
"Boto3 SQS instrumentation: cannot set context propagation on SQS/SNS message due to maximum amount of "
"MessageAttributes"
)
boto3sqs_getter = Boto3SQSGetter()
boto3sqs_setter = Boto3SQSSetter()
# pylint: disable=attribute-defined-outside-init
[docs]class Boto3SQSInstrumentor(BaseInstrumentor):
received_messages_spans: Dict[str, Span] = {}
current_span_related_to_token: Span = None
current_context_token = None
[docs] class ContextableList(list):
"""
Since the classic way to process SQS messages is using a `for` loop, without a well defined scope like a
callback - we are doing something similar to the instrumentation of Kafka-python and instrumenting the
`__iter__` functions and the `__getitem__` functions to set the span context of the addressed message. Since
the return value from an `SQS.ReceiveMessage` returns a builtin list, we cannot wrap it and change all of the
calls for `list.__iter__` and `list.__getitem__` - therefore we use ContextableList. It is bound to the
received_messages_spans dict
"""
def __getitem__(self, key: int) -> Any:
retval = super(
Boto3SQSInstrumentor.ContextableList, self
).__getitem__(key)
if not isinstance(retval, dict):
return retval
receipt_handle = retval.get("ReceiptHandle")
if not receipt_handle:
return retval
started_span = Boto3SQSInstrumentor.received_messages_spans.get(
receipt_handle
)
if started_span is None:
return retval
if Boto3SQSInstrumentor.current_context_token:
context.detach(Boto3SQSInstrumentor.current_context_token)
Boto3SQSInstrumentor.current_context_token = context.attach(
trace.set_span_in_context(started_span)
)
Boto3SQSInstrumentor.current_span_related_to_token = started_span
return retval
def __iter__(self) -> Generator:
index = 0
while index < len(self):
yield self[index]
index = index + 1
[docs] def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
@staticmethod
def _enrich_span(
span: Span,
queue_name: str,
queue_url: str,
conversation_id: Optional[str] = None,
operation: Optional[MessagingOperationValues] = None,
message_id: Optional[str] = None,
) -> None:
if not span.is_recording():
return
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "aws.sqs")
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, queue_name)
span.set_attribute(
SpanAttributes.MESSAGING_DESTINATION_KIND,
MessagingDestinationKindValues.QUEUE.value,
)
span.set_attribute(SpanAttributes.MESSAGING_URL, queue_url)
if operation:
span.set_attribute(
SpanAttributes.MESSAGING_OPERATION, operation.value
)
if conversation_id:
span.set_attribute(
SpanAttributes.MESSAGING_CONVERSATION_ID, conversation_id
)
if message_id:
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, message_id)
@staticmethod
def _safe_end_processing_span(receipt_handle: str) -> None:
started_span: Span = Boto3SQSInstrumentor.received_messages_spans.pop(
receipt_handle, None
)
if started_span:
if (
Boto3SQSInstrumentor.current_span_related_to_token
== started_span
):
context.detach(Boto3SQSInstrumentor.current_context_token)
Boto3SQSInstrumentor.current_context_token = None
started_span.end()
@staticmethod
def _extract_queue_name_from_url(queue_url: str) -> str:
# A Queue name cannot have the `/` char, therefore we can return the part after the last /
return queue_url.split("/")[-1]
def _create_processing_span(
self,
queue_name: str,
queue_url: str,
receipt_handle: str,
message: Dict[str, Any],
) -> None:
message_attributes = message.get("MessageAttributes", {})
links = []
ctx = propagate.extract(message_attributes, getter=boto3sqs_getter)
parent_span_ctx = trace.get_current_span(ctx).get_span_context()
if parent_span_ctx.is_valid:
links.append(Link(context=parent_span_ctx))
span = self._tracer.start_span(
name=f"{queue_name} process", links=links, kind=SpanKind.CONSUMER
)
with trace.use_span(span):
message_id = message.get("MessageId")
Boto3SQSInstrumentor.received_messages_spans[receipt_handle] = span
Boto3SQSInstrumentor._enrich_span(
span,
queue_name,
queue_url,
message_id=message_id,
operation=MessagingOperationValues.PROCESS,
)
def _wrap_send_message(self, sqs_class: type) -> None:
def send_wrapper(wrapped, instance, args, kwargs):
if not is_instrumentation_enabled():
return wrapped(*args, **kwargs)
queue_url = kwargs.get("QueueUrl")
# The method expect QueueUrl and Entries params, so if they are None, we call wrapped to receive the
# original exception
queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url(
queue_url
)
with self._tracer.start_as_current_span(
name=f"{queue_name} send",
kind=SpanKind.PRODUCER,
end_on_exit=True,
) as span:
Boto3SQSInstrumentor._enrich_span(span, queue_name, queue_url)
attributes = kwargs.pop("MessageAttributes", {})
propagate.inject(attributes, setter=boto3sqs_setter)
retval = wrapped(*args, MessageAttributes=attributes, **kwargs)
message_id = retval.get("MessageId")
if message_id:
if span.is_recording():
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID, message_id
)
return retval
wrap_function_wrapper(sqs_class, "send_message", send_wrapper)
def _wrap_send_message_batch(self, sqs_class: type) -> None:
def send_batch_wrapper(wrapped, instance, args, kwargs):
queue_url = kwargs.get("QueueUrl")
entries = kwargs.get("Entries")
# The method expect QueueUrl and Entries params, so if they are None, we call wrapped to receive the
# original exception
if (
not is_instrumentation_enabled()
or not queue_url
or not entries
):
return wrapped(*args, **kwargs)
queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url(
queue_url
)
ids_to_spans: Dict[str, Span] = {}
for entry in entries:
entry_id = entry["Id"]
span = self._tracer.start_span(
name=f"{queue_name} send", kind=SpanKind.PRODUCER
)
ids_to_spans[entry_id] = span
Boto3SQSInstrumentor._enrich_span(
span, queue_name, queue_url, conversation_id=entry_id
)
with trace.use_span(span):
if "MessageAttributes" not in entry:
entry["MessageAttributes"] = {}
propagate.inject(
entry["MessageAttributes"], setter=boto3sqs_setter
)
retval = wrapped(*args, **kwargs)
for successful_messages in retval["Successful"]:
message_identifier = successful_messages["Id"]
message_span = ids_to_spans.get(message_identifier)
if message_span:
if message_span.is_recording():
message_span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
successful_messages.get("MessageId"),
)
for span in ids_to_spans.values():
span.end()
return retval
wrap_function_wrapper(
sqs_class, "send_message_batch", send_batch_wrapper
)
def _wrap_receive_message(self, sqs_class: type) -> None:
def receive_message_wrapper(wrapped, instance, args, kwargs):
queue_url = kwargs.get("QueueUrl")
message_attribute_names = kwargs.pop("MessageAttributeNames", [])
message_attribute_names.extend(
propagate.get_global_textmap().fields
)
queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url(
queue_url
)
with self._tracer.start_as_current_span(
name=f"{queue_name} receive",
end_on_exit=True,
kind=SpanKind.CONSUMER,
) as span:
Boto3SQSInstrumentor._enrich_span(
span,
queue_name,
queue_url,
operation=MessagingOperationValues.RECEIVE,
)
retval = wrapped(
*args,
MessageAttributeNames=message_attribute_names,
**kwargs,
)
messages = retval.get("Messages", [])
if not messages:
return retval
for message in messages:
receipt_handle = message.get("ReceiptHandle")
if not receipt_handle:
continue
Boto3SQSInstrumentor._safe_end_processing_span(
receipt_handle
)
self._create_processing_span(
queue_name, queue_url, receipt_handle, message
)
retval["Messages"] = Boto3SQSInstrumentor.ContextableList(
messages
)
return retval
wrap_function_wrapper(
sqs_class, "receive_message", receive_message_wrapper
)
@staticmethod
def _wrap_delete_message(sqs_class: type) -> None:
def delete_message_wrapper(wrapped, instance, args, kwargs):
receipt_handle = kwargs.get("ReceiptHandle")
if receipt_handle:
Boto3SQSInstrumentor._safe_end_processing_span(receipt_handle)
return wrapped(*args, **kwargs)
wrap_function_wrapper(
sqs_class, "delete_message", delete_message_wrapper
)
@staticmethod
def _wrap_delete_message_batch(sqs_class: type) -> None:
def delete_message_wrapper_batch(wrapped, instance, args, kwargs):
entries = kwargs.get("Entries")
for entry in entries:
receipt_handle = entry.get("ReceiptHandle")
if receipt_handle:
Boto3SQSInstrumentor._safe_end_processing_span(
receipt_handle
)
return wrapped(*args, **kwargs)
wrap_function_wrapper(
sqs_class, "delete_message_batch", delete_message_wrapper_batch
)
def _wrap_client_creation(self) -> None:
"""
Since botocore creates classes on the fly using schemas, the SQS class is not necesraily created upon the call
of `instrument()`. Therefore we need to wrap the creation of the boto3 client, which triggers the creation of
the SQS client.
"""
def client_wrapper(wrapped, instance, args, kwargs):
retval = wrapped(*args, **kwargs)
self._decorate_sqs(type(retval))
return retval
wrap_function_wrapper(boto3, "client", client_wrapper)
def _decorate_sqs(self, sqs_class: type) -> None:
"""
Since botocore creates classes on the fly using schemas, we try to find the class that inherits from the base
class and is SQS to wrap.
"""
# We define SQS client as the only client that implements send_message_batch
if not hasattr(sqs_class, "send_message_batch"):
return
if getattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False):
return
setattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, True)
self._wrap_send_message(sqs_class)
self._wrap_send_message_batch(sqs_class)
self._wrap_receive_message(sqs_class)
self._wrap_delete_message(sqs_class)
self._wrap_delete_message_batch(sqs_class)
@staticmethod
def _un_decorate_sqs(sqs_class: type) -> None:
if not getattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False):
return
unwrap(sqs_class, "send_message")
unwrap(sqs_class, "send_message_batch")
unwrap(sqs_class, "receive_message")
unwrap(sqs_class, "delete_message")
unwrap(sqs_class, "delete_message_batch")
setattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False)
def _instrument(self, **kwargs: Dict[str, Any]) -> None:
self._tracer_provider: Optional[TracerProvider] = kwargs.get(
"tracer_provider"
)
self._tracer: Tracer = trace.get_tracer(
__name__,
__version__,
self._tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
self._wrap_client_creation()
for client_cls in botocore.client.BaseClient.__subclasses__():
self._decorate_sqs(client_cls)
def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
unwrap(boto3, "client")
for client_cls in botocore.client.BaseClient.__subclasses__():
self._un_decorate_sqs(client_cls)