OpenTelemetry aiokafka instrumentation

pypi

Installation

pip install opentelemetry-instrumentation-aiokafka

API

Instrument aiokafka to report instrumentation-kafka produced and consumed messages

Usage

import asyncio
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer

# Instrument kafka
AIOKafkaInstrumentor().instrument()

# report a span of type producer with the default settings
async def produce():
    producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
    await producer.start()
    try:
        await producer.send_and_wait('my-topic', b'raw_bytes')
    finally:
        await producer.stop()

# report a span of type consumer with the default settings
async def consume():
    consumer = AIOKafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
    await consumer.start()
    try:
        async for message in consumer:
            # process message
            print(message)
    finally:
        await consumer.stop()

asyncio.run(produce())
asyncio.run(consume())

The _instrument() method accepts the following keyword args: tracer_provider (TracerProvider) - an optional tracer provider async_produce_hook (Callable) - a function with extra user-defined logic to be performed before sending the message this function signature is: def async_produce_hook(span: Span, args, kwargs) async_consume_hook (Callable) - a function with extra user-defined logic to be performed after consuming a message this function signature is: def async_consume_hook(span: Span, record: kafka.record.ABCRecord, args, kwargs) for example:

import asyncio
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer

async def async_produce_hook(span, args, kwargs):
    if span and span.is_recording():
        span.set_attribute("custom_user_attribute_from_async_response_hook", "some-value")

async def async_consume_hook(span, record, args, kwargs):
    if span and span.is_recording():
        span.set_attribute("custom_user_attribute_from_consume_hook", "some-value")

# instrument kafka with produce and consume hooks
AIOKafkaInstrumentor().instrument(async_produce_hook=async_produce_hook, async_consume_hook=async_consume_hook)

# Using kafka as normal now will automatically generate spans,
# including user custom attributes added from the hooks
async def produce():
    producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
    await producer.start()
    try:
        await producer.send_and_wait('my-topic', b'raw_bytes')
    finally:
        await producer.stop()

asyncio.run(produce())

API

class opentelemetry.instrumentation.aiokafka.AIOKafkaInstrumentor(*args, **kwargs)[source]

Bases: BaseInstrumentor

An instrumentor for kafka module See BaseInstrumentor

instrumentation_dependencies()[source]

Return a list of python packages with versions that the will be instrumented.

The format should be the same as used in requirements.txt or pyproject.toml.

For example, if an instrumentation instruments requests 1.x, this method should look like: :rtype: Collection[str]

def instrumentation_dependencies(self) -> Collection[str]:

return [‘requests ~= 1.0’]

This will ensure that the instrumentation will only be used when the specified library is present in the environment.