OpenTelemetry gRPC Instrumentation

Module contents

Usage Client

import logging

import grpc

from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    ConsoleSpanExporter,
    SimpleSpanProcessor,
)

try:
    from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
    from gen import helloworld_pb2, helloworld_pb2_grpc

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
    SimpleSpanProcessor(ConsoleSpanExporter())
)

grpc_client_instrumentor = GrpcInstrumentorClient()
grpc_client_instrumentor.instrument()

def run():
    with grpc.insecure_channel("localhost:50051") as channel:

        stub = helloworld_pb2_grpc.GreeterStub(channel)
        response = stub.SayHello(helloworld_pb2.HelloRequest(name="YOU"))

    print("Greeter client received: " + response.message)


if __name__ == "__main__":
    logging.basicConfig()
    run()

Usage Server

import logging
from concurrent import futures

import grpc

from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    ConsoleSpanExporter,
    SimpleSpanProcessor,
)

try:
    from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
    from gen import helloworld_pb2, helloworld_pb2_grpc

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
    SimpleSpanProcessor(ConsoleSpanExporter())
)

grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()

class Greeter(helloworld_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)


def serve():

    server = grpc.server(futures.ThreadPoolExecutor())

    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    server.wait_for_termination()


if __name__ == "__main__":
    logging.basicConfig()
    serve()

You can also add the interceptor manually, rather than using GrpcInstrumentorServer:

from opentelemetry.instrumentation.grpc import server_interceptor

server = grpc.server(futures.ThreadPoolExecutor(),
                     interceptors = [server_interceptor()])

Usage Aio Client

import logging
import asyncio

import grpc

from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorClient
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    ConsoleSpanExporter,
    SimpleSpanProcessor,
)

try:
    from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
    from gen import helloworld_pb2, helloworld_pb2_grpc

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
    SimpleSpanProcessor(ConsoleSpanExporter())
)

grpc_client_instrumentor = GrpcAioInstrumentorClient()
grpc_client_instrumentor.instrument()

async def run():
    with grpc.aio.insecure_channel("localhost:50051") as channel:

        stub = helloworld_pb2_grpc.GreeterStub(channel)
        response = await stub.SayHello(helloworld_pb2.HelloRequest(name="YOU"))

    print("Greeter client received: " + response.message)


if __name__ == "__main__":
    logging.basicConfig()
    asyncio.run(run())

You can also add the interceptor manually, rather than using GrpcAioInstrumentorClient:

from opentelemetry.instrumentation.grpc import aio_client_interceptors

channel = grpc.aio.insecure_channel("localhost:12345", interceptors=aio_client_interceptors())

Usage Aio Server

import logging
import asyncio

import grpc

from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorServer
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    ConsoleSpanExporter,
    SimpleSpanProcessor,
)

try:
    from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
    from gen import helloworld_pb2, helloworld_pb2_grpc

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
    SimpleSpanProcessor(ConsoleSpanExporter())
)

grpc_server_instrumentor = GrpcAioInstrumentorServer()
grpc_server_instrumentor.instrument()

class Greeter(helloworld_pb2_grpc.GreeterServicer):
    async def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)


async def serve():

    server = grpc.aio.server()

    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port("[::]:50051")
    await server.start()
    await server.wait_for_termination()


if __name__ == "__main__":
    logging.basicConfig()
    asyncio.run(serve())

You can also add the interceptor manually, rather than using GrpcAioInstrumentorServer:

from opentelemetry.instrumentation.grpc import aio_server_interceptor

server = grpc.aio.server(interceptors = [aio_server_interceptor()])

Filters

If you prefer to filter specific requests to be instrumented, you can specify the condition by assigning filters to instrumentors.

You can write a global server instrumentor as follows:

from opentelemetry.instrumentation.grpc import filters, GrpcInstrumentorServer

grpc_server_instrumentor = GrpcInstrumentorServer(
    filter_ = filters.any_of(
        filters.method_name("SimpleMethod"),
        filters.method_name("ComplexMethod"),
    )
)
grpc_server_instrumentor.instrument()

You can also use the filters directly on the provided interceptors:

my_interceptor = server_interceptor(
    filter_ = filters.negate(filters.method_name("TestMethod"))
)
server = grpc.server(futures.ThreadPoolExecutor(),
                     interceptors = [my_interceptor])

filter_ option also applies to both global and manual client intrumentors.

Environment variable

If you’d like to exclude specific services for the instrumentations, you can use OTEL_PYTHON_GRPC_EXCLUDED_SERVICES environment variables.

For example, if you assign "GRPCTestServer,GRPCHealthServer" to the variable, then the global interceptor automatically adds the filters to exclude requests to services GRPCTestServer and GRPCHealthServer.

