Source code for opentelemetry.instrumentation.exceptions

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
"""
Instrument uncaught exceptions to emit OpenTelemetry logs.

Usage
-----

.. code-block:: python

    from opentelemetry.instrumentation.exceptions import (
        UnhandledExceptionInstrumentor,
    )

    UnhandledExceptionInstrumentor().instrument()

This instrumentation captures uncaught process exceptions, uncaught thread
exceptions, and unhandled asyncio task exceptions and emits them as
OpenTelemetry logs.
"""

from __future__ import annotations

import asyncio
import sys
import threading
from collections.abc import Collection
from types import TracebackType
from typing import Any

from wrapt import (
    wrap_function_wrapper,  # type: ignore[reportUnknownVariableType]
)

from opentelemetry._logs import Logger, SeverityNumber, get_logger
from opentelemetry.instrumentation.exceptions.package import _instruments
from opentelemetry.instrumentation.exceptions.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.schemas import Schemas


[docs]class UnhandledExceptionInstrumentor(BaseInstrumentor): """Emit logs for uncaught exceptions and unhandled asyncio exceptions.""" def __init__(self): super().__init__() self._logger: Logger | None = None # pylint: disable-next=no-self-use
[docs] def instrumentation_dependencies(self) -> Collection[str]: return _instruments
def _instrument(self, **kwargs: Any): self._logger = get_logger( __name__, __version__, kwargs.get("logger_provider"), schema_url=Schemas.V1_37_0.value, ) self._install_sys_hook() self._install_threading_hook() self._install_asyncio_hook() def _uninstrument(self, **kwargs: Any): self._restore_sys_hook() self._restore_threading_hook() self._restore_asyncio_hook() self._logger = None def _install_sys_hook(self) -> None: wrap_function_wrapper( sys, "excepthook", self._wrap_sys_excepthook, ) @staticmethod def _restore_sys_hook() -> None: unwrap(sys, "excepthook") def _install_threading_hook(self) -> None: wrap_function_wrapper( threading, "excepthook", self._wrap_threading_excepthook, ) @staticmethod def _restore_threading_hook() -> None: unwrap(threading, "excepthook") def _install_asyncio_hook(self) -> None: wrap_function_wrapper( asyncio.BaseEventLoop, "call_exception_handler", self._wrap_asyncio_call_exception_handler, ) @staticmethod def _restore_asyncio_hook() -> None: unwrap(asyncio.BaseEventLoop, "call_exception_handler") def _emit_exception( self, exc: BaseException, *, severity_text: str, severity_number: SeverityNumber, event_name: str, ) -> None: # BaseException includes process-control signals like KeyboardInterrupt. if not isinstance(exc, Exception) or self._logger is None: return try: self._logger.emit( event_name=event_name, body=str(exc), severity_text=severity_text, severity_number=severity_number, exception=exc, ) # Logging must never replace the original unhandled exception path. # pylint: disable-next=broad-exception-caught except Exception: # pragma: no cover pass def _wrap_sys_excepthook( self, wrapped, instance, args: tuple[type[BaseException], BaseException, TracebackType | None], kwargs: dict[str, Any], ) -> None: _, exc, _ = args self._emit_exception( exc, severity_text="FATAL", severity_number=SeverityNumber.FATAL, event_name=type(exc).__name__, ) wrapped(*args, **kwargs) def _wrap_threading_excepthook( self, wrapped, instance, args: tuple[threading.ExceptHookArgs], kwargs: dict[str, Any], ) -> None: (hook_args,) = args self._emit_exception( hook_args.exc_value, severity_text="ERROR", severity_number=SeverityNumber.ERROR, event_name=hook_args.exc_type.__name__, ) wrapped(*args, **kwargs) def _wrap_asyncio_call_exception_handler( self, wrapped, instance: asyncio.AbstractEventLoop, args: tuple[dict[str, Any]], kwargs: dict[str, Any], ) -> None: context = args[0] if args else kwargs.get("context") if context: exc = context.get("exception") if isinstance(exc, BaseException): message = context.get("message") self._emit_exception( exc, severity_text="ERROR", severity_number=SeverityNumber.ERROR, event_name=str(message) if message else type(exc).__name__, ) wrapped(*args, **kwargs)
__all__ = ["UnhandledExceptionInstrumentor"]