Source code for opentelemetry.instrumentation.asyncclick

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

"""
Instrument `asyncclick`_ CLI applications. The instrumentor will avoid instrumenting
well-known servers (e.g. *flask run* and *uvicorn*) to avoid unexpected effects
like every request having the same Trace ID.



.. _asyncclick: https://pypi.org/project/asyncclick/

Usage
-----

.. code-block:: python

    import asyncio
    import asyncclick
    from opentelemetry.instrumentation.asyncclick import AsyncClickInstrumentor

    AsyncClickInstrumentor().instrument()

    @asyncclick.command()
    async def hello():
        asyncclick.echo(f'Hello world!')

    if __name__ == "__main__":
        asyncio.run(hello())

API
---
"""

from __future__ import annotations

import os
import sys
from functools import partial
from logging import getLogger
from typing import (
    TYPE_CHECKING,
    Any,
    Awaitable,
    Callable,
    Collection,
    TypeVar,
)

import asyncclick
from typing_extensions import ParamSpec, Unpack
from wrapt import (
    wrap_function_wrapper,  # type: ignore[reportUnknownVariableType]
)

from opentelemetry import trace
from opentelemetry.instrumentation.asyncclick.package import _instruments
from opentelemetry.instrumentation.asyncclick.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
    unwrap,
)
from opentelemetry.semconv._incubating.attributes.process_attributes import (
    PROCESS_COMMAND_ARGS,
    PROCESS_EXECUTABLE_NAME,
    PROCESS_EXIT_CODE,
    PROCESS_PID,
)
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from opentelemetry.trace.status import StatusCode

if TYPE_CHECKING:
    from typing import TypedDict

    class InstrumentKwargs(TypedDict, total=False):
        tracer_provider: trace.TracerProvider

    class UninstrumentKwargs(TypedDict, total=False):
        pass


_logger = getLogger(__name__)


T = TypeVar("T")
P = ParamSpec("P")


async def _command_invoke_wrapper(
    wrapped: Callable[P, Awaitable[T]],
    instance: asyncclick.core.Command,
    args: tuple[Any, ...],
    kwargs: dict[str, Any],
    tracer: trace.Tracer,
) -> T:
    # Subclasses of Command include groups and CLI runners, but
    # we only want to instrument the actual commands which are
    # instances of Command itself.
    if instance.__class__ != asyncclick.Command:
        return await wrapped(*args, **kwargs)

    ctx = args[0]

    span_name = ctx.info_name
    span_attributes = {
        PROCESS_COMMAND_ARGS: sys.argv,
        PROCESS_EXECUTABLE_NAME: sys.argv[0],
        PROCESS_EXIT_CODE: 0,
        PROCESS_PID: os.getpid(),
    }

    with tracer.start_as_current_span(
        name=span_name,
        kind=trace.SpanKind.INTERNAL,
        attributes=span_attributes,
    ) as span:
        try:
            result = await wrapped(*args, **kwargs)
            return result
        except Exception as exc:
            span.set_status(StatusCode.ERROR, str(exc))
            if span.is_recording():
                span.set_attribute(ERROR_TYPE, type(exc).__qualname__)
                span.set_attribute(
                    PROCESS_EXIT_CODE, getattr(exc, "exit_code", 1)
                )
            raise


# pylint: disable=no-self-use
[docs]class AsyncClickInstrumentor(BaseInstrumentor): """An instrumentor for asyncclick"""
[docs] def instrumentation_dependencies(self) -> Collection[str]: return _instruments
def _instrument(self, **kwargs: Unpack[InstrumentKwargs]) -> None: tracer_provider = kwargs.get("tracer_provider") tracer = trace.get_tracer( __name__, __version__, tracer_provider, ) wrap_function_wrapper( asyncclick.core.Command, "invoke", partial(_command_invoke_wrapper, tracer=tracer), ) def _uninstrument(self, **kwargs: Unpack["UninstrumentKwargs"]) -> None: unwrap(asyncclick.core.Command, "invoke")