Source code for opentelemetry.instrumentation.asgi

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
# pylint: disable=too-many-locals,too-many-lines

"""
The opentelemetry-instrumentation-asgi package provides an ASGI middleware that can be used
on any ASGI framework (such as Django-channels / Quart) to track request timing through OpenTelemetry.

Usage (Quart)
-------------

.. code-block:: python

    from quart import Quart
    from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware

    app = Quart(__name__)
    app.asgi_app = OpenTelemetryMiddleware(app.asgi_app)

    @app.route("/")
    async def hello():
        return "Hello!"

    if __name__ == "__main__":
        app.run(debug=True)


Usage (Django 3.0)
------------------

Modify the application's ``asgi.py`` file as shown below.

.. code-block:: python

    import os
    from django.core.asgi import get_asgi_application
    from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware

    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'asgi_example.settings')

    application = get_asgi_application()
    application = OpenTelemetryMiddleware(application)


Usage (Raw ASGI)
----------------

.. code-block:: python

    from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware

    app = ...  # An ASGI application.
    app = OpenTelemetryMiddleware(app)


Configuration
-------------

Request/Response hooks
**********************

This instrumentation supports request and response hooks. These are functions that get called
right after a span is created for a request and right before the span is finished for the response.

- The server request hook is passed a server span and ASGI scope object for every incoming request.
- The client request hook is called with the internal span and an ASGI scope when the method ``receive`` is called.
- The client response hook is called with the internal span and an ASGI event when the method ``send`` is called.

For example,

.. code-block:: python

    from opentelemetry.trace import Span
    from typing import Any
    from asgiref.typing import Scope, ASGIReceiveEvent, ASGISendEvent
    from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware

    async def application(scope: Scope, receive: ASGIReceiveEvent, send: ASGISendEvent):
        await send({
            'type': 'http.response.start',
            'status': 200,
            'headers': [
                [b'content-type', b'text/plain'],
            ],
        })

        await send({
            'type': 'http.response.body',
            'body': b'Hello, world!',
        })

    def server_request_hook(span: Span, scope: Scope):
        if span and span.is_recording():
            span.set_attribute("custom_user_attribute_from_request_hook", "some-value")

    def client_request_hook(span: Span, scope: Scope, message: dict[str, Any]):
        if span and span.is_recording():
            span.set_attribute("custom_user_attribute_from_client_request_hook", "some-value")

    def client_response_hook(span: Span, scope: Scope, message: dict[str, Any]):
        if span and span.is_recording():
            span.set_attribute("custom_user_attribute_from_response_hook", "some-value")

    OpenTelemetryMiddleware(application, server_request_hook=server_request_hook, client_request_hook=client_request_hook, client_response_hook=client_response_hook)

Capture HTTP request and response headers
*****************************************
You can configure the agent to capture specified HTTP headers as span attributes, according to the
`semantic conventions <https://github.com/open-telemetry/semantic-conventions/blob/main/docs/http/http-spans.md#http-server-span>`_.

Request headers
***************
To capture HTTP request headers as span attributes, set the environment variable
``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST`` to a comma delimited list of HTTP header names.

For example,
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST="content-type,custom_request_header"

will extract ``content-type`` and ``custom_request_header`` from the request headers and add them as span attributes.

Request header names in ASGI are case-insensitive. So, giving the header name as ``CUStom-Header`` in the environment
variable will capture the header named ``custom-header``.

Regular expressions may also be used to match multiple headers that correspond to the given pattern.  For example:
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST="Accept.*,X-.*"

Would match all request headers that start with ``Accept`` and ``X-``.

To capture all request headers, set ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST`` to ``".*"``.
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST=".*"

The name of the added span attribute will follow the format ``http.request.header.<header_name>`` where ``<header_name>``
is the normalized HTTP header name (lowercase, with ``-`` replaced by ``_``). The value of the attribute will be a
list containing the header values.

For example:
``http.request.header.custom_request_header = ["<value1>", "<value2>"]``

Response headers
****************
To capture HTTP response headers as span attributes, set the environment variable
``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE`` to a comma delimited list of HTTP header names.

For example,
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE="content-type,custom_response_header"

will extract ``content-type`` and ``custom_response_header`` from the response headers and add them as span attributes.

Response header names in ASGI are case-insensitive. So, giving the header name as ``CUStom-Header`` in the environment
variable will capture the header named ``custom-header``.

Regular expressions may also be used to match multiple headers that correspond to the given pattern.  For example:
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE="Content.*,X-.*"

Would match all response headers that start with ``Content`` and ``X-``.

To capture all response headers, set ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE`` to ``".*"``.
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE=".*"

The name of the added span attribute will follow the format ``http.response.header.<header_name>`` where ``<header_name>``
is the normalized HTTP header name (lowercase, with ``-`` replaced by ``_``). The value of the attribute will be a
list containing the header values.

For example:
``http.response.header.custom_response_header = ["<value1>", "<value2>"]``

Sanitizing headers
******************
In order to prevent storing sensitive data such as personally identifiable information (PII), session keys, passwords,
etc, set the environment variable ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS``
to a comma delimited list of HTTP header names to be sanitized.  Regexes may be used, and all header names will be
matched in a case-insensitive manner.

For example,
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS=".*session.*,set-cookie"

will replace the value of headers such as ``session-id`` and ``set-cookie`` with ``[REDACTED]`` in the span.

Note:
    The environment variable names used to capture HTTP headers are still experimental, and thus are subject to change.

API
---
"""

