Source code for opentelemetry.instrumentation.pika.pika_instrumentor

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=unnecessary-dunder-call

from logging import getLogger
from typing import Any, Collection, Dict, Optional

import pika
import wrapt
from packaging import version
from pika.adapters import BlockingConnection
from pika.adapters.blocking_connection import BlockingChannel

from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.pika import utils
from opentelemetry.instrumentation.pika.package import _instruments
from opentelemetry.instrumentation.pika.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.trace import Tracer, TracerProvider

_LOG = getLogger(__name__)
_CTX_KEY = "__otel_task_span"

_FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"]


def _consumer_callback_attribute_name() -> str:
    pika_version = version.parse(pika.__version__)
    return (
        "on_message_callback"
        if pika_version >= version.parse("1.0.0")
        else "consumer_cb"
    )


[docs]class PikaInstrumentor(BaseInstrumentor): # type: ignore CONSUMER_CALLBACK_ATTR = _consumer_callback_attribute_name() # pylint: disable=attribute-defined-outside-init @staticmethod def _instrument_blocking_channel_consumers( channel: BlockingChannel, tracer: Tracer, consume_hook: utils.HookT = utils.dummy_callback, ) -> Any: for consumer_tag, consumer_info in channel._consumer_infos.items(): callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR consumer_callback = getattr(consumer_info, callback_attr, None) if consumer_callback is None: continue decorated_callback = utils._decorate_callback( consumer_callback, tracer, consumer_tag, consume_hook, ) setattr( decorated_callback, "_original_callback", consumer_callback, ) setattr(consumer_info, callback_attr, decorated_callback) @staticmethod def _instrument_basic_publish( channel: BlockingChannel, tracer: Tracer, publish_hook: utils.HookT = utils.dummy_callback, ) -> None: original_function = getattr(channel, "basic_publish") decorated_function = utils._decorate_basic_publish( original_function, channel, tracer, publish_hook ) setattr(decorated_function, "_original_function", original_function) channel.__setattr__("basic_publish", decorated_function) channel.basic_publish = decorated_function @staticmethod def _instrument_channel_functions( channel: BlockingChannel, tracer: Tracer, publish_hook: utils.HookT = utils.dummy_callback, ) -> None: if hasattr(channel, "basic_publish"): PikaInstrumentor._instrument_basic_publish( channel, tracer, publish_hook ) @staticmethod def _uninstrument_channel_functions(channel: BlockingChannel) -> None: for function_name in _FUNCTIONS_TO_UNINSTRUMENT: if not hasattr(channel, function_name): continue function = getattr(channel, function_name) if hasattr(function, "_original_function"): channel.__setattr__(function_name, function._original_function) unwrap(channel, "basic_consume")
[docs] @staticmethod # Make sure that the spans are created inside hash them set as parent and not as brothers def instrument_channel( channel: BlockingChannel, tracer_provider: Optional[TracerProvider] = None, publish_hook: utils.HookT = utils.dummy_callback, consume_hook: utils.HookT = utils.dummy_callback, ) -> None: if not hasattr(channel, "_is_instrumented_by_opentelemetry"): channel._is_instrumented_by_opentelemetry = False if channel._is_instrumented_by_opentelemetry: _LOG.warning( "Attempting to instrument Pika channel while already instrumented!" ) return tracer = trace.get_tracer( __name__, __version__, tracer_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) PikaInstrumentor._instrument_blocking_channel_consumers( channel, tracer, consume_hook ) PikaInstrumentor._decorate_basic_consume(channel, tracer, consume_hook) PikaInstrumentor._instrument_channel_functions( channel, tracer, publish_hook )
[docs] @staticmethod def uninstrument_channel(channel: BlockingChannel) -> None: if ( not hasattr(channel, "_is_instrumented_by_opentelemetry") or not channel._is_instrumented_by_opentelemetry ): _LOG.error( "Attempting to uninstrument Pika channel while already uninstrumented!" ) return for consumers_tag, client_info in channel._consumer_infos.items(): callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR consumer_callback = getattr(client_info, callback_attr, None) if hasattr(consumer_callback, "_original_callback"): channel._consumer_infos[ consumers_tag ] = consumer_callback._original_callback PikaInstrumentor._uninstrument_channel_functions(channel)
def _decorate_channel_function( self, tracer_provider: Optional[TracerProvider], publish_hook: utils.HookT = utils.dummy_callback, consume_hook: utils.HookT = utils.dummy_callback, ) -> None: def wrapper(wrapped, instance, args, kwargs): channel = wrapped(*args, **kwargs) self.instrument_channel( channel, tracer_provider=tracer_provider, publish_hook=publish_hook, consume_hook=consume_hook, ) return channel wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper) @staticmethod def _decorate_basic_consume( channel: BlockingChannel, tracer: Optional[Tracer], consume_hook: utils.HookT = utils.dummy_callback, ) -> None: def wrapper(wrapped, instance, args, kwargs): return_value = wrapped(*args, **kwargs) PikaInstrumentor._instrument_blocking_channel_consumers( channel, tracer, consume_hook ) return return_value wrapt.wrap_function_wrapper(channel, "basic_consume", wrapper) def _instrument(self, **kwargs: Dict[str, Any]) -> None: tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) publish_hook: utils.HookT = kwargs.get( "publish_hook", utils.dummy_callback ) consume_hook: utils.HookT = kwargs.get( "consume_hook", utils.dummy_callback ) self.__setattr__("__opentelemetry_tracer_provider", tracer_provider) self._decorate_channel_function( tracer_provider, publish_hook=publish_hook, consume_hook=consume_hook, ) def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: if hasattr(self, "__opentelemetry_tracer_provider"): delattr(self, "__opentelemetry_tracer_provider") unwrap(BlockingConnection, "channel")
[docs] def instrumentation_dependencies(self) -> Collection[str]: return _instruments