Source code for opentelemetry.instrumentation.httpx

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Usage
-----

Instrumenting all clients
*************************

When using the instrumentor, all clients will automatically trace requests.

.. code-block:: python

     import httpx
     from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

     url = "https://some.url/get"
     HTTPXClientInstrumentor().instrument()

     with httpx.Client() as client:
          response = client.get(url)

     async with httpx.AsyncClient() as client:
          response = await client.get(url)

Instrumenting single clients
****************************

If you only want to instrument requests for specific client instances, you can
use the `instrument_client` method.


.. code-block:: python

    import httpx
    from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

    url = "https://some.url/get"

    with httpx.Client(transport=telemetry_transport) as client:
        HTTPXClientInstrumentor.instrument_client(client)
        response = client.get(url)

    async with httpx.AsyncClient(transport=telemetry_transport) as client:
        HTTPXClientInstrumentor.instrument_client(client)
        response = await client.get(url)


Uninstrument
************

If you need to uninstrument clients, there are two options available.

.. code-block:: python

     import httpx
     from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

     HTTPXClientInstrumentor().instrument()
     client = httpx.Client()

     # Uninstrument a specific client
     HTTPXClientInstrumentor.uninstrument_client(client)

     # Uninstrument all clients
     HTTPXClientInstrumentor().uninstrument()


Using transports directly
*************************

If you don't want to use the instrumentor class, you can use the transport classes directly.


.. code-block:: python

    import httpx
    from opentelemetry.instrumentation.httpx import (
        AsyncOpenTelemetryTransport,
        SyncOpenTelemetryTransport,
    )

    url = "https://some.url/get"
    transport = httpx.HTTPTransport()
    telemetry_transport = SyncOpenTelemetryTransport(transport)

    with httpx.Client(transport=telemetry_transport) as client:
        response = client.get(url)

    transport = httpx.AsyncHTTPTransport()
    telemetry_transport = AsyncOpenTelemetryTransport(transport)

    async with httpx.AsyncClient(transport=telemetry_transport) as client:
        response = await client.get(url)


Request and response hooks
***************************

The instrumentation supports specifying request and response hooks. These are functions that get called back by the instrumentation right after a span is created for a request
and right before the span is finished while processing a response.

.. note::

    The request hook receives the raw arguments provided to the transport layer. The response hook receives the raw return values from the transport layer.

The hooks can be configured as follows:


.. code-block:: python

    from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

    def request_hook(span, request):
        # method, url, headers, stream, extensions = request
        pass

    def response_hook(span, request, response):
        # method, url, headers, stream, extensions = request
        # status_code, headers, stream, extensions = response
        pass

    async def async_request_hook(span, request):
        # method, url, headers, stream, extensions = request
        pass

    async def async_response_hook(span, request, response):
        # method, url, headers, stream, extensions = request
        # status_code, headers, stream, extensions = response
        pass

    HTTPXClientInstrumentor().instrument(
        request_hook=request_hook,
        response_hook=response_hook,
        async_request_hook=async_request_hook,
        async_response_hook=async_response_hook
    )


Or if you are using the transport classes directly:


.. code-block:: python

    from opentelemetry.instrumentation.httpx import SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport

    def request_hook(span, request):
        # method, url, headers, stream, extensions = request
        pass

    def response_hook(span, request, response):
        # method, url, headers, stream, extensions = request
        # status_code, headers, stream, extensions = response
        pass

    async def async_request_hook(span, request):
        # method, url, headers, stream, extensions = request
        pass

    async def async_response_hook(span, request, response):
        # method, url, headers, stream, extensions = request
        # status_code, headers, stream, extensions = response
        pass

    transport = httpx.HTTPTransport()
    telemetry_transport = SyncOpenTelemetryTransport(
        transport,
        request_hook=request_hook,
        response_hook=response_hook
    )

    async_transport = httpx.AsyncHTTPTransport()
    async_telemetry_transport = AsyncOpenTelemetryTransport(
        async_transport,
        request_hook=async_request_hook,
        response_hook=async_response_hook
    )

