# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
"""
The integration with PostgreSQL supports the `Psycopg`_ library. It can be enabled by
using ``PsycopgInstrumentor``.
.. _Psycopg: https://www.psycopg.org/psycopg3/docs/
Usage
-----
.. code-block:: python
import psycopg
from opentelemetry.instrumentation.psycopg import PsycopgInstrumentor
# Call instrument() to wrap all database connections
PsycopgInstrumentor().instrument()
cnx = psycopg.connect(database='Database')
cursor = cnx.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS test (testField INTEGER)")
cursor.execute("INSERT INTO test (testField) VALUES (123)")
cursor.close()
cnx.close()
.. code-block:: python
import psycopg
from opentelemetry.instrumentation.psycopg import PsycopgInstrumentor
# Alternatively, use instrument_connection for an individual connection
cnx = psycopg.connect(database='Database')
instrumented_cnx = PsycopgInstrumentor().instrument_connection(cnx)
cursor = instrumented_cnx.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS test (testField INTEGER)")
cursor.execute("INSERT INTO test (testField) VALUES (123)")
cursor.close()
instrumented_cnx.close()
Configuration
-------------
SQLCommenter
************
You can optionally configure Psycopg instrumentation to enable sqlcommenter which enriches
the query with contextual information. Queries made after setting up trace integration with
sqlcommenter enabled will have configurable key-value pairs appended to them, e.g.
``"select * from auth_users; /*traceparent=00-01234567-abcd-01*/"``. This supports context
propagation between database client and server when database log records are enabled.
For more information, see:
* `Semantic Conventions - Database Spans <https://github.com/open-telemetry/semantic-conventions/blob/main/docs/db/database-spans.md#sql-commenter>`_
* `sqlcommenter <https://google.github.io/sqlcommenter/>`_
.. code:: python
from opentelemetry.instrumentation.psycopg import PsycopgInstrumentor
PsycopgInstrumentor().instrument(enable_commenter=True)
SQLCommenter with commenter_options
***********************************
The key-value pairs appended to the query can be configured using
``commenter_options``. When sqlcommenter is enabled, all available KVs/tags
are calculated by default. ``commenter_options`` supports *opting out*
of specific KVs.
.. code:: python
from opentelemetry.instrumentation.psycopg import PsycopgInstrumentor
# Opts into sqlcomment for Psycopg trace integration.
# Opts out of tags for libpq_version, db_driver.
PsycopgInstrumentor().instrument(
enable_commenter=True,
commenter_options={
"libpq_version": False,
"db_driver": False,
}
)
Available commenter_options
###########################
The following sqlcomment key-values can be opted out of through ``commenter_options``:
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| Commenter Option | Description | Example |
+===========================+===========================================================+===========================================================================+
| ``db_driver`` | Database driver name with version. | ``psycopg='3.1.9'`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| ``dbapi_threadsafety`` | DB-API threadsafety value: 0-3 or unknown. | ``dbapi_threadsafety=2`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| ``dbapi_level`` | DB-API API level: 1.0, 2.0, or unknown. | ``dbapi_level='2.0'`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| ``driver_paramstyle`` | DB-API paramstyle for SQL statement parameter. | ``driver_paramstyle='pyformat'`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| ``libpq_version`` | PostgreSQL libpq version | ``libpq_version=140001`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
| ``opentelemetry_values`` | OpenTelemetry context as traceparent at time of query. | ``traceparent='00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01'`` |
+---------------------------+-----------------------------------------------------------+---------------------------------------------------------------------------+
SQLComment in span attribute
****************************
If sqlcommenter is enabled, you can opt into the inclusion of sqlcomment in
the query span ``db.statement`` and/or ``db.query.text`` attribute for your
needs. If ``commenter_options`` have been set, the span attribute comment
will also be configured by this setting.
.. code:: python
from opentelemetry.instrumentation.psycopg import PsycopgInstrumentor
# Opts into sqlcomment for Psycopg trace integration.
# Opts into sqlcomment for `db.statement` and/or `db.query.text` span attribute.
PsycopgInstrumentor().instrument(
enable_commenter=True,
enable_attribute_commenter=True,
)
Warning:
Capture of sqlcomment in ``db.statement``/``db.query.text`` may have high cardinality without platform normalization. See `Semantic Conventions for database spans <https://opentelemetry.io/docs/specs/semconv/database/database-spans/#generating-a-summary-of-the-query-text>`_ for more information.
API
---
"""
from __future__ import annotations
import logging
from typing import Any, Callable, Collection, TypeVar
import psycopg # pylint: disable=import-self
from psycopg.sql import Composable # pylint: disable=no-name-in-module
from opentelemetry.instrumentation import dbapi
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.psycopg.package import _instruments
from opentelemetry.instrumentation.psycopg.version import __version__
from opentelemetry.trace import TracerProvider
_logger = logging.getLogger(__name__)
_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory"
ConnectionT = TypeVar(
"ConnectionT", psycopg.Connection, psycopg.AsyncConnection
)
CursorT = TypeVar("CursorT", psycopg.Cursor, psycopg.AsyncCursor)
[docs]class PsycopgInstrumentor(BaseInstrumentor):
_CONNECTION_ATTRIBUTES = {
"database": "info.dbname",
"port": "info.port",
"host": "info.host",
"user": "info.user",
}
_DATABASE_SYSTEM = "postgresql"
[docs] def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
def _instrument(self, **kwargs: Any):
"""Integrate with PostgreSQL Psycopg library.
Psycopg: http://initd.org/psycopg/
"""
tracer_provider = kwargs.get("tracer_provider")
enable_sqlcommenter = kwargs.get("enable_commenter", False)
commenter_options = kwargs.get("commenter_options", {})
enable_attribute_commenter = kwargs.get(
"enable_attribute_commenter", False
)
capture_parameters = kwargs.get("capture_parameters", False)
dbapi.wrap_connect(
__name__,
psycopg,
"connect",
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
db_api_integration_factory=DatabaseApiIntegration,
enable_commenter=enable_sqlcommenter,
commenter_options=commenter_options,
enable_attribute_commenter=enable_attribute_commenter,
capture_parameters=capture_parameters,
)
dbapi.wrap_connect(
__name__,
psycopg.Connection, # pylint: disable=no-member
"connect",
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
db_api_integration_factory=DatabaseApiIntegration,
enable_commenter=enable_sqlcommenter,
commenter_options=commenter_options,
enable_attribute_commenter=enable_attribute_commenter,
capture_parameters=capture_parameters,
)
dbapi.wrap_connect(
__name__,
psycopg.AsyncConnection, # pylint: disable=no-member
"connect",
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
db_api_integration_factory=DatabaseApiAsyncIntegration,
enable_commenter=enable_sqlcommenter,
commenter_options=commenter_options,
enable_attribute_commenter=enable_attribute_commenter,
capture_parameters=capture_parameters,
)
def _uninstrument(self, **kwargs: Any):
""" "Disable Psycopg instrumentation"""
dbapi.unwrap_connect(psycopg, "connect") # pylint: disable=no-member
dbapi.unwrap_connect(
psycopg.Connection,
"connect", # pylint: disable=no-member
)
dbapi.unwrap_connect(
psycopg.AsyncConnection,
"connect", # pylint: disable=no-member
)
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
[docs] @staticmethod
def instrument_connection(
connection: ConnectionT, tracer_provider: TracerProvider | None = None
) -> ConnectionT:
"""Enable instrumentation in a psycopg connection.
Args:
connection: psycopg.Connection
The psycopg connection object to be instrumented.
tracer_provider: opentelemetry.trace.TracerProvider, optional
The TracerProvider to use for instrumentation. If not provided,
the global TracerProvider will be used.
Returns:
An instrumented psycopg connection object.
"""
if not hasattr(connection, "_is_instrumented_by_opentelemetry"):
connection._is_instrumented_by_opentelemetry = False
if not connection._is_instrumented_by_opentelemetry:
setattr(
connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory
)
if isinstance(connection, psycopg.AsyncConnection):
connection.cursor_factory = _new_cursor_async_factory(
tracer_provider=tracer_provider
)
else:
connection.cursor_factory = _new_cursor_factory(
tracer_provider=tracer_provider
)
connection._is_instrumented_by_opentelemetry = True
else:
_logger.warning(
"Attempting to instrument Psycopg connection while already instrumented"
)
return connection
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
[docs] @staticmethod
def uninstrument_connection(connection: ConnectionT) -> ConnectionT:
connection.cursor_factory = getattr(
connection, _OTEL_CURSOR_FACTORY_KEY, None
)
return connection
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
[docs]class DatabaseApiIntegration(dbapi.DatabaseApiIntegration):
[docs] def wrapped_connection(
self,
connect_method: Callable[..., Any],
args: tuple[Any, Any],
kwargs: dict[Any, Any],
):
"""Add object proxy to connection object."""
base_cursor_factory = kwargs.pop("cursor_factory", None)
new_factory_kwargs = {"db_api": self}
if base_cursor_factory:
new_factory_kwargs["base_factory"] = base_cursor_factory
kwargs["cursor_factory"] = _new_cursor_factory(**new_factory_kwargs)
connection = connect_method(*args, **kwargs)
self.get_connection_attributes(connection)
return connection
[docs]class DatabaseApiAsyncIntegration(dbapi.DatabaseApiIntegration):
[docs] async def wrapped_connection(
self,
connect_method: Callable[..., Any],
args: tuple[Any, Any],
kwargs: dict[Any, Any],
):
"""Add object proxy to connection object."""
base_cursor_factory = kwargs.pop("cursor_factory", None)
new_factory_kwargs = {"db_api": self}
if base_cursor_factory:
new_factory_kwargs["base_factory"] = base_cursor_factory
kwargs["cursor_factory"] = _new_cursor_async_factory(
**new_factory_kwargs
)
connection = await connect_method(*args, **kwargs)
self.get_connection_attributes(connection)
return connection
[docs]class CursorTracer(dbapi.CursorTracer):
[docs] def get_operation_name(self, cursor: CursorT, args: list[Any]) -> str:
if not args:
return ""
statement = args[0]
if isinstance(statement, Composable):
statement = statement.as_string(cursor)
# `statement` can be empty string. See #2643
if statement and isinstance(statement, str):
# Strip leading comments so we get the operation name.
return self._leading_comment_remover.sub("", statement).split()[0]
return ""
[docs] def get_statement(self, cursor: CursorT, args: list[Any]) -> str:
if not args:
return ""
statement = args[0]
if isinstance(statement, Composable):
statement = statement.as_string(cursor)
return statement
def _new_cursor_factory(
db_api: DatabaseApiIntegration | None = None,
base_factory: type[psycopg.Cursor] | None = None,
tracer_provider: TracerProvider | None = None,
):
if not db_api:
db_api = DatabaseApiIntegration(
__name__,
PsycopgInstrumentor._DATABASE_SYSTEM,
connection_attributes=PsycopgInstrumentor._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
base_factory = base_factory or psycopg.Cursor
_cursor_tracer = CursorTracer(db_api)
class TracedCursorFactory(base_factory):
def execute(self, *args: Any, **kwargs: Any):
return _cursor_tracer.traced_execution(
self, super().execute, *args, **kwargs
)
def executemany(self, *args: Any, **kwargs: Any):
return _cursor_tracer.traced_execution(
self, super().executemany, *args, **kwargs
)
def callproc(self, *args: Any, **kwargs: Any):
return _cursor_tracer.traced_execution(
self, super().callproc, *args, **kwargs
)
return TracedCursorFactory
def _new_cursor_async_factory(
db_api: DatabaseApiAsyncIntegration | None = None,
base_factory: type[psycopg.AsyncCursor] | None = None,
tracer_provider: TracerProvider | None = None,
):
if not db_api:
db_api = DatabaseApiAsyncIntegration(
__name__,
PsycopgInstrumentor._DATABASE_SYSTEM,
connection_attributes=PsycopgInstrumentor._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
base_factory = base_factory or psycopg.AsyncCursor
_cursor_tracer = CursorTracer(db_api)
class TracedCursorAsyncFactory(base_factory):
async def execute(self, *args: Any, **kwargs: Any):
return await _cursor_tracer.traced_execution_async(
self, super().execute, *args, **kwargs
)
async def executemany(self, *args: Any, **kwargs: Any):
return await _cursor_tracer.traced_execution_async(
self, super().executemany, *args, **kwargs
)
async def callproc(self, *args: Any, **kwargs: Any):
return await _cursor_tracer.traced_execution_async(
self, super().callproc, *args, **kwargs
)
return TracedCursorAsyncFactory