Source code for opentelemetry.instrumentation.kafka

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

"""
Instrument kafka-python to report instrumentation-kafka produced and consumed messages

Usage
-----

.. code:: python

    from opentelemetry.instrumentation.kafka import KafkaInstrumentor
    from kafka import KafkaProducer, KafkaConsumer

    # Instrument kafka
    KafkaInstrumentor().instrument()

    # report a span of type producer with the default settings
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    producer.send('my-topic', b'raw_bytes')

    def process_msg(message):
        print(message)

    # report a span of type consumer with the default settings
    consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
    for message in consumer:
        # process message
        process_msg(message)

The _instrument() method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
produce_hook (Callable) - a function with extra user-defined logic to be performed before sending the message
this function signature is:
def produce_hook(span: Span, args, kwargs)
consume_hook (Callable) - a function with extra user-defined logic to be performed after consuming a message
this function signature is:
def consume_hook(span: Span, record: kafka.record.ABCRecord, args, kwargs)
for example:

.. code:: python

    from opentelemetry.instrumentation.kafka import KafkaInstrumentor
    from kafka import KafkaProducer, KafkaConsumer

    def produce_hook(span, args, kwargs):
        if span and span.is_recording():
            span.set_attribute("custom_user_attribute_from_produce_hook", "some-value")

    def consume_hook(span, record, args, kwargs):
        if span and span.is_recording():
            span.set_attribute("custom_user_attribute_from_consume_hook", "some-value")

    # instrument kafka with produce and consume hooks
    KafkaInstrumentor().instrument(produce_hook=produce_hook, consume_hook=consume_hook)

    # Using kafka as normal now will automatically generate spans,
    # including user custom attributes added from the hooks
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    producer.send('my-topic', b'raw_bytes')

    def process_msg(message):
        print(message)

    consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
    for message in consumer:
        # process message
        process_msg(message)

API
___
"""

from importlib.metadata import PackageNotFoundError, distribution
from typing import Collection

import kafka
from wrapt import wrap_function_wrapper

from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.kafka.package import (
    _instruments_any,
    _instruments_kafka_python,
    _instruments_kafka_python_ng,
)
from opentelemetry.instrumentation.kafka.utils import _wrap_next, _wrap_send
from opentelemetry.instrumentation.kafka.version import __version__
from opentelemetry.instrumentation.utils import unwrap


[docs]class KafkaInstrumentor(BaseInstrumentor): """An instrumentor for kafka module See `BaseInstrumentor` """
[docs] def instrumentation_dependencies(self) -> Collection[str]: # Determine which package of kafka-python is installed # Right now there are two packages, kafka-python and kafka-python-ng # The latter is a fork of the former because the former is connected # to a pypi namespace that the current maintainers cannot access # https://github.com/dpkp/kafka-python/issues/2431 try: distribution("kafka-python-ng") return (_instruments_kafka_python_ng,) except PackageNotFoundError: pass try: distribution("kafka-python") return (_instruments_kafka_python,) except PackageNotFoundError: pass return _instruments_any
def _instrument(self, **kwargs): """Instruments the kafka module Args: **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global. ``produce_hook``: a callable to be executed just before producing a message ``consume_hook``: a callable to be executed just after consuming a message """ tracer_provider = kwargs.get("tracer_provider") produce_hook = kwargs.get("produce_hook") consume_hook = kwargs.get("consume_hook") tracer = trace.get_tracer( __name__, __version__, tracer_provider=tracer_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) wrap_function_wrapper( kafka.KafkaProducer, "send", _wrap_send(tracer, produce_hook) ) wrap_function_wrapper( kafka.KafkaConsumer, "__next__", _wrap_next(tracer, consume_hook), ) def _uninstrument(self, **kwargs): unwrap(kafka.KafkaProducer, "send") unwrap(kafka.KafkaConsumer, "__next__")