OpenTelemetry Pika Instrumentation

Instrument pika to trace RabbitMQ applications.

Usage

  • Start broker backend

docker run -p 5672:5672 rabbitmq
  • Run instrumented task

import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor

PikaInstrumentor().instrument()

connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
  • PikaInstrumentor also supports instrumentation of a single channel

import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor

connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

pika_instrumentation = PikaInstrumentor()
pika_instrumentation.instrument_channel(channel=channel)


channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')

pika_instrumentation.uninstrument_channel(channel=channel)
  • PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider

PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider)
  • PikaInstrumentor also supports instrumenting with hooks that will be called when producing or consuming a message. The hooks should be of type “Callable[[Span, bytes, BasicProperties], None]” where the first parameter is the span, the second parameter is the message body and the third parameter is the message properties

def publish_hook(span: Span, body: bytes, properties: BasicProperties):
    span.set_attribute("messaging.payload", body.decode())

def consume_hook(span: Span, body: bytes, properties: BasicProperties):
    span.set_attribute("messaging.id", properties.message_id)

PikaInstrumentor.instrument_channel(channel, publish_hook=publish_hook, consume_hook=consume_hook)

Consumer Instrumentation

For consumer instrumentation, pika supports two consuming modes:

  • Consumers using the basic_consume method which accepts a callback. This is supported for global instrumentation (PikaInstrumentor().instrument()) as well channel specific instrumentation (PikaInstrumentor().instrument_channel(channel))

  • Consumers using the consume method which returns a generator over messages. This is supported for global instrumentations only (PikaInstrumentor().instrument())

API

class opentelemetry.instrumentation.pika.PikaInstrumentor(*args, **kwargs)[source]

Bases: BaseInstrumentor

CONSUMER_CALLBACK_ATTR = 'on_message_callback'
static instrument_channel(channel, tracer_provider=None, publish_hook=<function dummy_callback>, consume_hook=<function dummy_callback>)[source]
Return type:

None

static uninstrument_channel(channel)[source]
Return type:

None

instrumentation_dependencies()[source]

Return a list of python packages with versions that the will be instrumented.

The format should be the same as used in requirements.txt or pyproject.toml.

For example, if an instrumentation instruments requests 1.x, this method should look like: :rtype: Collection[str]

def instrumentation_dependencies(self) -> Collection[str]:

return [‘requests ~= 1.0’]

This will ensure that the instrumentation will only be used when the specified library is present in the environment.