Source code for opentelemetry.instrumentation.httpx

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
# pylint: disable=too-many-lines
"""
Usage
-----

Instrumenting all clients
*************************

When using the instrumentor, all clients will automatically trace requests.

.. code-block:: python

    import httpx
    import asyncio
    from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

    url = "https://example.com"
    HTTPXClientInstrumentor().instrument()

    with httpx.Client() as client:
        response = client.get(url)

    async def get(url):
        async with httpx.AsyncClient() as client:
            response = await client.get(url)

    asyncio.run(get(url))

Instrumenting single clients
****************************

If you only want to instrument requests for specific client instances, you can
use the `instrument_client` method.


.. code-block:: python

    import httpx
    import asyncio
    from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

    url = "https://example.com"

    with httpx.Client() as client:
        HTTPXClientInstrumentor.instrument_client(client)
        response = client.get(url)

    async def get(url):
        async with httpx.AsyncClient() as client:
            HTTPXClientInstrumentor.instrument_client(client)
            response = await client.get(url)

    asyncio.run(get(url))

Uninstrument
************

If you need to uninstrument clients, there are two options available.

.. code-block:: python

    import httpx
    from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

    HTTPXClientInstrumentor().instrument()
    client = httpx.Client()

    # Uninstrument a specific client
    HTTPXClientInstrumentor.uninstrument_client(client)

    # Uninstrument all clients
    HTTPXClientInstrumentor().uninstrument()


Using transports directly
*************************

If you don't want to use the instrumentor class, you can use the transport classes directly.


.. code-block:: python

    import httpx
    import asyncio
    from opentelemetry.instrumentation.httpx import (
        AsyncOpenTelemetryTransport,
        SyncOpenTelemetryTransport,
    )

    url = "https://example.com"
    transport = httpx.HTTPTransport()
    telemetry_transport = SyncOpenTelemetryTransport(transport)

    with httpx.Client(transport=telemetry_transport) as client:
        response = client.get(url)

    transport = httpx.AsyncHTTPTransport()
    telemetry_transport = AsyncOpenTelemetryTransport(transport)

    async def get(url):
        async with httpx.AsyncClient(transport=telemetry_transport) as client:
            response = await client.get(url)

    asyncio.run(get(url))

Request and response hooks
***************************

The instrumentation supports specifying request and response hooks. These are functions that get called back by the instrumentation right after a span is created for a request
and right before the span is finished while processing a response.

.. note::

    The request hook receives the raw arguments provided to the transport layer. The response hook receives the raw return values from the transport layer.

The hooks can be configured as follows:


.. code-block:: python

    from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

    def request_hook(span, request):
        # method, url, headers, stream, extensions = request
        pass

    def response_hook(span, request, response):
        # method, url, headers, stream, extensions = request
        # status_code, headers, stream, extensions = response
        pass

    async def async_request_hook(span, request):
        # method, url, headers, stream, extensions = request
        pass

    async def async_response_hook(span, request, response):
        # method, url, headers, stream, extensions = request
        # status_code, headers, stream, extensions = response
        pass

    HTTPXClientInstrumentor().instrument(
        request_hook=request_hook,
        response_hook=response_hook,
        async_request_hook=async_request_hook,
        async_response_hook=async_response_hook
    )


Or if you are using the transport classes directly:


.. code-block:: python

    import httpx
    from opentelemetry.instrumentation.httpx import SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport

    def request_hook(span, request):
        # method, url, headers, stream, extensions = request
        pass

    def response_hook(span, request, response):
        # method, url, headers, stream, extensions = request
        # status_code, headers, stream, extensions = response
        pass

    async def async_request_hook(span, request):
        # method, url, headers, stream, extensions = request
        pass

    async def async_response_hook(span, request, response):
        # method, url, headers, stream, extensions = request
        # status_code, headers, stream, extensions = response
        pass

    transport = httpx.HTTPTransport()
    telemetry_transport = SyncOpenTelemetryTransport(
        transport,
        request_hook=request_hook,
        response_hook=response_hook
    )

    async_transport = httpx.AsyncHTTPTransport()
    async_telemetry_transport = AsyncOpenTelemetryTransport(
        async_transport,
        request_hook=async_request_hook,
        response_hook=async_response_hook
    )


Configuration
-------------

Exclude lists
*************
To exclude certain URLs from tracking, set the environment variable ``OTEL_PYTHON_HTTPX_EXCLUDED_URLS``
(or ``OTEL_PYTHON_EXCLUDED_URLS`` to cover all instrumentations) to a string of comma delimited regexes that match the
URLs.

For example,

::

    export OTEL_PYTHON_HTTPX_EXCLUDED_URLS="client/.*/info,healthcheck"

will exclude requests such as ``https://site/client/123/info`` and ``https://site/xyz/healthcheck``.

Capture HTTP request and response headers
*****************************************
You can configure the agent to capture specified HTTP headers as span attributes, according to the
`semantic conventions <https://opentelemetry.io/docs/specs/semconv/http/http-spans/#http-client-span>`_.

Request headers
***************
To capture HTTP request headers as span attributes, set the environment variable
``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST`` to a comma delimited list of HTTP header names.

For example using the environment variable,
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST="content-type,custom_request_header"

will extract ``content-type`` and ``custom_request_header`` from the request headers and add them as span attributes.

Request header names in HttpX are case-insensitive. So, giving the header name as ``CUStom-Header`` in the environment
variable will capture the header named ``custom-header``.

Regular expressions may also be used to match multiple headers that correspond to the given pattern.  For example:
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST="Accept.*,X-.*"

Would match all request headers that start with ``Accept`` and ``X-``.

To capture all request headers, set ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST`` to ``".*"``.
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST=".*"

The name of the added span attribute will follow the format ``http.request.header.<header_name>`` where ``<header_name>``
is the normalized HTTP header name (lowercase, with ``-`` replaced by ``_``). The value of the attribute will be a
single item list containing all the header values.

For example:
``http.request.header.custom_request_header = ["<value1>", "<value2>"]``

Response headers
****************
To capture HTTP response headers as span attributes, set the environment variable
``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE`` to a comma delimited list of HTTP header names.

For example using the environment variable,
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE="content-type,custom_response_header"

will extract ``content-type`` and ``custom_response_header`` from the response headers and add them as span attributes.

Response header names in HttpX are case-insensitive. So, giving the header name as ``CUStom-Header`` in the environment
variable will capture the header named ``custom-header``.

Regular expressions may also be used to match multiple headers that correspond to the given pattern.  For example:
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE="Content.*,X-.*"

Would match all response headers that start with ``Content`` and ``X-``.

To capture all response headers, set ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE`` to ``".*"``.
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE=".*"

The name of the added span attribute will follow the format ``http.response.header.<header_name>`` where ``<header_name>``
is the normalized HTTP header name (lowercase, with ``-`` replaced by ``_``). The value of the attribute will be a
list containing the header values.

For example:
``http.response.header.custom_response_header = ["<value1>", "<value2>"]``

Sanitizing headers
******************
In order to prevent storing sensitive data such as personally identifiable information (PII), session keys, passwords,
etc, set the environment variable ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS``
to a comma delimited list of HTTP header names to be sanitized.

Regexes may be used, and all header names will be matched in a case-insensitive manner.

For example using the environment variable,
::

    export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS=".*session.*,set-cookie"

will replace the value of headers such as ``session-id`` and ``set-cookie`` with ``[REDACTED]`` in the span.

Note:
    The environment variable names used to capture HTTP headers are still experimental, and thus are subject to change.

API
---
"""

