Source code for opentelemetry.instrumentation.aiohttp_client

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

"""
The opentelemetry-instrumentation-aiohttp-client package allows tracing HTTP
requests made by the aiohttp client library.

Usage
-----
Explicitly instrumenting a single client session:

.. code:: python

    import asyncio
    import aiohttp
    from opentelemetry.instrumentation.aiohttp_client import create_trace_config
    import yarl

    def strip_query_params(url: yarl.URL) -> str:
        return str(url.with_query(None))

    async def get(url):
        async with aiohttp.ClientSession(trace_configs=[create_trace_config(
            # Remove all query params from the URL attribute on the span.
            url_filter=strip_query_params,
        )]) as session:
            async with session.get(url) as response:
                await response.text()

    asyncio.run(get("https://example.com"))

Instrumenting all client sessions:

.. code:: python

    import asyncio
    import aiohttp
    from opentelemetry.instrumentation.aiohttp_client import (
        AioHttpClientInstrumentor
    )

    # Enable instrumentation
    AioHttpClientInstrumentor().instrument()

    # Create a session and make an HTTP get request
    async def get(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                await response.text()

    asyncio.run(get("https://example.com"))

Configuration
-------------

Request/Response hooks
**********************

Utilize request/response hooks to execute custom logic to be performed before/after performing a request.

.. code-block:: python

   def request_hook(span: Span, params: aiohttp.TraceRequestStartParams):
      if span and span.is_recording():
            span.set_attribute("custom_user_attribute_from_request_hook", "some-value")

   def response_hook(span: Span, params: typing.Union[
                aiohttp.TraceRequestEndParams,
                aiohttp.TraceRequestExceptionParams,
            ]):
        if span and span.is_recording():
            span.set_attribute("custom_user_attribute_from_response_hook", "some-value")

   AioHttpClientInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook)

Exclude lists
*************
To exclude certain URLs from tracking, set the environment variable ``OTEL_PYTHON_AIOHTTP_CLIENT_EXCLUDED_URLS``
(or ``OTEL_PYTHON_EXCLUDED_URLS`` to cover all instrumentations) to a string of comma delimited regexes that match the
URLs.

For example,

::

    export OTEL_PYTHON_AIOHTTP_CLIENT_EXCLUDED_URLS="client/.*/info,healthcheck"

will exclude requests such as ``https://site/client/123/info`` and ``https://site/xyz/healthcheck``.

Capture HTTP request and response headers
*****************************************
You can configure the agent to capture specified HTTP headers as span attributes, according to the
`semantic conventions <https://opentelemetry.io/docs/specs/semconv/http/http-spans/#http-client-span>`_.

Request headers
***************
To capture HTTP request headers as span attributes, set the environment variable
``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST`` to a comma delimited list of HTTP header names.

For example using the environment variable,
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST="content-type,custom_request_header"

will extract ``content-type`` and ``custom_request_header`` from the request headers and add them as span attributes.

Request header names in aiohttp are case-insensitive. So, giving the header name as ``CUStom-Header`` in the environment
variable will capture the header named ``custom-header``.

Regular expressions may also be used to match multiple headers that correspond to the given pattern.  For example:
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST="Accept.*,X-.*"

Would match all request headers that start with ``Accept`` and ``X-``.

To capture all request headers, set ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST`` to ``".*"``.
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST=".*"

The name of the added span attribute will follow the format ``http.request.header.<header_name>`` where ``<header_name>``
is the normalized HTTP header name (lowercase, with ``-`` replaced by ``_``). The value of the attribute will be a
single item list containing all the header values.

For example:
``http.request.header.custom_request_header = ["<value1>", "<value2>"]``

Response headers
****************
To capture HTTP response headers as span attributes, set the environment variable
``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE`` to a comma delimited list of HTTP header names.

For example using the environment variable,
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE="content-type,custom_response_header"

will extract ``content-type`` and ``custom_response_header`` from the response headers and add them as span attributes.

Response header names in aiohttp are case-insensitive. So, giving the header name as ``CUStom-Header`` in the environment
variable will capture the header named ``custom-header``.

Regular expressions may also be used to match multiple headers that correspond to the given pattern.  For example:
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE="Content.*,X-.*"

Would match all response headers that start with ``Content`` and ``X-``.

To capture all response headers, set ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE`` to ``".*"``.
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE=".*"

The name of the added span attribute will follow the format ``http.response.header.<header_name>`` where ``<header_name>``
is the normalized HTTP header name (lowercase, with ``-`` replaced by ``_``). The value of the attribute will be a
list containing the header values.

For example:
``http.response.header.custom_response_header = ["<value1>", "<value2>"]``

Sanitizing headers
******************
In order to prevent storing sensitive data such as personally identifiable information (PII), session keys, passwords,
etc, set the environment variable ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS``
to a comma delimited list of HTTP header names to be sanitized.

Regexes may be used, and all header names will be matched in a case-insensitive manner.

For example using the environment variable,
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS=".*session.*,set-cookie"

will replace the value of headers such as ``session-id`` and ``set-cookie`` with ``[REDACTED]`` in the span.

Note:
    The environment variable names used to capture HTTP headers are still experimental, and thus are subject to change.

API
---
"""

