OpenTelemetry confluent-kafka Instrumentation

pypi

This library allows tracing requests made by the confluent-kafka library.

Installation

pip install opentelemetry-instrumentation-confluent-kafka

References

Instrument confluent-kafka-python to report instrumentation-confluent-kafka produced and consumed messages

Usage

from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
from confluent_kafka import Producer, Consumer

# Instrument kafka
ConfluentKafkaInstrumentor().instrument()

# report a span of type producer with the default settings
conf1 = {'bootstrap.servers': "localhost:9092"}
producer = Producer(conf1)
producer.produce('my-topic',b'raw_bytes')
conf2 = {'bootstrap.servers': "localhost:9092", 'group.id': "foo", 'auto.offset.reset': 'smallest'}
# report a span of type consumer with the default settings
consumer = Consumer(conf2)

def basic_consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)
        running = True
        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write(f"{msg.topic() [{msg.partition()}] reached end at offset {msg.offset()}}")
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

basic_consume_loop(consumer, "my-topic")
---
The _instrument method accepts the following keyword args:

tracer_provider (TracerProvider) - an optional tracer provider

instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message

this function signature is:

def instrument_producer(producer: Producer, tracer_provider=None)

instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message

this function signature is:

def instrument_consumer(consumer: Consumer, tracer_provider=None)

for example:

from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor

from confluent_kafka import Producer, Consumer

inst = ConfluentKafkaInstrumentor()

p = confluent_kafka.Producer({'bootstrap.servers': 'localhost:29092'})
c = confluent_kafka.Consumer({
    'bootstrap.servers': 'localhost:29092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

# instrument confluent kafka with produce and consume hooks
p = inst.instrument_producer(p, tracer_provider)
c = inst.instrument_consumer(c, tracer_provider=tracer_provider)

# Using kafka as normal now will automatically generate spans,
# including user custom attributes added from the hooks
conf = {'bootstrap.servers': "localhost:9092"}
p.produce('my-topic',b'raw_bytes')
msg = c.poll()

___

class opentelemetry.instrumentation.confluent_kafka.AutoInstrumentedProducer[source]

Bases: Producer

produce(topic, value=None, *args, **kwargs)[source]
produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])[source]

Produce message to topic. This is an asynchronous operation, an application may use the callback (alias on_delivery) argument to pass a function (or lambda) that will be called from poll() when the message has been successfully delivered or permanently fails delivery.

Currently message headers are not supported on the message returned to the callback. The msg.headers() will return None even if the original message had headers set.

Parameters:
  • topic (str) – Topic to produce message to

  • value (str|bytes) – Message payload

  • key (str|bytes) – Message key

  • partition (int) – Partition to produce to, else uses the configured built-in partitioner.

  • on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery

  • timestamp (int) – Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.

  • headers (dict|list) – Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)

Return type:

None

Raises:
  • BufferError – if the internal producer message queue is full (queue.buffering.max.messages exceeded)

  • KafkaException – for other errors, see exception code

  • NotImplementedError – if timestamp is specified without underlying library support.

class opentelemetry.instrumentation.confluent_kafka.AutoInstrumentedConsumer(config)[source]

Bases: Consumer

poll(timeout=-1)[source]
poll([timeout=None])[source]

Consumes a single message, calls callbacks and returns events.

The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).

Parameters:

timeout (float) – Maximum time to block waiting for message, event or callback (default: infinite (None translated into -1 in the library)). (Seconds)

Returns:

A Message object or None on timeout

Return type:

Message or None

Raises:

RuntimeError if called on a closed consumer

consume(*args, **kwargs)[source]
consume([num_messages=1][, timeout=-1])[source]

Consumes a list of messages (possibly empty on timeout). Callbacks may be executed as a side effect of calling this method.

The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None) and errors for each Message in the list (see error().code() for specifics). If the enable.partition.eof configuration property is set to True, partition EOF events will also be exposed as Messages with error().code() set to _PARTITION_EOF.

Parameters:
  • num_messages (int) – The maximum number of messages to return (default: 1).

  • timeout (float) – The maximum time to block waiting for message, event or callback (default: infinite (-1)). (Seconds)

