# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
.. asyncio: https://github.com/python/asyncio
The opentelemetry-instrumentation-asycnio package allows tracing asyncio applications.
The metric for coroutine, future, is generated even if there is no setting to generate a span.
Run instrumented application
------------------------------
1. coroutine
----------------
.. code:: python
# export OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE=sleep
import asyncio
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
AsyncioInstrumentor().instrument()
async def main():
await asyncio.create_task(asyncio.sleep(0.1))
asyncio.run(main())
2. future
------------
.. code:: python
# export OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED=true
loop = asyncio.get_event_loop()
future = asyncio.Future()
future.set_result(1)
task = asyncio.ensure_future(future)
loop.run_until_complete(task)
3. to_thread
-------------
.. code:: python
import asyncio
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
AsyncioInstrumentor().instrument()
async def main():
await asyncio.to_thread(func)
def func():
pass
asyncio.run(main())
asyncio metric types
---------------------
* asyncio.process.duration (seconds) - Duration of asyncio process
* asyncio.process.count (count) - Number of asyncio process
API
---
"""
import asyncio
import sys
from asyncio import futures
from timeit import default_timer
from typing import Collection
from wrapt import wrap_function_wrapper as _wrap
from opentelemetry.instrumentation.asyncio.package import _instruments
from opentelemetry.instrumentation.asyncio.utils import (
get_coros_to_trace,
get_future_trace_enabled,
get_to_thread_to_trace,
)
from opentelemetry.instrumentation.asyncio.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import get_meter
from opentelemetry.trace import get_tracer
from opentelemetry.trace.status import Status, StatusCode
ASYNCIO_PREFIX = "asyncio"
[docs]class AsyncioInstrumentor(BaseInstrumentor):
"""
An instrumentor for asyncio
See `BaseInstrumentor`
"""
methods_with_coroutine = [
"create_task",
"ensure_future",
"wait_for",
"wait",
"as_completed",
"run_coroutine_threadsafe",
]
def __init__(self):
super().__init__()
self.process_duration_histogram = None
self.process_created_counter = None
self._tracer = None
self._meter = None
self._coros_name_to_trace: set = set()
self._to_thread_name_to_trace: set = set()
self._future_active_enabled: bool = False
[docs] def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
def _instrument(self, **kwargs):
self._tracer = get_tracer(
__name__, __version__, kwargs.get("tracer_provider")
)
self._meter = get_meter(
__name__, __version__, kwargs.get("meter_provider")
)
self._coros_name_to_trace = get_coros_to_trace()
self._future_active_enabled = get_future_trace_enabled()
self._to_thread_name_to_trace = get_to_thread_to_trace()
self.process_duration_histogram = self._meter.create_histogram(
name="asyncio.process.duration",
description="Duration of asyncio process",
unit="s",
)
self.process_created_counter = self._meter.create_counter(
name="asyncio.process.created",
description="Number of asyncio process",
unit="{process}",
)
for method in self.methods_with_coroutine:
self.instrument_method_with_coroutine(method)
self.instrument_gather()
self.instrument_to_thread()
self.instrument_taskgroup_create_task()
def _uninstrument(self, **kwargs):
for method in self.methods_with_coroutine:
uninstrument_method_with_coroutine(method)
uninstrument_gather()
uninstrument_to_thread()
uninstrument_taskgroup_create_task()
[docs] def instrument_method_with_coroutine(self, method_name: str):
"""
Instruments specified asyncio method.
"""
def wrap_coro_or_future(method, instance, args, kwargs):
# If the first argument is a coroutine or future,
# we decorate it with a span and return the task.
if args and len(args) > 0:
first_arg = args[0]
# Check if it's a coroutine or future and wrap it
if asyncio.iscoroutine(first_arg) or futures.isfuture(
first_arg
):
args = (self.trace_item(first_arg),) + args[1:]
# Check if it's a list and wrap each item
elif isinstance(first_arg, list):
args = (
[self.trace_item(item) for item in first_arg],
) + args[1:]
return method(*args, **kwargs)
_wrap(asyncio, method_name, wrap_coro_or_future)
[docs] def instrument_gather(self):
def wrap_coros_or_futures(method, instance, args, kwargs):
if args and len(args) > 0:
# Check if it's a coroutine or future and wrap it
wrapped_args = tuple(self.trace_item(item) for item in args)
return method(*wrapped_args, **kwargs)
return method(*args, **kwargs)
_wrap(asyncio, "gather", wrap_coros_or_futures)
[docs] def instrument_to_thread(self) -> None:
# to_thread was added in Python 3.9
if sys.version_info < (3, 9):
return
def wrap_to_thread(method, instance, args, kwargs) -> None:
if args:
first_arg = args[0]
# Wrap the first argument
wrapped_first_arg = self.trace_to_thread(first_arg)
wrapped_args = (wrapped_first_arg,) + args[1:]
return method(*wrapped_args, **kwargs)
return method(*args, **kwargs)
_wrap(asyncio, "to_thread", wrap_to_thread)
[docs] def instrument_taskgroup_create_task(self) -> None:
# TaskGroup.create_task was added in Python 3.11
if sys.version_info < (3, 11):
return
def wrap_taskgroup_create_task(method, instance, args, kwargs) -> None:
if args:
coro = args[0]
wrapped_coro = self.trace_coroutine(coro)
wrapped_args = (wrapped_coro,) + args[1:]
return method(*wrapped_args, **kwargs)
return method(*args, **kwargs)
_wrap(
asyncio.TaskGroup, # pylint: disable=no-member
"create_task",
wrap_taskgroup_create_task,
)
[docs] def trace_to_thread(self, func: callable):
"""Trace a function."""
start = default_timer()
span = (
self._tracer.start_span(
f"{ASYNCIO_PREFIX} to_thread-" + func.__name__
)
if func.__name__ in self._to_thread_name_to_trace
else None
)
attr = {"type": "to_thread", "name": func.__name__}
exception = None
try:
attr["state"] = "finished"
return func
except Exception:
attr["state"] = "exception"
raise
finally:
self.record_process(start, attr, span, exception)
[docs] def trace_item(self, coro_or_future):
"""Trace a coroutine or future item."""
# Task is already traced, return it
if isinstance(coro_or_future, asyncio.Task):
return coro_or_future
if asyncio.iscoroutine(coro_or_future):
return self.trace_coroutine(coro_or_future)
if futures.isfuture(coro_or_future):
return self.trace_future(coro_or_future)
return coro_or_future
[docs] async def trace_coroutine(self, coro):
start = default_timer()
attr = {
"type": "coroutine",
"name": coro.__name__,
}
span = (
self._tracer.start_span(f"{ASYNCIO_PREFIX} coro-" + coro.__name__)
if coro.__name__ in self._coros_name_to_trace
else None
)
exception = None
try:
attr["state"] = "finished"
return await coro
# CancelledError is raised when a coroutine is cancelled
# before it has a chance to run. We don't want to record
# this as an error.
except asyncio.CancelledError:
attr["state"] = "cancelled"
except Exception as exc:
exception = exc
state = determine_state(exception)
attr["state"] = state
raise
finally:
self.record_process(start, attr, span, exception)
[docs] def trace_future(self, future):
start = default_timer()
span = (
self._tracer.start_span(f"{ASYNCIO_PREFIX} future")
if self._future_active_enabled
else None
)
def callback(f):
exception = f.exception()
attr = {
"type": "future",
}
state = determine_state(exception)
attr["state"] = state
self.record_process(start, attr, span, exception)
future.add_done_callback(callback)
return future
[docs] def record_process(
self, start: float, attr: dict, span=None, exception=None
) -> None:
"""
Record the processing time, update histogram and counter, and handle span.
:param start: Start time of the process.
:param attr: Attributes for the histogram and counter.
:param span: Optional span for tracing.
:param exception: Optional exception occurred during the process.
"""
duration = max(default_timer() - start, 0)
self.process_duration_histogram.record(duration, attr)
self.process_created_counter.add(1, attr)
if span:
if span.is_recording() and exception:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(exception)
span.end()
[docs]def determine_state(exception: Exception) -> str:
if isinstance(exception, asyncio.CancelledError):
return "cancelled"
if isinstance(exception, asyncio.TimeoutError):
return "timeout"
if exception:
return "exception"
return "finished"
[docs]def uninstrument_taskgroup_create_task() -> None:
# TaskGroup.create_task was added in Python 3.11
if sys.version_info < (3, 11):
return
unwrap(asyncio.TaskGroup, "create_task") # pylint: disable=no-member
[docs]def uninstrument_to_thread() -> None:
# to_thread was added in Python 3.9
if sys.version_info < (3, 9):
return
unwrap(asyncio, "to_thread")
[docs]def uninstrument_gather() -> None:
unwrap(asyncio, "gather")
[docs]def uninstrument_method_with_coroutine(method_name: str) -> None:
"""
Uninstrument specified asyncio method.
"""
unwrap(asyncio, method_name)