# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This library allows tracing HTTP requests made by the
`urllib3 <https://urllib3.readthedocs.io/>`_ library.
Usage
-----
.. code-block:: python
import urllib3
from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor
def strip_query_params(url: str) -> str:
return url.split("?")[0]
URLLib3Instrumentor().instrument(
# Remove all query params from the URL attribute on the span.
url_filter=strip_query_params,
)
http = urllib3.PoolManager()
response = http.request("GET", "https://www.example.org/")
Configuration
-------------
Request/Response hooks
**********************
The urllib3 instrumentation supports extending tracing behavior with the help of
request and response hooks. These are functions that are called back by the instrumentation
right after a Span is created for a request and right before the span is finished processing a response respectively.
The hooks can be configured as follows:
.. code:: python
# `request` is an instance of urllib3.connectionpool.HTTPConnectionPool
def request_hook(span, request):
pass
# `request` is an instance of urllib3.connectionpool.HTTPConnectionPool
# `response` is an instance of urllib3.response.HTTPResponse
def response_hook(span, request, response):
pass
URLLib3Instrumentor().instrument(
request_hook=request_hook, response_hook=response_hook
)
Exclude lists
*************
To exclude certain URLs from being tracked, set the environment variable ``OTEL_PYTHON_URLLIB3_EXCLUDED_URLS``
(or ``OTEL_PYTHON_EXCLUDED_URLS`` as fallback) with comma delimited regexes representing which URLs to exclude.
For example,
::
export OTEL_PYTHON_URLLIB3_EXCLUDED_URLS="client/.*/info,healthcheck"
will exclude requests such as ``https://site/client/123/info`` and ``https://site/xyz/healthcheck``.
API
---
"""
import collections.abc
import io
import typing
from timeit import default_timer
from typing import Collection
import urllib3.connectionpool
import wrapt
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.urllib3.package import _instruments
from opentelemetry.instrumentation.urllib3.version import __version__
from opentelemetry.instrumentation.utils import (
http_status_to_status_code,
is_http_instrumentation_enabled,
suppress_http_instrumentation,
unwrap,
)
from opentelemetry.metrics import Histogram, get_meter
from opentelemetry.propagate import inject
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span, SpanKind, Tracer, get_tracer
from opentelemetry.trace.status import Status
from opentelemetry.util.http import (
ExcludeList,
get_excluded_urls,
parse_excluded_urls,
)
from opentelemetry.util.http.httplib import set_ip_on_next_http_connection
_excluded_urls_from_env = get_excluded_urls("URLLIB3")
_UrlFilterT = typing.Optional[typing.Callable[[str], str]]
_RequestHookT = typing.Optional[
typing.Callable[
[
Span,
urllib3.connectionpool.HTTPConnectionPool,
typing.Dict,
typing.Optional[str],
],
None,
]
]
_ResponseHookT = typing.Optional[
typing.Callable[
[
Span,
urllib3.connectionpool.HTTPConnectionPool,
urllib3.response.HTTPResponse,
],
None,
]
]
_URL_OPEN_ARG_TO_INDEX_MAPPING = {
"method": 0,
"url": 1,
"body": 2,
}
[docs]class URLLib3Instrumentor(BaseInstrumentor):
[docs] def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
def _instrument(self, **kwargs):
"""Instruments the urllib3 module
Args:
**kwargs: Optional arguments
``tracer_provider``: a TracerProvider, defaults to global.
``request_hook``: An optional callback that is invoked right after a span is created.
``response_hook``: An optional callback which is invoked right before the span is finished processing a response.
``url_filter``: A callback to process the requested URL prior
to adding it as a span attribute.
``excluded_urls``: A string containing a comma-delimited
list of regexes used to exclude URLs from tracking
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(
__name__,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
excluded_urls = kwargs.get("excluded_urls")
meter_provider = kwargs.get("meter_provider")
meter = get_meter(
__name__,
__version__,
meter_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
duration_histogram = meter.create_histogram(
name=MetricInstruments.HTTP_CLIENT_DURATION,
unit="ms",
description="Measures the duration of outbound HTTP requests.",
)
request_size_histogram = meter.create_histogram(
name=MetricInstruments.HTTP_CLIENT_REQUEST_SIZE,
unit="By",
description="Measures the size of HTTP request messages.",
)
response_size_histogram = meter.create_histogram(
name=MetricInstruments.HTTP_CLIENT_RESPONSE_SIZE,
unit="By",
description="Measures the size of HTTP response messages.",
)
_instrument(
tracer,
duration_histogram,
request_size_histogram,
response_size_histogram,
request_hook=kwargs.get("request_hook"),
response_hook=kwargs.get("response_hook"),
url_filter=kwargs.get("url_filter"),
excluded_urls=_excluded_urls_from_env
if excluded_urls is None
else parse_excluded_urls(excluded_urls),
)
def _uninstrument(self, **kwargs):
_uninstrument()
def _instrument(
tracer: Tracer,
duration_histogram: Histogram,
request_size_histogram: Histogram,
response_size_histogram: Histogram,
request_hook: _RequestHookT = None,
response_hook: _ResponseHookT = None,
url_filter: _UrlFilterT = None,
excluded_urls: ExcludeList = None,
):
def instrumented_urlopen(wrapped, instance, args, kwargs):
if not is_http_instrumentation_enabled():
return wrapped(*args, **kwargs)
url = _get_url(instance, args, kwargs, url_filter)
if excluded_urls and excluded_urls.url_disabled(url):
return wrapped(*args, **kwargs)
method = _get_url_open_arg("method", args, kwargs).upper()
headers = _prepare_headers(kwargs)
body = _get_url_open_arg("body", args, kwargs)
span_name = method.strip()
span_attributes = {
SpanAttributes.HTTP_METHOD: method,
SpanAttributes.HTTP_URL: url,
}
with tracer.start_as_current_span(
span_name, kind=SpanKind.CLIENT, attributes=span_attributes
) as span, set_ip_on_next_http_connection(span):
if callable(request_hook):
request_hook(span, instance, headers, body)
inject(headers)
with suppress_http_instrumentation():
start_time = default_timer()
response = wrapped(*args, **kwargs)
elapsed_time = round((default_timer() - start_time) * 1000)
_apply_response(span, response)
if callable(response_hook):
response_hook(span, instance, response)
request_size = _get_body_size(body)
response_size = int(response.headers.get("Content-Length", 0))
metric_attributes = _create_metric_attributes(
instance, response, method
)
duration_histogram.record(
elapsed_time, attributes=metric_attributes
)
if request_size is not None:
request_size_histogram.record(
request_size, attributes=metric_attributes
)
response_size_histogram.record(
response_size, attributes=metric_attributes
)
return response
wrapt.wrap_function_wrapper(
urllib3.connectionpool.HTTPConnectionPool,
"urlopen",
instrumented_urlopen,
)
def _get_url_open_arg(name: str, args: typing.List, kwargs: typing.Mapping):
arg_idx = _URL_OPEN_ARG_TO_INDEX_MAPPING.get(name)
if arg_idx is not None:
try:
return args[arg_idx]
except IndexError:
pass
return kwargs.get(name)
def _get_url(
instance: urllib3.connectionpool.HTTPConnectionPool,
args: typing.List,
kwargs: typing.Mapping,
url_filter: _UrlFilterT,
) -> str:
url_or_path = _get_url_open_arg("url", args, kwargs)
if not url_or_path.startswith("/"):
url = url_or_path
else:
url = instance.scheme + "://" + instance.host
if _should_append_port(instance.scheme, instance.port):
url += ":" + str(instance.port)
url += url_or_path
if url_filter:
return url_filter(url)
return url
def _get_body_size(body: object) -> typing.Optional[int]:
if body is None:
return 0
if isinstance(body, collections.abc.Sized):
return len(body)
if isinstance(body, io.BytesIO):
return body.getbuffer().nbytes
return None
def _should_append_port(scheme: str, port: typing.Optional[int]) -> bool:
if not port:
return False
if scheme == "http" and port == 80:
return False
if scheme == "https" and port == 443:
return False
return True
def _prepare_headers(urlopen_kwargs: typing.Dict) -> typing.Dict:
headers = urlopen_kwargs.get("headers")
# avoid modifying original headers on inject
headers = headers.copy() if headers is not None else {}
urlopen_kwargs["headers"] = headers
return headers
def _apply_response(span: Span, response: urllib3.response.HTTPResponse):
if not span.is_recording():
return
span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.status)
span.set_status(Status(http_status_to_status_code(response.status)))
def _create_metric_attributes(
instance: urllib3.connectionpool.HTTPConnectionPool,
response: urllib3.response.HTTPResponse,
method: str,
) -> dict:
metric_attributes = {
SpanAttributes.HTTP_METHOD: method,
SpanAttributes.HTTP_HOST: instance.host,
SpanAttributes.HTTP_SCHEME: instance.scheme,
SpanAttributes.HTTP_STATUS_CODE: response.status,
SpanAttributes.NET_PEER_NAME: instance.host,
SpanAttributes.NET_PEER_PORT: instance.port,
}
version = getattr(response, "version")
if version:
metric_attributes[SpanAttributes.HTTP_FLAVOR] = (
"1.1" if version == 11 else "1.0"
)
return metric_attributes
def _uninstrument():
unwrap(urllib3.connectionpool.HTTPConnectionPool, "urlopen")