from __future__ import annotations

import logging
import typing
from collections import defaultdict
from functools import partial
from inspect import iscoroutinefunction
from timeit import default_timer
from types import TracebackType

import httpx
from wrapt import wrap_function_wrapper

from opentelemetry.instrumentation._semconv import (
    HTTP_DURATION_HISTOGRAM_BUCKETS_NEW,
    HTTP_DURATION_HISTOGRAM_BUCKETS_OLD,
    _client_duration_attrs_new,
    _client_duration_attrs_old,
    _filter_semconv_duration_attrs,
    _get_schema_url,
    _OpenTelemetrySemanticConventionStability,
    _OpenTelemetryStabilitySignalType,
    _report_new,
    _report_old,
    _set_http_host_client,
    _set_http_method,
    _set_http_net_peer_name_client,
    _set_http_network_protocol_version,
    _set_http_peer_port_client,
    _set_http_scheme,
    _set_http_status_code,
    _set_http_url,
    _set_status,
    _StabilityMode,
)
from opentelemetry.instrumentation.httpx.package import _instruments
from opentelemetry.instrumentation.httpx.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
    http_status_to_status_code,
    is_http_instrumentation_enabled,
    unwrap,
)
from opentelemetry.metrics import Histogram, MeterProvider, get_meter
from opentelemetry.propagate import inject
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from opentelemetry.semconv.attributes.network_attributes import (
    NETWORK_PEER_ADDRESS,
    NETWORK_PEER_PORT,
)
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.semconv.metrics.http_metrics import (
    HTTP_CLIENT_REQUEST_DURATION,
)
from opentelemetry.trace import SpanKind, Tracer, TracerProvider, get_tracer
from opentelemetry.trace.span import Span
from opentelemetry.trace.status import StatusCode
from opentelemetry.util.http import (
    OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST,
    OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE,
    OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS,
    ExcludeList,
    get_custom_header_attributes,
    get_custom_headers,
    get_excluded_urls,
    normalise_request_header_name,
    normalise_response_header_name,
    redact_url,
    sanitize_method,
)

_logger = logging.getLogger(__name__)

RequestHook = typing.Callable[[Span, "RequestInfo"], None]
ResponseHook = typing.Callable[[Span, "RequestInfo", "ResponseInfo"], None]
AsyncRequestHook = typing.Callable[
    [Span, "RequestInfo"], typing.Awaitable[typing.Any]
]
AsyncResponseHook = typing.Callable[
    [Span, "RequestInfo", "ResponseInfo"], typing.Awaitable[typing.Any]
]


