Source code for opentelemetry.instrumentation.confluent_kafka

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Instrument confluent-kafka-python to report instrumentation-confluent-kafka produced and consumed messages

Usage
-----

.. code-block:: python

    from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
    from confluent_kafka import Producer, Consumer

    # Instrument kafka
    ConfluentKafkaInstrumentor().instrument()

    # report a span of type producer with the default settings
    conf1 = {'bootstrap.servers': "localhost:9092"}
    producer = Producer(conf1)
    producer.produce('my-topic',b'raw_bytes')
    conf2 = {'bootstrap.servers': "localhost:9092", 'group.id': "foo", 'auto.offset.reset': 'smallest'}
    # report a span of type consumer with the default settings
    consumer = Consumer(conf2)

    def basic_consume_loop(consumer, topics):
        try:
            consumer.subscribe(topics)
            running = True
            while running:
                msg = consumer.poll(timeout=1.0)
                if msg is None: continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        sys.stderr.write(f"{msg.topic() [{msg.partition()}] reached end at offset {msg.offset()}}")
                    elif msg.error():
                        raise KafkaException(msg.error())
                else:
                    msg_process(msg)
        finally:
            # Close down consumer to commit final offsets.
            consumer.close()

    basic_consume_loop(consumer, "my-topic")
    ---

The _instrument method accepts the following keyword args:
  tracer_provider (TracerProvider) - an optional tracer provider

  instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message
    this function signature is:

  def instrument_producer(producer: Producer, tracer_provider=None)

    instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message
        this function signature is:

  def instrument_consumer(consumer: Consumer, tracer_provider=None)
    for example:

.. code:: python

    from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor

    from confluent_kafka import Producer, Consumer

    inst = ConfluentKafkaInstrumentor()

    p = confluent_kafka.Producer({'bootstrap.servers': 'localhost:29092'})
    c = confluent_kafka.Consumer({
        'bootstrap.servers': 'localhost:29092',
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest'
    })

    # instrument confluent kafka with produce and consume hooks
    p = inst.instrument_producer(p, tracer_provider)
    c = inst.instrument_consumer(c, tracer_provider=tracer_provider)

    # Using kafka as normal now will automatically generate spans,
    # including user custom attributes added from the hooks
    conf = {'bootstrap.servers': "localhost:9092"}
    p.produce('my-topic',b'raw_bytes')
    msg = c.poll()