API
---
"""
import logging
import typing
from types import TracebackType

import httpx

from opentelemetry import context
from opentelemetry.instrumentation.httpx.package import _instruments
from opentelemetry.instrumentation.httpx.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import http_status_to_status_code
from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
from opentelemetry.trace.span import Span
from opentelemetry.trace.status import Status
from opentelemetry.util.http import remove_url_credentials

_logger = logging.getLogger(__name__)

URL = typing.Tuple[bytes, bytes, typing.Optional[int], bytes]
Headers = typing.List[typing.Tuple[bytes, bytes]]
RequestHook = typing.Callable[[Span, "RequestInfo"], None]
ResponseHook = typing.Callable[[Span, "RequestInfo", "ResponseInfo"], None]
AsyncRequestHook = typing.Callable[
    [Span, "RequestInfo"], typing.Awaitable[typing.Any]
]
AsyncResponseHook = typing.Callable[
    [Span, "RequestInfo", "ResponseInfo"], typing.Awaitable[typing.Any]
]


[docs]class RequestInfo(typing.NamedTuple): method: bytes url: URL headers: typing.Optional[Headers] stream: typing.Optional[ typing.Union[httpx.SyncByteStream, httpx.AsyncByteStream] ] extensions: typing.Optional[dict]
[docs]class ResponseInfo(typing.NamedTuple): status_code: int headers: typing.Optional[Headers] stream: typing.Iterable[bytes] extensions: typing.Optional[dict]
def _get_default_span_name(method: str) -> str: return method.strip() def _apply_status_code(span: Span, status_code: int) -> None: if not span.is_recording(): return span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code) span.set_status(Status(http_status_to_status_code(status_code))) def _prepare_attributes(method: bytes, url: URL) -> typing.Dict[str, str]: _method = method.decode().upper() _url = str(httpx.URL(url)) span_attributes = { SpanAttributes.HTTP_METHOD: _method, SpanAttributes.HTTP_URL: _url, } return span_attributes def _prepare_headers(headers: typing.Optional[Headers]) -> httpx.Headers: return httpx.Headers(headers) def _extract_parameters(args, kwargs): if isinstance(args[0], httpx.Request): # In httpx >= 0.20.0, handle_request receives a Request object request: httpx.Request = args[0] method = request.method.encode() url = httpx.URL(remove_url_credentials(str(request.url))) headers = request.headers stream = request.stream extensions = request.extensions else: # In httpx < 0.20.0, handle_request receives the parameters separately method = args[0] url = args[1] headers = kwargs.get("headers", args[2] if len(args) > 2 else None) stream = kwargs.get("stream", args[3] if len(args) > 3 else None) extensions = kwargs.get( "extensions", args[4] if len(args) > 4 else None ) return method, url, headers, stream, extensions def _inject_propagation_headers(headers, args, kwargs): _headers = _prepare_headers(headers) inject(_headers) if isinstance(args[0], httpx.Request): request: httpx.Request = args[0] request.headers = _headers else: kwargs["headers"] = _headers.raw
[docs]class SyncOpenTelemetryTransport(httpx.BaseTransport): """Sync transport class that will trace all requests made with a client. Args: transport: SyncHTTPTransport instance to wrap tracer_provider: Tracer provider to use request_hook: A hook that receives the span and request that is called right after the span is created response_hook: A hook that receives the span, request, and response that is called right before the span ends """ def __init__( self, transport: httpx.BaseTransport, tracer_provider: typing.Optional[TracerProvider] = None, request_hook: typing.Optional[RequestHook] = None, response_hook: typing.Optional[ResponseHook] = None, ): self._transport = transport self._tracer = get_tracer( __name__, instrumenting_library_version=__version__, tracer_provider=tracer_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) self._request_hook = request_hook self._response_hook = response_hook def __enter__(self) -> "SyncOpenTelemetryTransport": self._transport.__enter__() return self def __exit__( self, exc_type: typing.Optional[typing.Type[BaseException]] = None, exc_value: typing.Optional[BaseException] = None, traceback: typing.Optional[TracebackType] = None, ) -> None: self._transport.__exit__(exc_type, exc_value, traceback)
[docs] def handle_request( self, *args, **kwargs, ) -> typing.Union[ typing.Tuple[int, "Headers", httpx.SyncByteStream, dict], httpx.Response, ]: """Add request info to span.""" if context.get_value("suppress_instrumentation"): return self._transport.handle_request(*args, **kwargs) method, url, headers, stream, extensions = _extract_parameters( args, kwargs ) span_attributes = _prepare_attributes(method, url) request_info = RequestInfo(method, url, headers, stream, extensions) span_name = _get_default_span_name( span_attributes[SpanAttributes.HTTP_METHOD] ) with self._tracer.start_as_current_span( span_name, kind=SpanKind.CLIENT, attributes=span_attributes ) as span: if self._request_hook is not None: self._request_hook(span, request_info) _inject_propagation_headers(headers, args, kwargs) response = self._transport.handle_request(*args, **kwargs) if isinstance(response, httpx.Response): response: httpx.Response = response status_code = response.status_code headers = response.headers stream = response.stream extensions = response.extensions else: status_code, headers, stream, extensions = response _apply_status_code(span, status_code) if self._response_hook is not None: self._response_hook( span, request_info, ResponseInfo(status_code, headers, stream, extensions), ) return response
[docs] def close(self) -> None: self._transport.close()
[docs]class AsyncOpenTelemetryTransport(httpx.AsyncBaseTransport): """Async transport class that will trace all requests made with a client. Args: transport: AsyncHTTPTransport instance to wrap tracer_provider: Tracer provider to use request_hook: A hook that receives the span and request that is called right after the span is created response_hook: A hook that receives the span, request, and response that is called right before the span ends """ def __init__( self, transport: httpx.AsyncBaseTransport, tracer_provider: typing.Optional[TracerProvider] = None, request_hook: typing.Optional[AsyncRequestHook] = None, response_hook: typing.Optional[AsyncResponseHook] = None, ): self._transport = transport self._tracer = get_tracer( __name__, instrumenting_library_version=__version__, tracer_provider=tracer_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) self._request_hook = request_hook self._response_hook = response_hook async def __aenter__(self) -> "AsyncOpenTelemetryTransport": await self._transport.__aenter__() return self async def __aexit__( self, exc_type: typing.Optional[typing.Type[BaseException]] = None, exc_value: typing.Optional[BaseException] = None, traceback: typing.Optional[TracebackType] = None, ) -> None: await self._transport.__aexit__(exc_type, exc_value, traceback)
[docs] async def handle_async_request( self, *args, **kwargs ) -> typing.Union[ typing.Tuple[int, "Headers", httpx.AsyncByteStream, dict], httpx.Response, ]: """Add request info to span.""" if context.get_value("suppress_instrumentation"): return await self._transport.handle_async_request(*args, **kwargs) method, url, headers, stream, extensions = _extract_parameters( args, kwargs ) span_attributes = _prepare_attributes(method, url) span_name = _get_default_span_name( span_attributes[SpanAttributes.HTTP_METHOD] ) request_info = RequestInfo(method, url, headers, stream, extensions) with self._tracer.start_as_current_span( span_name, kind=SpanKind.CLIENT, attributes=span_attributes ) as span: if self._request_hook is not None: await self._request_hook(span, request_info) _inject_propagation_headers(headers, args, kwargs) response = await self._transport.handle_async_request( *args, **kwargs ) if isinstance(response, httpx.Response): response: httpx.Response = response status_code = response.status_code headers = response.headers stream = response.stream extensions = response.extensions else: status_code, headers, stream, extensions = response _apply_status_code(span, status_code) if self._response_hook is not None: await self._response_hook( span, request_info, ResponseInfo(status_code, headers, stream, extensions), ) return response
[docs] async def aclose(self) -> None: await self._transport.aclose()
class _InstrumentedClient(httpx.Client): _tracer_provider = None _request_hook = None _response_hook = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._original_transport = self._transport self._is_instrumented_by_opentelemetry = True self._transport = SyncOpenTelemetryTransport( self._transport, tracer_provider=_InstrumentedClient._tracer_provider, request_hook=_InstrumentedClient._request_hook, response_hook=_InstrumentedClient._response_hook, ) class _InstrumentedAsyncClient(httpx.AsyncClient): _tracer_provider = None _request_hook = None _response_hook = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._original_transport = self._transport self._is_instrumented_by_opentelemetry = True self._transport = AsyncOpenTelemetryTransport( self._transport, tracer_provider=_InstrumentedAsyncClient._tracer_provider, request_hook=_InstrumentedAsyncClient._request_hook, response_hook=_InstrumentedAsyncClient._response_hook, )
[docs]class HTTPXClientInstrumentor(BaseInstrumentor): # pylint: disable=protected-access,attribute-defined-outside-init """An instrumentor for httpx Client and AsyncClient See `BaseInstrumentor` """
[docs] def instrumentation_dependencies(self) -> typing.Collection[str]: return _instruments
def _instrument(self, **kwargs): """Instruments httpx Client and AsyncClient Args: **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global ``request_hook``: A ``httpx.Client`` hook that receives the span and request that is called right after the span is created ``response_hook``: A ``httpx.Client`` hook that receives the span, request, and response that is called right before the span ends ``async_request_hook``: Async ``request_hook`` for ``httpx.AsyncClient`` ``async_response_hook``: Async``response_hook`` for ``httpx.AsyncClient`` """ self._original_client = httpx.Client self._original_async_client = httpx.AsyncClient request_hook = kwargs.get("request_hook") response_hook = kwargs.get("response_hook") async_request_hook = kwargs.get("async_request_hook", request_hook) async_response_hook = kwargs.get("async_response_hook", response_hook) if callable(request_hook): _InstrumentedClient._request_hook = request_hook if callable(async_request_hook): _InstrumentedAsyncClient._request_hook = async_request_hook if callable(response_hook): _InstrumentedClient._response_hook = response_hook if callable(async_response_hook): _InstrumentedAsyncClient._response_hook = async_response_hook tracer_provider = kwargs.get("tracer_provider") _InstrumentedClient._tracer_provider = tracer_provider _InstrumentedAsyncClient._tracer_provider = tracer_provider httpx.Client = _InstrumentedClient httpx.AsyncClient = _InstrumentedAsyncClient def _uninstrument(self, **kwargs): httpx.Client = self._original_client httpx.AsyncClient = self._original_async_client _InstrumentedClient._tracer_provider = None _InstrumentedClient._request_hook = None _InstrumentedClient._response_hook = None _InstrumentedAsyncClient._tracer_provider = None _InstrumentedAsyncClient._request_hook = None _InstrumentedAsyncClient._response_hook = None
[docs] @staticmethod def instrument_client( client: typing.Union[httpx.Client, httpx.AsyncClient], tracer_provider: TracerProvider = None, request_hook: typing.Union[ typing.Optional[RequestHook], typing.Optional[AsyncRequestHook] ] = None, response_hook: typing.Union[ typing.Optional[ResponseHook], typing.Optional[AsyncResponseHook] ] = None, ) -> None: """Instrument httpx Client or AsyncClient Args: client: The httpx Client or AsyncClient instance tracer_provider: A TracerProvider, defaults to global request_hook: A hook that receives the span and request that is called right after the span is created response_hook: A hook that receives the span, request, and response that is called right before the span ends """ # pylint: disable=protected-access if not hasattr(client, "_is_instrumented_by_opentelemetry"): client._is_instrumented_by_opentelemetry = False if not client._is_instrumented_by_opentelemetry: if isinstance(client, httpx.Client): client._original_transport = client._transport transport = client._transport or httpx.HTTPTransport() client._transport = SyncOpenTelemetryTransport( transport, tracer_provider=tracer_provider, request_hook=request_hook, response_hook=response_hook, ) client._is_instrumented_by_opentelemetry = True if isinstance(client, httpx.AsyncClient): transport = client._transport or httpx.AsyncHTTPTransport() client._transport = AsyncOpenTelemetryTransport( transport, tracer_provider=tracer_provider, request_hook=request_hook, response_hook=response_hook, ) client._is_instrumented_by_opentelemetry = True else: _logger.warning( "Attempting to instrument Httpx client while already instrumented" )
[docs] @staticmethod def uninstrument_client( client: typing.Union[httpx.Client, httpx.AsyncClient] ): """Disables instrumentation for the given client instance Args: client: The httpx Client or AsyncClient instance """ if hasattr(client, "_original_transport"): client._transport = client._original_transport del client._original_transport client._is_instrumented_by_opentelemetry = False else: _logger.warning( "Attempting to uninstrument Httpx " "client while already uninstrumented" )