OpenTelemetry confluent-kafka Instrumentation
This library allows tracing requests made by the confluent-kafka library.
Installation
pip install opentelemetry-instrumentation-confluent-kafka
References
Instrument confluent-kafka-python to report instrumentation-confluent-kafka produced and consumed messages
Usage
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
from confluent_kafka import Producer, Consumer
# Instrument kafka
ConfluentKafkaInstrumentor().instrument()
# report a span of type producer with the default settings
conf1 = {'bootstrap.servers': "localhost:9092"}
producer = Producer(conf1)
producer.produce('my-topic',b'raw_bytes')
conf2 = {'bootstrap.servers': "localhost:9092", 'group.id': "foo", 'auto.offset.reset': 'smallest'}
# report a span of type consumer with the default settings
consumer = Consumer(conf2)
def basic_consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
running = True
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write(f"{msg.topic() [{msg.partition()}] reached end at offset {msg.offset()}}")
elif msg.error():
raise KafkaException(msg.error())
else:
msg_process(msg)
finally:
# Close down consumer to commit final offsets.
consumer.close()
basic_consume_loop(consumer, "my-topic")
- The _instrument method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
- instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message
this function signature is:
def instrument_producer(producer: Producer, tracer_provider=None)
- instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message
this function signature is:
- def instrument_consumer(consumer: Consumer, tracer_provider=None)
for example:
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
from confluent_kafka import Producer, Consumer
inst = ConfluentKafkaInstrumentor()
p = confluent_kafka.Producer({'bootstrap.servers': 'localhost:29092'})
c = confluent_kafka.Consumer({
'bootstrap.servers': 'localhost:29092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
# instrument confluent kafka with produce and consume hooks
p = inst.instrument_producer(p, tracer_provider)
c = inst.instrument_consumer(c, tracer_provider=tracer_provider)
# Using kafka as normal now will automatically generate spans,
# including user custom attributes added from the hooks
conf = {'bootstrap.servers': "localhost:9092"}
p.produce('my-topic',b'raw_bytes')
msg = c.poll()
- class opentelemetry.instrumentation.confluent_kafka.AutoInstrumentedProducer[source]
Bases:
Producer
- produce(topic, value=None, *args, **kwargs)[source]
-
Produce message to topic. This is an asynchronous operation, an application may use the
callback
(aliason_delivery
) argument to pass a function (or lambda) that will be called frompoll()
when the message has been successfully delivered or permanently fails delivery.Currently message headers are not supported on the message returned to the callback. The
msg.headers()
will return None even if the original message had headers set.- Parameters:
topic (str) – Topic to produce message to
partition (int) – Partition to produce to, else uses the configured built-in partitioner.
on_delivery(err,msg) (func) – Delivery report callback to call (from
poll()
orflush()
) on successful or failed deliverytimestamp (int) – Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
headers (dict|list) – Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)
- Return type:
None
- Raises:
BufferError – if the internal producer message queue is full (
queue.buffering.max.messages
exceeded)KafkaException – for other errors, see exception code
NotImplementedError – if timestamp is specified without underlying library support.
- class opentelemetry.instrumentation.confluent_kafka.AutoInstrumentedConsumer(config)[source]
Bases:
Consumer
- poll(timeout=-1)[source]
-
Consumes a single message, calls callbacks and returns events.
The application must check the returned
Message
object’sMessage.error()
method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).- Parameters:
timeout (float) – Maximum time to block waiting for message, event or callback (default: infinite (None translated into -1 in the library)). (Seconds)
- Returns:
A Message object or None on timeout
- Return type:
Message
or None- Raises:
RuntimeError if called on a closed consumer
- consume(*args, **kwargs)[source]
-
Consumes a list of messages (possibly empty on timeout). Callbacks may be executed as a side effect of calling this method.
The application must check the returned
Message
object’sMessage.error()
method to distinguish between proper messages (error() returns None) and errors for eachMessage
in the list (see error().code() for specifics). If the enable.partition.eof configuration property is set to True, partition EOF events will also be exposed as Messages with error().code() set to _PARTITION_EOF.- Parameters:
- Returns:
A list of Message objects (possibly empty on timeout)
- Return type:
list(Message)
- Raises:
RuntimeError – if called on a closed consumer
KafkaError – in case of internal error
ValueError – if num_messages > 1M
- close()[source]
Close down and terminate the Kafka Consumer.
Actions performed:
Stops consuming.
Commits offsets, unless the consumer property ‘enable.auto.commit’ is set to False.
Leaves the consumer group.
- Return type:
None
- class opentelemetry.instrumentation.confluent_kafka.ProxiedProducer(producer, tracer)[source]
Bases:
Producer
- flush(timeout=-1)[source]
- flush([timeout])[source]
- Param:
float timeout: Maximum time to block (requires librdkafka >= v0.9.4). (Seconds)
- Returns:
Number of messages still in queue.
Note
See
poll()
for a description on what callbacks may be triggered.
- poll(timeout=-1)[source]
-
Polls the producer for events and calls the corresponding callbacks (if registered).
Callbacks:
on_delivery
callbacks fromproduce()
…
- purge(in_queue=True, in_flight=True, blocking=True)[source]
- purge([in_queue=True][, in_flight=True][, blocking=True])[source]
Purge messages currently handled by the producer instance. The application will need to call poll() or flush() afterwards to serve the delivery report callbacks of the purged messages.
- Param:
bool in_queue: Purge messages from internal queues. By default, true.
- Param:
bool in_flight: Purge messages in flight to or from the broker. By default, true.
- Param:
bool blocking: If set to False, will not wait on background thread queue purging to finish. By default, true.
- produce(topic, value=None, *args, **kwargs)[source]
-
Produce message to topic. This is an asynchronous operation, an application may use the
callback
(aliason_delivery
) argument to pass a function (or lambda) that will be called frompoll()
when the message has been successfully delivered or permanently fails delivery.Currently message headers are not supported on the message returned to the callback. The
msg.headers()
will return None even if the original message had headers set.- Parameters:
topic (str) – Topic to produce message to
partition (int) – Partition to produce to, else uses the configured built-in partitioner.
on_delivery(err,msg) (func) – Delivery report callback to call (from
poll()
orflush()
) on successful or failed deliverytimestamp (int) – Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
headers (dict|list) – Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)
- Return type:
None
- Raises:
BufferError – if the internal producer message queue is full (
queue.buffering.max.messages
exceeded)KafkaException – for other errors, see exception code
NotImplementedError – if timestamp is specified without underlying library support.
- original_producer()[source]
- class opentelemetry.instrumentation.confluent_kafka.ProxiedConsumer(consumer, tracer)[source]
Bases:
Consumer
- close()[source]
Close down and terminate the Kafka Consumer.
Actions performed:
Stops consuming.
Commits offsets, unless the consumer property ‘enable.auto.commit’ is set to False.
Leaves the consumer group.
- Return type:
None
- committed(partitions, timeout=-1)[source]
-
Retrieve committed offsets for the specified partitions.
- Parameters:
- Returns:
List of topic+partitions with offset and possibly error set.
- Return type:
list(TopicPartition)
- Raises:
KafkaException
- Raises:
RuntimeError if called on a closed consumer
- commit(*args, **kwargs)[source]
-
Commit a message or a list of offsets.
The
message
andoffsets
parameters are mutually exclusive. If neither is set, the current partition assignment’s offsets are used instead. Use this method to commit offsets if you have ‘enable.auto.commit’ set to False.- Parameters:
message (confluent_kafka.Message) – Commit the message’s offset+1. Note: By convention, committed offsets reflect the next message to be consumed, not the last message consumed.
offsets (list(TopicPartition)) – List of topic+partitions+offsets to commit.
asynchronous (bool) – If true, asynchronously commit, returning None immediately. If False, the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition should be checked for success.
- Return type:
None|list(TopicPartition)
- Raises:
KafkaException
- Raises:
RuntimeError if called on a closed consumer
- consume(*args, **kwargs)[source]
-
Consumes a list of messages (possibly empty on timeout). Callbacks may be executed as a side effect of calling this method.
The application must check the returned
Message
object’sMessage.error()
method to distinguish between proper messages (error() returns None) and errors for eachMessage
in the list (see error().code() for specifics). If the enable.partition.eof configuration property is set to True, partition EOF events will also be exposed as Messages with error().code() set to _PARTITION_EOF.- Parameters:
- Returns:
A list of Message objects (possibly empty on timeout)
- Return type:
list(Message)
- Raises:
RuntimeError – if called on a closed consumer
KafkaError – in case of internal error
ValueError – if num_messages > 1M
- get_watermark_offsets(partition, timeout=-1, *args, **kwargs)[source]
-
Retrieve low and high offsets for the specified partition.
- Parameters:
partition (TopicPartition) – Topic+partition to return offsets for.
timeout (float) – Request timeout (seconds). Ignored if cached=True.
cached (bool) – Instead of querying the broker, use cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.
- Returns:
Tuple of (low,high) on success or None on timeout. The high offset is the offset of the last message + 1.
- Return type:
- Raises:
KafkaException
- Raises:
RuntimeError if called on a closed consumer
- offsets_for_times(partitions, timeout=-1)[source]
-
Look up offsets by timestamp for the specified partitions.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.
- param list(TopicPartition) partitions:
topic+partitions with timestamps in the TopicPartition.offset field.
- param float timeout:
Request timeout (seconds).
- returns:
List of topic+partition with offset field set and possibly error set
- rtype:
list(TopicPartition)
- raises:
KafkaException
- raises:
RuntimeError if called on a closed consumer
- poll(timeout=-1)[source]
-
Consumes a single message, calls callbacks and returns events.
The application must check the returned
Message
object’sMessage.error()
method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).- Parameters:
timeout (float) – Maximum time to block waiting for message, event or callback (default: infinite (None translated into -1 in the library)). (Seconds)
- Returns:
A Message object or None on timeout
- Return type:
Message
or None- Raises:
RuntimeError if called on a closed consumer
- subscribe(topics, on_assign=<function ProxiedConsumer.<lambda>>, *args, **kwargs)[source]
- subscribe(topics[, on_assign=None][, on_revoke=None][, on_lost=None])[source]
Set subscription to supplied list of topics This replaces a previous subscription.
Regexp pattern subscriptions are supported by prefixing the topic string with
"^"
, e.g.:consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
- Parameters:
topics (list(str)) – List of topics (strings) to subscribe to.
on_assign (callable) – callback to provide handling of customized offsets on completion of a successful partition re-assignment.
on_revoke (callable) – callback to provide handling of offset commits to a customized store on the start of a rebalance operation.
on_lost (callable) – callback to provide handling in the case the partition assignment has been lost. If not specified, lost partition events will be delivered to on_revoke, if specified. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.
- Raises:
KafkaException –
- Raises:
RuntimeError if called on a closed consumer
- on_assign(consumer, partitions)
- on_revoke(consumer, partitions)
- original_consumer()[source]
- class opentelemetry.instrumentation.confluent_kafka.ConfluentKafkaInstrumentor(*args, **kwargs)[source]
Bases:
BaseInstrumentor
An instrumentor for confluent kafka module See BaseInstrumentor
- static instrument_producer(producer, tracer_provider=None)[source]
- Return type:
ProxiedProducer
- static instrument_consumer(consumer, tracer_provider=None)[source]
- Return type:
ProxiedConsumer
- static uninstrument_producer(producer)[source]
- Return type:
Producer
- static uninstrument_consumer(consumer)[source]
- Return type:
Consumer
- instrumentation_dependencies()[source]
Return a list of python packages with versions that the will be instrumented.
The format should be the same as used in requirements.txt or pyproject.toml.
For example, if an instrumentation instruments requests 1.x, this method should look like: :rtype:
Collection
[str
]- def instrumentation_dependencies(self) -> Collection[str]:
return [‘requests ~= 1.0’]
This will ensure that the instrumentation will only be used when the specified library is present in the environment.
- static wrap_produce(func, instance, tracer, args, kwargs)[source]
- static wrap_poll(func, instance, tracer, args, kwargs)[source]
- static wrap_consume(func, instance, tracer, args, kwargs)[source]
- static wrap_close(func, instance)[source]