Returns:

A list of Message objects (possibly empty on timeout)

Return type:

list(Message)

Raises:
  • RuntimeError – if called on a closed consumer

  • KafkaError – in case of internal error

  • ValueError – if num_messages > 1M

class opentelemetry.instrumentation.confluent_kafka.ProxiedProducer(producer, tracer)[source]

Bases: Producer

flush(timeout=-1)[source]
flush([timeout])[source]

Wait for all messages in the Producer queue to be delivered. This is a convenience method that calls poll() until len() is zero or the optional timeout elapses.

Param:

float timeout: Maximum time to block (requires librdkafka >= v0.9.4). (Seconds)

Returns:

Number of messages still in queue.

Note

See poll() for a description on what callbacks may be triggered.

poll(timeout=-1)[source]
poll([timeout])[source]

Polls the producer for events and calls the corresponding callbacks (if registered).

Callbacks:

Parameters:

timeout (float) – Maximum time to block waiting for events. (Seconds)

Returns:

Number of events processed (callbacks served)

Return type:

int

produce(topic, value=None, *args, **kwargs)[source]
produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])[source]

Produce message to topic. This is an asynchronous operation, an application may use the callback (alias on_delivery) argument to pass a function (or lambda) that will be called from poll() when the message has been successfully delivered or permanently fails delivery.

Currently message headers are not supported on the message returned to the callback. The msg.headers() will return None even if the original message had headers set.

Parameters:
  • topic (str) – Topic to produce message to

  • value (str|bytes) – Message payload

  • key (str|bytes) – Message key

  • partition (int) – Partition to produce to, else uses the configured built-in partitioner.

  • on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery

  • timestamp (int) – Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.

  • headers (dict|list) – Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)

Return type:

None

Raises:
  • BufferError – if the internal producer message queue is full (queue.buffering.max.messages exceeded)

  • KafkaException – for other errors, see exception code

  • NotImplementedError – if timestamp is specified without underlying library support.

original_producer()[source]
class opentelemetry.instrumentation.confluent_kafka.ProxiedConsumer(consumer, tracer)[source]

Bases: Consumer

committed(partitions, timeout=-1)[source]
committed(partitions[, timeout=None])[source]

Retrieve committed offsets for the specified partitions.

Parameters:
  • partitions (list(TopicPartition)) – List of topic+partitions to query for stored offsets.

  • timeout (float) – Request timeout (seconds).

Returns:

List of topic+partitions with offset and possibly error set.

Return type:

list(TopicPartition)

Raises:

KafkaException

Raises:

RuntimeError if called on a closed consumer

commit(*args, **kwargs)[source]
commit([message=None][, offsets=None][, asynchronous=True])[source]

Commit a message or a list of offsets.

The message and offsets parameters are mutually exclusive. If neither is set, the current partition assignment’s offsets are used instead. Use this method to commit offsets if you have ‘enable.auto.commit’ set to False.

Parameters:
  • message (confluent_kafka.Message) – Commit the message’s offset+1. Note: By convention, committed offsets reflect the next message to be consumed, not the last message consumed.

  • offsets (list(TopicPartition)) – List of topic+partitions+offsets to commit.

  • asynchronous (bool) – If true, asynchronously commit, returning None immediately. If False, the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition should be checked for success.

Return type:

None|list(TopicPartition)

Raises:

KafkaException

Raises:

RuntimeError if called on a closed consumer

consume(*args, **kwargs)[source]
consume([num_messages=1][, timeout=-1])[source]

Consumes a list of messages (possibly empty on timeout). Callbacks may be executed as a side effect of calling this method.

The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None) and errors for each Message in the list (see error().code() for specifics). If the enable.partition.eof configuration property is set to True, partition EOF events will also be exposed as Messages with error().code() set to _PARTITION_EOF.

Parameters:
  • num_messages (int) – The maximum number of messages to return (default: 1).

  • timeout (float) – The maximum time to block waiting for message, event or callback (default: infinite (-1)). (Seconds)

Returns:

A list of Message objects (possibly empty on timeout)