from __future__ import annotations

import typing
import urllib
from collections import defaultdict
from functools import wraps
from timeit import default_timer
from typing import Any, Awaitable, Callable, DefaultDict, Tuple

from asgiref.compatibility import guarantee_single_callable

from opentelemetry import context, trace
from opentelemetry.instrumentation._semconv import (
    HTTP_DURATION_HISTOGRAM_BUCKETS_NEW,
    _filter_semconv_active_request_count_attr,
    _filter_semconv_duration_attrs,
    _get_schema_url,
    _OpenTelemetrySemanticConventionStability,
    _OpenTelemetryStabilitySignalType,
    _report_new,
    _report_old,
    _server_active_requests_count_attrs_new,
    _server_active_requests_count_attrs_old,
    _server_duration_attrs_new,
    _server_duration_attrs_old,
    _set_http_flavor_version,
    _set_http_host_server,
    _set_http_method,
    _set_http_net_host_port,
    _set_http_peer_ip_server,
    _set_http_peer_port_server,
    _set_http_scheme,
    _set_http_target,
    _set_http_url,
    _set_http_user_agent,
    _set_status,
    _StabilityMode,
)
from opentelemetry.instrumentation.asgi.types import (
    ClientRequestHook,
    ClientResponseHook,
    ServerRequestHook,
)
from opentelemetry.instrumentation.asgi.version import __version__  # noqa
from opentelemetry.instrumentation.propagators import (
    get_global_response_propagator,
)
from opentelemetry.instrumentation.utils import (
    _start_internal_or_server_span,
    is_http_instrumentation_enabled,
)
from opentelemetry.metrics import get_meter
from opentelemetry.propagators.textmap import Getter, Setter
from opentelemetry.semconv._incubating.attributes.http_attributes import (
    HTTP_SERVER_NAME,
    HTTP_TARGET,
)
from opentelemetry.semconv._incubating.attributes.user_agent_attributes import (
    USER_AGENT_SYNTHETIC_TYPE,
)
from opentelemetry.semconv._incubating.metrics.http_metrics import (
    create_http_server_active_requests,
    create_http_server_request_body_size,
    create_http_server_response_body_size,
)
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.semconv.metrics.http_metrics import (
    HTTP_SERVER_REQUEST_DURATION,
)
from opentelemetry.trace import Span, set_span_in_context
from opentelemetry.util.http import (
    OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS,
    OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST,
    OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE,
    ExcludeList,
    SanitizeValue,
    _parse_url_query,
    detect_synthetic_user_agent,
    get_custom_headers,
    normalise_request_header_name,
    normalise_response_header_name,
    normalize_user_agent,
    parse_excluded_urls,
    redact_url,
    sanitize_method,
)