class opentelemetry.instrumentation.grpc.GrpcInstrumentorServer(*args, **kwargs)[source]

Bases: BaseInstrumentor

Globally instrument the grpc server.

Usage:

grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()

If you want to add a filter that only intercept requests
to match the condition, pass ``filter_`` to GrpcInstrumentorServer.

grpc_server_instrumentor = GrpcInstrumentorServer(
    filter_=filters.method_prefix("SimpleMethod"))
grpc_server_instrumentor.instrument()
instrumentation_dependencies()[source]

Return a list of python packages with versions that the will be instrumented.

The format should be the same as used in requirements.txt or pyproject.toml.

For example, if an instrumentation instruments requests 1.x, this method should look like: :rtype: Collection[str]

def instrumentation_dependencies(self) -> Collection[str]:

return [‘requests ~= 1.0’]

This will ensure that the instrumentation will only be used when the specified library is present in the environment.

class opentelemetry.instrumentation.grpc.GrpcAioInstrumentorServer(*args, **kwargs)[source]

Bases: BaseInstrumentor

Globally instrument the grpc.aio server.

Usage:

grpc_aio_server_instrumentor = GrpcAioInstrumentorServer()
grpc_aio_server_instrumentor.instrument()
instrumentation_dependencies()[source]

Return a list of python packages with versions that the will be instrumented.

The format should be the same as used in requirements.txt or pyproject.toml.

For example, if an instrumentation instruments requests 1.x, this method should look like: :rtype: Collection[str]

def instrumentation_dependencies(self) -> Collection[str]:

return [‘requests ~= 1.0’]

This will ensure that the instrumentation will only be used when the specified library is present in the environment.

class opentelemetry.instrumentation.grpc.GrpcInstrumentorClient(*args, **kwargs)[source]

Bases: BaseInstrumentor

Globally instrument the grpc client

Usage:

grpc_client_instrumentor = GrpcInstrumentorClient()
grpc_client_instrumentor.instrument()

If you want to add a filter that only intercept requests
to match the condition, pass ``filter_`` option to GrpcInstrumentorClient.

grpc_client_instrumentor = GrpcInstrumentorClient(
    filter_=filters.negate(filters.health_check())
)
grpc_client_instrumentor.instrument()
instrumentation_dependencies()[source]

Return a list of python packages with versions that the will be instrumented.

The format should be the same as used in requirements.txt or pyproject.toml.

For example, if an instrumentation instruments requests 1.x, this method should look like: :rtype: Collection[str]

def instrumentation_dependencies(self) -> Collection[str]:

return [‘requests ~= 1.0’]

This will ensure that the instrumentation will only be used when the specified library is present in the environment.

wrapper_fn(original_func, instance, args, kwargs)[source]
class opentelemetry.instrumentation.grpc.GrpcAioInstrumentorClient(*args, **kwargs)[source]

Bases: BaseInstrumentor

Globally instrument the grpc.aio client.

Usage:

grpc_aio_client_instrumentor = GrpcAioInstrumentorClient()
grpc_aio_client_instrumentor.instrument()
instrumentation_dependencies()[source]

Return a list of python packages with versions that the will be instrumented.

The format should be the same as used in requirements.txt or pyproject.toml.

For example, if an instrumentation instruments requests 1.x, this method should look like: :rtype: Collection[str]

def instrumentation_dependencies(self) -> Collection[str]:

return [‘requests ~= 1.0’]

This will ensure that the instrumentation will only be used when the specified library is present in the environment.

opentelemetry.instrumentation.grpc.client_interceptor(tracer_provider=None, filter_=None, request_hook=None, response_hook=None)[source]

Create a gRPC client channel interceptor.

Parameters:
  • tracer – The tracer to use to create client-side spans.

  • filter – filter function that returns True if gRPC requests matches the condition. Default is None and intercept all requests.

Returns:

An invocation-side interceptor object.

opentelemetry.instrumentation.grpc.server_interceptor(tracer_provider=None, filter_=None)[source]

Create a gRPC server interceptor.

Parameters:
  • tracer – The tracer to use to create server-side spans.

  • filter – filter function that returns True if gRPC requests matches the condition. Default is None and intercept all requests.

Returns:

A service-side interceptor object.

opentelemetry.instrumentation.grpc.aio_client_interceptors(tracer_provider=None, filter_=None, request_hook=None, response_hook=None)[source]

Create a gRPC client channel interceptor.

Parameters:

tracer – The tracer to use to create client-side spans.

Returns:

An invocation-side interceptor object.

opentelemetry.instrumentation.grpc.aio_server_interceptor(tracer_provider=None, filter_=None)[source]

Create a gRPC aio server interceptor.

Parameters:

tracer – The tracer to use to create server-side spans.

Returns:

A service-side interceptor object.