Source code for opentelemetry.instrumentation.urllib3

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This library allows tracing HTTP requests made by the
`urllib3 <https://urllib3.readthedocs.io/>`_ library.

Usage
-----
.. code-block:: python

    import urllib3
    from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor

    def strip_query_params(url: str) -> str:
        return url.split("?")[0]

    URLLib3Instrumentor().instrument(
        # Remove all query params from the URL attribute on the span.
        url_filter=strip_query_params,
    )

    http = urllib3.PoolManager()
    response = http.request("GET", "https://www.example.org/")

Configuration
-------------

Request/Response hooks
**********************

The urllib3 instrumentation supports extending tracing behavior with the help of
request and response hooks. These are functions that are called back by the instrumentation
right after a Span is created for a request and right before the span is finished processing a response respectively.
The hooks can be configured as follows:

.. code:: python

    # `request` is an instance of urllib3.connectionpool.HTTPConnectionPool
    def request_hook(span, request):
        pass

    # `request` is an instance of urllib3.connectionpool.HTTPConnectionPool
    # `response` is an instance of urllib3.response.HTTPResponse
    def response_hook(span, request, response):
        pass

    URLLib3Instrumentor().instrument(
        request_hook=request_hook, response_hook=response_hook
    )

Exclude lists
*************

To exclude certain URLs from being tracked, set the environment variable ``OTEL_PYTHON_URLLIB3_EXCLUDED_URLS``
(or ``OTEL_PYTHON_EXCLUDED_URLS`` as fallback) with comma delimited regexes representing which URLs to exclude.

For example,

::

    export OTEL_PYTHON_URLLIB3_EXCLUDED_URLS="client/.*/info,healthcheck"

will exclude requests such as ``https://site/client/123/info`` and ``https://site/xyz/healthcheck``.