from __future__ import annotations

import types
import typing
from timeit import default_timer
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Collection,
    TypedDict,
    Union,
    cast,
)
from urllib.parse import urlparse

import aiohttp
import wrapt
import yarl

from opentelemetry import context as context_api
from opentelemetry import trace
from opentelemetry.instrumentation._semconv import (
    HTTP_DURATION_HISTOGRAM_BUCKETS_NEW,
    HTTP_DURATION_HISTOGRAM_BUCKETS_OLD,
    _client_duration_attrs_new,
    _client_duration_attrs_old,
    _filter_semconv_duration_attrs,
    _get_schema_url,
    _OpenTelemetrySemanticConventionStability,
    _OpenTelemetryStabilitySignalType,
    _report_new,
    _report_old,
    _set_http_host_client,
    _set_http_method,
    _set_http_net_peer_name_client,
    _set_http_peer_port_client,
    _set_http_url,
    _set_status,
    _StabilityMode,
)
from opentelemetry.instrumentation.aiohttp_client.package import _instruments
from opentelemetry.instrumentation.aiohttp_client.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
    is_http_instrumentation_enabled,
    unwrap,
)
from opentelemetry.metrics import MeterProvider, get_meter
from opentelemetry.propagate import inject
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from opentelemetry.semconv.metrics import (
    MetricInstruments,  # type: ignore[reportDeprecated]
)
from opentelemetry.semconv.metrics.http_metrics import (
    HTTP_CLIENT_REQUEST_DURATION,
)
from opentelemetry.trace import Span, SpanKind, TracerProvider, get_tracer
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util.http import (
    OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST,
    OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE,
    OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS,
    get_custom_header_attributes,
    get_custom_headers,
    get_excluded_urls,
    normalise_request_header_name,
    normalise_response_header_name,
    redact_url,
    sanitize_method,
)

if TYPE_CHECKING:
    from typing_extensions import Unpack

    UrlFilterT = typing.Optional[typing.Callable[[yarl.URL], str]]
    RequestHookT = typing.Optional[
        typing.Callable[[Span, aiohttp.TraceRequestStartParams], None]
    ]
    ResponseHookT = typing.Optional[
        typing.Callable[
            [
                Span,
                typing.Union[
                    aiohttp.TraceRequestEndParams,
                    aiohttp.TraceRequestExceptionParams,
                ],
            ],
            None,
        ]
    ]

    class ClientSessionInitKwargs(TypedDict, total=False):
        trace_configs: typing.Sequence[aiohttp.TraceConfig]

    class InstrumentKwargs(TypedDict, total=False):
        tracer_provider: trace.TracerProvider
        meter_provider: MeterProvider
        url_filter: UrlFilterT
        request_hook: RequestHookT
        response_hook: ResponseHookT
        trace_configs: typing.Sequence[aiohttp.TraceConfig]

    class UninstrumentKwargs(TypedDict, total=False):
        pass


def _get_span_name(method: str) -> str:
    method = sanitize_method(method.strip())
    if method == "_OTHER":
        method = "HTTP"
    return method


def _set_http_status_code_attribute(
    span: Span,
    status_code: int,
    metric_attributes: Union[dict[str, Any], None] = None,
    sem_conv_opt_in_mode: _StabilityMode = _StabilityMode.DEFAULT,
):
    status_code_str = str(status_code)
    try:
        status_code = int(status_code)
    except ValueError:
        status_code = -1
    if metric_attributes is None:
        metric_attributes = {}
    # When we have durations we should set metrics only once
    # Also the decision to include status code on a histogram should
    # not be dependent on tracing decisions.
    _set_status(
        span,
        metric_attributes,
        status_code,
        status_code_str,
        server_span=False,
        sem_conv_opt_in_mode=sem_conv_opt_in_mode,
    )


# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
[docs]def create_trace_config( url_filter: UrlFilterT = None, request_hook: RequestHookT = None, response_hook: ResponseHookT = None, tracer_provider: Union[TracerProvider, None] = None, meter_provider: Union[MeterProvider, None] = None, sem_conv_opt_in_mode: _StabilityMode = _StabilityMode.DEFAULT, captured_request_headers: typing.Optional[list[str]] = None, captured_response_headers: typing.Optional[list[str]] = None, sensitive_headers: typing.Optional[list[str]] = None, ) -> aiohttp.TraceConfig: """Create an aiohttp-compatible trace configuration. One span is created for the entire HTTP request, including initial TCP/TLS setup if the connection doesn't exist. By default the span name is set to the HTTP request method. Example usage: .. code:: python import aiohttp from opentelemetry.instrumentation.aiohttp_client import create_trace_config async with aiohttp.ClientSession(trace_configs=[create_trace_config()]) as session: async with session.get(url) as response: await response.text() :param url_filter: A callback to process the requested URL prior to adding it as a span attribute. This can be useful to remove sensitive data such as API keys or user personal information. :param Callable request_hook: Optional callback that can modify span name and request params. :param Callable response_hook: Optional callback that can modify span name and response params. :param tracer_provider: optional TracerProvider from which to get a Tracer :param meter_provider: optional Meter provider to use :param captured_request_headers: List of HTTP request header regexes to capture as span attributes. Header names matching these patterns will be added as span attributes with the format ``http.request.header.<header_name>``. :param captured_response_headers: List of HTTP response header regexes to capture as span attributes. Header names matching these patterns will be added as span attributes with the format ``http.response.header.<header_name>``. :param sensitive_headers: List of HTTP header regexes whose values should be sanitized (redacted) when captured. Header values matching these patterns will be replaced with ``[REDACTED]``. :return: An object suitable for use with :py:class:`aiohttp.ClientSession`. :rtype: :py:class:`aiohttp.TraceConfig` """ # `aiohttp.TraceRequestStartParams` resolves to `aiohttp.tracing.TraceRequestStartParams` # which doesn't exist in the aiohttp intersphinx inventory. # Explicitly specify the type for the `request_hook` and `response_hook` param and rtype to work # around this issue. schema_url = _get_schema_url(sem_conv_opt_in_mode) tracer = get_tracer( __name__, __version__, tracer_provider, schema_url=schema_url, ) meter = get_meter( __name__, __version__, meter_provider, schema_url, ) duration_histogram_old = None if _report_old(sem_conv_opt_in_mode): duration_histogram_old = meter.create_histogram( name=MetricInstruments.HTTP_CLIENT_DURATION, # type: ignore[reportDeprecated] unit="ms", description="measures the duration of the outbound HTTP request", explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_OLD, ) duration_histogram_new = None if _report_new(sem_conv_opt_in_mode): duration_histogram_new = meter.create_histogram( name=HTTP_CLIENT_REQUEST_DURATION, unit="s", description="Duration of HTTP client requests.", explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_NEW, ) excluded_urls = get_excluded_urls("AIOHTTP_CLIENT") def _end_trace(trace_config_ctx: types.SimpleNamespace): elapsed_time = max(default_timer() - trace_config_ctx.start_time, 0) if trace_config_ctx.token: context_api.detach(trace_config_ctx.token) if trace_config_ctx.span: trace_config_ctx.span.end() if trace_config_ctx.duration_histogram_old is not None: duration_attrs_old = cast( dict[str, Any], _filter_semconv_duration_attrs( trace_config_ctx.metric_attributes, _client_duration_attrs_old, _client_duration_attrs_new, _StabilityMode.DEFAULT, ), ) trace_config_ctx.duration_histogram_old.record( max(round(elapsed_time * 1000), 0), attributes=duration_attrs_old, ) if trace_config_ctx.duration_histogram_new is not None: duration_attrs_new = cast( dict[str, Any], _filter_semconv_duration_attrs( trace_config_ctx.metric_attributes, _client_duration_attrs_old, _client_duration_attrs_new, _StabilityMode.HTTP, ), ) trace_config_ctx.duration_histogram_new.record( elapsed_time, attributes=duration_attrs_new ) async def on_request_start( _session: aiohttp.ClientSession, trace_config_ctx: types.SimpleNamespace, params: aiohttp.TraceRequestStartParams, ): if ( not is_http_instrumentation_enabled() or trace_config_ctx.excluded_urls.url_disabled(str(params.url)) ): return trace_config_ctx.start_time = default_timer() method = params.method request_span_name = _get_span_name(method) request_url = ( redact_url( cast(Callable[[yarl.URL], str], trace_config_ctx.url_filter)( params.url ) ) if callable(trace_config_ctx.url_filter) else redact_url(str(params.url)) ) span_attributes: dict[str, Any] = {} _set_http_method( span_attributes, method, sanitize_method(method), sem_conv_opt_in_mode, ) _set_http_method( trace_config_ctx.metric_attributes, method, sanitize_method(method), sem_conv_opt_in_mode, ) _set_http_url(span_attributes, request_url, sem_conv_opt_in_mode) try: parsed_url = urlparse(request_url) if parsed_url.hostname: _set_http_host_client( trace_config_ctx.metric_attributes, parsed_url.hostname, sem_conv_opt_in_mode, ) _set_http_net_peer_name_client( trace_config_ctx.metric_attributes, parsed_url.hostname, sem_conv_opt_in_mode, ) if _report_new(sem_conv_opt_in_mode): _set_http_host_client( span_attributes, parsed_url.hostname, sem_conv_opt_in_mode, ) if parsed_url.port: _set_http_peer_port_client( trace_config_ctx.metric_attributes, parsed_url.port, sem_conv_opt_in_mode, ) if _report_new(sem_conv_opt_in_mode): _set_http_peer_port_client( span_attributes, parsed_url.port, sem_conv_opt_in_mode ) except ValueError: pass span_attributes.update( get_custom_header_attributes( { key: params.headers.getall(key) for key in params.headers.keys() }, captured_request_headers, sensitive_headers, normalise_request_header_name, ) ) trace_config_ctx.span = trace_config_ctx.tracer.start_span( request_span_name, kind=SpanKind.CLIENT, attributes=span_attributes ) if callable(request_hook): request_hook(trace_config_ctx.span, params) trace_config_ctx.token = context_api.attach( trace.set_span_in_context(trace_config_ctx.span) ) inject(params.headers) async def on_request_end( _session: aiohttp.ClientSession, trace_config_ctx: types.SimpleNamespace, params: aiohttp.TraceRequestEndParams, ): if trace_config_ctx.span is None: return if callable(response_hook): response_hook(trace_config_ctx.span, params) _set_http_status_code_attribute( trace_config_ctx.span, params.response.status, trace_config_ctx.metric_attributes, sem_conv_opt_in_mode, ) trace_config_ctx.span.set_attributes( get_custom_header_attributes( { key: params.response.headers.getall(key) for key in params.response.headers.keys() }, captured_response_headers, sensitive_headers, normalise_response_header_name, ) ) _end_trace(trace_config_ctx) async def on_request_exception( _session: aiohttp.ClientSession, trace_config_ctx: types.SimpleNamespace, params: aiohttp.TraceRequestExceptionParams, ): if trace_config_ctx.span is None: return if trace_config_ctx.span.is_recording() and params.exception: exc_type = type(params.exception).__qualname__ if _report_new(sem_conv_opt_in_mode): trace_config_ctx.span.set_attribute(ERROR_TYPE, exc_type) trace_config_ctx.metric_attributes[ERROR_TYPE] = exc_type trace_config_ctx.span.set_status( Status(StatusCode.ERROR, exc_type) ) trace_config_ctx.span.record_exception(params.exception) if callable(response_hook): response_hook(trace_config_ctx.span, params) _end_trace(trace_config_ctx) def _trace_config_ctx_factory(**kwargs: Any) -> types.SimpleNamespace: kwargs.setdefault("trace_request_ctx", {}) return types.SimpleNamespace( tracer=tracer, span=None, token=None, duration_histogram_old=duration_histogram_old, duration_histogram_new=duration_histogram_new, metric_attributes={}, url_filter=url_filter, excluded_urls=excluded_urls, start_time=0, **kwargs, ) trace_config = aiohttp.TraceConfig( trace_config_ctx_factory=cast( type[types.SimpleNamespace], _trace_config_ctx_factory ) ) trace_config.on_request_start.append(on_request_start) trace_config.on_request_end.append(on_request_end) trace_config.on_request_exception.append(on_request_exception) return trace_config
def _instrument( tracer_provider: Union[TracerProvider, None] = None, meter_provider: Union[MeterProvider, None] = None, url_filter: UrlFilterT = None, request_hook: RequestHookT = None, response_hook: ResponseHookT = None, trace_configs: typing.Optional[ typing.Sequence[aiohttp.TraceConfig] ] = None, sem_conv_opt_in_mode: _StabilityMode = _StabilityMode.DEFAULT, captured_request_headers: typing.Optional[list[str]] = None, captured_response_headers: typing.Optional[list[str]] = None, sensitive_headers: typing.Optional[list[str]] = None, ): """Enables tracing of all ClientSessions When a ClientSession gets created a TraceConfig is automatically added to the session's trace_configs. """ trace_configs = trace_configs or () # pylint:disable=unused-argument def instrumented_init( wrapped: Callable[..., None], _instance: aiohttp.ClientSession, args: tuple[Any, ...], kwargs: ClientSessionInitKwargs, ): client_trace_configs = list(kwargs.get("trace_configs") or []) client_trace_configs.extend(trace_configs) trace_config = create_trace_config( url_filter=url_filter, request_hook=request_hook, response_hook=response_hook, tracer_provider=tracer_provider, meter_provider=meter_provider, sem_conv_opt_in_mode=sem_conv_opt_in_mode, captured_request_headers=captured_request_headers, captured_response_headers=captured_response_headers, sensitive_headers=sensitive_headers, ) setattr(trace_config, "_is_instrumented_by_opentelemetry", True) client_trace_configs.append(trace_config) kwargs["trace_configs"] = client_trace_configs return wrapped(*args, **kwargs) wrapt.wrap_function_wrapper( # type: ignore[reportUnknownVariableType] aiohttp.ClientSession, "__init__", instrumented_init ) def _uninstrument(): """Disables instrumenting for all newly created ClientSessions""" unwrap(aiohttp.ClientSession, "__init__") def _uninstrument_session(client_session: aiohttp.ClientSession): """Disables instrumentation for the given ClientSession""" # pylint: disable=protected-access trace_configs = client_session._trace_configs client_session._trace_configs = [ trace_config for trace_config in trace_configs if not hasattr(trace_config, "_is_instrumented_by_opentelemetry") ]
[docs]class AioHttpClientInstrumentor(BaseInstrumentor): """An instrumentor for aiohttp client sessions See `BaseInstrumentor` """
[docs] def instrumentation_dependencies(self) -> Collection[str]: return _instruments
def _instrument(self, **kwargs: Unpack[InstrumentKwargs]): """Instruments aiohttp ClientSession Args: **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global ``meter_provider``: a MeterProvider, defaults to global ``url_filter``: A callback to process the requested URL prior to adding it as a span attribute. This can be useful to remove sensitive data such as API keys or user personal information. ``request_hook``: An optional callback that is invoked right after a span is created. ``response_hook``: An optional callback which is invoked right before the span is finished processing a response. ``trace_configs``: An optional list of aiohttp.TraceConfig items, allowing customize enrichment of spans based on aiohttp events (see specification: https://docs.aiohttp.org/en/stable/tracing_reference.html) """ _OpenTelemetrySemanticConventionStability._initialize() _sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( _OpenTelemetryStabilitySignalType.HTTP, ) _instrument( tracer_provider=kwargs.get("tracer_provider"), meter_provider=kwargs.get("meter_provider"), url_filter=kwargs.get("url_filter"), request_hook=kwargs.get("request_hook"), response_hook=kwargs.get("response_hook"), trace_configs=kwargs.get("trace_configs"), sem_conv_opt_in_mode=_sem_conv_opt_in_mode, captured_request_headers=get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST ), captured_response_headers=get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE ), sensitive_headers=get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS ), ) def _uninstrument(self, **kwargs: Unpack[UninstrumentKwargs]): _uninstrument()
[docs] @staticmethod def uninstrument_session(client_session: aiohttp.ClientSession): """Disables instrumentation for the given session""" _uninstrument_session(client_session)