# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
"""
The integration with PostgreSQL supports the aiopg library,
it can be enabled by using ``AiopgInstrumentor``.
.. aiopg: https://github.com/aio-libs/aiopg
Usage
-----
.. code-block:: python
import asyncio
import aiopg
from opentelemetry.instrumentation.aiopg import AiopgInstrumentor
# Call instrument() to wrap all database connections
AiopgInstrumentor().instrument()
dsn = 'user=user password=password host=127.0.0.1'
async def connect():
cnx = await aiopg.connect(dsn)
cursor = await cnx.cursor()
await cursor.execute("CREATE TABLE IF NOT EXISTS test (testField INTEGER)")
await cursor.execute("INSERT INTO test (testField) VALUES (123)")
cursor.close()
cnx.close()
async def create_pool():
pool = await aiopg.create_pool(dsn)
cnx = await pool.acquire()
cursor = await cnx.cursor()
await cursor.execute("CREATE TABLE IF NOT EXISTS test (testField INTEGER)")
await cursor.execute("INSERT INTO test (testField) VALUES (123)")
cursor.close()
cnx.close()
asyncio.run(connect())
asyncio.run(create_pool())
.. code-block:: python
import asyncio
import aiopg
from opentelemetry.instrumentation.aiopg import AiopgInstrumentor
dsn = 'user=user password=password host=127.0.0.1'
# Alternatively, use instrument_connection for an individual connection
async def go():
cnx = await aiopg.connect(dsn)
instrumented_cnx = AiopgInstrumentor().instrument_connection(cnx)
cursor = await instrumented_cnx.cursor()
await cursor.execute("CREATE TABLE IF NOT EXISTS test (testField INTEGER)")
await cursor.execute("INSERT INTO test (testField) VALUES (123)")
cursor.close()
instrumented_cnx.close()
asyncio.run(go())
API
---
"""
from typing import Collection
from opentelemetry.instrumentation.aiopg import wrappers
from opentelemetry.instrumentation.aiopg.package import _instruments
from opentelemetry.instrumentation.aiopg.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
[docs]class AiopgInstrumentor(BaseInstrumentor):
_CONNECTION_ATTRIBUTES = {
"database": "info.dbname",
"port": "info.port",
"host": "info.host",
"user": "info.user",
}
_DATABASE_SYSTEM = "postgresql"
[docs] def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
def _instrument(self, **kwargs):
"""Integrate with PostgreSQL aiopg library.
aiopg: https://github.com/aio-libs/aiopg
"""
tracer_provider = kwargs.get("tracer_provider")
wrappers.wrap_connect(
__name__,
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
wrappers.wrap_create_pool(
__name__,
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
# pylint:disable=no-self-use
def _uninstrument(self, **kwargs):
""" "Disable aiopg instrumentation"""
wrappers.unwrap_connect()
wrappers.unwrap_create_pool()
# pylint:disable=no-self-use
[docs] def instrument_connection(self, connection, tracer_provider=None):
"""Enable instrumentation in a aiopg connection.
Args:
connection: The connection to instrument.
tracer_provider: The optional tracer provider to use. If omitted
the current globally configured one is used.
Returns:
An instrumented connection.
"""
return wrappers.instrument_connection(
__name__,
connection,
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
[docs] def uninstrument_connection(self, connection):
"""Disable instrumentation in a aiopg connection.
Args:
connection: The connection to uninstrument.
Returns:
An uninstrumented connection.
"""
return wrappers.uninstrument_connection(connection)