# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
#
"""
Instrument `redis`_ to report Redis queries.
.. _redis: https://pypi.org/project/redis/
Instrument All Clients
----------------------
The easiest way to instrument all redis client instances is by
``RedisInstrumentor().instrument()``:
.. code:: python
from opentelemetry.instrumentation.redis import RedisInstrumentor
import redis
# Instrument redis
RedisInstrumentor().instrument()
# This will report a span with the default settings
client = redis.StrictRedis(host="localhost", port=6379)
client.get("my-key")
Async Redis clients (i.e. ``redis.asyncio.Redis``) are also instrumented in the same way:
.. code:: python
from opentelemetry.instrumentation.redis import RedisInstrumentor
import redis.asyncio
# Instrument redis
RedisInstrumentor().instrument()
# This will report a span with the default settings
async def redis_get():
client = redis.asyncio.Redis(host="localhost", port=6379)
await client.get("my-key")
.. note::
Calling the ``instrument`` method will instrument the client classes, so any client
created after the ``instrument`` call will be instrumented. To instrument only a
single client, use :func:`RedisInstrumentor.instrument_client` method.
Instrument Single Client
------------------------
The :func:`RedisInstrumentor.instrument_client` can instrument a connection instance. This is useful when there are multiple clients with a different redis database index.
Or, you might have a different connection pool used for an application function you
don't want instrumented.
.. code:: python
from opentelemetry.instrumentation.redis import RedisInstrumentor
import redis
instrumented_client = redis.Redis()
not_instrumented_client = redis.Redis()
# Instrument redis
RedisInstrumentor.instrument_client(client=instrumented_client)
# This will report a span with the default settings
instrumented_client.get("my-key")
# This will not have a span
not_instrumented_client.get("my-key")
.. warning::
All client instances created after calling ``RedisInstrumentor().instrument`` will
be instrumented. To avoid instrumenting all clients, use
:func:`RedisInstrumentor.instrument_client` .
Request/Response Hooks
----------------------
.. code:: python
from opentelemetry.instrumentation.redis import RedisInstrumentor
import redis
def request_hook(span, instance, args, kwargs):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_request_hook", "some-value")
def response_hook(span, instance, response):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_response_hook", "some-value")
# Instrument redis with hooks
RedisInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook)
# This will report a span with the default settings and the custom attributes added from the hooks
client = redis.StrictRedis(host="localhost", port=6379)
client.get("my-key")
Suppress Instrumentation
------------------------
You can use the ``suppress_instrumentation`` context manager to prevent instrumentation
from being applied to specific Redis operations. This is useful when you want to avoid
creating spans for internal operations, health checks, or during specific code paths.
.. code:: python
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.utils import suppress_instrumentation
import redis
# Instrument redis
RedisInstrumentor().instrument()
client = redis.StrictRedis(host="localhost", port=6379)
# This will report a span
client.get("my-key")
# This will NOT report a span
with suppress_instrumentation():
client.get("internal-key")
client.set("cache-key", "value")
# This will report a span again
client.get("another-key")
API
---
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, Callable, Collection
import redis
from wrapt import wrap_function_wrapper
from opentelemetry import trace
from opentelemetry.instrumentation._semconv import (
_get_schema_url_for_signal_types,
_get_semconv_opt_in_modes,
_OpenTelemetrySemanticConventionStability,
_OpenTelemetryStabilitySignalType,
_set_db_statement,
)
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.redis.package import _instruments
from opentelemetry.instrumentation.redis.util import (
_add_create_attributes,
_add_search_attributes,
_build_span_meta_data_for_pipeline,
_build_span_name,
_format_command_args,
_set_connection_attributes,
)
from opentelemetry.instrumentation.redis.version import __version__
from opentelemetry.instrumentation.utils import (
is_instrumentation_enabled,
unwrap,
)
from opentelemetry.trace import (
StatusCode,
Tracer,
TracerProvider,
get_tracer,
)
if TYPE_CHECKING:
from typing import Awaitable
import redis.asyncio.client
import redis.asyncio.cluster
import redis.client
import redis.cluster
import redis.connection
from opentelemetry.instrumentation.redis.custom_types import (
AsyncPipelineInstance,
AsyncRedisInstance,
PipelineInstance,
R,
RedisInstance,
RequestHook,
ResponseHook,
)
_logger = logging.getLogger(__name__)
_REDIS_ASYNCIO_VERSION = (4, 2, 0)
_REDIS_CLUSTER_VERSION = (4, 1, 0)
_REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 2)
_CLIENT_ASYNCIO_SUPPORT = redis.VERSION >= _REDIS_ASYNCIO_VERSION
_CLIENT_ASYNCIO_CLUSTER_SUPPORT = (
redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION
)
_CLIENT_CLUSTER_SUPPORT = redis.VERSION >= _REDIS_CLUSTER_VERSION
_CLIENT_BEFORE_V3 = redis.VERSION < (3, 0, 0)
if _CLIENT_ASYNCIO_SUPPORT:
import redis.asyncio
_INSTRUMENTATION_ATTR = "_is_instrumented_by_opentelemetry"
def _traced_execute_factory(
tracer: Tracer,
request_hook: RequestHook | None = None,
response_hook: ResponseHook | None = None,
):
sem_conv_opt_in_modes = _get_semconv_opt_in_modes(
(
_OpenTelemetryStabilitySignalType.DATABASE,
_OpenTelemetryStabilitySignalType.HTTP,
)
)
db_sem_conv_opt_in_mode = sem_conv_opt_in_modes[
_OpenTelemetryStabilitySignalType.DATABASE
]
http_sem_conv_opt_in_mode = sem_conv_opt_in_modes[
_OpenTelemetryStabilitySignalType.HTTP
]
def _traced_execute_command(
func: Callable[..., R],
instance: RedisInstance,
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> R:
if not is_instrumentation_enabled():
return func(*args, **kwargs)
query = _format_command_args(args)
name = _build_span_name(instance, args)
with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span_attrs = {}
_set_db_statement(span_attrs, query, db_sem_conv_opt_in_mode)
span_attrs["db.redis.args_length"] = len(args)
# Set all DB attributes
for key, value in span_attrs.items():
span.set_attribute(key, value)
_set_connection_attributes(
span,
instance,
db_sem_conv_opt_in_mode,
http_sem_conv_opt_in_mode,
)
if span.name == "redis.create_index":
_add_create_attributes(span, args)
if callable(request_hook):
request_hook(span, instance, args, kwargs)
response = func(*args, **kwargs)
if span.is_recording():
if span.name == "redis.search":
_add_search_attributes(span, response, args)
if callable(response_hook):
response_hook(span, instance, response)
return response
return _traced_execute_command
def _traced_execute_pipeline_factory(
tracer: Tracer,
request_hook: RequestHook | None = None,
response_hook: ResponseHook | None = None,
):
sem_conv_opt_in_modes = _get_semconv_opt_in_modes(
(
_OpenTelemetryStabilitySignalType.DATABASE,
_OpenTelemetryStabilitySignalType.HTTP,
)
)
db_sem_conv_opt_in_mode = sem_conv_opt_in_modes[
_OpenTelemetryStabilitySignalType.DATABASE
]
http_sem_conv_opt_in_mode = sem_conv_opt_in_modes[
_OpenTelemetryStabilitySignalType.HTTP
]
def _traced_execute_pipeline(
func: Callable[..., R],
instance: PipelineInstance,
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> R:
if not is_instrumentation_enabled():
return func(*args, **kwargs)
(
command_stack,
resource,
span_name,
) = _build_span_meta_data_for_pipeline(instance)
exception = None
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span_attrs = {}
_set_db_statement(
span_attrs, resource, db_sem_conv_opt_in_mode
)
span_attrs["db.redis.pipeline_length"] = len(command_stack)
# Set all DB attributes
for key, value in span_attrs.items():
span.set_attribute(key, value)
_set_connection_attributes(
span,
instance,
db_sem_conv_opt_in_mode,
http_sem_conv_opt_in_mode,
)
response = None
try:
response = func(*args, **kwargs)
except redis.WatchError as watch_exception:
span.set_status(StatusCode.UNSET)
exception = watch_exception
if callable(response_hook):
response_hook(span, instance, response)
if exception:
raise exception
return response
return _traced_execute_pipeline
def _async_traced_execute_factory(
tracer: Tracer,
request_hook: RequestHook | None = None,
response_hook: ResponseHook | None = None,
):
sem_conv_opt_in_modes = _get_semconv_opt_in_modes(
(
_OpenTelemetryStabilitySignalType.DATABASE,
_OpenTelemetryStabilitySignalType.HTTP,
)
)
db_sem_conv_opt_in_mode = sem_conv_opt_in_modes[
_OpenTelemetryStabilitySignalType.DATABASE
]
http_sem_conv_opt_in_mode = sem_conv_opt_in_modes[
_OpenTelemetryStabilitySignalType.HTTP
]
async def _async_traced_execute_command(
func: Callable[..., Awaitable[R]],
instance: AsyncRedisInstance,
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> Awaitable[R]:
if not is_instrumentation_enabled():
return await func(*args, **kwargs)
query = _format_command_args(args)
name = _build_span_name(instance, args)
with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span_attrs = {}
_set_db_statement(span_attrs, query, db_sem_conv_opt_in_mode)
span_attrs["db.redis.args_length"] = len(args)
# Set all DB attributes
for key, value in span_attrs.items():
span.set_attribute(key, value)
_set_connection_attributes(
span,
instance,
db_sem_conv_opt_in_mode,
http_sem_conv_opt_in_mode,
)
if callable(request_hook):
request_hook(span, instance, args, kwargs)
response = await func(*args, **kwargs)
if callable(response_hook):
response_hook(span, instance, response)
return response
return _async_traced_execute_command
def _async_traced_execute_pipeline_factory(
tracer: Tracer,
request_hook: RequestHook | None = None,
response_hook: ResponseHook | None = None,
):
sem_conv_opt_in_modes = _get_semconv_opt_in_modes(
(
_OpenTelemetryStabilitySignalType.DATABASE,
_OpenTelemetryStabilitySignalType.HTTP,
)
)
db_sem_conv_opt_in_mode = sem_conv_opt_in_modes[
_OpenTelemetryStabilitySignalType.DATABASE
]
http_sem_conv_opt_in_mode = sem_conv_opt_in_modes[
_OpenTelemetryStabilitySignalType.HTTP
]
async def _async_traced_execute_pipeline(
func: Callable[..., Awaitable[R]],
instance: AsyncPipelineInstance,
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> Awaitable[R]:
if not is_instrumentation_enabled():
return await func(*args, **kwargs)
(
command_stack,
resource,
span_name,
) = _build_span_meta_data_for_pipeline(instance)
exception = None
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span_attrs = {}
_set_db_statement(
span_attrs, resource, db_sem_conv_opt_in_mode
)
span_attrs["db.redis.pipeline_length"] = len(command_stack)
# Set all DB attributes
for key, value in span_attrs.items():
span.set_attribute(key, value)
_set_connection_attributes(
span,
instance,
db_sem_conv_opt_in_mode,
http_sem_conv_opt_in_mode,
)
response = None
try:
response = await func(*args, **kwargs)
except redis.WatchError as watch_exception:
span.set_status(StatusCode.UNSET)
exception = watch_exception
if callable(response_hook):
response_hook(span, instance, response)
if exception:
raise exception
return response
return _async_traced_execute_pipeline
# pylint: disable=R0915
def _instrument(
tracer: Tracer,
request_hook: RequestHook | None = None,
response_hook: ResponseHook | None = None,
):
_traced_execute_command = _traced_execute_factory(
tracer, request_hook, response_hook
)
_traced_execute_pipeline = _traced_execute_pipeline_factory(
tracer, request_hook, response_hook
)
pipeline_class = "BasePipeline" if _CLIENT_BEFORE_V3 else "Pipeline"
redis_class = "StrictRedis" if _CLIENT_BEFORE_V3 else "Redis"
wrap_function_wrapper(
"redis", f"{redis_class}.execute_command", _traced_execute_command
)
wrap_function_wrapper(
"redis.client",
f"{pipeline_class}.execute",
_traced_execute_pipeline,
)
wrap_function_wrapper(
"redis.client",
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
)
if _CLIENT_CLUSTER_SUPPORT:
wrap_function_wrapper(
"redis.cluster",
"RedisCluster.execute_command",
_traced_execute_command,
)
wrap_function_wrapper(
"redis.cluster",
"ClusterPipeline.execute",
_traced_execute_pipeline,
)
_async_traced_execute_command = _async_traced_execute_factory(
tracer, request_hook, response_hook
)
_async_traced_execute_pipeline = _async_traced_execute_pipeline_factory(
tracer, request_hook, response_hook
)
if _CLIENT_ASYNCIO_SUPPORT:
wrap_function_wrapper(
"redis.asyncio",
f"{redis_class}.execute_command",
_async_traced_execute_command,
)
wrap_function_wrapper(
"redis.asyncio.client",
f"{pipeline_class}.execute",
_async_traced_execute_pipeline,
)
wrap_function_wrapper(
"redis.asyncio.client",
f"{pipeline_class}.immediate_execute_command",
_async_traced_execute_command,
)
if _CLIENT_ASYNCIO_CLUSTER_SUPPORT:
wrap_function_wrapper(
"redis.asyncio.cluster",
"RedisCluster.execute_command",
_async_traced_execute_command,
)
wrap_function_wrapper(
"redis.asyncio.cluster",
"ClusterPipeline.execute",
_async_traced_execute_pipeline,
)
def _instrument_client(
client,
tracer: Tracer,
request_hook: RequestHook | None = None,
response_hook: ResponseHook | None = None,
):
# first, handle async clients and cluster clients
_async_traced_execute = _async_traced_execute_factory(
tracer, request_hook, response_hook
)
_async_traced_execute_pipeline = _async_traced_execute_pipeline_factory(
tracer, request_hook, response_hook
)
if _CLIENT_ASYNCIO_SUPPORT and isinstance(client, redis.asyncio.Redis):
def _async_pipeline_wrapper(func, instance, args, kwargs):
result = func(*args, **kwargs)
wrap_function_wrapper(
result, "execute", _async_traced_execute_pipeline
)
wrap_function_wrapper(
result, "immediate_execute_command", _async_traced_execute
)
return result
wrap_function_wrapper(client, "execute_command", _async_traced_execute)
wrap_function_wrapper(client, "pipeline", _async_pipeline_wrapper)
return
if _CLIENT_ASYNCIO_CLUSTER_SUPPORT and isinstance(
client, redis.asyncio.RedisCluster
):
def _async_cluster_pipeline_wrapper(func, instance, args, kwargs):
result = func(*args, **kwargs)
wrap_function_wrapper(
result, "execute", _async_traced_execute_pipeline
)
return result
wrap_function_wrapper(client, "execute_command", _async_traced_execute)
wrap_function_wrapper(
client, "pipeline", _async_cluster_pipeline_wrapper
)
return
# for redis.client.Redis, redis.Cluster and v3.0.0 redis.client.StrictRedis
# the wrappers are the same
_traced_execute = _traced_execute_factory(
tracer, request_hook, response_hook
)
_traced_execute_pipeline = _traced_execute_pipeline_factory(
tracer, request_hook, response_hook
)
def _pipeline_wrapper(func, instance, args, kwargs):
result = func(*args, **kwargs)
wrap_function_wrapper(result, "execute", _traced_execute_pipeline)
wrap_function_wrapper(
result, "immediate_execute_command", _traced_execute
)
return result
wrap_function_wrapper(
client,
"execute_command",
_traced_execute,
)
wrap_function_wrapper(
client,
"pipeline",
_pipeline_wrapper,
)
[docs]class RedisInstrumentor(BaseInstrumentor):
@staticmethod
def _get_tracer(**kwargs):
# Initialize semantic conventions opt-in if needed
_OpenTelemetrySemanticConventionStability._initialize()
# Redis instrumentation supports both DATABASE and HTTP signal types
signal_types = [
_OpenTelemetryStabilitySignalType.DATABASE,
_OpenTelemetryStabilitySignalType.HTTP,
]
tracer_provider = kwargs.get("tracer_provider")
return get_tracer(
__name__,
__version__,
tracer_provider=tracer_provider,
schema_url=_get_schema_url_for_signal_types(signal_types),
)
[docs] def instrument(
self,
tracer_provider: TracerProvider | None = None,
request_hook: RequestHook | None = None,
response_hook: ResponseHook | None = None,
**kwargs,
):
"""Instruments all Redis/StrictRedis/RedisCluster and async client instances.
Args:
tracer_provider: A TracerProvider, defaults to global.
request_hook:
a function with extra user-defined logic to run before performing the request.
The ``args`` is a tuple, where items are
command arguments. For example ``client.set("mykey", "value", ex=5)`` would
have ``args`` as ``('SET', 'mykey', 'value', 'EX', 5)``.
The ``kwargs`` represents occasional ``options`` passed by redis. For example,
if you use ``client.set("mykey", "value", get=True)``, the ``kwargs`` would be
``{'get': True}``.
response_hook:
a function with extra user-defined logic to run after the request is complete.
The ``args`` represents the response.
"""
super().instrument(
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
**kwargs,
)
def _instrument(self, **kwargs: Any):
"""Instruments the redis module
Args:
**kwargs: Optional arguments
``tracer_provider``: a TracerProvider, defaults to global.
``request_hook``: An optional callback that is invoked right after a span is created.
``response_hook``: An optional callback which is invoked right before the span is finished processing a response.
"""
_instrument(
self._get_tracer(**kwargs),
request_hook=kwargs.get("request_hook"),
response_hook=kwargs.get("response_hook"),
)
def _uninstrument(self, **kwargs: Any):
if _CLIENT_BEFORE_V3:
unwrap(redis.StrictRedis, "execute_command")
unwrap(redis.StrictRedis, "pipeline")
unwrap(redis.Redis, "pipeline")
unwrap(
redis.client.BasePipeline, # pylint:disable=no-member
"execute",
)
unwrap(
redis.client.BasePipeline, # pylint:disable=no-member
"immediate_execute_command",
)
else:
unwrap(redis.Redis, "execute_command")
unwrap(redis.Redis, "pipeline")
unwrap(redis.client.Pipeline, "execute")
unwrap(redis.client.Pipeline, "immediate_execute_command")
if _CLIENT_CLUSTER_SUPPORT:
unwrap(redis.cluster.RedisCluster, "execute_command")
unwrap(redis.cluster.ClusterPipeline, "execute")
if _CLIENT_ASYNCIO_SUPPORT:
unwrap(redis.asyncio.Redis, "execute_command")
unwrap(redis.asyncio.Redis, "pipeline")
unwrap(redis.asyncio.client.Pipeline, "execute")
unwrap(redis.asyncio.client.Pipeline, "immediate_execute_command")
if _CLIENT_ASYNCIO_CLUSTER_SUPPORT:
unwrap(redis.asyncio.cluster.RedisCluster, "execute_command")
unwrap(redis.asyncio.cluster.ClusterPipeline, "execute")
[docs] @staticmethod
def instrument_client(
client: redis.StrictRedis
| redis.Redis
| redis.asyncio.Redis
| redis.cluster.RedisCluster
| redis.asyncio.cluster.RedisCluster,
tracer_provider: TracerProvider | None = None,
request_hook: RequestHook | None = None,
response_hook: ResponseHook | None = None,
):
"""Instrument the provided Redis Client. The client can be sync or async.
Cluster client is also supported.
Args:
client: The redis client.
tracer_provider: A TracerProvider, defaults to global.
request_hook: a function with extra user-defined logic to run before
performing the request.
The ``args`` is a tuple, where items are
command arguments. For example ``client.set("mykey", "value", ex=5)`` would
have ``args`` as ``('SET', 'mykey', 'value', 'EX', 5)``.
The ``kwargs`` represents occasional ``options`` passed by redis. For example,
if you use ``client.set("mykey", "value", get=True)``, the ``kwargs`` would be
``{'get': True}``.
response_hook: a function with extra user-defined logic to run after
the request is complete.
The ``args`` represents the response.
"""
if not hasattr(client, _INSTRUMENTATION_ATTR):
setattr(client, _INSTRUMENTATION_ATTR, False)
if not getattr(client, _INSTRUMENTATION_ATTR):
_instrument_client(
client,
RedisInstrumentor._get_tracer(tracer_provider=tracer_provider),
request_hook=request_hook,
response_hook=response_hook,
)
setattr(client, _INSTRUMENTATION_ATTR, True)
else:
_logger.warning(
"Attempting to instrument Redis connection while already instrumented"
)
[docs] @staticmethod
def uninstrument_client(
client: redis.StrictRedis
| redis.Redis
| redis.asyncio.Redis
| redis.cluster.RedisCluster
| redis.asyncio.cluster.RedisCluster,
):
"""Disables instrumentation for the given client instance
Args:
client: The redis client
"""
if getattr(client, _INSTRUMENTATION_ATTR):
# for all clients we need to unwrap execute_command and pipeline functions
unwrap(client, "execute_command")
# the method was creating a pipeline and wrapping the functions of the
# created instance. any pipelines created before un-instrumenting will
# remain instrumented (pipelines should usually have a short span)
unwrap(client, "pipeline")
else:
_logger.warning(
"Attempting to un-instrument Redis connection that wasn't instrumented"
)
[docs] def instrumentation_dependencies(self) -> Collection[str]:
"""Return a list of python packages with versions that the will be instrumented."""
return _instruments