OpenTelemetry Celery Instrumentation

Instrument celery to trace Celery applications.

Usage

  • Start broker backend

docker run -p 5672:5672 rabbitmq
  • Run instrumented task

from opentelemetry.instrumentation.celery import CeleryInstrumentor

from celery import Celery
from celery.signals import worker_process_init

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    CeleryInstrumentor().instrument()

app = Celery("tasks", broker="amqp://localhost")

@app.task
def add(x, y):
    return x + y

add.delay(42, 50)

Setting up tracing

When tracing a celery worker process, tracing and instrumention both must be initialized after the celery worker process is initialized. This is required for any tracing components that might use threading to work correctly such as the BatchSpanProcessor. Celery provides a signal called worker_process_init that can be used to accomplish this as shown in the example above.

API

class opentelemetry.instrumentation.celery.CeleryGetter(*args, **kwds)[source]

Bases: Getter

get(carrier, key)[source]

Function that can retrieve zero or more values from the carrier. In the case that the value does not exist, returns None.

Parameters:
  • carrier – An object which contains values that are used to construct a Context.

  • key – key of a field in carrier.

Returns: first value of the propagation key or None if the key doesn’t

exist.

keys(carrier)[source]

Function that can retrieve all the keys in a carrier object.

Parameters:

carrier – An object which contains values that are used to construct a Context.

Returns:

list of keys from the carrier.

class opentelemetry.instrumentation.celery.CeleryInstrumentor(*args, **kwargs)[source]

Bases: BaseInstrumentor

metrics = None
task_id_to_start_time = {}
instrumentation_dependencies()[source]

Return a list of python packages with versions that the will be instrumented.

The format should be the same as used in requirements.txt or pyproject.toml.

For example, if an instrumentation instruments requests 1.x, this method should look like: :rtype: Collection[str]

def instrumentation_dependencies(self) -> Collection[str]:

return [‘requests ~= 1.0’]

This will ensure that the instrumentation will only be used when the specified library is present in the environment.

update_task_duration_time(task_id)[source]
create_celery_metrics(meter)[source]
Return type:

None