___
"""
from typing import Collection

import confluent_kafka
import wrapt
from confluent_kafka import Consumer, Producer

from opentelemetry import context, propagate, trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.trace import MessagingOperationValues
from opentelemetry.trace import Tracer

from .package import _instruments
from .utils import (
    KafkaPropertiesExtractor,
    _create_new_consume_span,
    _end_current_consume_span,
    _enrich_span,
    _get_span_name,
    _kafka_setter,
)
from .version import __version__


[docs]class AutoInstrumentedProducer(Producer): # This method is deliberately implemented in order to allow wrapt to wrap this function
[docs] def produce( self, topic, value=None, *args, **kwargs ): # pylint: disable=keyword-arg-before-vararg,useless-super-delegation super().produce(topic, value, *args, **kwargs)
[docs]class AutoInstrumentedConsumer(Consumer): def __init__(self, config): super().__init__(config) self._current_consume_span = None # This method is deliberately implemented in order to allow wrapt to wrap this function
[docs] def poll(self, timeout=-1): # pylint: disable=useless-super-delegation return super().poll(timeout)
# This method is deliberately implemented in order to allow wrapt to wrap this function
[docs] def consume( self, *args, **kwargs ): # pylint: disable=useless-super-delegation return super().consume(*args, **kwargs)
[docs]class ProxiedProducer(Producer): def __init__(self, producer: Producer, tracer: Tracer): self._producer = producer self._tracer = tracer
[docs] def flush(self, timeout=-1): self._producer.flush(timeout)
[docs] def poll(self, timeout=-1): self._producer.poll(timeout)
[docs] def produce( self, topic, value=None, *args, **kwargs ): # pylint: disable=keyword-arg-before-vararg new_kwargs = kwargs.copy() new_kwargs["topic"] = topic new_kwargs["value"] = value return ConfluentKafkaInstrumentor.wrap_produce( self._producer.produce, self, self._tracer, args, new_kwargs )
[docs] def original_producer(self): return self._producer
[docs]class ProxiedConsumer(Consumer): def __init__(self, consumer: Consumer, tracer: Tracer): self._consumer = consumer self._tracer = tracer self._current_consume_span = None self._current_context_token = None
[docs] def committed(self, partitions, timeout=-1): return self._consumer.committed(partitions, timeout)
[docs] def commit(self, *args, **kwargs): return self._consumer.commit(*args, **kwargs)
[docs] def consume(self, *args, **kwargs): return ConfluentKafkaInstrumentor.wrap_consume( self._consumer.consume, self, self._tracer, args, kwargs, )
[docs] def get_watermark_offsets( self, partition, timeout=-1, *args, **kwargs ): # pylint: disable=keyword-arg-before-vararg return self._consumer.get_watermark_offsets( partition, timeout, *args, **kwargs )
[docs] def offsets_for_times(self, partitions, timeout=-1): return self._consumer.offsets_for_times(partitions, timeout)
[docs] def poll(self, timeout=-1): return ConfluentKafkaInstrumentor.wrap_poll( self._consumer.poll, self, self._tracer, [timeout], {} )
[docs] def subscribe( self, topics, on_assign=lambda *args: None, *args, **kwargs ): # pylint: disable=keyword-arg-before-vararg self._consumer.subscribe(topics, on_assign, *args, **kwargs)
[docs] def original_consumer(self): return self._consumer
[docs]class ConfluentKafkaInstrumentor(BaseInstrumentor): """An instrumentor for confluent kafka module See `BaseInstrumentor` """ # pylint: disable=attribute-defined-outside-init
[docs] @staticmethod def instrument_producer( producer: Producer, tracer_provider=None ) -> ProxiedProducer: tracer = trace.get_tracer( __name__, __version__, tracer_provider=tracer_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) manual_producer = ProxiedProducer(producer, tracer) return manual_producer
[docs] @staticmethod def instrument_consumer( consumer: Consumer, tracer_provider=None ) -> ProxiedConsumer: tracer = trace.get_tracer( __name__, __version__, tracer_provider=tracer_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) manual_consumer = ProxiedConsumer(consumer, tracer) return manual_consumer
[docs] @staticmethod def uninstrument_producer(producer: Producer) -> Producer: if isinstance(producer, ProxiedProducer): return producer.original_producer() return producer
[docs] @staticmethod def uninstrument_consumer(consumer: Consumer) -> Consumer: if isinstance(consumer, ProxiedConsumer): return consumer.original_consumer() return consumer
[docs] def instrumentation_dependencies(self) -> Collection[str]: return _instruments
def _instrument(self, **kwargs): self._original_kafka_producer = confluent_kafka.Producer self._original_kafka_consumer = confluent_kafka.Consumer confluent_kafka.Producer = AutoInstrumentedProducer confluent_kafka.Consumer = AutoInstrumentedConsumer tracer_provider = kwargs.get("tracer_provider") tracer = trace.get_tracer( __name__, __version__, tracer_provider=tracer_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) self._tracer = tracer def _inner_wrap_produce(func, instance, args, kwargs): return ConfluentKafkaInstrumentor.wrap_produce( func, instance, self._tracer, args, kwargs ) def _inner_wrap_poll(func, instance, args, kwargs): return ConfluentKafkaInstrumentor.wrap_poll( func, instance, self._tracer, args, kwargs ) def _inner_wrap_consume(func, instance, args, kwargs): return ConfluentKafkaInstrumentor.wrap_consume( func, instance, self._tracer, args, kwargs ) wrapt.wrap_function_wrapper( AutoInstrumentedProducer, "produce", _inner_wrap_produce, ) wrapt.wrap_function_wrapper( AutoInstrumentedConsumer, "poll", _inner_wrap_poll, ) wrapt.wrap_function_wrapper( AutoInstrumentedConsumer, "consume", _inner_wrap_consume, ) def _uninstrument(self, **kwargs): confluent_kafka.Producer = self._original_kafka_producer confluent_kafka.Consumer = self._original_kafka_consumer unwrap(AutoInstrumentedProducer, "produce") unwrap(AutoInstrumentedConsumer, "poll")
[docs] @staticmethod def wrap_produce(func, instance, tracer, args, kwargs): topic = kwargs.get("topic") if not topic: topic = args[0] span_name = _get_span_name("send", topic) with tracer.start_as_current_span( name=span_name, kind=trace.SpanKind.PRODUCER ) as span: headers = KafkaPropertiesExtractor.extract_produce_headers( args, kwargs ) if headers is None: headers = [] kwargs["headers"] = headers topic = KafkaPropertiesExtractor.extract_produce_topic(args) _enrich_span( span, topic, operation=MessagingOperationValues.RECEIVE, ) # Replace propagate.inject( headers, setter=_kafka_setter, ) return func(*args, **kwargs)
[docs] @staticmethod def wrap_poll(func, instance, tracer, args, kwargs): if instance._current_consume_span: _end_current_consume_span(instance) with tracer.start_as_current_span( "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER ): record = func(*args, **kwargs) if record: _create_new_consume_span(instance, tracer, [record]) _enrich_span( instance._current_consume_span, record.topic(), record.partition(), record.offset(), operation=MessagingOperationValues.PROCESS, ) instance._current_context_token = context.attach( trace.set_span_in_context(instance._current_consume_span) ) return record
[docs] @staticmethod def wrap_consume(func, instance, tracer, args, kwargs): if instance._current_consume_span: _end_current_consume_span(instance) with tracer.start_as_current_span( "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER ): records = func(*args, **kwargs) if len(records) > 0: _create_new_consume_span(instance, tracer, records) _enrich_span( instance._current_consume_span, records[0].topic(), operation=MessagingOperationValues.PROCESS, ) instance._current_context_token = context.attach( trace.set_span_in_context(instance._current_consume_span) ) return records