Source code for opentelemetry.instrumentation.dbapi

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The trace integration with Database API supports libraries that follow the
Python Database API Specification v2.0.
`<https://www.python.org/dev/peps/pep-0249/>`_

Usage
-----

.. code-block:: python

    import mysql.connector
    import pyodbc

    from opentelemetry.instrumentation.dbapi import trace_integration


    # Ex: mysql.connector
    trace_integration(mysql.connector, "connect", "mysql")
    # Ex: pyodbc
    trace_integration(pyodbc, "Connection", "odbc")

API
---
"""

import functools
import logging
import re
import typing

import wrapt

from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.dbapi.version import __version__
from opentelemetry.instrumentation.sqlcommenter_utils import _add_sql_comment
from opentelemetry.instrumentation.utils import (
    _get_opentelemetry_values,
    unwrap,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer

_logger = logging.getLogger(__name__)


[docs]def trace_integration( connect_module: typing.Callable[..., typing.Any], connect_method_name: str, database_system: str, connection_attributes: typing.Dict = None, tracer_provider: typing.Optional[TracerProvider] = None, capture_parameters: bool = False, enable_commenter: bool = False, db_api_integration_factory=None, ): """Integrate with DB API library. https://www.python.org/dev/peps/pep-0249/ Args: connect_module: Module name where connect method is available. connect_method_name: The connect method name. database_system: An identifier for the database management system (DBMS) product being used. connection_attributes: Attribute names for database, port, host and user in Connection object. tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to use. If omitted the current configured one is used. capture_parameters: Configure if db.statement.parameters should be captured. enable_commenter: Flag to enable/disable sqlcommenter. db_api_integration_factory: The `DatabaseApiIntegration` to use. If none is passed the default one is used. """ wrap_connect( __name__, connect_module, connect_method_name, database_system, connection_attributes, version=__version__, tracer_provider=tracer_provider, capture_parameters=capture_parameters, enable_commenter=enable_commenter, db_api_integration_factory=db_api_integration_factory, )
[docs]def wrap_connect( name: str, connect_module: typing.Callable[..., typing.Any], connect_method_name: str, database_system: str, connection_attributes: typing.Dict = None, version: str = "", tracer_provider: typing.Optional[TracerProvider] = None, capture_parameters: bool = False, enable_commenter: bool = False, db_api_integration_factory=None, commenter_options: dict = None, ): """Integrate with DB API library. https://www.python.org/dev/peps/pep-0249/ Args: connect_module: Module name where connect method is available. connect_method_name: The connect method name. database_system: An identifier for the database management system (DBMS) product being used. connection_attributes: Attribute names for database, port, host and user in Connection object. tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to use. If omitted the current configured one is used. capture_parameters: Configure if db.statement.parameters should be captured. enable_commenter: Flag to enable/disable sqlcommenter. db_api_integration_factory: The `DatabaseApiIntegration` to use. If none is passed the default one is used. commenter_options: Configurations for tags to be appended at the sql query. """ db_api_integration_factory = ( db_api_integration_factory or DatabaseApiIntegration ) # pylint: disable=unused-argument def wrap_connect_( wrapped: typing.Callable[..., typing.Any], instance: typing.Any, args: typing.Tuple[typing.Any, typing.Any], kwargs: typing.Dict[typing.Any, typing.Any], ): db_integration = db_api_integration_factory( name, database_system, connection_attributes=connection_attributes, version=version, tracer_provider=tracer_provider, capture_parameters=capture_parameters, enable_commenter=enable_commenter, commenter_options=commenter_options, connect_module=connect_module, ) return db_integration.wrapped_connection(wrapped, args, kwargs) try: wrapt.wrap_function_wrapper( connect_module, connect_method_name, wrap_connect_ ) except Exception as ex: # pylint: disable=broad-except _logger.warning("Failed to integrate with DB API. %s", str(ex))
[docs]def unwrap_connect( connect_module: typing.Callable[..., typing.Any], connect_method_name: str ): """Disable integration with DB API library. https://www.python.org/dev/peps/pep-0249/ Args: connect_module: Module name where the connect method is available. connect_method_name: The connect method name. """ unwrap(connect_module, connect_method_name)
[docs]def instrument_connection( name: str, connection, database_system: str, connection_attributes: typing.Dict = None, version: str = "", tracer_provider: typing.Optional[TracerProvider] = None, capture_parameters: bool = False, enable_commenter: bool = False, commenter_options: dict = None, ): """Enable instrumentation in a database connection. Args: connection: The connection to instrument. database_system: An identifier for the database management system (DBMS) product being used. connection_attributes: Attribute names for database, port, host and user in a connection object. tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to use. If omitted the current configured one is used. capture_parameters: Configure if db.statement.parameters should be captured. enable_commenter: Flag to enable/disable sqlcommenter. commenter_options: Configurations for tags to be appended at the sql query. Returns: An instrumented connection. """ if isinstance(connection, wrapt.ObjectProxy): _logger.warning("Connection already instrumented") return connection db_integration = DatabaseApiIntegration( name, database_system, connection_attributes=connection_attributes, version=version, tracer_provider=tracer_provider, capture_parameters=capture_parameters, enable_commenter=enable_commenter, commenter_options=commenter_options, ) db_integration.get_connection_attributes(connection) return get_traced_connection_proxy(connection, db_integration)
[docs]def uninstrument_connection(connection): """Disable instrumentation in a database connection. Args: connection: The connection to uninstrument. Returns: An uninstrumented connection. """ if isinstance(connection, wrapt.ObjectProxy): return connection.__wrapped__ _logger.warning("Connection is not instrumented") return connection
[docs]class DatabaseApiIntegration: def __init__( self, name: str, database_system: str, connection_attributes=None, version: str = "", tracer_provider: typing.Optional[TracerProvider] = None, capture_parameters: bool = False, enable_commenter: bool = False, commenter_options: dict = None, connect_module: typing.Callable[..., typing.Any] = None, ): self.connection_attributes = connection_attributes if self.connection_attributes is None: self.connection_attributes = { "database": "database", "port": "port", "host": "host", "user": "user", } self._name = name self._version = version self._tracer = get_tracer( self._name, instrumenting_library_version=self._version, tracer_provider=tracer_provider, ) self.capture_parameters = capture_parameters self.enable_commenter = enable_commenter self.commenter_options = commenter_options self.database_system = database_system self.connection_props = {} self.span_attributes = {} self.name = "" self.database = "" self.connect_module = connect_module
[docs] def wrapped_connection( self, connect_method: typing.Callable[..., typing.Any], args: typing.Tuple[typing.Any, typing.Any], kwargs: typing.Dict[typing.Any, typing.Any], ): """Add object proxy to connection object.""" connection = connect_method(*args, **kwargs) self.get_connection_attributes(connection) return get_traced_connection_proxy(connection, self)
[docs] def get_connection_attributes(self, connection): # Populate span fields using connection for key, value in self.connection_attributes.items(): # Allow attributes nested in connection object attribute = functools.reduce( lambda attribute, attribute_value: getattr( attribute, attribute_value, None ), value.split("."), connection, ) if attribute: self.connection_props[key] = attribute self.name = self.database_system self.database = self.connection_props.get("database", "") if self.database: # PyMySQL encodes names with utf-8 if hasattr(self.database, "decode"): self.database = self.database.decode(errors="ignore") self.name += "." + self.database user = self.connection_props.get("user") # PyMySQL encodes this data if user and isinstance(user, bytes): user = user.decode() if user is not None: self.span_attributes[SpanAttributes.DB_USER] = str(user) host = self.connection_props.get("host") if host is not None: self.span_attributes[SpanAttributes.NET_PEER_NAME] = host port = self.connection_props.get("port") if port is not None: self.span_attributes[SpanAttributes.NET_PEER_PORT] = port
[docs]def get_traced_connection_proxy( connection, db_api_integration, *args, **kwargs ): # pylint: disable=abstract-method class TracedConnectionProxy(wrapt.ObjectProxy): # pylint: disable=unused-argument def __init__(self, connection, *args, **kwargs): wrapt.ObjectProxy.__init__(self, connection) def __getattribute__(self, name): if object.__getattribute__(self, name): return object.__getattribute__(self, name) return object.__getattribute__( object.__getattribute__(self, "_connection"), name ) def cursor(self, *args, **kwargs): return get_traced_cursor_proxy( self.__wrapped__.cursor(*args, **kwargs), db_api_integration ) def __enter__(self): self.__wrapped__.__enter__() return self def __exit__(self, *args, **kwargs): self.__wrapped__.__exit__(*args, **kwargs) return TracedConnectionProxy(connection, *args, **kwargs)
[docs]class CursorTracer: def __init__(self, db_api_integration: DatabaseApiIntegration) -> None: self._db_api_integration = db_api_integration self._commenter_enabled = self._db_api_integration.enable_commenter self._commenter_options = ( self._db_api_integration.commenter_options if self._db_api_integration.commenter_options else {} ) self._connect_module = self._db_api_integration.connect_module self._leading_comment_remover = re.compile(r"^/\*.*?\*/") def _populate_span( self, span: trace_api.Span, cursor, *args: typing.Tuple[typing.Any, typing.Any], ): if not span.is_recording(): return statement = self.get_statement(cursor, args) span.set_attribute( SpanAttributes.DB_SYSTEM, self._db_api_integration.database_system ) span.set_attribute( SpanAttributes.DB_NAME, self._db_api_integration.database ) span.set_attribute(SpanAttributes.DB_STATEMENT, statement) for ( attribute_key, attribute_value, ) in self._db_api_integration.span_attributes.items(): span.set_attribute(attribute_key, attribute_value) if self._db_api_integration.capture_parameters and len(args) > 1: span.set_attribute("db.statement.parameters", str(args[1]))
[docs] def get_operation_name(self, cursor, args): # pylint: disable=no-self-use if args and isinstance(args[0], str): # Strip leading comments so we get the operation name. return self._leading_comment_remover.sub("", args[0]).split()[0] return ""
[docs] def get_statement(self, cursor, args): # pylint: disable=no-self-use if not args: return "" statement = args[0] if isinstance(statement, bytes): return statement.decode("utf8", "replace") return statement
[docs] def traced_execution( self, cursor, query_method: typing.Callable[..., typing.Any], *args: typing.Tuple[typing.Any, typing.Any], **kwargs: typing.Dict[typing.Any, typing.Any], ): name = self.get_operation_name(cursor, args) if not name: name = ( self._db_api_integration.database if self._db_api_integration.database else self._db_api_integration.name ) with self._db_api_integration._tracer.start_as_current_span( name, kind=SpanKind.CLIENT ) as span: self._populate_span(span, cursor, *args) if args and self._commenter_enabled: try: args_list = list(args) commenter_data = dict( # Psycopg2/framework information db_driver=f"psycopg2:{self._connect_module.__version__.split(' ')[0]}", dbapi_threadsafety=self._connect_module.threadsafety, dbapi_level=self._connect_module.apilevel, libpq_version=self._connect_module.__libpq_version__, driver_paramstyle=self._connect_module.paramstyle, ) if self._commenter_options.get( "opentelemetry_values", True ): commenter_data.update(**_get_opentelemetry_values()) # Filter down to just the requested attributes. commenter_data = { k: v for k, v in commenter_data.items() if self._commenter_options.get(k, True) } statement = _add_sql_comment( args_list[0], **commenter_data ) args_list[0] = statement args = tuple(args_list) except Exception as exc: # pylint: disable=broad-except _logger.exception( "Exception while generating sql comment: %s", exc ) return query_method(*args, **kwargs)
[docs]def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs): _cursor_tracer = CursorTracer(db_api_integration) # pylint: disable=abstract-method class TracedCursorProxy(wrapt.ObjectProxy): # pylint: disable=unused-argument def __init__(self, cursor, *args, **kwargs): wrapt.ObjectProxy.__init__(self, cursor) def execute(self, *args, **kwargs): return _cursor_tracer.traced_execution( self.__wrapped__, self.__wrapped__.execute, *args, **kwargs ) def executemany(self, *args, **kwargs): return _cursor_tracer.traced_execution( self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs ) def callproc(self, *args, **kwargs): return _cursor_tracer.traced_execution( self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs ) def __enter__(self): self.__wrapped__.__enter__() return self def __exit__(self, *args, **kwargs): self.__wrapped__.__exit__(*args, **kwargs) return TracedCursorProxy(cursor, *args, **kwargs)