OpenTelemetry Pika Instrumentation
Instrument pika to trace RabbitMQ applications.
Usage
Start broker backend
docker run -p 5672:5672 rabbitmq
Run instrumented task
import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor
PikaInstrumentor().instrument()
connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
PikaInstrumentor also supports instrumentation of a single channel
import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor
connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
pika_instrumentation = PikaInstrumentor()
pika_instrumentation.instrument_channel(channel=channel)
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
pika_instrumentation.uninstrument_channel(channel=channel)
PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider
PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider)
PikaInstrumentor also supports instrumenting with hooks that will be called when producing or consuming a message. The hooks should be of type “Callable[[Span, bytes, BasicProperties], None]” where the first parameter is the span, the second parameter is the message body and the third parameter is the message properties
def publish_hook(span: Span, body: bytes, properties: BasicProperties):
span.set_attribute("messaging.payload", body.decode())
def consume_hook(span: Span, body: bytes, properties: BasicProperties):
span.set_attribute("messaging.id", properties.message_id)
PikaInstrumentor.instrument_channel(channel, publish_hook=publish_hook, consume_hook=consume_hook)
Consumer Instrumentation
For consumer instrumentation, pika supports two consuming modes:
Consumers using the basic_consume method which accepts a callback. This is supported for global instrumentation (PikaInstrumentor().instrument()) as well channel specific instrumentation (PikaInstrumentor().instrument_channel(channel))
Consumers using the consume method which returns a generator over messages. This is supported for global instrumentations only (PikaInstrumentor().instrument())
API
- class opentelemetry.instrumentation.pika.PikaInstrumentor(*args, **kwargs)[source]
Bases:
BaseInstrumentor
- CONSUMER_CALLBACK_ATTR = 'on_message_callback'
- static instrument_channel(channel, tracer_provider=None, publish_hook=<function dummy_callback>, consume_hook=<function dummy_callback>)[source]
- Return type:
- instrumentation_dependencies()[source]
Return a list of python packages with versions that the will be instrumented.
The format should be the same as used in requirements.txt or pyproject.toml.
For example, if an instrumentation instruments requests 1.x, this method should look like: :rtype:
Collection
[str
]- def instrumentation_dependencies(self) -> Collection[str]:
return [‘requests ~= 1.0’]
This will ensure that the instrumentation will only be used when the specified library is present in the environment.