[docs]class ASGIGetter(Getter[dict]):
[docs] def get( self, carrier: dict, key: str ) -> typing.Optional[typing.List[str]]: """Getter implementation to retrieve a HTTP header value from the ASGI scope. Args: carrier: ASGI scope object key: header name in scope Returns: A list with a single string with the header value if it exists, else None. """ headers = carrier.get("headers") if not headers: return None # ASGI header keys are in lower case key = key.lower() decoded = [ _decode_header_item(_value) for (_key, _value) in headers if _decode_header_item(_key).lower() == key ] if not decoded: return None return decoded
[docs] def keys(self, carrier: dict) -> typing.List[str]: headers = carrier.get("headers") or [] return [_decode_header_item(_key) for (_key, _value) in headers]
asgi_getter = ASGIGetter()
[docs]class ASGISetter(Setter[dict]):
[docs] def set(self, carrier: dict, key: str, value: str) -> None: # pylint: disable=no-self-use """Sets response header values on an ASGI scope according to `the spec <https://asgi.readthedocs.io/en/latest/specs/www.html#response-start-send-event>`_. Args: carrier: ASGI scope object key: response header name to set value: response header value Returns: None """ headers = carrier.get("headers") if not headers: headers = [] carrier["headers"] = headers headers.append([key.lower().encode(), value.encode()])
asgi_setter = ASGISetter() # pylint: disable=too-many-branches
[docs]def collect_request_attributes( scope, sem_conv_opt_in_mode=_StabilityMode.DEFAULT ): """Collects HTTP request attributes from the ASGI scope and returns a dictionary to be used as span creation attributes.""" server_host, port, http_url = get_host_port_url_tuple(scope) query_string = scope.get("query_string") if query_string and http_url: if isinstance(query_string, bytes): query_string = query_string.decode("utf8") http_url += "?" + urllib.parse.unquote(query_string) result = {} scheme = scope.get("scheme") if scheme: _set_http_scheme(result, scheme, sem_conv_opt_in_mode) if server_host: _set_http_host_server(result, server_host, sem_conv_opt_in_mode) if port: _set_http_net_host_port(result, port, sem_conv_opt_in_mode) flavor = scope.get("http_version") if flavor: _set_http_flavor_version(result, flavor, sem_conv_opt_in_mode) path = scope.get("path") if path: _set_http_target( result, path, path, query_string, sem_conv_opt_in_mode ) if http_url: if _report_old(sem_conv_opt_in_mode): _set_http_url( result, redact_url(http_url), _StabilityMode.DEFAULT, ) http_method = scope.get("method", "") if http_method: _set_http_method( result, http_method, sanitize_method(http_method), sem_conv_opt_in_mode, ) http_host_value_list = asgi_getter.get(scope, "host") if http_host_value_list: if _report_old(sem_conv_opt_in_mode): result[HTTP_SERVER_NAME] = ",".join(http_host_value_list) http_user_agent = asgi_getter.get(scope, "user-agent") if http_user_agent: user_agent_raw = http_user_agent[0] user_agent_value = normalize_user_agent(user_agent_raw) if user_agent_value: _set_http_user_agent( result, user_agent_value, sem_conv_opt_in_mode ) # Check for synthetic user agent type synthetic_type = detect_synthetic_user_agent(user_agent_value) if synthetic_type: result[USER_AGENT_SYNTHETIC_TYPE] = synthetic_type if "client" in scope and scope["client"] is not None: _set_http_peer_ip_server( result, scope.get("client")[0], sem_conv_opt_in_mode ) _set_http_peer_port_server( result, scope.get("client")[1], sem_conv_opt_in_mode ) # remove None values result = {k: v for k, v in result.items() if v is not None} return result
[docs]def collect_custom_headers_attributes( scope_or_response_message: dict[str, Any], sanitize: SanitizeValue, header_regexes: list[str], normalize_names: Callable[[str], str], ) -> dict[str, list[str]]: """ Returns custom HTTP request or response headers to be added into SERVER span as span attributes. Refer to semantic conventions: - https://github.com/open-telemetry/semantic-conventions/blob/main/docs/http/http-spans.md#http-server-span """ headers: DefaultDict[str, list[str]] = defaultdict(list) raw_headers = scope_or_response_message.get("headers") if raw_headers: for key, value in raw_headers: # Decode headers before processing. headers[_decode_header_item(key)].append( _decode_header_item(value) ) return sanitize.sanitize_header_values( headers, header_regexes, normalize_names, )
[docs]def get_host_port_url_tuple(scope): """Returns (host, port, full_url) tuple.""" server = scope.get("server") or ["0.0.0.0", 80] port = server[1] host_header = asgi_getter.get(scope, "host") if host_header: host_value = host_header[0] # Ensure host_value is a string, not bytes if isinstance(host_value, bytes): host_value = _decode_header_item(host_value) url_host = host_value else: url_host = server[0] + (":" + str(port) if str(port) != "80" else "") server_host = server[0] + (":" + str(port) if str(port) != "80" else "") # using the scope path is enough, see: # - https://asgi.readthedocs.io/en/latest/specs/www.html#http-connection-scope (see: root_path and path) # - https://asgi.readthedocs.io/en/latest/specs/www.html#wsgi-compatibility (see: PATH_INFO) # PATH_INFO can be derived by stripping root_path from path # -> that means that the path should contain the root_path already, so prefixing it again is not necessary # - https://wsgi.readthedocs.io/en/latest/definitions.html#envvar-PATH_INFO full_path = scope.get("path", "") http_url = scope.get("scheme", "http") + "://" + url_host + full_path return server_host, port, http_url
[docs]def set_status_code( span, status_code, metric_attributes=None, sem_conv_opt_in_mode=_StabilityMode.DEFAULT, ): """Adds HTTP response attributes to span using the status_code argument.""" status_code_str = str(status_code) try: status_code = int(status_code) except ValueError: status_code = -1 if metric_attributes is None: metric_attributes = {} _set_status( span, metric_attributes, status_code, status_code_str, server_span=True, sem_conv_opt_in_mode=sem_conv_opt_in_mode, )
[docs]def get_default_span_details(scope: dict) -> Tuple[str, dict]: """ Default span name is the HTTP method and URL path, or just the method. https://github.com/open-telemetry/opentelemetry-specification/pull/3165 https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/http/#name Args: scope: the ASGI scope dictionary Returns: a tuple of the span name, and any attributes to attach to the span. """ path = scope.get("path", "").strip() method = sanitize_method(scope.get("method", "").strip()) if method == "_OTHER": method = "HTTP" if method and path: # http return f"{method} {path}", {} if path: # websocket return path, {} return method, {} # http with no path
def _collect_target_attribute( scope: typing.Dict[str, typing.Any], ) -> typing.Optional[str]: """ Returns the target path as defined by the Semantic Conventions. This value is suitable to use in metrics as it should replace concrete values with a parameterized name. Example: /api/users/{user_id} Refer to the specification https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/http-metrics.md#parameterized-attributes Note: this function requires specific code for each framework, as there's no standard attribute to use. """ # FastAPI root_path = scope.get("root_path", "") route = scope.get("route") path_format = getattr(route, "path_format", None) if path_format: return f"{root_path}{path_format}" return None
[docs]class OpenTelemetryMiddleware: """The ASGI application middleware. This class is an ASGI middleware that starts and annotates spans for any requests it is invoked with. Args: app: The ASGI application callable to forward requests to. default_span_details: Callback which should return a string and a tuple, representing the desired default span name and a dictionary with any additional span attributes to set. Optional: Defaults to get_default_span_details. server_request_hook: Optional callback which is called with the server span and ASGI scope object for every incoming request. client_request_hook: Optional callback which is called with the internal span, and ASGI scope and event which are sent as dictionaries for when the method receive is called. client_response_hook: Optional callback which is called with the internal span, and ASGI scope and event which are sent as dictionaries for when the method send is called. tracer_provider: The optional tracer provider to use. If omitted the current globally configured one is used. meter_provider: The optional meter provider to use. If omitted the current globally configured one is used. exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace. """ # pylint: disable=too-many-branches,too-many-positional-arguments def __init__( self, app, excluded_urls: ExcludeList | str | None = None, default_span_details=None, server_request_hook: ServerRequestHook = None, client_request_hook: ClientRequestHook = None, client_response_hook: ClientResponseHook = None, tracer_provider=None, meter_provider=None, tracer=None, meter=None, http_capture_headers_server_request: list[str] | None = None, http_capture_headers_server_response: list[str] | None = None, http_capture_headers_sanitize_fields: list[str] | None = None, exclude_spans: list[typing.Literal["receive", "send"]] | None = None, ): # initialize semantic conventions opt-in if needed _OpenTelemetrySemanticConventionStability._initialize() sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( _OpenTelemetryStabilitySignalType.HTTP, ) self.app = guarantee_single_callable(app) self.tracer = ( trace.get_tracer( __name__, __version__, tracer_provider, schema_url=_get_schema_url(sem_conv_opt_in_mode), ) if tracer is None else tracer ) self.meter = ( get_meter( __name__, __version__, meter_provider, schema_url=_get_schema_url(sem_conv_opt_in_mode), ) if meter is None else meter ) self.duration_histogram_old = None if _report_old(sem_conv_opt_in_mode): self.duration_histogram_old = self.meter.create_histogram( name=MetricInstruments.HTTP_SERVER_DURATION, unit="ms", description="Measures the duration of inbound HTTP requests.", ) self.duration_histogram_new = None if _report_new(sem_conv_opt_in_mode): self.duration_histogram_new = self.meter.create_histogram( name=HTTP_SERVER_REQUEST_DURATION, description="Duration of HTTP server requests.", unit="s", explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_NEW, ) self.server_response_size_histogram = None if _report_old(sem_conv_opt_in_mode): self.server_response_size_histogram = self.meter.create_histogram( name=MetricInstruments.HTTP_SERVER_RESPONSE_SIZE, unit="By", description="measures the size of HTTP response messages (compressed).", ) self.server_response_body_size_histogram = None if _report_new(sem_conv_opt_in_mode): self.server_response_body_size_histogram = ( create_http_server_response_body_size(self.meter) ) self.server_request_size_histogram = None if _report_old(sem_conv_opt_in_mode): self.server_request_size_histogram = self.meter.create_histogram( name=MetricInstruments.HTTP_SERVER_REQUEST_SIZE, unit="By", description="Measures the size of HTTP request messages (compressed).", ) self.server_request_body_size_histogram = None if _report_new(sem_conv_opt_in_mode): self.server_request_body_size_histogram = ( create_http_server_request_body_size(self.meter) ) self.active_requests_counter = create_http_server_active_requests( self.meter ) if isinstance(excluded_urls, str): excluded_urls = parse_excluded_urls(excluded_urls) self.excluded_urls = excluded_urls self.default_span_details = ( default_span_details or get_default_span_details ) def failsafe(func): if func is None: return None @wraps(func) def wrapper(span: Span, *args, **kwargs): try: func(span, *args, **kwargs) except Exception as exc: # pylint: disable=broad-exception-caught span.record_exception(exc) return wrapper self.server_request_hook = failsafe(server_request_hook) self.client_request_hook = failsafe(client_request_hook) self.client_response_hook = failsafe(client_response_hook) self.content_length_header = None self._sem_conv_opt_in_mode = sem_conv_opt_in_mode # Environment variables as constructor parameters self.http_capture_headers_server_request = ( http_capture_headers_server_request or ( get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST ) ) or None ) self.http_capture_headers_server_response = ( http_capture_headers_server_response or ( get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE ) ) or None ) self.http_capture_headers_sanitize_fields = SanitizeValue( http_capture_headers_sanitize_fields or ( get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS ) ) or [] ) self.exclude_receive_span = ( "receive" in exclude_spans if exclude_spans else False ) self.exclude_send_span = ( "send" in exclude_spans if exclude_spans else False ) # pylint: disable=too-many-statements async def __call__( self, scope: typing.MutableMapping[str, Any], receive: Callable[[], Awaitable[typing.MutableMapping[str, Any]]], send: Callable[[typing.MutableMapping[str, Any]], Awaitable[None]], ) -> None: """The ASGI application Args: scope: An ASGI environment. receive: An awaitable callable yielding dictionaries send: An awaitable callable taking a single dictionary as argument. """ start = default_timer() if not is_http_instrumentation_enabled() or scope["type"] not in ( "http", "websocket", ): return await self.app(scope, receive, send) _, _, url = get_host_port_url_tuple(scope) if self.excluded_urls and self.excluded_urls.url_disabled(url): return await self.app(scope, receive, send) span_name, additional_attributes = self.default_span_details(scope) attributes = collect_request_attributes( scope, self._sem_conv_opt_in_mode ) attributes.update(additional_attributes) span, token = _start_internal_or_server_span( tracer=self.tracer, span_name=span_name, start_time=None, context_carrier=scope, context_getter=asgi_getter, attributes=attributes, ) active_requests_count_attrs = _parse_active_request_count_attrs( attributes, self._sem_conv_opt_in_mode, ) if scope["type"] == "http": self.active_requests_counter.add(1, active_requests_count_attrs) try: with trace.use_span(span, end_on_exit=False) as current_span: if current_span.is_recording(): for key, value in attributes.items(): current_span.set_attribute(key, value) if current_span.kind == trace.SpanKind.SERVER: custom_attributes = ( collect_custom_headers_attributes( scope, self.http_capture_headers_sanitize_fields, self.http_capture_headers_server_request, normalise_request_header_name, ) if self.http_capture_headers_server_request else {} ) if len(custom_attributes) > 0: current_span.set_attributes(custom_attributes) if callable(self.server_request_hook): self.server_request_hook(current_span, scope) otel_receive = self._get_otel_receive( span_name, scope, receive ) otel_send = self._get_otel_send( current_span, span_name, scope, send, attributes, ) await self.app(scope, otel_receive, otel_send) finally: if scope["type"] == "http": target = _collect_target_attribute(scope) if target: path, query = _parse_url_query(target) _set_http_target( attributes, target, path, query, self._sem_conv_opt_in_mode, ) duration_s = default_timer() - start duration_attrs_old = _parse_duration_attrs( attributes, _StabilityMode.DEFAULT ) if target: duration_attrs_old[HTTP_TARGET] = target duration_attrs_new = _parse_duration_attrs( attributes, _StabilityMode.HTTP ) span_ctx = set_span_in_context(span) if self.duration_histogram_old: self.duration_histogram_old.record( max(round(duration_s * 1000), 0), duration_attrs_old, context=span_ctx, ) if self.duration_histogram_new: self.duration_histogram_new.record( max(duration_s, 0), duration_attrs_new, context=span_ctx, ) self.active_requests_counter.add( -1, active_requests_count_attrs ) if self.content_length_header: if self.server_response_size_histogram: self.server_response_size_histogram.record( self.content_length_header, duration_attrs_old, context=span_ctx, ) if self.server_response_body_size_histogram: self.server_response_body_size_histogram.record( self.content_length_header, duration_attrs_new, context=span_ctx, ) request_size = asgi_getter.get(scope, "content-length") if request_size: try: request_size_amount = int(request_size[0]) except ValueError: pass else: if self.server_request_size_histogram: self.server_request_size_histogram.record( request_size_amount, duration_attrs_old, context=span_ctx, ) if self.server_request_body_size_histogram: self.server_request_body_size_histogram.record( request_size_amount, duration_attrs_new, context=span_ctx, ) if token: context.detach(token) if span.is_recording(): span.end() # pylint: enable=too-many-branches def _get_otel_receive(self, server_span_name, scope, receive): if self.exclude_receive_span: return receive @wraps(receive) async def otel_receive(): with self.tracer.start_as_current_span( " ".join((server_span_name, scope["type"], "receive")) ) as receive_span: message = await receive() if callable(self.client_request_hook): self.client_request_hook(receive_span, scope, message) if receive_span.is_recording(): if message["type"] == "websocket.receive": set_status_code( receive_span, 200, None, self._sem_conv_opt_in_mode, ) receive_span.set_attribute( "asgi.event.type", message["type"] ) return message return otel_receive def _set_send_span( self, server_span_name, scope, send, message, status_code, expecting_trailers, ): """Set send span attributes and status code.""" with self.tracer.start_as_current_span( " ".join((server_span_name, scope["type"], "send")) ) as send_span: if callable(self.client_response_hook): self.client_response_hook(send_span, scope, message) if send_span.is_recording(): if message["type"] == "http.response.start": expecting_trailers = message.get("trailers", False) send_span.set_attribute("asgi.event.type", message["type"]) if status_code: set_status_code( send_span, status_code, None, self._sem_conv_opt_in_mode, ) return expecting_trailers def _set_server_span( self, server_span, message, status_code, duration_attrs ): """Set server span attributes and status code.""" if ( server_span.is_recording() and server_span.kind == trace.SpanKind.SERVER and "headers" in message ): custom_response_attributes = ( collect_custom_headers_attributes( message, self.http_capture_headers_sanitize_fields, self.http_capture_headers_server_response, normalise_response_header_name, ) if self.http_capture_headers_server_response else {} ) if len(custom_response_attributes) > 0: server_span.set_attributes(custom_response_attributes) if status_code: set_status_code( server_span, status_code, duration_attrs, self._sem_conv_opt_in_mode, ) def _get_otel_send( self, server_span, server_span_name, scope, send, duration_attrs, ): expecting_trailers = False @wraps(send) async def otel_send(message: dict[str, Any]): nonlocal expecting_trailers status_code = None if message["type"] == "http.response.start": status_code = message["status"] elif message["type"] == "websocket.send": status_code = 200 if not self.exclude_send_span: expecting_trailers = self._set_send_span( server_span_name, scope, send, message, status_code, expecting_trailers, ) self._set_server_span( server_span, message, status_code, duration_attrs ) propagator = get_global_response_propagator() if propagator: propagator.inject( message, context=set_span_in_context( server_span, trace.context_api.Context() ), setter=asgi_setter, ) content_length = asgi_getter.get(message, "content-length") if content_length: try: self.content_length_header = int(content_length[0]) except ValueError: pass await send(message) # pylint: disable=too-many-boolean-expressions if ( not expecting_trailers and message["type"] == "http.response.body" and not message.get("more_body", False) ) or ( expecting_trailers and message["type"] == "http.response.trailers" and not message.get("more_trailers", False) ): server_span.end() return otel_send
def _parse_duration_attrs( req_attrs, sem_conv_opt_in_mode=_StabilityMode.DEFAULT ): return _filter_semconv_duration_attrs( req_attrs, _server_duration_attrs_old, _server_duration_attrs_new, sem_conv_opt_in_mode, ) def _parse_active_request_count_attrs( req_attrs, sem_conv_opt_in_mode=_StabilityMode.DEFAULT ): return _filter_semconv_active_request_count_attr( req_attrs, _server_active_requests_count_attrs_old, _server_active_requests_count_attrs_new, sem_conv_opt_in_mode, ) def _decode_header_item(value): try: return value.decode("utf-8") except ValueError: # ASGI header encoding specs, see: # - https://asgi.readthedocs.io/en/latest/specs/www.html#wsgi-encoding-differences (see: WSGI encoding differences) # - https://docs.python.org/3/library/codecs.html#text-encodings (see: Text Encodings) return value.decode("unicode_escape")