API
---
"""

import collections.abc
import io
import typing
from timeit import default_timer
from typing import Collection

import urllib3.connectionpool
import wrapt

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.urllib3.package import _instruments
from opentelemetry.instrumentation.urllib3.version import __version__
from opentelemetry.instrumentation.utils import (
    http_status_to_status_code,
    is_http_instrumentation_enabled,
    suppress_http_instrumentation,
    unwrap,
)
from opentelemetry.metrics import Histogram, get_meter
from opentelemetry.propagate import inject
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span, SpanKind, Tracer, get_tracer
from opentelemetry.trace.status import Status
from opentelemetry.util.http import (
    ExcludeList,
    get_excluded_urls,
    parse_excluded_urls,
)
from opentelemetry.util.http.httplib import set_ip_on_next_http_connection

_excluded_urls_from_env = get_excluded_urls("URLLIB3")

_UrlFilterT = typing.Optional[typing.Callable[[str], str]]
_RequestHookT = typing.Optional[
    typing.Callable[
        [
            Span,
            urllib3.connectionpool.HTTPConnectionPool,
            typing.Dict,
            typing.Optional[str],
        ],
        None,
    ]
]
_ResponseHookT = typing.Optional[
    typing.Callable[
        [
            Span,
            urllib3.connectionpool.HTTPConnectionPool,
            urllib3.response.HTTPResponse,
        ],
        None,
    ]
]

_URL_OPEN_ARG_TO_INDEX_MAPPING = {
    "method": 0,
    "url": 1,
    "body": 2,
}


[docs]class URLLib3Instrumentor(BaseInstrumentor):
[docs] def instrumentation_dependencies(self) -> Collection[str]: return _instruments
def _instrument(self, **kwargs): """Instruments the urllib3 module Args: **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global. ``request_hook``: An optional callback that is invoked right after a span is created. ``response_hook``: An optional callback which is invoked right before the span is finished processing a response. ``url_filter``: A callback to process the requested URL prior to adding it as a span attribute. ``excluded_urls``: A string containing a comma-delimited list of regexes used to exclude URLs from tracking """ tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer( __name__, __version__, tracer_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) excluded_urls = kwargs.get("excluded_urls") meter_provider = kwargs.get("meter_provider") meter = get_meter( __name__, __version__, meter_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) duration_histogram = meter.create_histogram( name=MetricInstruments.HTTP_CLIENT_DURATION, unit="ms", description="Measures the duration of outbound HTTP requests.", ) request_size_histogram = meter.create_histogram( name=MetricInstruments.HTTP_CLIENT_REQUEST_SIZE, unit="By", description="Measures the size of HTTP request messages.", ) response_size_histogram = meter.create_histogram( name=MetricInstruments.HTTP_CLIENT_RESPONSE_SIZE, unit="By", description="Measures the size of HTTP response messages.", ) _instrument( tracer, duration_histogram, request_size_histogram, response_size_histogram, request_hook=kwargs.get("request_hook"), response_hook=kwargs.get("response_hook"), url_filter=kwargs.get("url_filter"), excluded_urls=_excluded_urls_from_env if excluded_urls is None else parse_excluded_urls(excluded_urls), ) def _uninstrument(self, **kwargs): _uninstrument()
def _instrument( tracer: Tracer, duration_histogram: Histogram, request_size_histogram: Histogram, response_size_histogram: Histogram, request_hook: _RequestHookT = None, response_hook: _ResponseHookT = None, url_filter: _UrlFilterT = None, excluded_urls: ExcludeList = None, ): def instrumented_urlopen(wrapped, instance, args, kwargs): if not is_http_instrumentation_enabled(): return wrapped(*args, **kwargs) url = _get_url(instance, args, kwargs, url_filter) if excluded_urls and excluded_urls.url_disabled(url): return wrapped(*args, **kwargs) method = _get_url_open_arg("method", args, kwargs).upper() headers = _prepare_headers(kwargs) body = _get_url_open_arg("body", args, kwargs) span_name = method.strip() span_attributes = { SpanAttributes.HTTP_METHOD: method, SpanAttributes.HTTP_URL: url, } with tracer.start_as_current_span( span_name, kind=SpanKind.CLIENT, attributes=span_attributes ) as span, set_ip_on_next_http_connection(span): if callable(request_hook): request_hook(span, instance, headers, body) inject(headers) with suppress_http_instrumentation(): start_time = default_timer() response = wrapped(*args, **kwargs) elapsed_time = round((default_timer() - start_time) * 1000) _apply_response(span, response) if callable(response_hook): response_hook(span, instance, response) request_size = _get_body_size(body) response_size = int(response.headers.get("Content-Length", 0)) metric_attributes = _create_metric_attributes( instance, response, method ) duration_histogram.record( elapsed_time, attributes=metric_attributes ) if request_size is not None: request_size_histogram.record( request_size, attributes=metric_attributes ) response_size_histogram.record( response_size, attributes=metric_attributes ) return response wrapt.wrap_function_wrapper( urllib3.connectionpool.HTTPConnectionPool, "urlopen", instrumented_urlopen, ) def _get_url_open_arg(name: str, args: typing.List, kwargs: typing.Mapping): arg_idx = _URL_OPEN_ARG_TO_INDEX_MAPPING.get(name) if arg_idx is not None: try: return args[arg_idx] except IndexError: pass return kwargs.get(name) def _get_url( instance: urllib3.connectionpool.HTTPConnectionPool, args: typing.List, kwargs: typing.Mapping, url_filter: _UrlFilterT, ) -> str: url_or_path = _get_url_open_arg("url", args, kwargs) if not url_or_path.startswith("/"): url = url_or_path else: url = instance.scheme + "://" + instance.host if _should_append_port(instance.scheme, instance.port): url += ":" + str(instance.port) url += url_or_path if url_filter: return url_filter(url) return url def _get_body_size(body: object) -> typing.Optional[int]: if body is None: return 0 if isinstance(body, collections.abc.Sized): return len(body) if isinstance(body, io.BytesIO): return body.getbuffer().nbytes return None def _should_append_port(scheme: str, port: typing.Optional[int]) -> bool: if not port: return False if scheme == "http" and port == 80: return False if scheme == "https" and port == 443: return False return True def _prepare_headers(urlopen_kwargs: typing.Dict) -> typing.Dict: headers = urlopen_kwargs.get("headers") # avoid modifying original headers on inject headers = headers.copy() if headers is not None else {} urlopen_kwargs["headers"] = headers return headers def _apply_response(span: Span, response: urllib3.response.HTTPResponse): if not span.is_recording(): return span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.status) span.set_status(Status(http_status_to_status_code(response.status))) def _create_metric_attributes( instance: urllib3.connectionpool.HTTPConnectionPool, response: urllib3.response.HTTPResponse, method: str, ) -> dict: metric_attributes = { SpanAttributes.HTTP_METHOD: method, SpanAttributes.HTTP_HOST: instance.host, SpanAttributes.HTTP_SCHEME: instance.scheme, SpanAttributes.HTTP_STATUS_CODE: response.status, SpanAttributes.NET_PEER_NAME: instance.host, SpanAttributes.NET_PEER_PORT: instance.port, } version = getattr(response, "version") if version: metric_attributes[SpanAttributes.HTTP_FLAVOR] = ( "1.1" if version == 11 else "1.0" ) return metric_attributes def _uninstrument(): unwrap(urllib3.connectionpool.HTTPConnectionPool, "urlopen")