Source code for opentelemetry.instrumentation.tortoiseorm

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Instrument tortoise-orm to report SQL queries.

Usage
-----

.. code:: python

    from fastapi import FastAPI
    from tortoise.contrib.fastapi import register_tortoise
    from opentelemetry.sdk.resources import SERVICE_NAME, Resource
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.instrumentation.tortoiseorm import TortoiseORMInstrumentor

    app = FastAPI()
    tracer = TracerProvider(resource=Resource({SERVICE_NAME: "FastAPI"}))
    TortoiseORMInstrumentor().instrument(tracer_provider=tracer)

    register_tortoise(
        app,
        db_url="sqlite://sample.db",
        modules={"models": ["example_app.db_models"]}
    )

API
---
"""
from typing import Collection

import wrapt

from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.tortoiseorm.package import _instruments
from opentelemetry.instrumentation.tortoiseorm.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCode

try:
    import tortoise.backends.asyncpg.client

    TORTOISE_POSTGRES_SUPPORT = True
except ModuleNotFoundError:
    TORTOISE_POSTGRES_SUPPORT = False

try:
    import tortoise.backends.mysql.client

    TORTOISE_MYSQL_SUPPORT = True
except ModuleNotFoundError:
    TORTOISE_MYSQL_SUPPORT = False

try:
    import tortoise.backends.sqlite.client

    TORTOISE_SQLITE_SUPPORT = True
except ModuleNotFoundError:
    TORTOISE_SQLITE_SUPPORT = False

import tortoise.contrib.pydantic.base


[docs]class TortoiseORMInstrumentor(BaseInstrumentor): """An instrumentor for Tortoise-ORM See `BaseInstrumentor` """
[docs] def instrumentation_dependencies(self) -> Collection[str]: return _instruments
def _instrument(self, **kwargs): """Instruments Tortoise ORM backend methods. Args: **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global ``capture_parameters``: set to True to capture SQL query parameters Returns: None """ tracer_provider = kwargs.get("tracer_provider") # pylint: disable=attribute-defined-outside-init self._tracer = trace.get_tracer( __name__, __version__, tracer_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) self.capture_parameters = kwargs.get("capture_parameters", False) if TORTOISE_SQLITE_SUPPORT: funcs = [ "SqliteClient.execute_many", "SqliteClient.execute_query", "SqliteClient.execute_insert", "SqliteClient.execute_query_dict", "SqliteClient.execute_script", ] for func in funcs: wrapt.wrap_function_wrapper( "tortoise.backends.sqlite.client", func, self._do_execute, ) if TORTOISE_POSTGRES_SUPPORT: funcs = [ "AsyncpgDBClient.execute_many", "AsyncpgDBClient.execute_query", "AsyncpgDBClient.execute_insert", "AsyncpgDBClient.execute_query_dict", "AsyncpgDBClient.execute_script", ] for func in funcs: wrapt.wrap_function_wrapper( "tortoise.backends.asyncpg.client", func, self._do_execute, ) if TORTOISE_MYSQL_SUPPORT: funcs = [ "MySQLClient.execute_many", "MySQLClient.execute_query", "MySQLClient.execute_insert", "MySQLClient.execute_query_dict", "MySQLClient.execute_script", ] for func in funcs: wrapt.wrap_function_wrapper( "tortoise.backends.mysql.client", func, self._do_execute, ) wrapt.wrap_function_wrapper( "tortoise.contrib.pydantic.base", "PydanticModel.from_queryset", self._from_queryset, ) wrapt.wrap_function_wrapper( "tortoise.contrib.pydantic.base", "PydanticModel.from_queryset_single", self._from_queryset, ) wrapt.wrap_function_wrapper( "tortoise.contrib.pydantic.base", "PydanticListModel.from_queryset", self._from_queryset, ) def _uninstrument(self, **kwargs): if TORTOISE_SQLITE_SUPPORT: unwrap( tortoise.backends.sqlite.client.SqliteClient, "execute_query" ) unwrap( tortoise.backends.sqlite.client.SqliteClient, "execute_many" ) unwrap( tortoise.backends.sqlite.client.SqliteClient, "execute_insert" ) unwrap( tortoise.backends.sqlite.client.SqliteClient, "execute_query_dict", ) unwrap( tortoise.backends.sqlite.client.SqliteClient, "execute_script" ) if TORTOISE_MYSQL_SUPPORT: unwrap(tortoise.backends.mysql.client.MySQLClient, "execute_query") unwrap(tortoise.backends.mysql.client.MySQLClient, "execute_many") unwrap( tortoise.backends.mysql.client.MySQLClient, "execute_insert" ) unwrap( tortoise.backends.mysql.client.MySQLClient, "execute_query_dict", ) unwrap( tortoise.backends.mysql.client.MySQLClient, "execute_script" ) if TORTOISE_POSTGRES_SUPPORT: unwrap( tortoise.backends.asyncpg.client.AsyncpgDBClient, "execute_query", ) unwrap( tortoise.backends.asyncpg.client.AsyncpgDBClient, "execute_many", ) unwrap( tortoise.backends.asyncpg.client.AsyncpgDBClient, "execute_insert", ) unwrap( tortoise.backends.asyncpg.client.AsyncpgDBClient, "execute_query_dict", ) unwrap( tortoise.backends.asyncpg.client.AsyncpgDBClient, "execute_script", ) unwrap(tortoise.contrib.pydantic.base.PydanticModel, "from_queryset") unwrap( tortoise.contrib.pydantic.base.PydanticModel, "from_queryset_single", ) unwrap( tortoise.contrib.pydantic.base.PydanticListModel, "from_queryset" ) def _hydrate_span_from_args(self, connection, query, parameters) -> dict: """Get network and database attributes from connection.""" span_attributes = {} capabilities = getattr(connection, "capabilities", None) if capabilities is not None: if capabilities.dialect == "sqlite": span_attributes[ SpanAttributes.DB_SYSTEM ] = DbSystemValues.SQLITE.value elif capabilities.dialect == "postgres": span_attributes[ SpanAttributes.DB_SYSTEM ] = DbSystemValues.POSTGRESQL.value elif capabilities.dialect == "mysql": span_attributes[ SpanAttributes.DB_SYSTEM ] = DbSystemValues.MYSQL.value dbname = getattr(connection, "filename", None) if dbname: span_attributes[SpanAttributes.DB_NAME] = dbname dbname = getattr(connection, "database", None) if dbname: span_attributes[SpanAttributes.DB_NAME] = dbname if query is not None: span_attributes[SpanAttributes.DB_STATEMENT] = query user = getattr(connection, "user", None) if user: span_attributes[SpanAttributes.DB_USER] = user host = getattr(connection, "host", None) if host: span_attributes[SpanAttributes.NET_PEER_NAME] = host port = getattr(connection, "port", None) if port: span_attributes[SpanAttributes.NET_PEER_PORT] = port if self.capture_parameters: if parameters is not None and len(parameters) > 0: span_attributes["db.statement.parameters"] = str(parameters) return span_attributes async def _do_execute(self, func, instance, args, kwargs): exception = None name = args[0].split()[0] with self._tracer.start_as_current_span( name, kind=SpanKind.CLIENT ) as span: if span.is_recording(): span_attributes = self._hydrate_span_from_args( instance, args[0], args[1:], ) for attribute, value in span_attributes.items(): span.set_attribute(attribute, value) try: result = await func(*args, **kwargs) except Exception as exc: # pylint: disable=W0703 exception = exc raise finally: if span.is_recording() and exception is not None: span.set_status(Status(StatusCode.ERROR)) return result async def _from_queryset(self, func, modelcls, args, kwargs): exception = None name = f"pydantic.{func.__name__}" with self._tracer.start_as_current_span( name, kind=SpanKind.INTERNAL, ) as span: if span.is_recording(): span_attributes = {} model_config = getattr(modelcls, "Config", None) if model_config: model_title = getattr( modelcls.Config, "title", modelcls.__name__, ) if model_title: span_attributes["pydantic.model"] = model_title for attribute, value in span_attributes.items(): span.set_attribute(attribute, value) try: result = await func(*args, **kwargs) except Exception as exc: # pylint: disable=W0703 exception = exc raise finally: if span.is_recording() and exception is not None: span.set_status(Status(StatusCode.ERROR)) return result