Source code for opentelemetry.instrumentation.aiokafka

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

"""
Instrument aiokafka to report instrumentation-kafka produced and consumed messages

Usage
-----

.. code:: python

    import asyncio
    from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
    from aiokafka import AIOKafkaProducer, AIOKafkaConsumer

    # Instrument kafka
    AIOKafkaInstrumentor().instrument()

    # report a span of type producer with the default settings
    async def produce():
        producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
        await producer.start()
        try:
            await producer.send_and_wait('my-topic', b'raw_bytes')
        finally:
            await producer.stop()

    # report a span of type consumer with the default settings
    async def consume():
        consumer = AIOKafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
        await consumer.start()
        try:
            async for message in consumer:
                # process message
                print(message)
        finally:
            await consumer.stop()

    asyncio.run(produce())
    asyncio.run(consume())

The _instrument() method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
async_produce_hook (Callable) - a function with extra user-defined logic to be performed before sending the message
this function signature is:
def async_produce_hook(span: Span, args, kwargs)
async_consume_hook (Callable) - a function with extra user-defined logic to be performed after consuming a message
this function signature is:
def async_consume_hook(span: Span, record: kafka.record.ABCRecord, args, kwargs)
for example:

.. code:: python

    import asyncio
    from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
    from aiokafka import AIOKafkaProducer, AIOKafkaConsumer

    async def async_produce_hook(span, args, kwargs):
        if span and span.is_recording():
            span.set_attribute("custom_user_attribute_from_async_response_hook", "some-value")

    async def async_consume_hook(span, record, args, kwargs):
        if span and span.is_recording():
            span.set_attribute("custom_user_attribute_from_consume_hook", "some-value")

    # instrument kafka with produce and consume hooks
    AIOKafkaInstrumentor().instrument(async_produce_hook=async_produce_hook, async_consume_hook=async_consume_hook)

    # Using kafka as normal now will automatically generate spans,
    # including user custom attributes added from the hooks
    async def produce():
        producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
        await producer.start()
        try:
            await producer.send_and_wait('my-topic', b'raw_bytes')
        finally:
            await producer.stop()

    asyncio.run(produce())

API
___
"""

from __future__ import annotations

from inspect import iscoroutinefunction
from typing import TYPE_CHECKING, Collection

import aiokafka
from wrapt import (
    wrap_function_wrapper,  # type: ignore[reportUnknownVariableType]
)

from opentelemetry import trace
from opentelemetry.instrumentation.aiokafka.package import _instruments
from opentelemetry.instrumentation.aiokafka.utils import (
    _wrap_getmany,
    _wrap_getone,
    _wrap_send,
)
from opentelemetry.instrumentation.aiokafka.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.schemas import Schemas

if TYPE_CHECKING:
    from typing import TypedDict

    from typing_extensions import Unpack

    from .utils import ConsumeHookT, ProduceHookT

    class InstrumentKwargs(TypedDict, total=False):
        tracer_provider: trace.TracerProvider
        async_produce_hook: ProduceHookT
        async_consume_hook: ConsumeHookT

    class UninstrumentKwargs(TypedDict, total=False):
        pass


[docs]class AIOKafkaInstrumentor(BaseInstrumentor): """An instrumentor for kafka module See `BaseInstrumentor` """
[docs] def instrumentation_dependencies(self) -> Collection[str]: return _instruments
def _instrument(self, **kwargs: Unpack[InstrumentKwargs]): """Instruments the kafka module Args: **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global. ``async_produce_hook``: a callable to be executed just before producing a message ``async_consume_hook``: a callable to be executed just after consuming a message """ tracer_provider = kwargs.get("tracer_provider") async_produce_hook = kwargs.get("async_produce_hook") if not iscoroutinefunction(async_produce_hook): async_produce_hook = None async_consume_hook = kwargs.get("async_consume_hook") if not iscoroutinefunction(async_consume_hook): async_consume_hook = None tracer = trace.get_tracer( __name__, __version__, tracer_provider=tracer_provider, schema_url=Schemas.V1_27_0.value, ) wrap_function_wrapper( aiokafka.AIOKafkaProducer, "send", _wrap_send(tracer, async_produce_hook), ) wrap_function_wrapper( aiokafka.AIOKafkaConsumer, "getone", _wrap_getone(tracer, async_consume_hook), ) wrap_function_wrapper( aiokafka.AIOKafkaConsumer, "getmany", _wrap_getmany(tracer, async_consume_hook), ) def _uninstrument(self, **kwargs: Unpack[UninstrumentKwargs]): unwrap(aiokafka.AIOKafkaProducer, "send") unwrap(aiokafka.AIOKafkaConsumer, "getone") unwrap(aiokafka.AIOKafkaConsumer, "getmany")