# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import atexit
import logging
import queue
import random
import threading
from typing import Any, Callable
from opentelemetry._opamp.callbacks import MessageData, OpAMPCallbacks
from opentelemetry._opamp.client import OpAMPClient
from opentelemetry._opamp.proto import opamp_pb2
logger = logging.getLogger(__name__)
def _safe_invoke(function: Callable[..., Any], *args: Any) -> None:
function_name = "<unknown>"
try:
function_name = function.__name__
function(*args)
except Exception as exc: # pylint: disable=broad-exception-caught
logger.error(
"Error when invoking function '%s'", function_name, exc_info=exc
)
class _Job:
"""
Represents a single request job, with retry/backoff metadata.
"""
def __init__(
self,
payload: Any,
max_retries: int = 1,
initial_backoff: float = 1.0,
callback: Callable[..., None] | None = None,
):
self.payload = payload
self.attempt = 0
self.max_retries = max_retries
self.initial_backoff = initial_backoff
# callback is called after OpAMP message handler is executed
self.callback = callback
def should_retry(self) -> bool:
"""Checks if we should retry again"""
return self.attempt <= self.max_retries
def delay(self) -> float:
"""Calculate the delay before next retry"""
assert self.attempt > 0
return (
self.initial_backoff
* (2 ** (self.attempt - 1))
* random.uniform(0.8, 1.2)
)
[docs]class OpAMPAgent:
"""
OpAMPAgent handles:
- periodic “heartbeat” calls enqueued at a fixed interval
- on-demand calls via send()
- exponential backoff retry on failures
- immediate cancellation of all jobs on shutdown
"""
def __init__(
self,
*,
interval: float = 30,
callbacks: OpAMPCallbacks,
max_retries: int = 10,
heartbeat_max_retries: int = 1,
initial_backoff: float = 1.0,
client: OpAMPClient,
):
"""
:param interval: seconds between heartbeat calls
:param callbacks: OpAMPCallbacks instance for receiving client events
:param max_retries: how many times to retry a failed job for ad-hoc messages
:param heartbeat_max_retries: how many times to retry an heartbeat failed job
:param initial_backoff: base seconds for exponential backoff
:param client: an OpAMPClient instance
"""
self._interval = interval
self._callbacks = callbacks
self._max_retries = max_retries
self._heartbeat_max_retries = heartbeat_max_retries
self._initial_backoff = initial_backoff
self._queue: queue.Queue[_Job] = queue.Queue()
self._stop = threading.Event()
self._worker = threading.Thread(
name="OpAMPAgentWorker", target=self._run_worker, daemon=True
)
self._scheduler = threading.Thread(
name="OpAMPAgentScheduler", target=self._run_scheduler, daemon=True
)
# start scheduling only after connection with server has been established
self._schedule = False
self._client = client
def _enable_scheduler(self):
self._schedule = True
logger.debug("Connected with endpoint, enabling heartbeat")
[docs] def start(self) -> None:
"""
Starts the scheduler and worker threads.
"""
self._stop.clear()
self._worker.start()
self._scheduler.start()
atexit.register(self.stop)
# enqueue the connection message so we can then enable heartbeat
payload = self._client.build_full_state_message()
self.send(
payload,
max_retries=self._max_retries,
callback=self._enable_scheduler,
)
[docs] def send(
self,
payload: Any,
max_retries: int | None = None,
callback: Callable[..., None] | None = None,
) -> None:
"""
Enqueue an on-demand request.
"""
if not self._worker.is_alive():
logger.warning(
"Called send() but worker thread is not alive. Worker threads is started with start()"
)
if max_retries is None:
max_retries = self._max_retries
job = _Job(
payload,
max_retries=max_retries,
initial_backoff=self._initial_backoff,
callback=callback,
)
self._queue.put(job)
logger.debug("On-demand job enqueued: %r", payload)
def _run_scheduler(self) -> None:
"""
After we made a connection, periodically enqueue “heartbeat” jobs until stop is signaled.
"""
while not self._stop.wait(self._interval):
if self._schedule:
payload = self._client.build_heartbeat_message()
job = _Job(
payload=payload,
max_retries=self._heartbeat_max_retries,
initial_backoff=self._initial_backoff,
)
self._queue.put(job)
logger.debug("Periodic job enqueued")
def _run_worker(self) -> None:
"""
Worker loop: pull jobs, attempt the message handler, retry on failure with backoff.
"""
# pylint: disable=broad-exception-caught
while not self._stop.is_set():
try:
job: _Job = self._queue.get(timeout=1)
except queue.Empty:
continue
message = None
while job.should_retry() and not self._stop.is_set():
try:
message = self._client.send(job.payload)
_safe_invoke(
self._callbacks.on_connect, self, self._client
)
logger.debug("Job succeeded: %r", job.payload)
break
except Exception as exc:
job.attempt += 1
_safe_invoke(
self._callbacks.on_connect_failed,
self,
self._client,
exc,
)
logger.warning(
"Job %r failed attempt %d/%d: %s",
job.payload,
job.attempt,
job.max_retries + 1,
exc,
)
if not job.should_retry():
logger.error(
"Job %r dropped after max retries", job.payload
)
break
# exponential backoff with +/- 20% jitter, interruptible by stop event
delay = job.delay()
logger.debug("Retrying in %.1fs", delay)
if self._stop.wait(delay):
# stop requested during backoff: abandon job
logger.debug(
"Stop signaled, abandoning job %r", job.payload
)
break
if message is not None:
self._process_message(message)
try:
if job.callback is not None:
job.callback()
except Exception as exc:
logging.warning("Callback for job failed: %s", exc)
finally:
self._queue.task_done()
def _process_message(self, message: opamp_pb2.ServerToAgent) -> None:
if message.HasField("error_response"):
_safe_invoke(
self._callbacks.on_error,
self,
self._client,
message.error_response,
)
return
if message.flags & opamp_pb2.ServerToAgentFlags_ReportFullState:
logger.debug("Server requested full state report")
payload = self._client.build_full_state_message()
self.send(payload)
msg_data = MessageData.from_server_message(message)
_safe_invoke(
self._callbacks.on_message,
self,
self._client,
msg_data,
)
[docs] def stop(self, timeout: float | None = None) -> None:
"""
Signal server we are disconnecting and then threads to exit
:param timeout: seconds to wait for each thread to join
"""
# Before exiting send signal the server we are disconnecting to free our resources
# This is not required by the spec but is helpful in practice
logger.debug("Stopping OpAMPAgent: sending AgentDisconnect")
payload = self._client.build_agent_disconnect_message()
try:
self._client.send(payload)
except Exception: # pylint: disable=broad-exception-caught
logger.debug(
"Stopping OpAMPAgent: failed to send AgentDisconnect message"
)
logger.debug("Stopping OpAMPAgent: signaling threads")
# Signal threads to exit
self._stop.set()
# don't crash if the user calls stop() before start() or calls stop() multiple times
try:
self._worker.join(timeout=timeout)
except RuntimeError as exc:
logger.warning(
"Stopping OpAMPAgent: worker thread failed to join %s", exc
)
try:
self._scheduler.join(timeout=timeout)
except RuntimeError as exc:
logger.warning(
"Stopping OpAMPAgent: scheduler thread failed to join %s", exc
)
logger.debug("OpAMPAgent stopped")