Source code for opentelemetry.instrumentation.threading

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
"""
Instrument threading to propagate OpenTelemetry context.

Usage
-----

.. code-block:: python

    from opentelemetry.instrumentation.threading import ThreadingInstrumentor

    ThreadingInstrumentor().instrument()

This library provides instrumentation for the `threading` module to ensure that
the OpenTelemetry context is propagated across threads. It is important to note
that this instrumentation does not produce any telemetry data on its own. It
merely ensures that the context is correctly propagated when threads are used.


When instrumented, new threads created using threading.Thread, threading.Timer,
or within futures.ThreadPoolExecutor will have the current OpenTelemetry
context attached, and this context will be re-activated in the thread's
run method or the executor's worker thread."
"""

from __future__ import annotations

import threading
from concurrent import futures
from typing import TYPE_CHECKING, Any, Callable, Collection

from wrapt import (
    wrap_function_wrapper,  # type: ignore[reportUnknownVariableType]
)

from opentelemetry import context
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.threading.package import _instruments
from opentelemetry.instrumentation.utils import unwrap

if TYPE_CHECKING:
    from typing import Protocol, TypeVar

    R = TypeVar("R")

    class HasOtelContext(Protocol):
        _otel_context: context.Context


[docs]class ThreadingInstrumentor(BaseInstrumentor): __WRAPPER_START_METHOD = "start" __WRAPPER_RUN_METHOD = "run" __WRAPPER_SUBMIT_METHOD = "submit"
[docs] def instrumentation_dependencies(self) -> Collection[str]: return _instruments
def _instrument(self, **kwargs: Any): self._instrument_thread() self._instrument_timer() self._instrument_thread_pool() def _uninstrument(self, **kwargs: Any): self._uninstrument_thread() self._uninstrument_timer() self._uninstrument_thread_pool() @staticmethod def _instrument_thread(): wrap_function_wrapper( threading.Thread, ThreadingInstrumentor.__WRAPPER_START_METHOD, ThreadingInstrumentor.__wrap_threading_start, ) wrap_function_wrapper( threading.Thread, ThreadingInstrumentor.__WRAPPER_RUN_METHOD, ThreadingInstrumentor.__wrap_threading_run, ) @staticmethod def _instrument_timer(): wrap_function_wrapper( threading.Timer, ThreadingInstrumentor.__WRAPPER_START_METHOD, ThreadingInstrumentor.__wrap_threading_start, ) wrap_function_wrapper( threading.Timer, ThreadingInstrumentor.__WRAPPER_RUN_METHOD, ThreadingInstrumentor.__wrap_threading_run, ) @staticmethod def _instrument_thread_pool(): wrap_function_wrapper( futures.ThreadPoolExecutor, ThreadingInstrumentor.__WRAPPER_SUBMIT_METHOD, ThreadingInstrumentor.__wrap_thread_pool_submit, ) @staticmethod def _uninstrument_thread(): unwrap(threading.Thread, ThreadingInstrumentor.__WRAPPER_START_METHOD) unwrap(threading.Thread, ThreadingInstrumentor.__WRAPPER_RUN_METHOD) @staticmethod def _uninstrument_timer(): unwrap(threading.Timer, ThreadingInstrumentor.__WRAPPER_START_METHOD) unwrap(threading.Timer, ThreadingInstrumentor.__WRAPPER_RUN_METHOD) @staticmethod def _uninstrument_thread_pool(): unwrap( futures.ThreadPoolExecutor, ThreadingInstrumentor.__WRAPPER_SUBMIT_METHOD, ) @staticmethod def __wrap_threading_start( call_wrapped: Callable[[], None], instance: HasOtelContext, args: tuple[()], kwargs: dict[str, Any], ) -> None: instance._otel_context = context.get_current() return call_wrapped(*args, **kwargs) @staticmethod def __wrap_threading_run( call_wrapped: Callable[..., R], instance: HasOtelContext, args: tuple[Any, ...], kwargs: dict[str, Any], ) -> R: token = None try: if hasattr(instance, "_otel_context"): token = context.attach(instance._otel_context) return call_wrapped(*args, **kwargs) finally: if token is not None: context.detach(token) @staticmethod def __wrap_thread_pool_submit( call_wrapped: Callable[..., R], instance: futures.ThreadPoolExecutor, args: tuple[Callable[..., Any], ...], kwargs: dict[str, Any], ) -> R: # obtain the original function and wrapped kwargs original_func = args[0] otel_context = context.get_current() def wrapped_func(*func_args: Any, **func_kwargs: Any) -> R: token = None try: token = context.attach(otel_context) return original_func(*func_args, **func_kwargs) finally: if token is not None: context.detach(token) # replace the original function with the wrapped function new_args: tuple[Callable[..., Any], ...] = (wrapped_func,) + args[1:] return call_wrapped(*new_args, **kwargs)