# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Instrument `sqlalchemy`_ to report SQL queries.
There are two options for instrumenting code. The first option is to use
the ``opentelemetry-instrument`` executable which will automatically
instrument your SQLAlchemy engine. The second is to programmatically enable
instrumentation via the following code:
.. _sqlalchemy: https://pypi.org/project/sqlalchemy/
SQLCOMMENTER
****************************************
You can optionally configure SQLAlchemy instrumentation to enable sqlcommenter which enriches
the query with contextual information.
Usage
-----
.. code:: python
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
SQLAlchemyInstrumentor().instrument(enable_commenter=True, commenter_options={})
For example,
::
Invoking engine.execute("select * from auth_users") will lead to sql query "select * from auth_users" but when SQLCommenter is enabled
the query will get appended with some configurable tags like "select * from auth_users /*tag=value*/;"
SQLCommenter Configurations
***************************
We can configure the tags to be appended to the sqlquery log by adding configuration inside commenter_options(default:{}) keyword
db_driver = True(Default) or False
For example,
::
Enabling this flag will add any underlying driver like psycopg2 /*db_driver='psycopg2'*/
db_framework = True(Default) or False
For example,
::
Enabling this flag will add db_framework and it's version /*db_framework='sqlalchemy:0.41b0'*/
opentelemetry_values = True(Default) or False
For example,
::
Enabling this flag will add traceparent values /*traceparent='00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01'*/
Usage
-----
.. code:: python
from sqlalchemy import create_engine
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
import sqlalchemy
engine = create_engine("sqlite:///:memory:")
SQLAlchemyInstrumentor().instrument(
engine=engine,
)
# of the async variant of SQLAlchemy
from sqlalchemy.ext.asyncio import create_async_engine
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
import sqlalchemy
engine = create_async_engine("sqlite:///:memory:")
SQLAlchemyInstrumentor().instrument(
engine=engine.sync_engine
)
API
---
"""
from collections.abc import Sequence
from typing import Collection
import sqlalchemy
from packaging.version import parse as parse_version
from sqlalchemy.engine.base import Engine
from wrapt import wrap_function_wrapper as _w
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.sqlalchemy.engine import (
EngineTracer,
_wrap_connect,
_wrap_create_async_engine,
_wrap_create_engine,
)
from opentelemetry.instrumentation.sqlalchemy.package import _instruments
from opentelemetry.instrumentation.sqlalchemy.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import get_meter
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.trace import get_tracer
[docs]class SQLAlchemyInstrumentor(BaseInstrumentor):
"""An instrumentor for SQLAlchemy
See `BaseInstrumentor`
"""
[docs] def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
def _instrument(self, **kwargs):
"""Instruments SQLAlchemy engine creation methods and the engine
if passed as an argument.
Args:
**kwargs: Optional arguments
``engine``: a SQLAlchemy engine instance
``engines``: a list of SQLAlchemy engine instances
``tracer_provider``: a TracerProvider, defaults to global
``meter_provider``: a MeterProvider, defaults to global
``enable_commenter``: bool to enable sqlcommenter, defaults to False
``commenter_options``: dict of sqlcommenter config, defaults to {}
Returns:
An instrumented engine if passed in as an argument or list of instrumented engines, None otherwise.
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(
__name__,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
meter_provider = kwargs.get("meter_provider")
meter = get_meter(
__name__,
__version__,
meter_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
connections_usage = meter.create_up_down_counter(
name=MetricInstruments.DB_CLIENT_CONNECTIONS_USAGE,
unit="connections",
description="The number of connections that are currently in state described by the state attribute.",
)
enable_commenter = kwargs.get("enable_commenter", False)
commenter_options = kwargs.get("commenter_options", {})
_w(
"sqlalchemy",
"create_engine",
_wrap_create_engine(
tracer, connections_usage, enable_commenter, commenter_options
),
)
_w(
"sqlalchemy.engine",
"create_engine",
_wrap_create_engine(
tracer, connections_usage, enable_commenter, commenter_options
),
)
_w(
"sqlalchemy.engine.base",
"Engine.connect",
_wrap_connect(tracer),
)
if parse_version(sqlalchemy.__version__).release >= (1, 4):
_w(
"sqlalchemy.ext.asyncio",
"create_async_engine",
_wrap_create_async_engine(
tracer,
connections_usage,
enable_commenter,
commenter_options,
),
)
if kwargs.get("engine") is not None:
return EngineTracer(
tracer,
kwargs.get("engine"),
connections_usage,
kwargs.get("enable_commenter", False),
kwargs.get("commenter_options", {}),
)
if kwargs.get("engines") is not None and isinstance(
kwargs.get("engines"), Sequence
):
return [
EngineTracer(
tracer,
engine,
connections_usage,
kwargs.get("enable_commenter", False),
kwargs.get("commenter_options", {}),
)
for engine in kwargs.get("engines")
]
return None
def _uninstrument(self, **kwargs):
unwrap(sqlalchemy, "create_engine")
unwrap(sqlalchemy.engine, "create_engine")
unwrap(Engine, "connect")
if parse_version(sqlalchemy.__version__).release >= (1, 4):
unwrap(sqlalchemy.ext.asyncio, "create_async_engine")
EngineTracer.remove_all_event_listeners()