# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
"""
The integration with MongoDB supports the `pymongo`_ library, it can be
enabled using the ``PymongoInstrumentor``.
.. _pymongo: https://pypi.org/project/pymongo
Usage
-----
.. code:: python
from pymongo import MongoClient
from opentelemetry.instrumentation.pymongo import PymongoInstrumentor
PymongoInstrumentor().instrument()
client = MongoClient()
db = client["MongoDB_Database"]
collection = db["MongoDB_Collection"]
collection.find_one()
API
---
The `instrument` method accepts the following keyword args:
* tracer_provider (``TracerProvider``) - an optional tracer provider
* request_hook (``Callable[[Span, CommandStartedEvent], None]``) - a function with extra user-defined logic to be performed before querying mongodb
* response_hook (``Callable[[Span, CommandSucceededEvent], None]``) - a function with extra user-defined logic to be performed after the query returns with a successful response
* failed_hook (``Callable[[Span, CommandFailedEvent], None]``) - a function with extra user-defined logic to be performed after the query returns with a failed response
* capture_statement (``bool``) - an optional value to enable capturing the database statement that is being executed
for example:
.. code:: python
from opentelemetry.instrumentation.pymongo import PymongoInstrumentor
from pymongo import MongoClient
def request_hook(span, event):
# request hook logic
pass
def response_hook(span, event):
# response hook logic
pass
def failed_hook(span, event):
# failed hook logic
pass
# Instrument pymongo with hooks
PymongoInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook, failed_hook=failed_hook)
# This will create a span with pymongo specific attributes, including custom attributes added from the hooks
client = MongoClient()
db = client["MongoDB_Database"]
collection = db["MongoDB_Collection"]
collection.find_one()
"""
from __future__ import annotations
from logging import getLogger
from typing import Any, Callable, Collection, TypeVar
from pymongo import monitoring
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.pymongo.package import _instruments
from opentelemetry.instrumentation.pymongo.utils import (
COMMAND_TO_ATTRIBUTE_MAPPING,
)
from opentelemetry.instrumentation.pymongo.version import __version__
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.semconv._incubating.attributes.db_attributes import (
DB_MONGODB_COLLECTION,
DB_NAME,
DB_STATEMENT,
DB_SYSTEM,
)
from opentelemetry.semconv._incubating.attributes.net_attributes import (
NET_PEER_NAME,
NET_PEER_PORT,
)
from opentelemetry.semconv.trace import DbSystemValues
from opentelemetry.trace import SpanKind, Tracer, get_tracer
from opentelemetry.trace.span import Span
from opentelemetry.trace.status import Status, StatusCode
_LOG = getLogger(__name__)
RequestHookT = Callable[[Span, monitoring.CommandStartedEvent], None]
ResponseHookT = Callable[[Span, monitoring.CommandSucceededEvent], None]
FailedHookT = Callable[[Span, monitoring.CommandFailedEvent], None]
CommandEvent = TypeVar(
"CommandEvent",
monitoring.CommandStartedEvent,
monitoring.CommandSucceededEvent,
monitoring.CommandFailedEvent,
)
[docs]def dummy_callback(span: Span, event: CommandEvent): ...
[docs]class CommandTracer(monitoring.CommandListener):
def __init__(
self,
tracer: Tracer,
request_hook: RequestHookT = dummy_callback,
response_hook: ResponseHookT = dummy_callback,
failed_hook: FailedHookT = dummy_callback,
capture_statement: bool = False,
):
self._tracer = tracer
self._span_dict = {}
self.is_enabled = True
self.start_hook = request_hook
self.success_hook = response_hook
self.failed_hook = failed_hook
self.capture_statement = capture_statement
[docs] def started(self, event: monitoring.CommandStartedEvent):
"""Method to handle a pymongo CommandStartedEvent"""
if not self.is_enabled or not is_instrumentation_enabled():
return
command_name = event.command_name
span_name = f"{event.database_name}.{command_name}"
statement = self._get_statement_by_command_name(command_name, event)
collection = _get_command_collection_name(event)
try:
span = self._tracer.start_span(span_name, kind=SpanKind.CLIENT)
if span.is_recording():
span.set_attribute(DB_SYSTEM, DbSystemValues.MONGODB.value)
span.set_attribute(DB_NAME, event.database_name)
span.set_attribute(DB_STATEMENT, statement)
if collection:
span.set_attribute(DB_MONGODB_COLLECTION, collection)
if event.connection_id is not None:
span.set_attribute(NET_PEER_NAME, event.connection_id[0])
span.set_attribute(NET_PEER_PORT, event.connection_id[1])
try:
self.start_hook(span, event)
except (
Exception # noqa pylint: disable=broad-except
) as hook_exception: # noqa pylint: disable=broad-except
_LOG.exception(hook_exception)
# Add Span to dictionary
self._span_dict[_get_span_dict_key(event)] = span
except Exception as ex: # noqa pylint: disable=broad-except
if span is not None and span.is_recording():
span.set_status(Status(StatusCode.ERROR, str(ex)))
span.end()
self._pop_span(event)
[docs] def succeeded(self, event: monitoring.CommandSucceededEvent):
"""Method to handle a pymongo CommandSucceededEvent"""
if not self.is_enabled or not is_instrumentation_enabled():
return
span = self._pop_span(event)
if span is None:
return
if span.is_recording():
try:
self.success_hook(span, event)
except (
Exception # noqa pylint: disable=broad-except
) as hook_exception: # noqa pylint: disable=broad-except
_LOG.exception(hook_exception)
span.end()
[docs] def failed(self, event: monitoring.CommandFailedEvent):
"""Method to handle a pymongo CommandFailedEvent"""
if not (self.is_enabled and is_instrumentation_enabled()):
return
span = self._pop_span(event)
if span is None:
return
if span.is_recording():
span.set_status(
Status(
StatusCode.ERROR,
event.failure.get("errmsg", "Unknown error"),
)
)
try:
self.failed_hook(span, event)
except (
Exception # noqa pylint: disable=broad-except
) as hook_exception: # noqa pylint: disable=broad-except
_LOG.exception(hook_exception)
span.end()
def _pop_span(self, event: CommandEvent) -> Span | None:
return self._span_dict.pop(_get_span_dict_key(event), None)
def _get_statement_by_command_name(
self, command_name: str, event: CommandEvent
) -> str:
statement = command_name
command_attribute = COMMAND_TO_ATTRIBUTE_MAPPING.get(command_name)
command = event.command.get(command_attribute)
if command and self.capture_statement:
statement += " " + str(command)
return statement
def _get_command_collection_name(event: CommandEvent) -> str | None:
collection_name = event.command.get(event.command_name)
if not collection_name or not isinstance(collection_name, str):
return None
return collection_name
def _get_span_dict_key(
event: CommandEvent,
) -> int | tuple[int, tuple[str, int | None]]:
if event.connection_id is not None:
return event.request_id, event.connection_id
return event.request_id
[docs]class PymongoInstrumentor(BaseInstrumentor):
_commandtracer_instance: CommandTracer | None = None
# The instrumentation for PyMongo is based on the event listener interface
# https://api.mongodb.com/python/current/api/pymongo/monitoring.html.
# This interface only allows to register listeners and does not provide
# an unregister API. In order to provide a mechanishm to disable
# instrumentation an enabled flag is implemented in CommandTracer,
# it's checked in the different listeners.
[docs] def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
def _instrument(self, **kwargs: Any):
"""Integrate with pymongo to trace it using event listener.
https://api.mongodb.com/python/current/api/pymongo/monitoring.html
Args:
tracer_provider: The `TracerProvider` to use. If none is passed the
current configured one is used.
"""
tracer_provider = kwargs.get("tracer_provider")
request_hook = kwargs.get("request_hook", dummy_callback)
response_hook = kwargs.get("response_hook", dummy_callback)
failed_hook = kwargs.get("failed_hook", dummy_callback)
capture_statement = kwargs.get("capture_statement")
# Create and register a CommandTracer only the first time
if self._commandtracer_instance is None:
tracer = get_tracer(
__name__,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
self._commandtracer_instance = CommandTracer(
tracer,
request_hook=request_hook,
response_hook=response_hook,
failed_hook=failed_hook,
capture_statement=capture_statement,
)
monitoring.register(self._commandtracer_instance)
# If already created, just enable it
self._commandtracer_instance.is_enabled = True
def _uninstrument(self, **kwargs: Any):
if self._commandtracer_instance is not None:
self._commandtracer_instance.is_enabled = False