[docs]class RequestInfo(typing.NamedTuple): method: bytes url: httpx.URL headers: httpx.Headers | None stream: httpx.SyncByteStream | httpx.AsyncByteStream | None extensions: dict[str, typing.Any] | None
[docs]class ResponseInfo(typing.NamedTuple): status_code: int headers: httpx.Headers | None stream: httpx.SyncByteStream | httpx.AsyncByteStream extensions: dict[str, typing.Any] | None
def _get_default_span_name(method: str) -> str: method = sanitize_method(method.strip()) if method == "_OTHER": method = "HTTP" return method def _prepare_headers(headers: httpx.Headers | None) -> httpx.Headers: return httpx.Headers(headers) def _extract_parameters( args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any] ) -> tuple[ bytes, httpx.URL | tuple[bytes, bytes, int | None, bytes], httpx.Headers | None, httpx.SyncByteStream | httpx.AsyncByteStream | None, dict[str, typing.Any], ]: if isinstance(args[0], httpx.Request): # In httpx >= 0.20.0, handle_request receives a Request object request: httpx.Request = args[0] method = request.method.encode() url = httpx.URL(str(request.url)) headers = request.headers stream = request.stream extensions = request.extensions else: # In httpx < 0.20.0, handle_request receives the parameters separately method = args[0] url = args[1] headers = kwargs.get("headers", args[2] if len(args) > 2 else None) stream = kwargs.get("stream", args[3] if len(args) > 3 else None) extensions = kwargs.get( "extensions", args[4] if len(args) > 4 else None ) return method, url, headers, stream, extensions def _normalize_url( url: httpx.URL | tuple[bytes, bytes, int | None, bytes], ) -> str: if isinstance(url, tuple): scheme, host, port, path = [ part.decode() if isinstance(part, bytes) else part for part in url ] return ( f"{scheme}://{host}:{port}{path}" if port else f"{scheme}://{host}{path}" ) return str(url) def _inject_propagation_headers(headers, args, kwargs): _headers = _prepare_headers(headers) inject(_headers) if isinstance(args[0], httpx.Request): request: httpx.Request = args[0] request.headers = _headers else: kwargs["headers"] = _headers.raw def _normalize_headers( headers: httpx.Headers | dict[str, list[str] | str] | list[tuple[bytes, bytes]] | None, ) -> dict[str, list[str]]: normalized_headers: defaultdict[str, list[str]] = defaultdict(list) if isinstance(headers, httpx.Headers): for key in headers.keys(): normalized_headers[key.lower()].extend( headers.get_list(key, split_commas=True) ) elif isinstance(headers, dict): for key, value in headers.items(): if isinstance(value, list): normalized_headers[key.lower()].extend(value) else: normalized_headers[key.lower()].append(value) elif isinstance(headers, list): for key, value in headers: normalized_headers[key.decode("latin-1").lower()].append( value.decode("latin-1") ) return dict(normalized_headers) def _extract_response( response: httpx.Response | tuple[int, httpx.Headers, httpx.SyncByteStream, dict[str, typing.Any]], ) -> tuple[ int, httpx.Headers, httpx.SyncByteStream | httpx.AsyncByteStream, dict[str, typing.Any], str, ]: if isinstance(response, httpx.Response): status_code = response.status_code headers = response.headers stream = response.stream extensions = response.extensions http_version = response.http_version else: status_code, headers, stream, extensions = response http_version = extensions.get("http_version", b"HTTP/1.1").decode( "ascii", errors="ignore" ) return (status_code, headers, stream, extensions, http_version) def _apply_request_client_attributes_to_span( span_attributes: dict[str, typing.Any], metric_attributes: dict[str, typing.Any], url: str | httpx.URL, method_original: str, semconv: _StabilityMode, headers: httpx.Headers | dict[str, list[str] | str] | None = None, captured_headers: list[str] | None = None, sensitive_headers: list[str] | None = None, ): url = httpx.URL(url) # http semconv transition: http.method -> http.request.method _set_http_method( span_attributes, method_original, sanitize_method(method_original), semconv, ) # http semconv transition: http.url -> url.full _set_http_url(span_attributes, redact_url(str(url)), semconv) # Set HTTP method in metric labels _set_http_method( metric_attributes, method_original, sanitize_method(method_original), semconv, ) span_attributes.update( get_custom_header_attributes( _normalize_headers(headers), captured_headers, sensitive_headers, normalise_request_header_name, ) ) if _report_old(semconv): # TODO: Support opt-in for url.scheme in new semconv _set_http_scheme(metric_attributes, url.scheme, semconv) if _report_new(semconv): if url.host: # http semconv transition: http.host -> server.address _set_http_host_client(span_attributes, url.host, semconv) # Add metric labels _set_http_host_client(metric_attributes, url.host, semconv) _set_http_net_peer_name_client( metric_attributes, url.host, semconv ) # http semconv transition: net.sock.peer.addr -> network.peer.address span_attributes[NETWORK_PEER_ADDRESS] = url.host if url.port: # http semconv transition: net.sock.peer.port -> network.peer.port _set_http_peer_port_client(span_attributes, url.port, semconv) span_attributes[NETWORK_PEER_PORT] = url.port # Add metric labels _set_http_peer_port_client(metric_attributes, url.port, semconv) def _apply_response_client_attributes_to_span( span: Span, status_code: int, http_version: str, semconv: _StabilityMode, headers: httpx.Headers | dict[str, list[str] | str] | None = None, captured_headers: list[str] | None = None, sensitive_headers: list[str] | None = None, ): # http semconv transition: http.status_code -> http.response.status_code # TODO: use _set_status when it's stable for http clients span_attributes = {} _set_http_status_code( span_attributes, status_code, semconv, ) http_status_code = http_status_to_status_code(status_code) span.set_status(http_status_code) span.set_attributes( get_custom_header_attributes( _normalize_headers(headers), captured_headers, sensitive_headers, normalise_response_header_name, ) ) if http_status_code == StatusCode.ERROR and _report_new(semconv): # http semconv transition: new error.type span_attributes[ERROR_TYPE] = str(status_code) if http_version and _report_new(semconv): # http semconv transition: http.flavor -> network.protocol.version _set_http_network_protocol_version( span_attributes, http_version.replace("HTTP/", ""), semconv, ) for key, val in span_attributes.items(): span.set_attribute(key, val) def _apply_response_client_attributes_to_metrics( span: Span | None, metric_attributes: dict[str, typing.Any], status_code: int, http_version: str, semconv: _StabilityMode, ) -> None: """Apply response attributes to metric attributes.""" # Set HTTP status code in metric attributes _set_status( span, metric_attributes, status_code, str(status_code), server_span=False, sem_conv_opt_in_mode=semconv, ) if http_version and _report_new(semconv): _set_http_network_protocol_version( metric_attributes, http_version.replace("HTTP/", ""), semconv, )
[docs]class SyncOpenTelemetryTransport(httpx.BaseTransport): """Sync transport class that will trace all requests made with a client. Args: transport: SyncHTTPTransport instance to wrap tracer_provider: Tracer provider to use meter_provider: Meter provider to use request_hook: A hook that receives the span and request that is called right after the span is created response_hook: A hook that receives the span, request, and response that is called right before the span ends """ def __init__( self, transport: httpx.BaseTransport, tracer_provider: TracerProvider | None = None, meter_provider: MeterProvider | None = None, request_hook: RequestHook | None = None, response_hook: ResponseHook | None = None, ): _OpenTelemetrySemanticConventionStability._initialize() self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( _OpenTelemetryStabilitySignalType.HTTP, ) schema_url = _get_schema_url(self._sem_conv_opt_in_mode) self._transport = transport self._tracer = get_tracer( __name__, instrumenting_library_version=__version__, tracer_provider=tracer_provider, schema_url=schema_url, ) meter = get_meter( __name__, __version__, meter_provider, schema_url, ) self._duration_histogram_old = None if _report_old(self._sem_conv_opt_in_mode): self._duration_histogram_old = meter.create_histogram( name=MetricInstruments.HTTP_CLIENT_DURATION, unit="ms", description="measures the duration of the outbound HTTP request", explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_OLD, ) self._duration_histogram_new = None if _report_new(self._sem_conv_opt_in_mode): self._duration_histogram_new = meter.create_histogram( name=HTTP_CLIENT_REQUEST_DURATION, unit="s", description="Duration of HTTP client requests.", explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_NEW, ) self._request_hook = request_hook self._response_hook = response_hook self._excluded_urls = get_excluded_urls("HTTPX") self._captured_request_headers = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST ) self._captured_response_headers = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE ) self._sensitive_headers = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS ) def __enter__(self) -> SyncOpenTelemetryTransport: self._transport.__enter__() return self def __exit__( self, exc_type: type[BaseException] | None = None, exc_value: BaseException | None = None, traceback: TracebackType | None = None, ) -> None: self._transport.__exit__(exc_type, exc_value, traceback) # pylint: disable=R0914
[docs] def handle_request( self, *args: typing.Any, **kwargs: typing.Any, ) -> ( tuple[int, httpx.Headers, httpx.SyncByteStream, dict[str, typing.Any]] | httpx.Response ): """Add request info to span.""" if not is_http_instrumentation_enabled(): return self._transport.handle_request(*args, **kwargs) method, url, headers, stream, extensions = _extract_parameters( args, kwargs ) if self._excluded_urls and self._excluded_urls.url_disabled( _normalize_url(url) ): return self._transport.handle_request(*args, **kwargs) method_original = method.decode() span_name = _get_default_span_name(method_original) span_attributes = {} metric_attributes = {} # apply http client response attributes according to semconv _apply_request_client_attributes_to_span( span_attributes, metric_attributes, url, method_original, self._sem_conv_opt_in_mode, headers, self._captured_request_headers, self._sensitive_headers, ) request_info = RequestInfo(method, url, headers, stream, extensions) with self._tracer.start_as_current_span( span_name, kind=SpanKind.CLIENT, attributes=span_attributes ) as span: exception = None if callable(self._request_hook): self._request_hook(span, request_info) _inject_propagation_headers(headers, args, kwargs) start_time = default_timer() try: response = self._transport.handle_request(*args, **kwargs) except Exception as exc: # pylint: disable=W0703 exception = exc response = getattr(exc, "response", None) finally: elapsed_time = max(default_timer() - start_time, 0) if isinstance(response, (httpx.Response, tuple)): status_code, headers, stream, extensions, http_version = ( _extract_response(response) ) # Always apply response attributes to metrics _apply_response_client_attributes_to_metrics( span, metric_attributes, status_code, http_version, self._sem_conv_opt_in_mode, ) if span.is_recording(): # apply http client response attributes according to semconv _apply_response_client_attributes_to_span( span, status_code, http_version, self._sem_conv_opt_in_mode, headers, self._captured_response_headers, self._sensitive_headers, ) if callable(self._response_hook): self._response_hook( span, request_info, ResponseInfo(status_code, headers, stream, extensions), ) if exception: if span.is_recording() and _report_new( self._sem_conv_opt_in_mode ): span.set_attribute( ERROR_TYPE, type(exception).__qualname__ ) metric_attributes[ERROR_TYPE] = type( exception ).__qualname__ raise exception.with_traceback(exception.__traceback__) if self._duration_histogram_old is not None: duration_attrs_old = _filter_semconv_duration_attrs( metric_attributes, _client_duration_attrs_old, _client_duration_attrs_new, _StabilityMode.DEFAULT, ) self._duration_histogram_old.record( max(round(elapsed_time * 1000), 0), attributes=duration_attrs_old, ) if self._duration_histogram_new is not None: duration_attrs_new = _filter_semconv_duration_attrs( metric_attributes, _client_duration_attrs_old, _client_duration_attrs_new, _StabilityMode.HTTP, ) self._duration_histogram_new.record( elapsed_time, attributes=duration_attrs_new ) return response
[docs] def close(self) -> None: self._transport.close()
[docs]class AsyncOpenTelemetryTransport(httpx.AsyncBaseTransport): """Async transport class that will trace all requests made with a client. Args: transport: AsyncHTTPTransport instance to wrap tracer_provider: Tracer provider to use meter_provider: Meter provider to use request_hook: A hook that receives the span and request that is called right after the span is created response_hook: A hook that receives the span, request, and response that is called right before the span ends """ def __init__( self, transport: httpx.AsyncBaseTransport, tracer_provider: TracerProvider | None = None, meter_provider: MeterProvider | None = None, request_hook: AsyncRequestHook | None = None, response_hook: AsyncResponseHook | None = None, ): _OpenTelemetrySemanticConventionStability._initialize() self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( _OpenTelemetryStabilitySignalType.HTTP, ) schema_url = _get_schema_url(self._sem_conv_opt_in_mode) self._transport = transport self._tracer = get_tracer( __name__, instrumenting_library_version=__version__, tracer_provider=tracer_provider, schema_url=schema_url, ) meter = get_meter( __name__, __version__, meter_provider, schema_url, ) self._duration_histogram_old = None if _report_old(self._sem_conv_opt_in_mode): self._duration_histogram_old = meter.create_histogram( name=MetricInstruments.HTTP_CLIENT_DURATION, unit="ms", description="measures the duration of the outbound HTTP request", explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_OLD, ) self._duration_histogram_new = None if _report_new(self._sem_conv_opt_in_mode): self._duration_histogram_new = meter.create_histogram( name=HTTP_CLIENT_REQUEST_DURATION, unit="s", description="Duration of HTTP client requests.", explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_NEW, ) self._request_hook = request_hook self._response_hook = response_hook self._excluded_urls = get_excluded_urls("HTTPX") self._captured_request_headers = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST ) self._captured_response_headers = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE ) self._sensitive_headers = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS ) async def __aenter__(self) -> "AsyncOpenTelemetryTransport": await self._transport.__aenter__() return self async def __aexit__( self, exc_type: typing.Type[BaseException] | None = None, exc_value: BaseException | None = None, traceback: TracebackType | None = None, ) -> None: await self._transport.__aexit__(exc_type, exc_value, traceback) # pylint: disable=R0914
[docs] async def handle_async_request( self, *args: typing.Any, **kwargs: typing.Any ) -> ( tuple[int, httpx.Headers, httpx.AsyncByteStream, dict[str, typing.Any]] | httpx.Response ): """Add request info to span.""" if not is_http_instrumentation_enabled(): return await self._transport.handle_async_request(*args, **kwargs) method, url, headers, stream, extensions = _extract_parameters( args, kwargs ) if self._excluded_urls and self._excluded_urls.url_disabled( _normalize_url(url) ): return await self._transport.handle_async_request(*args, **kwargs) method_original = method.decode() span_name = _get_default_span_name(method_original) span_attributes = {} metric_attributes = {} # apply http client response attributes according to semconv _apply_request_client_attributes_to_span( span_attributes, metric_attributes, url, method_original, self._sem_conv_opt_in_mode, headers, self._captured_request_headers, self._sensitive_headers, ) request_info = RequestInfo(method, url, headers, stream, extensions) with self._tracer.start_as_current_span( span_name, kind=SpanKind.CLIENT, attributes=span_attributes ) as span: exception = None if callable(self._request_hook): await self._request_hook(span, request_info) _inject_propagation_headers(headers, args, kwargs) start_time = default_timer() try: response = await self._transport.handle_async_request( *args, **kwargs ) except Exception as exc: # pylint: disable=W0703 exception = exc response = getattr(exc, "response", None) finally: elapsed_time = max(default_timer() - start_time, 0) if isinstance(response, (httpx.Response, tuple)): status_code, headers, stream, extensions, http_version = ( _extract_response(response) ) # Always apply response attributes to metrics _apply_response_client_attributes_to_metrics( span, metric_attributes, status_code, http_version, self._sem_conv_opt_in_mode, ) if span.is_recording(): # apply http client response attributes according to semconv _apply_response_client_attributes_to_span( span, status_code, http_version, self._sem_conv_opt_in_mode, headers, self._captured_response_headers, self._sensitive_headers, ) if callable(self._response_hook): await self._response_hook( span, request_info, ResponseInfo(status_code, headers, stream, extensions), ) if exception: if span.is_recording() and _report_new( self._sem_conv_opt_in_mode ): span.set_attribute( ERROR_TYPE, type(exception).__qualname__ ) metric_attributes[ERROR_TYPE] = type( exception ).__qualname__ raise exception.with_traceback(exception.__traceback__) if self._duration_histogram_old is not None: duration_attrs_old = _filter_semconv_duration_attrs( metric_attributes, _client_duration_attrs_old, _client_duration_attrs_new, _StabilityMode.DEFAULT, ) self._duration_histogram_old.record( max(round(elapsed_time * 1000), 0), attributes=duration_attrs_old, ) if self._duration_histogram_new is not None: duration_attrs_new = _filter_semconv_duration_attrs( metric_attributes, _client_duration_attrs_old, _client_duration_attrs_new, _StabilityMode.HTTP, ) self._duration_histogram_new.record( elapsed_time, attributes=duration_attrs_new ) return response
[docs] async def aclose(self) -> None: await self._transport.aclose()
[docs]class HTTPXClientInstrumentor(BaseInstrumentor): # pylint: disable=protected-access """An instrumentor for httpx Client and AsyncClient See `BaseInstrumentor` """
[docs] def instrumentation_dependencies(self) -> typing.Collection[str]: return _instruments
# pylint: disable=too-many-locals def _instrument(self, **kwargs: typing.Any): """Instruments httpx Client and AsyncClient Args: **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global ``meter_provider``: a MeterProvider, defaults to global ``request_hook``: A ``httpx.Client`` hook that receives the span and request that is called right after the span is created ``response_hook``: A ``httpx.Client`` hook that receives the span, request, and response that is called right before the span ends ``async_request_hook``: Async ``request_hook`` for ``httpx.AsyncClient`` ``async_response_hook``: Async``response_hook`` for ``httpx.AsyncClient`` """ tracer_provider = kwargs.get("tracer_provider") meter_provider = kwargs.get("meter_provider") request_hook = kwargs.get("request_hook") response_hook = kwargs.get("response_hook") async_request_hook = kwargs.get("async_request_hook") async_request_hook = ( async_request_hook if iscoroutinefunction(async_request_hook) else None ) async_response_hook = kwargs.get("async_response_hook") async_response_hook = ( async_response_hook if iscoroutinefunction(async_response_hook) else None ) excluded_urls = get_excluded_urls("HTTPX") captured_request_headers: list[str] = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST ) captured_response_headers: list[str] = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE ) sensitive_headers: list[str] = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS ) _OpenTelemetrySemanticConventionStability._initialize() sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( _OpenTelemetryStabilitySignalType.HTTP, ) schema_url = _get_schema_url(sem_conv_opt_in_mode) tracer = get_tracer( __name__, instrumenting_library_version=__version__, tracer_provider=tracer_provider, schema_url=schema_url, ) meter = get_meter( __name__, __version__, meter_provider, schema_url, ) duration_histogram_old = None if _report_old(sem_conv_opt_in_mode): duration_histogram_old = meter.create_histogram( name=MetricInstruments.HTTP_CLIENT_DURATION, unit="ms", description="measures the duration of the outbound HTTP request", explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_OLD, ) duration_histogram_new = None if _report_new(sem_conv_opt_in_mode): duration_histogram_new = meter.create_histogram( name=HTTP_CLIENT_REQUEST_DURATION, unit="s", description="Duration of HTTP client requests.", explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_NEW, ) wrap_function_wrapper( "httpx", "HTTPTransport.handle_request", partial( self._handle_request_wrapper, tracer=tracer, duration_histogram_old=duration_histogram_old, duration_histogram_new=duration_histogram_new, sem_conv_opt_in_mode=sem_conv_opt_in_mode, request_hook=request_hook, response_hook=response_hook, excluded_urls=excluded_urls, captured_request_headers=captured_request_headers, captured_response_headers=captured_response_headers, sensitive_headers=sensitive_headers, ), ) wrap_function_wrapper( "httpx", "AsyncHTTPTransport.handle_async_request", partial( self._handle_async_request_wrapper, tracer=tracer, duration_histogram_old=duration_histogram_old, duration_histogram_new=duration_histogram_new, sem_conv_opt_in_mode=sem_conv_opt_in_mode, async_request_hook=async_request_hook, async_response_hook=async_response_hook, excluded_urls=excluded_urls, captured_request_headers=captured_request_headers, captured_response_headers=captured_response_headers, sensitive_headers=sensitive_headers, ), ) def _uninstrument(self, **kwargs: typing.Any): unwrap(httpx.HTTPTransport, "handle_request") unwrap(httpx.AsyncHTTPTransport, "handle_async_request") @staticmethod def _handle_request_wrapper( # pylint: disable=too-many-locals wrapped: typing.Callable[..., typing.Any], instance: httpx.HTTPTransport, args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any], tracer: Tracer, duration_histogram_old: Histogram, duration_histogram_new: Histogram, sem_conv_opt_in_mode: _StabilityMode, request_hook: RequestHook, response_hook: ResponseHook, excluded_urls: ExcludeList | None, captured_request_headers: list[str] | None = None, captured_response_headers: list[str] | None = None, sensitive_headers: list[str] | None = None, ): if not is_http_instrumentation_enabled(): return wrapped(*args, **kwargs) method, url, headers, stream, extensions = _extract_parameters( args, kwargs ) if excluded_urls and excluded_urls.url_disabled(_normalize_url(url)): return wrapped(*args, **kwargs) method_original = method.decode() span_name = _get_default_span_name(method_original) span_attributes = {} metric_attributes = {} # apply http client response attributes according to semconv _apply_request_client_attributes_to_span( span_attributes, metric_attributes, url, method_original, sem_conv_opt_in_mode, headers, captured_request_headers, sensitive_headers, ) request_info = RequestInfo(method, url, headers, stream, extensions) with tracer.start_as_current_span( span_name, kind=SpanKind.CLIENT, attributes=span_attributes ) as span: exception = None if callable(request_hook): request_hook(span, request_info) _inject_propagation_headers(headers, args, kwargs) start_time = default_timer() try: response = wrapped(*args, **kwargs) except Exception as exc: # pylint: disable=W0703 exception = exc response = getattr(exc, "response", None) finally: elapsed_time = max(default_timer() - start_time, 0) if isinstance(response, (httpx.Response, tuple)): status_code, headers, stream, extensions, http_version = ( _extract_response(response) ) # Always apply response attributes to metrics _apply_response_client_attributes_to_metrics( span, metric_attributes, status_code, http_version, sem_conv_opt_in_mode, ) if span.is_recording(): # apply http client response attributes according to semconv _apply_response_client_attributes_to_span( span, status_code, http_version, sem_conv_opt_in_mode, headers, captured_response_headers, sensitive_headers, ) if callable(response_hook): response_hook( span, request_info, ResponseInfo(status_code, headers, stream, extensions), ) if exception: if span.is_recording() and _report_new(sem_conv_opt_in_mode): span.set_attribute( ERROR_TYPE, type(exception).__qualname__ ) metric_attributes[ERROR_TYPE] = type( exception ).__qualname__ raise exception.with_traceback(exception.__traceback__) if duration_histogram_old is not None: duration_attrs_old = _filter_semconv_duration_attrs( metric_attributes, _client_duration_attrs_old, _client_duration_attrs_new, _StabilityMode.DEFAULT, ) duration_histogram_old.record( max(round(elapsed_time * 1000), 0), attributes=duration_attrs_old, ) if duration_histogram_new is not None: duration_attrs_new = _filter_semconv_duration_attrs( metric_attributes, _client_duration_attrs_old, _client_duration_attrs_new, _StabilityMode.HTTP, ) duration_histogram_new.record( elapsed_time, attributes=duration_attrs_new ) return response @staticmethod async def _handle_async_request_wrapper( # pylint: disable=too-many-locals wrapped: typing.Callable[..., typing.Awaitable[typing.Any]], instance: httpx.AsyncHTTPTransport, args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any], tracer: Tracer, duration_histogram_old: Histogram, duration_histogram_new: Histogram, sem_conv_opt_in_mode: _StabilityMode, async_request_hook: AsyncRequestHook, async_response_hook: AsyncResponseHook, excluded_urls: ExcludeList | None, captured_request_headers: typing.Optional[list[str]] = None, captured_response_headers: typing.Optional[list[str]] = None, sensitive_headers: typing.Optional[list[str]] = None, ): if not is_http_instrumentation_enabled(): return await wrapped(*args, **kwargs) method, url, headers, stream, extensions = _extract_parameters( args, kwargs ) if excluded_urls and excluded_urls.url_disabled(_normalize_url(url)): return await wrapped(*args, **kwargs) method_original = method.decode() span_name = _get_default_span_name(method_original) span_attributes = {} metric_attributes = {} # apply http client response attributes according to semconv _apply_request_client_attributes_to_span( span_attributes, metric_attributes, url, method_original, sem_conv_opt_in_mode, headers, captured_request_headers, sensitive_headers, ) request_info = RequestInfo(method, url, headers, stream, extensions) with tracer.start_as_current_span( span_name, kind=SpanKind.CLIENT, attributes=span_attributes ) as span: exception = None if callable(async_request_hook): await async_request_hook(span, request_info) _inject_propagation_headers(headers, args, kwargs) start_time = default_timer() try: response = await wrapped(*args, **kwargs) except Exception as exc: # pylint: disable=W0703 exception = exc response = getattr(exc, "response", None) finally: elapsed_time = max(default_timer() - start_time, 0) if isinstance(response, (httpx.Response, tuple)): status_code, headers, stream, extensions, http_version = ( _extract_response(response) ) # Always apply response attributes to metrics _apply_response_client_attributes_to_metrics( span, metric_attributes, status_code, http_version, sem_conv_opt_in_mode, ) if span.is_recording(): # apply http client response attributes according to semconv _apply_response_client_attributes_to_span( span, status_code, http_version, sem_conv_opt_in_mode, headers, captured_response_headers, sensitive_headers, ) if callable(async_response_hook): await async_response_hook( span, request_info, ResponseInfo(status_code, headers, stream, extensions), ) if exception: if span.is_recording() and _report_new(sem_conv_opt_in_mode): span.set_attribute( ERROR_TYPE, type(exception).__qualname__ ) raise exception.with_traceback(exception.__traceback__) if duration_histogram_old is not None: duration_attrs_old = _filter_semconv_duration_attrs( metric_attributes, _client_duration_attrs_old, _client_duration_attrs_new, _StabilityMode.DEFAULT, ) duration_histogram_old.record( max(round(elapsed_time * 1000), 0), attributes=duration_attrs_old, ) if duration_histogram_new is not None: duration_attrs_new = _filter_semconv_duration_attrs( metric_attributes, _client_duration_attrs_old, _client_duration_attrs_new, _StabilityMode.HTTP, ) duration_histogram_new.record( elapsed_time, attributes=duration_attrs_new ) return response # pylint: disable=too-many-branches,too-many-locals
[docs] @classmethod def instrument_client( cls, client: httpx.Client | httpx.AsyncClient, tracer_provider: TracerProvider | None = None, meter_provider: MeterProvider | None = None, request_hook: RequestHook | AsyncRequestHook | None = None, response_hook: ResponseHook | AsyncResponseHook | None = None, ) -> None: """Instrument httpx Client or AsyncClient Args: client: The httpx Client or AsyncClient instance tracer_provider: A TracerProvider, defaults to global meter_provider: A MeterProvider, defaults to global request_hook: A hook that receives the span and request that is called right after the span is created response_hook: A hook that receives the span, request, and response that is called right before the span ends """ if getattr(client, "_is_instrumented_by_opentelemetry", False): _logger.warning( "Attempting to instrument Httpx client while already instrumented" ) return _OpenTelemetrySemanticConventionStability._initialize() sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( _OpenTelemetryStabilitySignalType.HTTP, ) schema_url = _get_schema_url(sem_conv_opt_in_mode) tracer = get_tracer( __name__, instrumenting_library_version=__version__, tracer_provider=tracer_provider, schema_url=schema_url, ) meter = get_meter( __name__, __version__, meter_provider, schema_url, ) duration_histogram_old = None if _report_old(sem_conv_opt_in_mode): duration_histogram_old = meter.create_histogram( name=MetricInstruments.HTTP_CLIENT_DURATION, unit="ms", description="measures the duration of the outbound HTTP request", explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_OLD, ) duration_histogram_new = None if _report_new(sem_conv_opt_in_mode): duration_histogram_new = meter.create_histogram( name=HTTP_CLIENT_REQUEST_DURATION, unit="s", description="Duration of HTTP client requests.", explicit_bucket_boundaries_advisory=HTTP_DURATION_HISTOGRAM_BUCKETS_NEW, ) if iscoroutinefunction(request_hook): async_request_hook = request_hook request_hook = None else: # request_hook already set async_request_hook = None if iscoroutinefunction(response_hook): async_response_hook = response_hook response_hook = None else: # response_hook already set async_response_hook = None excluded_urls = get_excluded_urls("HTTPX") captured_request_headers: list[str] = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST ) captured_response_headers: list[str] = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE ) sensitive_headers: list[str] = get_custom_headers( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS ) if hasattr(client._transport, "handle_request"): wrap_function_wrapper( client._transport, "handle_request", partial( cls._handle_request_wrapper, tracer=tracer, duration_histogram_old=duration_histogram_old, duration_histogram_new=duration_histogram_new, sem_conv_opt_in_mode=sem_conv_opt_in_mode, request_hook=request_hook, response_hook=response_hook, excluded_urls=excluded_urls, captured_request_headers=captured_request_headers, captured_response_headers=captured_response_headers, sensitive_headers=sensitive_headers, ), ) for transport in client._mounts.values(): if hasattr(transport, "handle_request"): wrap_function_wrapper( transport, "handle_request", partial( cls._handle_request_wrapper, tracer=tracer, duration_histogram_old=duration_histogram_old, duration_histogram_new=duration_histogram_new, sem_conv_opt_in_mode=sem_conv_opt_in_mode, request_hook=request_hook, response_hook=response_hook, excluded_urls=excluded_urls, captured_request_headers=captured_request_headers, captured_response_headers=captured_response_headers, sensitive_headers=sensitive_headers, ), ) client._is_instrumented_by_opentelemetry = True if hasattr(client._transport, "handle_async_request"): wrap_function_wrapper( client._transport, "handle_async_request", partial( cls._handle_async_request_wrapper, tracer=tracer, duration_histogram_old=duration_histogram_old, duration_histogram_new=duration_histogram_new, sem_conv_opt_in_mode=sem_conv_opt_in_mode, async_request_hook=async_request_hook, async_response_hook=async_response_hook, excluded_urls=excluded_urls, captured_request_headers=captured_request_headers, captured_response_headers=captured_response_headers, sensitive_headers=sensitive_headers, ), ) for transport in client._mounts.values(): if hasattr(transport, "handle_async_request"): wrap_function_wrapper( transport, "handle_async_request", partial( cls._handle_async_request_wrapper, tracer=tracer, duration_histogram_old=duration_histogram_old, duration_histogram_new=duration_histogram_new, sem_conv_opt_in_mode=sem_conv_opt_in_mode, async_request_hook=async_request_hook, async_response_hook=async_response_hook, excluded_urls=excluded_urls, captured_request_headers=captured_request_headers, captured_response_headers=captured_response_headers, sensitive_headers=sensitive_headers, ), ) client._is_instrumented_by_opentelemetry = True
[docs] @staticmethod def uninstrument_client(client: httpx.Client | httpx.AsyncClient) -> None: """Disables instrumentation for the given client instance Args: client: The httpx Client or AsyncClient instance """ if hasattr(client._transport, "handle_request"): unwrap(client._transport, "handle_request") for transport in client._mounts.values(): unwrap(transport, "handle_request") client._is_instrumented_by_opentelemetry = False elif hasattr(client._transport, "handle_async_request"): unwrap(client._transport, "handle_async_request") for transport in client._mounts.values(): unwrap(transport, "handle_async_request") client._is_instrumented_by_opentelemetry = False