# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The integration with PostgreSQL supports the `Psycopg`_ library, it can be enabled by
using ``Psycopg2Instrumentor``.
.. _Psycopg: http://initd.org/psycopg/
SQLCOMMENTER
*****************************************
You can optionally configure Psycopg2 instrumentation to enable sqlcommenter which enriches
the query with contextual information.
Usage
-----
.. code:: python
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
Psycopg2Instrumentor().instrument(enable_commenter=True, commenter_options={})
For example,
::
Invoking cursor.execute("select * from auth_users") will lead to sql query "select * from auth_users" but when SQLCommenter is enabled
the query will get appended with some configurable tags like "select * from auth_users /*tag=value*/;"
SQLCommenter Configurations
***************************
We can configure the tags to be appended to the sqlquery log by adding configuration inside commenter_options(default:{}) keyword
db_driver = True(Default) or False
For example,
::
Enabling this flag will add psycopg2 and it's version which is /*psycopg2%%3A2.9.3*/
dbapi_threadsafety = True(Default) or False
For example,
::
Enabling this flag will add threadsafety /*dbapi_threadsafety=2*/
dbapi_level = True(Default) or False
For example,
::
Enabling this flag will add dbapi_level /*dbapi_level='2.0'*/
libpq_version = True(Default) or False
For example,
::
Enabling this flag will add libpq_version /*libpq_version=140001*/
driver_paramstyle = True(Default) or False
For example,
::
Enabling this flag will add driver_paramstyle /*driver_paramstyle='pyformat'*/
opentelemetry_values = True(Default) or False
For example,
::
Enabling this flag will add traceparent values /*traceparent='00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01'*/
Usage
-----
.. code-block:: python
import psycopg2
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
Psycopg2Instrumentor().instrument()
cnx = psycopg2.connect(database='Database')
cursor = cnx.cursor()
cursor.execute("INSERT INTO test (testField) VALUES (123)")
cursor.close()
cnx.close()
API
---
"""
import logging
import typing
from typing import Collection
import psycopg2
from psycopg2.extensions import (
cursor as pg_cursor, # pylint: disable=no-name-in-module
)
from psycopg2.sql import Composed # pylint: disable=no-name-in-module
from opentelemetry.instrumentation import dbapi
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.psycopg2.package import _instruments
from opentelemetry.instrumentation.psycopg2.version import __version__
_logger = logging.getLogger(__name__)
_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory"
[docs]class Psycopg2Instrumentor(BaseInstrumentor):
_CONNECTION_ATTRIBUTES = {
"database": "info.dbname",
"port": "info.port",
"host": "info.host",
"user": "info.user",
}
_DATABASE_SYSTEM = "postgresql"
[docs] def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
def _instrument(self, **kwargs):
"""Integrate with PostgreSQL Psycopg library.
Psycopg: http://initd.org/psycopg/
"""
tracer_provider = kwargs.get("tracer_provider")
enable_sqlcommenter = kwargs.get("enable_commenter", False)
commenter_options = kwargs.get("commenter_options", {})
dbapi.wrap_connect(
__name__,
psycopg2,
"connect",
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
db_api_integration_factory=DatabaseApiIntegration,
enable_commenter=enable_sqlcommenter,
commenter_options=commenter_options,
)
def _uninstrument(self, **kwargs):
""" "Disable Psycopg2 instrumentation"""
dbapi.unwrap_connect(psycopg2, "connect")
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
[docs] @staticmethod
def instrument_connection(connection, tracer_provider=None):
if not hasattr(connection, "_is_instrumented_by_opentelemetry"):
connection._is_instrumented_by_opentelemetry = False
if not connection._is_instrumented_by_opentelemetry:
setattr(
connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory
)
connection.cursor_factory = _new_cursor_factory(
tracer_provider=tracer_provider
)
connection._is_instrumented_by_opentelemetry = True
else:
_logger.warning(
"Attempting to instrument Psycopg connection while already instrumented"
)
return connection
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
[docs] @staticmethod
def uninstrument_connection(connection):
connection.cursor_factory = getattr(
connection, _OTEL_CURSOR_FACTORY_KEY, None
)
return connection
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
[docs]class DatabaseApiIntegration(dbapi.DatabaseApiIntegration):
[docs] def wrapped_connection(
self,
connect_method: typing.Callable[..., typing.Any],
args: typing.Tuple[typing.Any, typing.Any],
kwargs: typing.Dict[typing.Any, typing.Any],
):
"""Add object proxy to connection object."""
base_cursor_factory = kwargs.pop("cursor_factory", None)
new_factory_kwargs = {"db_api": self}
if base_cursor_factory:
new_factory_kwargs["base_factory"] = base_cursor_factory
kwargs["cursor_factory"] = _new_cursor_factory(**new_factory_kwargs)
connection = connect_method(*args, **kwargs)
self.get_connection_attributes(connection)
return connection
[docs]class CursorTracer(dbapi.CursorTracer):
[docs] def get_operation_name(self, cursor, args):
if not args:
return ""
statement = args[0]
if isinstance(statement, Composed):
statement = statement.as_string(cursor)
if isinstance(statement, str):
# Strip leading comments so we get the operation name.
return self._leading_comment_remover.sub("", statement).split()[0]
return ""
[docs] def get_statement(self, cursor, args):
if not args:
return ""
statement = args[0]
if isinstance(statement, Composed):
statement = statement.as_string(cursor)
return statement
def _new_cursor_factory(db_api=None, base_factory=None, tracer_provider=None):
if not db_api:
db_api = DatabaseApiIntegration(
__name__,
Psycopg2Instrumentor._DATABASE_SYSTEM,
connection_attributes=Psycopg2Instrumentor._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
base_factory = base_factory or pg_cursor
_cursor_tracer = CursorTracer(db_api)
class TracedCursorFactory(base_factory):
def execute(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self, super().execute, *args, **kwargs
)
def executemany(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self, super().executemany, *args, **kwargs
)
def callproc(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self, super().callproc, *args, **kwargs
)
return TracedCursorFactory