# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
"""
.. asyncio: https://github.com/python/asyncio
The opentelemetry-instrumentation-asyncio package allows tracing asyncio applications.
The metric for coroutine, future, is generated even if there is no setting to generate a span.
Run instrumented application
------------------------------
1. coroutine
----------------
.. code:: python
# export OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE=sleep
import asyncio
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
AsyncioInstrumentor().instrument()
async def main():
await asyncio.create_task(asyncio.sleep(0.1))
asyncio.run(main())
2. future
------------
.. code:: python
# export OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED=true
import asyncio
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
AsyncioInstrumentor().instrument()
loop = asyncio.get_event_loop()
future = asyncio.Future()
future.set_result(1)
task = asyncio.ensure_future(future)
loop.run_until_complete(task)
3. to_thread
-------------
.. code:: python
# export OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE=func
import asyncio
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
AsyncioInstrumentor().instrument()
async def main():
await asyncio.to_thread(func)
def func():
pass
asyncio.run(main())
asyncio metric types
---------------------
* asyncio.process.duration (seconds) - Duration of asyncio process
* asyncio.process.count (count) - Number of asyncio process
API
---
"""
import asyncio
import functools
import sys
from asyncio import futures
from timeit import default_timer
from typing import Collection
from wrapt import wrap_function_wrapper as _wrap
from opentelemetry.instrumentation.asyncio.instrumentation_state import (
_is_instrumented,
)
from opentelemetry.instrumentation.asyncio.package import _instruments
from opentelemetry.instrumentation.asyncio.utils import (
get_coros_to_trace,
get_future_trace_enabled,
get_to_thread_to_trace,
)
from opentelemetry.instrumentation.asyncio.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import get_meter
from opentelemetry.trace import get_tracer
from opentelemetry.trace.status import Status, StatusCode
ASYNCIO_PREFIX = "asyncio"
[docs]class AsyncioInstrumentor(BaseInstrumentor):
"""
An instrumentor for asyncio
See `BaseInstrumentor`
"""
methods_with_coroutine = [
"create_task",
"ensure_future",
"wait_for",
"wait",
"as_completed",
"run_coroutine_threadsafe",
]
[docs] def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
def _instrument(self, **kwargs):
self._tracer = get_tracer(
__name__, __version__, kwargs.get("tracer_provider")
)
self._meter = get_meter(
__name__, __version__, kwargs.get("meter_provider")
)
self._coros_name_to_trace = get_coros_to_trace()
self._future_active_enabled = get_future_trace_enabled()
self._to_thread_name_to_trace = get_to_thread_to_trace()
self.process_duration_histogram = self._meter.create_histogram(
name="asyncio.process.duration",
description="Duration of asyncio process",
unit="s",
)
self.process_created_counter = self._meter.create_counter(
name="asyncio.process.created",
description="Number of asyncio process",
unit="{process}",
)
for method in self.methods_with_coroutine:
self.instrument_method_with_coroutine(method)
self.instrument_gather()
self.instrument_to_thread()
self.instrument_taskgroup_create_task()
def _uninstrument(self, **kwargs):
for method in self.methods_with_coroutine:
uninstrument_method_with_coroutine(method)
uninstrument_gather()
uninstrument_to_thread()
uninstrument_taskgroup_create_task()
[docs] def instrument_method_with_coroutine(self, method_name: str):
"""
Instruments specified asyncio method.
"""
def wrap_coro_or_future(method, instance, args, kwargs):
# If the first argument is a coroutine or future,
# we decorate it with a span and return the task.
if args and len(args) > 0:
first_arg = args[0]
# Check if it's a coroutine or future and wrap it
if asyncio.iscoroutine(first_arg) or futures.isfuture(
first_arg
):
args = (self.trace_item(first_arg),) + args[1:]
# Check if it's a list and wrap each item
elif isinstance(first_arg, list):
args = (
[self.trace_item(item) for item in first_arg],
) + args[1:]
return method(*args, **kwargs)
_wrap(asyncio, method_name, wrap_coro_or_future)
[docs] def instrument_gather(self):
def wrap_coros_or_futures(method, instance, args, kwargs):
if args and len(args) > 0:
# Check if it's a coroutine or future and wrap it
wrapped_args = tuple(self.trace_item(item) for item in args)
return method(*wrapped_args, **kwargs)
return method(*args, **kwargs)
_wrap(asyncio, "gather", wrap_coros_or_futures)
[docs] def instrument_to_thread(self) -> None:
def wrap_to_thread(method, instance, args, kwargs) -> None:
if args:
first_arg = args[0]
# Wrap the first argument
wrapped_first_arg = self.trace_to_thread(first_arg)
wrapped_args = (wrapped_first_arg,) + args[1:]
return method(*wrapped_args, **kwargs)
return method(*args, **kwargs)
_wrap(asyncio, "to_thread", wrap_to_thread)
[docs] def instrument_taskgroup_create_task(self) -> None:
# TaskGroup.create_task was added in Python 3.11
if sys.version_info < (3, 11):
return
def wrap_taskgroup_create_task(method, instance, args, kwargs) -> None:
if args:
coro = args[0]
wrapped_coro = self.trace_coroutine(coro)
wrapped_args = (wrapped_coro,) + args[1:]
return method(*wrapped_args, **kwargs)
return method(*args, **kwargs)
_wrap(
asyncio.TaskGroup, # pylint: disable=no-member
"create_task",
wrap_taskgroup_create_task,
)
[docs] def trace_to_thread(self, func: callable):
"""
Trace a function, but if already instrumented, skip double-wrapping.
"""
if _is_instrumented(func):
return func
start = default_timer()
func_name = getattr(func, "__name__", None)
if func_name is None and isinstance(func, functools.partial):
func_name = func.func.__name__
span = (
self._tracer.start_span(f"{ASYNCIO_PREFIX} to_thread-" + func_name)
if func_name in self._to_thread_name_to_trace
else None
)
attr = {"type": "to_thread", "name": func_name}
exception = None
try:
attr["state"] = "finished"
return func
except Exception:
attr["state"] = "exception"
raise
finally:
self.record_process(start, attr, span, exception)
[docs] def trace_item(self, coro_or_future):
"""Trace a coroutine or future item."""
# Task is already traced, return it
if isinstance(coro_or_future, asyncio.Task):
return coro_or_future
if asyncio.iscoroutine(coro_or_future):
return self.trace_coroutine(coro_or_future)
if futures.isfuture(coro_or_future):
return self.trace_future(coro_or_future)
return coro_or_future
[docs] async def trace_coroutine(self, coro):
"""
Wrap a coroutine so that we measure its duration, metrics, etc.
If already instrumented, simply 'await coro' to preserve call behavior.
"""
if _is_instrumented(coro):
return await coro
if not hasattr(coro, "__name__"):
return await coro
start = default_timer()
attr = {
"type": "coroutine",
"name": coro.__name__,
}
span = (
self._tracer.start_span(f"{ASYNCIO_PREFIX} coro-" + coro.__name__)
if coro.__name__ in self._coros_name_to_trace
else None
)
exception = None
try:
attr["state"] = "finished"
return await coro
# CancelledError is raised when a coroutine is cancelled
# before it has a chance to run. We don't want to record
# this as an error.
# Still it needs to be raised in order for `asyncio.wait_for`
# to properly work with timeout and raise accordingly `asyncio.TimeoutError`
except asyncio.CancelledError:
attr["state"] = "cancelled"
raise
except Exception as exc:
exception = exc
state = determine_state(exception)
attr["state"] = state
raise
finally:
self.record_process(start, attr, span, exception)
[docs] def trace_future(self, future):
"""
Wrap a Future's done callback. If already instrumented, skip re-wrapping.
"""
if _is_instrumented(future):
return future
start = default_timer()
span = (
self._tracer.start_span(f"{ASYNCIO_PREFIX} future")
if self._future_active_enabled
else None
)
def callback(f):
attr = {
"type": "future",
"state": (
"cancelled"
if f.cancelled()
else determine_state(f.exception())
),
}
self.record_process(
start, attr, span, None if f.cancelled() else f.exception()
)
future.add_done_callback(callback)
return future
[docs] def record_process(
self, start: float, attr: dict, span=None, exception=None
) -> None:
"""
Record the processing time, update histogram and counter, and handle span.
:param start: Start time of the process.
:param attr: Attributes for the histogram and counter.
:param span: Optional span for tracing.
:param exception: Optional exception occurred during the process.
"""
duration = max(default_timer() - start, 0)
self.process_duration_histogram.record(duration, attr)
self.process_created_counter.add(1, attr)
if span:
if span.is_recording() and exception:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(exception)
span.end()
[docs]def determine_state(exception: Exception) -> str:
if isinstance(exception, asyncio.CancelledError):
return "cancelled"
if isinstance(exception, asyncio.TimeoutError):
return "timeout"
if exception:
return "exception"
return "finished"
[docs]def uninstrument_taskgroup_create_task() -> None:
# TaskGroup.create_task was added in Python 3.11
if sys.version_info < (3, 11):
return
unwrap(asyncio.TaskGroup, "create_task") # pylint: disable=no-member
[docs]def uninstrument_to_thread() -> None:
unwrap(asyncio, "to_thread")
[docs]def uninstrument_gather() -> None:
unwrap(asyncio, "gather")
[docs]def uninstrument_method_with_coroutine(method_name: str) -> None:
"""
Uninstrument specified asyncio method.
"""
unwrap(asyncio, method_name)