Source code for opentelemetry.instrumentation.asyncio

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
.. asyncio: https://github.com/python/asyncio

The opentelemetry-instrumentation-asycnio package allows tracing asyncio applications.
The metric for coroutine, future, is generated even if there is no setting to generate a span.

Run instrumented application
------------------------------
1. coroutine
----------------
.. code:: python

    # export OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE=sleep

    import asyncio
    from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor

    AsyncioInstrumentor().instrument()

    async def main():
        await asyncio.create_task(asyncio.sleep(0.1))

    asyncio.run(main())

2. future
------------
.. code:: python

    # export OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED=true

    loop = asyncio.get_event_loop()

    future = asyncio.Future()
    future.set_result(1)
    task = asyncio.ensure_future(future)
    loop.run_until_complete(task)

3. to_thread
-------------
.. code:: python

    import asyncio
    from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor

    AsyncioInstrumentor().instrument()

    async def main():
        await asyncio.to_thread(func)

    def func():
        pass

    asyncio.run(main())


asyncio metric types
---------------------

* asyncio.process.duration (seconds) - Duration of asyncio process
* asyncio.process.count (count) - Number of asyncio process


API
---
"""
import asyncio
import sys
from asyncio import futures
from timeit import default_timer
from typing import Collection

from wrapt import wrap_function_wrapper as _wrap

from opentelemetry.instrumentation.asyncio.package import _instruments
from opentelemetry.instrumentation.asyncio.utils import (
    get_coros_to_trace,
    get_future_trace_enabled,
    get_to_thread_to_trace,
)
from opentelemetry.instrumentation.asyncio.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import get_meter
from opentelemetry.trace import get_tracer
from opentelemetry.trace.status import Status, StatusCode

ASYNCIO_PREFIX = "asyncio"


[docs]class AsyncioInstrumentor(BaseInstrumentor): """ An instrumentor for asyncio See `BaseInstrumentor` """ methods_with_coroutine = [ "create_task", "ensure_future", "wait_for", "wait", "as_completed", "run_coroutine_threadsafe", ] def __init__(self): super().__init__() self.process_duration_histogram = None self.process_created_counter = None self._tracer = None self._meter = None self._coros_name_to_trace: set = set() self._to_thread_name_to_trace: set = set() self._future_active_enabled: bool = False
[docs] def instrumentation_dependencies(self) -> Collection[str]: return _instruments
def _instrument(self, **kwargs): self._tracer = get_tracer( __name__, __version__, kwargs.get("tracer_provider") ) self._meter = get_meter( __name__, __version__, kwargs.get("meter_provider") ) self._coros_name_to_trace = get_coros_to_trace() self._future_active_enabled = get_future_trace_enabled() self._to_thread_name_to_trace = get_to_thread_to_trace() self.process_duration_histogram = self._meter.create_histogram( name="asyncio.process.duration", description="Duration of asyncio process", unit="s", ) self.process_created_counter = self._meter.create_counter( name="asyncio.process.created", description="Number of asyncio process", unit="{process}", ) for method in self.methods_with_coroutine: self.instrument_method_with_coroutine(method) self.instrument_gather() self.instrument_to_thread() self.instrument_taskgroup_create_task() def _uninstrument(self, **kwargs): for method in self.methods_with_coroutine: uninstrument_method_with_coroutine(method) uninstrument_gather() uninstrument_to_thread() uninstrument_taskgroup_create_task()
[docs] def instrument_method_with_coroutine(self, method_name: str): """ Instruments specified asyncio method. """ def wrap_coro_or_future(method, instance, args, kwargs): # If the first argument is a coroutine or future, # we decorate it with a span and return the task. if args and len(args) > 0: first_arg = args[0] # Check if it's a coroutine or future and wrap it if asyncio.iscoroutine(first_arg) or futures.isfuture( first_arg ): args = (self.trace_item(first_arg),) + args[1:] # Check if it's a list and wrap each item elif isinstance(first_arg, list): args = ( [self.trace_item(item) for item in first_arg], ) + args[1:] return method(*args, **kwargs) _wrap(asyncio, method_name, wrap_coro_or_future)
[docs] def instrument_gather(self): def wrap_coros_or_futures(method, instance, args, kwargs): if args and len(args) > 0: # Check if it's a coroutine or future and wrap it wrapped_args = tuple(self.trace_item(item) for item in args) return method(*wrapped_args, **kwargs) return method(*args, **kwargs) _wrap(asyncio, "gather", wrap_coros_or_futures)
[docs] def instrument_to_thread(self) -> None: # to_thread was added in Python 3.9 if sys.version_info < (3, 9): return def wrap_to_thread(method, instance, args, kwargs) -> None: if args: first_arg = args[0] # Wrap the first argument wrapped_first_arg = self.trace_to_thread(first_arg) wrapped_args = (wrapped_first_arg,) + args[1:] return method(*wrapped_args, **kwargs) return method(*args, **kwargs) _wrap(asyncio, "to_thread", wrap_to_thread)
[docs] def instrument_taskgroup_create_task(self) -> None: # TaskGroup.create_task was added in Python 3.11 if sys.version_info < (3, 11): return def wrap_taskgroup_create_task(method, instance, args, kwargs) -> None: if args: coro = args[0] wrapped_coro = self.trace_coroutine(coro) wrapped_args = (wrapped_coro,) + args[1:] return method(*wrapped_args, **kwargs) return method(*args, **kwargs) _wrap( asyncio.TaskGroup, # pylint: disable=no-member "create_task", wrap_taskgroup_create_task, )
[docs] def trace_to_thread(self, func: callable): """Trace a function.""" start = default_timer() span = ( self._tracer.start_span( f"{ASYNCIO_PREFIX} to_thread-" + func.__name__ ) if func.__name__ in self._to_thread_name_to_trace else None ) attr = {"type": "to_thread", "name": func.__name__} exception = None try: attr["state"] = "finished" return func except Exception: attr["state"] = "exception" raise finally: self.record_process(start, attr, span, exception)
[docs] def trace_item(self, coro_or_future): """Trace a coroutine or future item.""" # Task is already traced, return it if isinstance(coro_or_future, asyncio.Task): return coro_or_future if asyncio.iscoroutine(coro_or_future): return self.trace_coroutine(coro_or_future) if futures.isfuture(coro_or_future): return self.trace_future(coro_or_future) return coro_or_future
[docs] async def trace_coroutine(self, coro): start = default_timer() attr = { "type": "coroutine", "name": coro.__name__, } span = ( self._tracer.start_span(f"{ASYNCIO_PREFIX} coro-" + coro.__name__) if coro.__name__ in self._coros_name_to_trace else None ) exception = None try: attr["state"] = "finished" return await coro # CancelledError is raised when a coroutine is cancelled # before it has a chance to run. We don't want to record # this as an error. except asyncio.CancelledError: attr["state"] = "cancelled" except Exception as exc: exception = exc state = determine_state(exception) attr["state"] = state raise finally: self.record_process(start, attr, span, exception)
[docs] def trace_future(self, future): start = default_timer() span = ( self._tracer.start_span(f"{ASYNCIO_PREFIX} future") if self._future_active_enabled else None ) def callback(f): exception = f.exception() attr = { "type": "future", } state = determine_state(exception) attr["state"] = state self.record_process(start, attr, span, exception) future.add_done_callback(callback) return future
[docs] def record_process( self, start: float, attr: dict, span=None, exception=None ) -> None: """ Record the processing time, update histogram and counter, and handle span. :param start: Start time of the process. :param attr: Attributes for the histogram and counter. :param span: Optional span for tracing. :param exception: Optional exception occurred during the process. """ duration = max(default_timer() - start, 0) self.process_duration_histogram.record(duration, attr) self.process_created_counter.add(1, attr) if span: if span.is_recording() and exception: span.set_status(Status(StatusCode.ERROR)) span.record_exception(exception) span.end()
[docs]def determine_state(exception: Exception) -> str: if isinstance(exception, asyncio.CancelledError): return "cancelled" if isinstance(exception, asyncio.TimeoutError): return "timeout" if exception: return "exception" return "finished"
[docs]def uninstrument_taskgroup_create_task() -> None: # TaskGroup.create_task was added in Python 3.11 if sys.version_info < (3, 11): return unwrap(asyncio.TaskGroup, "create_task") # pylint: disable=no-member
[docs]def uninstrument_to_thread() -> None: # to_thread was added in Python 3.9 if sys.version_info < (3, 9): return unwrap(asyncio, "to_thread")
[docs]def uninstrument_gather() -> None: unwrap(asyncio, "gather")
[docs]def uninstrument_method_with_coroutine(method_name: str) -> None: """ Uninstrument specified asyncio method. """ unwrap(asyncio, method_name)