# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from logging import getLogger
from typing import Any, Generator, Mapping
from uuid_utils import uuid7
from opentelemetry._opamp import messages
from opentelemetry._opamp.proto import opamp_pb2
from opentelemetry._opamp.transport.base import HttpTransport
from opentelemetry._opamp.transport.requests import RequestsTransport
from opentelemetry._opamp.version import __version__
from opentelemetry.context import (
_SUPPRESS_INSTRUMENTATION_KEY,
attach,
detach,
set_value,
)
from opentelemetry.util.types import AnyValue
_logger = getLogger(__name__)
_DEFAULT_OPAMP_TIMEOUT_MS = 1_000
_OPAMP_HTTP_HEADERS = {
"Content-Type": "application/x-protobuf",
"User-Agent": "OTel-OpAMP-Python/" + __version__,
}
_HANDLED_CAPABILITIES = (
opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsStatus
| opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsHeartbeat
| opamp_pb2.AgentCapabilities.AgentCapabilities_AcceptsRemoteConfig
| opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsRemoteConfig
| opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsEffectiveConfig
)
[docs]class OpAMPClient:
"""
OpAMPClient implement the helpers for building and sending messages that the agent will use.
"""
def __init__(
self,
*,
endpoint: str,
headers: Mapping[str, str] | None = None,
timeout_millis: int = _DEFAULT_OPAMP_TIMEOUT_MS,
agent_identifying_attributes: Mapping[str, AnyValue],
agent_non_identifying_attributes: Mapping[str, AnyValue] | None = None,
transport: HttpTransport | None = None,
# this matches requests but can be mapped to other http libraries APIs
tls_certificate: str | bool = True,
tls_client_certificate: str | None = None,
tls_client_key: str | None = None,
):
self._timeout_millis = timeout_millis
self._transport = (
RequestsTransport() if transport is None else transport
)
self._endpoint = endpoint
headers = headers or {}
self._headers = {**_OPAMP_HTTP_HEADERS, **headers}
self._tls_certificate = tls_certificate
self._tls_client_certificate = tls_client_certificate
self._tls_client_key = tls_client_key
self._agent_description = messages.build_agent_description(
identifying_attributes=agent_identifying_attributes,
non_identifying_attributes=agent_non_identifying_attributes,
)
self._sequence_num: int = 0
self._instance_uid: bytes = uuid7().bytes
self._remote_config_status: opamp_pb2.RemoteConfigStatus | None = None
self._effective_config: opamp_pb2.EffectiveConfig | None = None
[docs] def build_agent_disconnect_message(self) -> bytes:
message = messages.build_agent_disconnect_message(
instance_uid=self._instance_uid,
sequence_num=self._sequence_num,
capabilities=_HANDLED_CAPABILITIES,
)
data = messages.encode_message(message)
return data
[docs] def build_heartbeat_message(self) -> bytes:
message = messages.build_heartbeat_message(
instance_uid=self._instance_uid,
sequence_num=self._sequence_num,
capabilities=_HANDLED_CAPABILITIES,
)
data = messages.encode_message(message)
return data
[docs] def update_effective_config(
self,
effective_config: Mapping[str, Any],
content_type: str,
) -> opamp_pb2.EffectiveConfig:
self._effective_config = messages.build_effective_config_message(
config=effective_config, content_type=content_type
)
return self._effective_config
[docs] def update_remote_config_status(
self,
remote_config_hash: bytes,
status: opamp_pb2.RemoteConfigStatuses.ValueType,
error_message: str = "",
) -> opamp_pb2.RemoteConfigStatus | None:
status_changed = (
not self._remote_config_status
or self._remote_config_status.last_remote_config_hash
!= remote_config_hash
or self._remote_config_status.status != status
or self._remote_config_status.error_message != error_message
)
# if the status changed update we return the RemoteConfigStatus message so that we can send it to the server
if status_changed:
_logger.debug(
"Update remote config status changed for %s",
remote_config_hash,
)
self._remote_config_status = (
messages.build_remote_config_status_message(
last_remote_config_hash=remote_config_hash,
status=status,
error_message=error_message,
)
)
return self._remote_config_status
return None
[docs] def build_remote_config_status_response_message(
self, remote_config_status: opamp_pb2.RemoteConfigStatus
) -> bytes:
message = messages.build_remote_config_status_response_message(
instance_uid=self._instance_uid,
sequence_num=self._sequence_num,
capabilities=_HANDLED_CAPABILITIES,
remote_config_status=remote_config_status,
)
data = messages.encode_message(message)
return data
[docs] def build_full_state_message(self) -> bytes:
message = messages.build_full_state_message(
instance_uid=self._instance_uid,
agent_description=self._agent_description,
remote_config_status=self._remote_config_status,
sequence_num=self._sequence_num,
effective_config=self._effective_config,
capabilities=_HANDLED_CAPABILITIES,
)
data = messages.encode_message(message)
return data
[docs] def send(self, data: bytes):
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
response = self._transport.send(
url=self._endpoint,
headers=self._headers,
data=data,
timeout_millis=self._timeout_millis,
tls_certificate=self._tls_certificate,
tls_client_certificate=self._tls_client_certificate,
tls_client_key=self._tls_client_key,
)
return response
finally:
self._sequence_num += 1
detach(token)
[docs] @staticmethod
def decode_remote_config(
remote_config: opamp_pb2.AgentRemoteConfig,
) -> Generator[tuple[str, Mapping[str, AnyValue]]]:
for config_file, config in messages.decode_remote_config(
remote_config
):
yield config_file, config