Return type:

list(Message)

Raises:
  • RuntimeError – if called on a closed consumer

  • KafkaError – in case of internal error

  • ValueError – if num_messages > 1M

get_watermark_offsets(partition, timeout=-1, *args, **kwargs)[source]
get_watermark_offsets(partition[, timeout=None][, cached=False])[source]

Retrieve low and high offsets for the specified partition.

Parameters:
  • partition (TopicPartition) – Topic+partition to return offsets for.

  • timeout (float) – Request timeout (seconds). Ignored if cached=True.

  • cached (bool) – Instead of querying the broker, use cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.

Returns:

Tuple of (low,high) on success or None on timeout. The high offset is the offset of the last message + 1.

Return type:

tuple(int,int)

Raises:

KafkaException

Raises:

RuntimeError if called on a closed consumer

offsets_for_times(partitions, timeout=-1)[source]
offsets_for_times(partitions[, timeout=None])[source]

Look up offsets by timestamp for the specified partitions.

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.

param list(TopicPartition) partitions:

topic+partitions with timestamps in the TopicPartition.offset field.

param float timeout:

Request timeout (seconds).

returns:

List of topic+partition with offset field set and possibly error set

rtype:

list(TopicPartition)

raises:

KafkaException

raises:

RuntimeError if called on a closed consumer

poll(timeout=-1)[source]
poll([timeout=None])[source]

Consumes a single message, calls callbacks and returns events.

The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).

Parameters:

timeout (float) – Maximum time to block waiting for message, event or callback (default: infinite (None translated into -1 in the library)). (Seconds)

Returns:

A Message object or None on timeout

Return type:

Message or None

Raises:

RuntimeError if called on a closed consumer

subscribe(topics, on_assign=<function ProxiedConsumer.<lambda>>, *args, **kwargs)[source]
subscribe(topics[, on_assign=None][, on_revoke=None][, on_lost=None])[source]

Set subscription to supplied list of topics This replaces a previous subscription.

Regexp pattern subscriptions are supported by prefixing the topic string with "^", e.g.:

consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
Parameters:
  • topics (list(str)) – List of topics (strings) to subscribe to.

  • on_assign (callable) – callback to provide handling of customized offsets on completion of a successful partition re-assignment.

  • on_revoke (callable) – callback to provide handling of offset commits to a customized store on the start of a rebalance operation.

  • on_lost (callable) – callback to provide handling in the case the partition assignment has been lost. If not specified, lost partition events will be delivered to on_revoke, if specified. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.

Raises:

KafkaException

Raises:

RuntimeError if called on a closed consumer

on_assign(consumer, partitions)
on_revoke(consumer, partitions)
on_lost(consumer, partitions)
Parameters:
  • consumer (Consumer) – Consumer instance.

  • partitions (list(TopicPartition)) – Absolute list of partitions being assigned or revoked.

original_consumer()[source]
class opentelemetry.instrumentation.confluent_kafka.ConfluentKafkaInstrumentor(*args, **kwargs)[source]

Bases: BaseInstrumentor

An instrumentor for confluent kafka module See BaseInstrumentor

static instrument_producer(producer, tracer_provider=None)[source]
Return type:

ProxiedProducer

static instrument_consumer(consumer, tracer_provider=None)[source]
Return type:

ProxiedConsumer

static uninstrument_producer(producer)[source]
Return type:

Producer

static uninstrument_consumer(consumer)[source]
Return type:

Consumer

instrumentation_dependencies()[source]

Return a list of python packages with versions that the will be instrumented.

The format should be the same as used in requirements.txt or pyproject.toml.

For example, if an instrumentation instruments requests 1.x, this method should look like: :rtype: Collection[str]

def instrumentation_dependencies(self) -> Collection[str]:

return [‘requests ~= 1.0’]

This will ensure that the instrumentation will only be used when the specified library is present in the environment.

static wrap_produce(func, instance, tracer, args, kwargs)[source]
static wrap_poll(func, instance, tracer, args, kwargs)[source]
static wrap_consume(func, instance, tracer, args, kwargs)[source]