OpenTelemetry kafka-python integration
Installation
pip install opentelemetry-instrumentation-kafka-python
API
Instrument kafka-python to report instrumentation-kafka produced and consumed messages
Usage
..code:: python
from opentelemetry.instrumentation.kafka import KafkaInstrumentor from kafka import KafkaProducer, KafkaConsumer
# Instrument kafka KafkaInstrumentor().instrument()
# report a span of type producer with the default settings producer = KafkaProducer(bootstrap_servers=[‘localhost:9092’]) producer.send(‘my-topic’, b’raw_bytes’)
# report a span of type consumer with the default settings consumer = KafkaConsumer(‘my-topic’, group_id=’my-group’, bootstrap_servers=[‘localhost:9092’]) for message in consumer: # process message
The _instrument() method accepts the following keyword args: tracer_provider (TracerProvider) - an optional tracer provider produce_hook (Callable) - a function with extra user-defined logic to be performed before sending the message this function signature is: def produce_hook(span: Span, args, kwargs) consume_hook (Callable) - a function with extra user-defined logic to be performed after consuming a message this function signature is: def consume_hook(span: Span, record: kafka.record.ABCRecord, args, kwargs) for example:
API
- class opentelemetry.instrumentation.kafka.KafkaInstrumentor(*args, **kwargs)[source]
Bases:
BaseInstrumentor
An instrumentor for kafka module See BaseInstrumentor
- instrumentation_dependencies()[source]
Return a list of python packages with versions that the will be instrumented.
The format should be the same as used in requirements.txt or pyproject.toml.
For example, if an instrumentation instruments requests 1.x, this method should look like: :rtype:
Collection
[str
]- def instrumentation_dependencies(self) -> Collection[str]:
return [‘requests ~= 1.0’]
This will ensure that the instrumentation will only be used when the specified library is present in the environment.