# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
"""
The trace integration with Database API supports libraries that follow the
Python Database API Specification v2.0.
`<https://www.python.org/dev/peps/pep-0249/>`_
Usage
-----
The DB-API instrumentor and its utilities provide common, core functionality for
database framework or object relation mapper (ORM) instrumentations. Users will
typically instrument database client code with those framework/ORM-specific
instrumentations, instead of directly using this DB-API integration. Features
such as sqlcommenter can be configured at framework/ORM level as well. See full
list at `instrumentation`_.
.. _instrumentation: https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation
If an instrumentation for your needs does not exist, then DB-API integration can
be used directly as follows.
.. code-block:: python
import mysql.connector
import pyodbc
from opentelemetry.instrumentation.dbapi import (
trace_integration,
wrap_connect,
)
# Example: mysql.connector
trace_integration(mysql.connector, "connect", "mysql")
# Example: pyodbc
trace_integration(pyodbc, "Connection", "odbc")
# Or, directly call wrap_connect for more configurability.
wrap_connect(__name__, mysql.connector, "connect", "mysql")
wrap_connect(__name__, pyodbc, "Connection", "odbc")
Configuration
-------------
SQLCommenter
************
You can optionally enable sqlcommenter which enriches the query with contextual
information. Queries made after setting up trace integration with sqlcommenter
enabled will have configurable key-value pairs appended to them, e.g.
``"select * from auth_users; /*traceparent=00-01234567-abcd-01*/"``. This
supports context propagation between database client and server when database log
records are enabled. For more information, see:
* `Semantic Conventions - Database Spans <https://github.com/open-telemetry/semantic-conventions/blob/main/docs/db/database-spans.md#sql-commenter>`_
* `sqlcommenter <https://google.github.io/sqlcommenter/>`_
.. code:: python
import mysql.connector
from opentelemetry.instrumentation.dbapi import wrap_connect
# Opts into sqlcomment for MySQL trace integration.
wrap_connect(
__name__,
mysql.connector,
"connect",
"mysql",
enable_commenter=True,
)
SQLCommenter with commenter_options
***********************************
The key-value pairs appended to the query can be configured using
``commenter_options``. When sqlcommenter is enabled, all available KVs/tags
are calculated by default. ``commenter_options`` supports *opting out*
of specific KVs.
.. code:: python
import mysql.connector
from opentelemetry.instrumentation.dbapi import wrap_connect
# Opts into sqlcomment for MySQL trace integration.
# Opts out of tags for libpq_version, db_driver.
wrap_connect(
__name__,
mysql.connector,
"connect",
"mysql",
enable_commenter=True,
commenter_options={
"libpq_version": False,
"db_driver": False,
}
)
Available commenter_options
###########################
The following sqlcomment key-values can be opted out of through ``commenter_options``:
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| Commenter Option | Description | Example |
+===========================+===========================================================+===========================================================================+
| ``db_driver`` | Database driver name with version. | ``mysql.connector=2.2.9`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| ``dbapi_threadsafety`` | DB-API threadsafety value: 0-3 or unknown. | ``dbapi_threadsafety=2`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| ``dbapi_level`` | DB-API API level: 1.0, 2.0, or unknown. | ``dbapi_level=2.0`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| ``driver_paramstyle`` | DB-API paramstyle for SQL statement parameter. | ``driver_paramstyle='pyformat'`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| ``libpq_version`` | PostgreSQL libpq version (checked for PostgreSQL only). | ``libpq_version=140001`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| ``mysql_client_version`` | MySQL client version (checked for MySQL only). | ``mysql_client_version='123'`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| ``opentelemetry_values`` | OpenTelemetry context as traceparent at time of query. | ``traceparent='00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01'`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
SQLComment in span attribute
****************************
If sqlcommenter is enabled, you can opt into the inclusion of sqlcomment in
the query span ``db.statement`` and/or ``db.query.text`` attribute for your
needs. If ``commenter_options`` have been set, the span attribute comment
will also be configured by this setting.
.. code:: python
import mysql.connector
from opentelemetry.instrumentation.dbapi import wrap_connect
# Opts into sqlcomment for MySQL trace integration.
# Opts into sqlcomment for `db.statement` and/or `db.query.text` span attribute.
wrap_connect(
__name__,
mysql.connector,
"connect",
"mysql",
enable_commenter=True,
enable_attribute_commenter=True,
)
API
---
"""
from __future__ import annotations
import functools
import logging
import re
import time
from typing import Any, Awaitable, Callable, Generic, TypeVar
from wrapt import wrap_function_wrapper
try:
# wrapt 2.0.0+
from wrapt import ( # pylint: disable=no-name-in-module
BaseObjectProxy,
ObjectProxy,
)
except ImportError:
from wrapt import ObjectProxy
from wrapt import ObjectProxy as BaseObjectProxy
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation._semconv import (
_get_schema_url_for_signal_types,
_OpenTelemetrySemanticConventionStability,
_OpenTelemetryStabilitySignalType,
_report_new,
_set_db_name,
_set_db_statement,
_set_db_system,
_set_db_user,
_set_http_net_peer_name_client,
_set_http_peer_port_client,
)
from opentelemetry.instrumentation.dbapi.version import __version__
from opentelemetry.instrumentation.sqlcommenter_utils import _add_sql_comment
from opentelemetry.instrumentation.utils import (
_get_opentelemetry_values,
is_instrumentation_enabled,
unwrap,
)
from opentelemetry.metrics import MeterProvider, get_meter
from opentelemetry.semconv._incubating.metrics.db_metrics import (
create_db_client_operation_duration,
create_db_client_response_returned_rows,
)
from opentelemetry.semconv.attributes.db_attributes import (
DB_NAMESPACE,
DB_OPERATION_NAME,
DB_SYSTEM_NAME,
)
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from opentelemetry.semconv.attributes.server_attributes import (
SERVER_ADDRESS,
SERVER_PORT,
)
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
from opentelemetry.util._importlib_metadata import version as util_version
_DB_DRIVER_ALIASES = {
"MySQLdb": "mysqlclient",
}
_logger = logging.getLogger(__name__)
ConnectionT = TypeVar("ConnectionT")
CursorT = TypeVar("CursorT")
[docs]def trace_integration(
connect_module: Callable[..., Any],
connect_method_name: str,
database_system: str,
connection_attributes: dict[str, Any] | None = None,
tracer_provider: TracerProvider | None = None,
capture_parameters: bool = False,
enable_commenter: bool = False,
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
enable_attribute_commenter: bool = False,
commenter_options: dict[str, Any] | None = None,
meter_provider: MeterProvider | None = None,
):
"""Integrate with DB API library.
https://www.python.org/dev/peps/pep-0249/
Args:
connect_module: Module name where connect method is available.
connect_method_name: The connect method name.
database_system: An identifier for the database management system (DBMS)
product being used.
connection_attributes: Attribute names for database, port, host and
user in Connection object.
tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to
use. If omitted the current configured one is used.
capture_parameters: Configure if db.statement.parameters should be captured.
enable_commenter: Flag to enable/disable sqlcommenter.
db_api_integration_factory: The `DatabaseApiIntegration` to use. If none is passed the
default one is used.
enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` and/or `db.query.text` span attribute. Only available if enable_commenter=True.
commenter_options: Configurations for tags to be appended at the sql query.
meter_provider: The :class:`opentelemetry.metrics.MeterProvider` to
use. If omitted the current configured one is used.
"""
wrap_connect(
__name__,
connect_module,
connect_method_name,
database_system,
connection_attributes,
version=__version__,
tracer_provider=tracer_provider,
capture_parameters=capture_parameters,
enable_commenter=enable_commenter,
db_api_integration_factory=db_api_integration_factory,
enable_attribute_commenter=enable_attribute_commenter,
commenter_options=commenter_options,
meter_provider=meter_provider,
)
# pylint: disable-next=too-many-positional-arguments
[docs]def wrap_connect(
name: str,
connect_module: Callable[..., Any],
connect_method_name: str,
database_system: str,
connection_attributes: dict[str, Any] | None = None,
version: str = "",
tracer_provider: TracerProvider | None = None,
capture_parameters: bool = False,
enable_commenter: bool = False,
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
commenter_options: dict[str, Any] | None = None,
enable_attribute_commenter: bool = False,
meter_provider: MeterProvider | None = None,
):
"""Integrate with DB API library.
https://www.python.org/dev/peps/pep-0249/
Args:
connect_module: Module name where connect method is available.
connect_method_name: The connect method name.
database_system: An identifier for the database management system (DBMS)
product being used.
connection_attributes: Attribute names for database, port, host and
user in Connection object.
tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to
use. If omitted the current configured one is used.
capture_parameters: Configure if db.statement.parameters should be captured.
enable_commenter: Flag to enable/disable sqlcommenter.
db_api_integration_factory: The `DatabaseApiIntegration` to use. If none is passed the
default one is used.
commenter_options: Configurations for tags to be appended at the sql query.
enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` and/or `db.query.text` span attribute. Only available if enable_commenter=True.
meter_provider: The :class:`opentelemetry.metrics.MeterProvider` to
use. If omitted the current configured one is used.
"""
db_api_integration_factory = (
db_api_integration_factory or DatabaseApiIntegration
)
# pylint: disable=unused-argument
def wrap_connect_(
wrapped: Callable[..., Any],
instance: Any,
args: tuple[Any, Any],
kwargs: dict[Any, Any],
):
db_integration = db_api_integration_factory(
name,
database_system,
connection_attributes=connection_attributes,
version=version,
tracer_provider=tracer_provider,
capture_parameters=capture_parameters,
enable_commenter=enable_commenter,
commenter_options=commenter_options,
connect_module=connect_module,
enable_attribute_commenter=enable_attribute_commenter,
meter_provider=meter_provider,
)
return db_integration.wrapped_connection(wrapped, args, kwargs)
try:
wrap_function_wrapper(
connect_module, connect_method_name, wrap_connect_
)
except Exception as ex: # pylint: disable=broad-except
_logger.warning("Failed to integrate with DB API. %s", str(ex))
[docs]def unwrap_connect(
connect_module: Callable[..., Any], connect_method_name: str
):
"""Disable integration with DB API library.
https://www.python.org/dev/peps/pep-0249/
Args:
connect_module: Module name where the connect method is available.
connect_method_name: The connect method name.
"""
unwrap(connect_module, connect_method_name)
# pylint: disable-next=too-many-positional-arguments
[docs]def instrument_connection(
name: str,
connection: ConnectionT | TracedConnectionProxy[ConnectionT],
database_system: str,
connection_attributes: dict[str, Any] | None = None,
version: str = "",
tracer_provider: TracerProvider | None = None,
capture_parameters: bool = False,
enable_commenter: bool = False,
commenter_options: dict[str, Any] | None = None,
connect_module: Callable[..., Any] | None = None,
enable_attribute_commenter: bool = False,
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
meter_provider: MeterProvider | None = None,
) -> TracedConnectionProxy[ConnectionT]:
"""Enable instrumentation in a database connection.
Args:
name: The instrumentation module name.
connection: The connection to instrument.
database_system: An identifier for the database management system (DBMS)
product being used.
connection_attributes: Attribute names for database, port, host and
user in a connection object.
tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to
use. If omitted the current configured one is used.
capture_parameters: Configure if db.statement.parameters should be captured.
enable_commenter: Flag to enable/disable sqlcommenter.
commenter_options: Configurations for tags to be appended at the sql query.
connect_module: Module name where connect method is available.
enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` and/or `db.query.text` span attribute. Only available if enable_commenter=True.
db_api_integration_factory: A class or factory function to use as a
replacement for :class:`DatabaseApiIntegration`. Can be used to
obtain connection attributes from the connect method instead of
from the connection itself (as done by the pymssql intrumentor).
meter_provider: The :class:`opentelemetry.metrics.MeterProvider` to
use. If omitted the current configured one is used.
Returns:
An instrumented connection.
"""
if isinstance(connection, BaseObjectProxy):
_logger.warning("Connection already instrumented")
return connection
db_api_integration_factory = (
db_api_integration_factory or DatabaseApiIntegration
)
db_integration = db_api_integration_factory(
name,
database_system,
connection_attributes=connection_attributes,
version=version,
tracer_provider=tracer_provider,
capture_parameters=capture_parameters,
enable_commenter=enable_commenter,
commenter_options=commenter_options,
connect_module=connect_module,
enable_attribute_commenter=enable_attribute_commenter,
meter_provider=meter_provider,
)
db_integration.get_connection_attributes(connection)
return get_traced_connection_proxy(connection, db_integration)
[docs]def uninstrument_connection(
connection: ConnectionT | TracedConnectionProxy[ConnectionT],
) -> ConnectionT:
"""Disable instrumentation in a database connection.
Args:
connection: The connection to uninstrument.
Returns:
An uninstrumented connection.
"""
if isinstance(connection, BaseObjectProxy):
return connection.__wrapped__
_logger.warning("Connection is not instrumented")
return connection
[docs]class DatabaseApiIntegration:
def __init__(
self,
name: str,
database_system: str,
connection_attributes: dict[str, Any] | None = None,
version: str = "",
tracer_provider: TracerProvider | None = None,
capture_parameters: bool = False,
enable_commenter: bool = False,
commenter_options: dict[str, Any] | None = None,
connect_module: Callable[..., Any] | None = None,
enable_attribute_commenter: bool = False,
meter_provider: MeterProvider | None = None,
):
# Initialize semantic conventions opt-in if needed
_OpenTelemetrySemanticConventionStability._initialize()
self._sem_conv_opt_in_mode_db = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.DATABASE,
)
self._sem_conv_opt_in_mode_http = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.HTTP,
)
if connection_attributes is None:
self.connection_attributes = {
"database": "database",
"port": "port",
"host": "host",
"user": "user",
}
else:
self.connection_attributes = connection_attributes
self._name = name
self._version = version
self._tracer = get_tracer(
self._name,
instrumenting_library_version=self._version,
tracer_provider=tracer_provider,
schema_url=_get_schema_url_for_signal_types(
[
_OpenTelemetryStabilitySignalType.DATABASE,
_OpenTelemetryStabilitySignalType.HTTP,
]
),
)
self._meter = None
self._duration_histogram = None
self._returned_rows_histogram = None
if _report_new(self._sem_conv_opt_in_mode_db):
self._meter = get_meter(
self._name,
self._version,
meter_provider,
schema_url=_get_schema_url_for_signal_types(
[_OpenTelemetryStabilitySignalType.DATABASE]
),
)
self._duration_histogram = create_db_client_operation_duration(
self._meter
)
self._returned_rows_histogram = (
create_db_client_response_returned_rows(self._meter)
)
self.capture_parameters = capture_parameters
self.enable_commenter = enable_commenter
self.commenter_options = commenter_options
self.enable_attribute_commenter = enable_attribute_commenter
self.database_system = database_system
self.connection_props: dict[str, Any] = {}
self.span_attributes: dict[str, Any] = {}
self.name = ""
self.database = ""
self._server_address: str | None = None
self._server_port: int | None = None
self.connect_module = connect_module
self.commenter_data = self.calculate_commenter_data()
def _get_db_version(self, db_driver: str) -> str:
if db_driver in _DB_DRIVER_ALIASES:
return util_version(_DB_DRIVER_ALIASES[db_driver])
db_version = ""
try:
db_version = self.connect_module.__version__
except AttributeError:
db_version = "unknown"
return db_version
[docs] def wrapped_connection(
self,
connect_method: Callable[..., ConnectionT],
args: tuple[Any, ...],
kwargs: dict[Any, Any],
) -> TracedConnectionProxy[ConnectionT]:
"""Add object proxy to connection object."""
connection = connect_method(*args, **kwargs)
self.get_connection_attributes(connection)
return get_traced_connection_proxy(connection, self)
[docs] def get_connection_attributes(self, connection: object) -> None:
# Populate span fields using connection
for key, value in self.connection_attributes.items():
# Allow attributes nested in connection object
attribute = functools.reduce(
lambda attribute, attribute_value: getattr(
attribute, attribute_value, None
),
value.split("."),
connection,
)
if attribute:
self.connection_props[key] = attribute
self.name = self.database_system
self.database = self.connection_props.get("database", "")
if self.database:
# PyMySQL encodes names with utf-8
if hasattr(self.database, "decode"):
self.database = self.database.decode(errors="ignore")
self.name += "." + self.database
user = self.connection_props.get("user")
# PyMySQL encodes this data
if user and isinstance(user, bytes):
user = user.decode()
if user is not None:
_set_db_user(
self.span_attributes, str(user), self._sem_conv_opt_in_mode_db
)
host = self.connection_props.get("host")
if host is not None:
_set_http_net_peer_name_client(
self.span_attributes,
host,
self._sem_conv_opt_in_mode_http,
)
self._server_address = host
port = self.connection_props.get("port")
if port is not None:
_set_http_peer_port_client(
self.span_attributes, port, self._sem_conv_opt_in_mode_http
)
self._server_port = port
# pylint: disable=abstract-method,no-member
[docs]class TracedConnectionProxy(BaseObjectProxy, Generic[ConnectionT]):
# pylint: disable=unused-argument
def __init__(
self,
connection: ConnectionT,
db_api_integration: DatabaseApiIntegration | None = None,
):
BaseObjectProxy.__init__(self, connection)
self._self_db_api_integration = db_api_integration
def __getattribute__(self, name: str):
if object.__getattribute__(self, name):
return object.__getattribute__(self, name)
return object.__getattribute__(
object.__getattribute__(self, "_connection"), name
)
[docs] def cursor(self, *args: Any, **kwargs: Any):
return get_traced_cursor_proxy(
self.__wrapped__.cursor(*args, **kwargs),
self._self_db_api_integration,
)
def __enter__(self):
self.__wrapped__.__enter__()
return self
def __exit__(self, *args: Any, **kwargs: Any):
self.__wrapped__.__exit__(*args, **kwargs)
[docs]def get_traced_connection_proxy(
connection: ConnectionT,
db_api_integration: DatabaseApiIntegration | None,
*args: Any,
**kwargs: Any,
) -> TracedConnectionProxy[ConnectionT]:
return TracedConnectionProxy(connection, db_api_integration)
[docs]class CursorTracer(Generic[CursorT]):
def __init__(self, db_api_integration: DatabaseApiIntegration) -> None:
self._db_api_integration = db_api_integration
self._commenter_enabled = self._db_api_integration.enable_commenter
self._commenter_options = (
self._db_api_integration.commenter_options
if self._db_api_integration.commenter_options
else {}
)
self._enable_attribute_commenter = (
self._db_api_integration.enable_attribute_commenter
)
self._connect_module = self._db_api_integration.connect_module
self._leading_comment_remover = re.compile(r"^/\*.*?\*/")
def _capture_mysql_version(self, cursor) -> None:
"""Lazy capture of mysql-connector client version using cursor, if applicable"""
if (
self._db_api_integration.database_system == "mysql"
and self._db_api_integration.connect_module.__name__
== "mysql.connector"
and not self._db_api_integration.commenter_data[
"mysql_client_version"
]
):
try:
# Autoinstrumentation and some programmatic calls
client_version = cursor._cnx._cmysql.get_client_info()
except AttributeError:
# Other programmatic instrumentation with reassigned wrapped connection
try:
client_version = (
cursor._connection._cmysql.get_client_info()
)
except AttributeError as exc:
_logger.debug(
"Could not set mysql_client_version: %s", exc
)
client_version = "unknown"
self._db_api_integration.commenter_data["mysql_client_version"] = (
client_version
)
def _get_commenter_data(self) -> dict:
"""Uses DB-API integration to return commenter data for sqlcomment"""
commenter_data = dict(self._db_api_integration.commenter_data)
if self._commenter_options.get("opentelemetry_values", True):
commenter_data.update(**_get_opentelemetry_values())
return {
k: v
for k, v in commenter_data.items()
if self._commenter_options.get(k, True)
}
def _update_args_with_added_sql_comment(self, args, cursor) -> tuple:
"""Updates args with cursor info and adds sqlcomment to query statement"""
try:
args_list = list(args)
self._capture_mysql_version(cursor)
commenter_data = self._get_commenter_data()
# Convert sql statement to string, handling psycopg2.sql.Composable object
if hasattr(args_list[0], "as_string"):
args_list[0] = args_list[0].as_string(cursor.connection)
args_list[0] = str(args_list[0])
statement = _add_sql_comment(args_list[0], **commenter_data)
args_list[0] = statement
args = tuple(args_list)
except Exception as exc: # pylint: disable=broad-except
_logger.exception(
"Exception while generating sql comment: %s", exc
)
return args
def _populate_span(
self,
span: trace_api.Span,
cursor: CursorT,
*args: tuple[Any, ...],
):
if not span.is_recording():
return
statement = self.get_statement(cursor, args)
sem_conv_mode = self._db_api_integration._sem_conv_opt_in_mode_db
span_attrs = {}
_set_db_system(
span_attrs,
self._db_api_integration.database_system,
sem_conv_mode,
)
_set_db_name(
span_attrs,
self._db_api_integration.database,
sem_conv_mode,
)
_set_db_statement(span_attrs, statement, sem_conv_mode)
# Set all collected attributes
span.set_attributes(span_attrs)
for (
attribute_key,
attribute_value,
) in self._db_api_integration.span_attributes.items():
span.set_attribute(attribute_key, attribute_value)
if self._db_api_integration.capture_parameters and len(args) > 1:
span.set_attribute("db.statement.parameters", str(args[1]))
[docs] def get_operation_name(
self, cursor: CursorT, args: tuple[Any, ...]
) -> str: # pylint: disable=no-self-use
if args and isinstance(args[0], str):
# Strip leading comments so we get the operation name.
return self._leading_comment_remover.sub("", args[0]).split()[0]
return ""
[docs] def get_statement(self, cursor: CursorT, args: tuple[Any, ...]): # pylint: disable=no-self-use
if not args:
return ""
statement = args[0]
if isinstance(statement, bytes):
return statement.decode("utf8", "replace")
return statement
def _get_metric_attributes(
self,
operation_name: str,
error: Exception | None,
) -> dict[str, Any]:
attributes: dict[str, Any] = {
DB_SYSTEM_NAME: self._db_api_integration.database_system,
}
if self._db_api_integration.database:
attributes[DB_NAMESPACE] = self._db_api_integration.database
if operation_name:
attributes[DB_OPERATION_NAME] = operation_name
if self._db_api_integration._server_address is not None:
attributes[SERVER_ADDRESS] = (
self._db_api_integration._server_address
)
if self._db_api_integration._server_port is not None:
attributes[SERVER_PORT] = self._db_api_integration._server_port
if error is not None:
attributes[ERROR_TYPE] = type(error).__qualname__
return attributes
def _record_metrics(
self,
cursor: CursorT,
operation_name: str,
start_time: float,
error: Exception | None,
) -> None:
if not _report_new(self._db_api_integration._sem_conv_opt_in_mode_db):
# DB Metrics are not supported without Database semconv opt-in
return
elapsed = time.perf_counter() - start_time
attributes = self._get_metric_attributes(operation_name, error)
self._db_api_integration._duration_histogram.record(
elapsed, attributes=attributes
)
if error is None:
rowcount = getattr(cursor, "rowcount", None)
if isinstance(rowcount, int) and rowcount >= 0:
self._db_api_integration._returned_rows_histogram.record(
rowcount, attributes=attributes
)
[docs] def traced_execution(
self,
cursor: CursorT,
query_method: Callable[..., Any],
*args: tuple[Any, ...],
**kwargs: dict[Any, Any],
):
if not is_instrumentation_enabled():
return query_method(*args, **kwargs)
operation_name = self.get_operation_name(cursor, args)
name = operation_name
if not name:
name = (
self._db_api_integration.database
if self._db_api_integration.database
else self._db_api_integration.name
)
with self._db_api_integration._tracer.start_as_current_span(
name, kind=SpanKind.CLIENT
) as span:
if span.is_recording():
if args and self._commenter_enabled:
if self._enable_attribute_commenter:
# sqlcomment is added to executed query and db.statement and/or db.query.text span attribute
args = self._update_args_with_added_sql_comment(
args, cursor
)
self._populate_span(span, cursor, *args)
else:
# sqlcomment is only added to executed query
# so db.statement and/or db.query.text are set before add_sql_comment
self._populate_span(span, cursor, *args)
args = self._update_args_with_added_sql_comment(
args, cursor
)
else:
# no sqlcomment anywhere
self._populate_span(span, cursor, *args)
start_time = time.perf_counter()
error: Exception | None = None
try:
return query_method(*args, **kwargs)
except Exception as exc:
error = exc
raise
finally:
self._record_metrics(cursor, operation_name, start_time, error)
[docs] async def traced_execution_async(
self,
cursor: CursorT,
query_method: Callable[..., Awaitable[Any]],
*args: tuple[Any, ...],
**kwargs: dict[Any, Any],
):
operation_name = self.get_operation_name(cursor, args)
name = operation_name
if not name:
name = (
self._db_api_integration.database
if self._db_api_integration.database
else self._db_api_integration.name
)
with self._db_api_integration._tracer.start_as_current_span(
name, kind=SpanKind.CLIENT
) as span:
if span.is_recording():
if args and self._commenter_enabled:
if self._enable_attribute_commenter:
# sqlcomment is added to executed query and db.statement and/or db.query.text span attribute
args = self._update_args_with_added_sql_comment(
args, cursor
)
self._populate_span(span, cursor, *args)
else:
# sqlcomment is only added to executed query
# so db.statement and/or db.query.text are set before add_sql_comment
self._populate_span(span, cursor, *args)
args = self._update_args_with_added_sql_comment(
args, cursor
)
else:
# no sqlcomment anywhere
self._populate_span(span, cursor, *args)
start_time = time.perf_counter()
error: Exception | None = None
try:
return await query_method(*args, **kwargs)
except Exception as exc:
error = exc
raise
finally:
self._record_metrics(cursor, operation_name, start_time, error)
# pylint: disable=abstract-method,no-member
[docs]class TracedCursorProxy(ObjectProxy, Generic[CursorT]):
# pylint: disable=unused-argument
def __init__(
self,
cursor: CursorT,
db_api_integration: DatabaseApiIntegration,
):
ObjectProxy.__init__(self, cursor)
self._self_cursor_tracer = CursorTracer[CursorT](db_api_integration)
[docs] def execute(self, *args: Any, **kwargs: Any):
return self._self_cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.execute, *args, **kwargs
)
[docs] def executemany(self, *args: Any, **kwargs: Any):
return self._self_cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs
)
[docs] def callproc(self, *args: Any, **kwargs: Any):
return self._self_cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs
)
def __enter__(self):
self.__wrapped__.__enter__()
return self
def __exit__(self, *args, **kwargs):
self.__wrapped__.__exit__(*args, **kwargs)
[docs]def get_traced_cursor_proxy(
cursor: CursorT,
db_api_integration: DatabaseApiIntegration,
*args: Any,
**kwargs: Any,
) -> TracedCursorProxy[CursorT]:
return TracedCursorProxy(cursor, db_api_integration)