# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
# pylint: disable=too-many-lines
"""
Instrument to report system (CPU, memory, network) and
process (CPU, memory, garbage collection) metrics. By default, the
following metrics are configured:
.. code:: python
{
"system.cpu.time": ["idle", "user", "system", "irq"],
"system.cpu.utilization": ["idle", "user", "system", "irq"],
"system.memory.usage": ["used", "free", "cached"],
"system.memory.utilization": ["used", "free", "cached"],
"system.swap.usage": ["used", "free"],
"system.swap.utilization": ["used", "free"],
"system.disk.io": ["read", "write"],
"system.disk.operations": ["read", "write"],
"system.disk.time": ["read", "write"],
"system.network.dropped.packets": ["transmit", "receive"],
"system.network.packets": ["transmit", "receive"],
"system.network.errors": ["transmit", "receive"],
"system.network.io": ["transmit", "receive"],
"system.network.connections": ["family", "type"],
"system.thread_count": None
"process.context_switches": ["involuntary", "voluntary"],
"process.cpu.time": ["user", "system"],
"process.cpu.utilization": None,
"process.memory.usage": None,
"process.memory.virtual": None,
"process.disk.io": ["read", "write"],
"process.open_file_descriptor.count": None,
"process.thread.count": None,
"process.runtime.memory": ["rss", "vms"],
"process.runtime.cpu.time": ["user", "system"],
"process.runtime.gc_count": None,
"cpython.gc.collections": None,
"cpython.gc.collected_objects": None,
"cpython.gc.uncollectable_objects": None,
"process.runtime.thread_count": None,
"process.runtime.cpu.utilization": None,
"process.runtime.context_switches": ["involuntary", "voluntary"],
}
Usage
-----
.. code:: python
from opentelemetry.metrics import set_meter_provider
from opentelemetry.instrumentation.system_metrics import SystemMetricsInstrumentor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
exporter = ConsoleMetricExporter()
set_meter_provider(MeterProvider([PeriodicExportingMetricReader(exporter)]))
SystemMetricsInstrumentor().instrument()
# metrics are collected asynchronously
input("...")
# to configure custom metrics
configuration = {
"system.memory.usage": ["used", "free", "cached"],
"system.cpu.time": ["idle", "user", "system", "irq"],
"system.network.io": ["transmit", "receive"],
"process.memory.usage": None,
"process.memory.virtual": None,
"process.cpu.time": ["user", "system"],
"process.context_switches": ["involuntary", "voluntary"],
}
SystemMetricsInstrumentor(config=configuration).instrument()
Out-of-spec `process.runtime` prefixed metrics are deprecated and will be removed in future versions, users are encouraged to move
to the `process` metrics.
API
---
"""
from __future__ import annotations
import fnmatch
import gc
import logging
import os
import sys
import threading
from platform import python_implementation
from typing import Any, Collection, Iterable
import psutil
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.system_metrics.environment_variables import (
OTEL_PYTHON_SYSTEM_METRICS_EXCLUDED_METRICS,
)
from opentelemetry.instrumentation.system_metrics.package import _instruments
from opentelemetry.instrumentation.system_metrics.version import __version__
from opentelemetry.metrics import CallbackOptions, Observation, get_meter
from opentelemetry.semconv._incubating.attributes.cpython_attributes import (
CPYTHON_GC_GENERATION,
)
from opentelemetry.semconv._incubating.metrics.process_metrics import (
create_process_cpu_utilization,
)
_logger = logging.getLogger(__name__)
_DEFAULT_CONFIG: dict[str, list[str] | None] = {
"system.cpu.time": ["idle", "user", "system", "irq"],
"system.cpu.utilization": ["idle", "user", "system", "irq"],
"system.memory.usage": ["used", "free", "cached"],
"system.memory.utilization": ["used", "free", "cached"],
"system.swap.usage": ["used", "free"],
"system.swap.utilization": ["used", "free"],
"system.disk.io": ["read", "write"],
"system.disk.operations": ["read", "write"],
"system.disk.time": ["read", "write"],
"system.network.dropped.packets": ["transmit", "receive"],
"system.network.packets": ["transmit", "receive"],
"system.network.errors": ["transmit", "receive"],
"system.network.io": ["transmit", "receive"],
"system.network.connections": ["family", "type"],
"system.thread_count": None,
"process.context_switches": ["involuntary", "voluntary"],
"process.cpu.time": ["user", "system"],
"process.cpu.utilization": ["user", "system"],
"process.memory.usage": None,
"process.memory.virtual": None,
"process.open_file_descriptor.count": None,
"process.thread.count": None,
"process.disk.io": ["read", "write"],
"process.runtime.memory": ["rss", "vms"],
"process.runtime.cpu.time": ["user", "system"],
"process.runtime.gc_count": None,
"cpython.gc.collections": None,
"cpython.gc.collected_objects": None,
"cpython.gc.uncollectable_objects": None,
"process.runtime.thread_count": None,
"process.runtime.cpu.utilization": None,
"process.runtime.context_switches": ["involuntary", "voluntary"],
}
if sys.platform == "darwin":
# see https://github.com/giampaolo/psutil/issues/1219
_DEFAULT_CONFIG.pop("system.network.connections")
def _build_default_config() -> dict[str, list[str] | None]:
excluded_metrics: list[str] = [
pat.strip()
for pat in os.environ.get(
OTEL_PYTHON_SYSTEM_METRICS_EXCLUDED_METRICS, ""
).split(",")
if pat.strip()
]
if excluded_metrics:
return {
key: value
for key, value in _DEFAULT_CONFIG.items()
if not any(fnmatch.fnmatch(key, pat) for pat in excluded_metrics)
}
return _DEFAULT_CONFIG
[docs]class SystemMetricsInstrumentor(BaseInstrumentor):
def __init__(
self,
labels: dict[str, str] | None = None,
config: dict[str, list[str] | None] | None = None,
):
super().__init__()
self._config = _build_default_config() if config is None else config
self._labels = {} if labels is None else labels
self._meter = None
self._python_implementation = python_implementation().lower()
self._proc = psutil.Process(os.getpid())
self._system_cpu_time_labels = self._labels.copy()
self._system_cpu_utilization_labels = self._labels.copy()
self._system_memory_usage_labels = self._labels.copy()
self._system_memory_utilization_labels = self._labels.copy()
self._system_swap_usage_labels = self._labels.copy()
self._system_swap_utilization_labels = self._labels.copy()
self._system_disk_io_labels = self._labels.copy()
self._system_disk_operations_labels = self._labels.copy()
self._system_disk_time_labels = self._labels.copy()
self._system_disk_merged_labels = self._labels.copy()
self._system_network_dropped_packets_labels = self._labels.copy()
self._system_network_packets_labels = self._labels.copy()
self._system_network_errors_labels = self._labels.copy()
self._system_network_io_labels = self._labels.copy()
self._system_network_connections_labels = self._labels.copy()
self._system_thread_count_labels = self._labels.copy()
self._context_switches_labels = self._labels.copy()
self._cpu_time_labels = self._labels.copy()
self._cpu_utilization_labels = self._labels.copy()
self._memory_usage_labels = self._labels.copy()
self._memory_virtual_labels = self._labels.copy()
self._process_disk_io_labels = self._labels.copy()
self._open_file_descriptor_count_labels = self._labels.copy()
self._thread_count_labels = self._labels.copy()
self._runtime_memory_labels = self._labels.copy()
self._runtime_cpu_time_labels = self._labels.copy()
self._runtime_gc_count_labels = self._labels.copy()
self._runtime_gc_collections_labels = self._labels.copy()
self._runtime_gc_collected_objects_labels = self._labels.copy()
self._runtime_gc_uncollectable_objects_labels = self._labels.copy()
self._runtime_thread_count_labels = self._labels.copy()
self._runtime_cpu_utilization_labels = self._labels.copy()
self._runtime_context_switches_labels = self._labels.copy()
[docs] def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
def _instrument(self, **kwargs: Any):
# pylint: disable=too-many-branches,too-many-statements
meter_provider = kwargs.get("meter_provider")
self._meter = get_meter(
__name__,
__version__,
meter_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
# system metrics
if "system.cpu.time" in self._config:
self._meter.create_observable_counter(
name="system.cpu.time",
callbacks=[self._get_system_cpu_time],
description="System CPU time",
unit="s",
)
# FIXME: double check this is divided by cpu core
if "system.cpu.utilization" in self._config:
self._meter.create_observable_gauge(
name="system.cpu.utilization",
callbacks=[self._get_system_cpu_utilization],
description="System CPU utilization",
unit="1",
)
if "system.memory.usage" in self._config:
self._meter.create_observable_gauge(
name="system.memory.usage",
callbacks=[self._get_system_memory_usage],
description="System memory usage",
unit="By",
)
if "system.memory.utilization" in self._config:
self._meter.create_observable_gauge(
name="system.memory.utilization",
callbacks=[self._get_system_memory_utilization],
description="System memory utilization",
unit="1",
)
# FIXME: system.swap is gone in favour of system.paging
if "system.swap.usage" in self._config:
self._meter.create_observable_gauge(
name="system.swap.usage",
callbacks=[self._get_system_swap_usage],
description="System swap usage",
unit="pages",
)
if "system.swap.utilization" in self._config:
self._meter.create_observable_gauge(
name="system.swap.utilization",
callbacks=[self._get_system_swap_utilization],
description="System swap utilization",
unit="1",
)
# TODO Add _get_system_swap_page_faults
# self._meter.create_observable_counter(
# name="system.swap.page_faults",
# callbacks=[self._get_system_swap_page_faults],
# description="System swap page faults",
# unit="faults",
# value_type=int,
# )
# TODO Add _get_system_swap_page_operations
# self._meter.create_observable_counter(
# name="system.swap.page_operations",
# callbacks=self._get_system_swap_page_operations,
# description="System swap page operations",
# unit="operations",
# value_type=int,
# )
if "system.disk.io" in self._config:
self._meter.create_observable_counter(
name="system.disk.io",
callbacks=[self._get_system_disk_io],
description="System disk IO",
unit="By",
)
if "system.disk.operations" in self._config:
self._meter.create_observable_counter(
name="system.disk.operations",
callbacks=[self._get_system_disk_operations],
description="System disk operations",
unit="operations",
)
# FIXME: this has been replaced by system.disk.operation.time
if "system.disk.time" in self._config:
self._meter.create_observable_counter(
name="system.disk.time",
callbacks=[self._get_system_disk_time],
description="System disk time",
unit="s",
)
# TODO Add _get_system_filesystem_usage
# self.accumulator.register_valueobserver(
# callback=self._get_system_filesystem_usage,
# name="system.filesystem.usage",
# description="System filesystem usage",
# unit="By",
# value_type=int,
# )
# TODO Add _get_system_filesystem_utilization
# self._meter.create_observable_gauge(
# callback=self._get_system_filesystem_utilization,
# name="system.filesystem.utilization",
# description="System filesystem utilization",
# unit="1",
# value_type=float,
# )
# TODO Filesystem information can be obtained with os.statvfs in Unix-like
# OSs, how to do the same in Windows?
# FIXME: this is now just system.network.dropped
if "system.network.dropped.packets" in self._config:
self._meter.create_observable_counter(
name="system.network.dropped_packets",
callbacks=[self._get_system_network_dropped_packets],
description="System network dropped_packets",
unit="packets",
)
if "system.network.packets" in self._config:
self._meter.create_observable_counter(
name="system.network.packets",
callbacks=[self._get_system_network_packets],
description="System network packets",
unit="packets",
)
if "system.network.errors" in self._config:
self._meter.create_observable_counter(
name="system.network.errors",
callbacks=[self._get_system_network_errors],
description="System network errors",
unit="errors",
)
if "system.network.io" in self._config:
self._meter.create_observable_counter(
name="system.network.io",
callbacks=[self._get_system_network_io],
description="System network io",
unit="By",
)
if "system.network.connections" in self._config:
self._meter.create_observable_up_down_counter(
name="system.network.connections",
callbacks=[self._get_system_network_connections],
description="System network connections",
unit="connections",
)
# FIXME: this is gone
if "system.thread_count" in self._config:
self._meter.create_observable_gauge(
name="system.thread_count",
callbacks=[self._get_system_thread_count],
description="System active threads count",
)
# process metrics
if "process.cpu.time" in self._config:
self._meter.create_observable_counter(
name="process.cpu.time",
callbacks=[self._get_cpu_time],
description="Total CPU seconds broken down by different states.",
unit="s",
)
if "process.cpu.utilization" in self._config:
create_process_cpu_utilization(
self._meter, callbacks=[self._get_cpu_utilization]
)
if (
"process.context_switches" in self._config
and self._can_read_context_switches()
):
self._meter.create_observable_counter(
name="process.context_switches",
callbacks=[self._get_context_switches],
description="Number of times the process has been context switched.",
)
if "process.memory.usage" in self._config:
self._meter.create_observable_up_down_counter(
name="process.memory.usage",
callbacks=[self._get_memory_usage],
description="The amount of physical memory in use.",
unit="By",
)
if "process.memory.virtual" in self._config:
self._meter.create_observable_up_down_counter(
name="process.memory.virtual",
callbacks=[self._get_memory_virtual],
description="The amount of committed virtual memory.",
unit="By",
)
if (
sys.platform != "win32"
and "process.open_file_descriptor.count" in self._config
):
self._meter.create_observable_up_down_counter(
name="process.open_file_descriptor.count",
callbacks=[self._get_open_file_descriptors],
description="Number of file descriptors in use by the process.",
)
if "process.thread.count" in self._config:
self._meter.create_observable_up_down_counter(
name="process.thread.count",
callbacks=[self._get_thread_count],
description="Process threads count.",
)
if "process.disk.io" in self._config:
self._meter.create_observable_counter(
name="process.disk.io",
callbacks=[self._get_process_disk_io],
description="Disk bytes transferred for the process.",
unit="By",
)
# FIXME: process.runtime keys are deprecated and will be removed in subsequent releases.
# When removing them, remember to clean also the callbacks and labels
if "process.runtime.memory" in self._config:
self._meter.create_observable_up_down_counter(
name=f"process.runtime.{self._python_implementation}.memory",
callbacks=[self._get_runtime_memory],
description=f"Runtime {self._python_implementation} memory",
unit="By",
)
if "process.runtime.cpu.time" in self._config:
self._meter.create_observable_counter(
name=f"process.runtime.{self._python_implementation}.cpu_time",
callbacks=[self._get_runtime_cpu_time],
description=f"Runtime {self._python_implementation} CPU time",
unit="s",
)
if "process.runtime.gc_count" in self._config:
if self._python_implementation == "pypy":
_logger.warning(
"The process.runtime.gc_count metric won't be collected because the interpreter is PyPy"
)
else:
self._meter.create_observable_counter(
name=f"process.runtime.{self._python_implementation}.gc_count",
callbacks=[self._get_runtime_gc_count],
description=f"Runtime {self._python_implementation} GC count",
unit="By",
)
if "cpython.gc.collections" in self._config:
if self._python_implementation == "pypy":
_logger.warning(
"The cpython.gc.collections metric won't be collected because the interpreter is PyPy"
)
else:
self._meter.create_observable_counter(
name="cpython.gc.collections",
callbacks=[self._get_runtime_gc_collections],
description="The number of times a generation was collected since interpreter start.",
unit="{collection}",
)
if "cpython.gc.collected_objects" in self._config:
if self._python_implementation == "pypy":
_logger.warning(
"The cpython.gc.collected_objects metric won't be collected because the interpreter is PyPy"
)
else:
self._meter.create_observable_counter(
name="cpython.gc.collected_objects",
callbacks=[self._get_runtime_gc_collected_objects],
description="The total number of objects collected since interpreter start.",
unit="{object}",
)
if "cpython.gc.uncollectable_objects" in self._config:
if self._python_implementation == "pypy":
_logger.warning(
"The cpython.gc.uncollectable_objects metric won't be collected because the interpreter is PyPy"
)
else:
self._meter.create_observable_counter(
name="cpython.gc.uncollectable_objects",
callbacks=[self._get_runtime_gc_uncollectable_objects],
description="The total number of uncollectable objects found since interpreter start.",
unit="{object}",
)
if "process.runtime.thread_count" in self._config:
self._meter.create_observable_up_down_counter(
name=f"process.runtime.{self._python_implementation}.thread_count",
callbacks=[self._get_runtime_thread_count],
description="Runtime active threads count",
)
if "process.runtime.cpu.utilization" in self._config:
self._meter.create_observable_gauge(
name=f"process.runtime.{self._python_implementation}.cpu.utilization",
callbacks=[self._get_runtime_cpu_utilization],
description="Runtime CPU utilization",
unit="1",
)
if (
"process.runtime.context_switches" in self._config
and self._can_read_context_switches()
):
self._meter.create_observable_counter(
name=f"process.runtime.{self._python_implementation}.context_switches",
callbacks=[self._get_runtime_context_switches],
description="Runtime context switches",
unit="switches",
)
def _uninstrument(self, **kwargs: Any):
pass
def _can_read_context_switches(self) -> bool:
"""On Google Cloud Run psutil is not able to read context switches, catch it before creating the observable instrument"""
try:
self._proc.num_ctx_switches()
return True
except NotImplementedError:
return False
def _get_open_file_descriptors(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for Number of file descriptors in use by the process"""
yield Observation(
self._proc.num_fds(),
self._open_file_descriptor_count_labels.copy(),
)
def _get_system_cpu_time(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for system CPU time"""
for cpu, times in enumerate(psutil.cpu_times(percpu=True)):
for metric in self._config["system.cpu.time"]:
if hasattr(times, metric):
self._system_cpu_time_labels["state"] = metric
self._system_cpu_time_labels["cpu"] = cpu + 1
yield Observation(
getattr(times, metric),
self._system_cpu_time_labels.copy(),
)
def _get_system_cpu_utilization(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for system CPU utilization"""
for cpu, times_percent in enumerate(
psutil.cpu_times_percent(percpu=True)
):
for metric in self._config["system.cpu.utilization"]:
if hasattr(times_percent, metric):
self._system_cpu_utilization_labels["state"] = metric
self._system_cpu_utilization_labels["cpu"] = cpu + 1
yield Observation(
getattr(times_percent, metric) / 100,
self._system_cpu_utilization_labels.copy(),
)
def _get_system_memory_usage(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for memory usage"""
virtual_memory = psutil.virtual_memory()
for metric in self._config["system.memory.usage"]:
self._system_memory_usage_labels["state"] = metric
if hasattr(virtual_memory, metric):
yield Observation(
getattr(virtual_memory, metric),
self._system_memory_usage_labels.copy(),
)
def _get_system_memory_utilization(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for memory utilization"""
system_memory = psutil.virtual_memory()
for metric in self._config["system.memory.utilization"]:
self._system_memory_utilization_labels["state"] = metric
if hasattr(system_memory, metric):
yield Observation(
getattr(system_memory, metric) / system_memory.total,
self._system_memory_utilization_labels.copy(),
)
def _get_system_swap_usage(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for swap usage"""
system_swap = psutil.swap_memory()
for metric in self._config["system.swap.usage"]:
self._system_swap_usage_labels["state"] = metric
if hasattr(system_swap, metric):
yield Observation(
getattr(system_swap, metric),
self._system_swap_usage_labels.copy(),
)
def _get_system_swap_utilization(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for swap utilization"""
system_swap = psutil.swap_memory()
for metric in self._config["system.swap.utilization"]:
if hasattr(system_swap, metric):
self._system_swap_utilization_labels["state"] = metric
yield Observation(
(
getattr(system_swap, metric) / system_swap.total
if system_swap.total
else 0
),
self._system_swap_utilization_labels.copy(),
)
def _get_system_disk_io(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for disk IO"""
for device, counters in psutil.disk_io_counters(perdisk=True).items():
for metric in self._config["system.disk.io"]:
if hasattr(counters, f"{metric}_bytes"):
self._system_disk_io_labels["device"] = device
self._system_disk_io_labels["direction"] = metric
yield Observation(
getattr(counters, f"{metric}_bytes"),
self._system_disk_io_labels.copy(),
)
def _get_system_disk_operations(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for disk operations"""
for device, counters in psutil.disk_io_counters(perdisk=True).items():
for metric in self._config["system.disk.operations"]:
if hasattr(counters, f"{metric}_count"):
self._system_disk_operations_labels["device"] = device
self._system_disk_operations_labels["direction"] = metric
yield Observation(
getattr(counters, f"{metric}_count"),
self._system_disk_operations_labels.copy(),
)
def _get_system_disk_time(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for disk time"""
for device, counters in psutil.disk_io_counters(perdisk=True).items():
for metric in self._config["system.disk.time"]:
if hasattr(counters, f"{metric}_time"):
self._system_disk_time_labels["device"] = device
self._system_disk_time_labels["direction"] = metric
yield Observation(
getattr(counters, f"{metric}_time") / 1000,
self._system_disk_time_labels.copy(),
)
def _get_system_disk_merged(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for disk merged operations"""
# FIXME The units in the spec is 1, it seems like it should be
# operations or the value type should be Double
for device, counters in psutil.disk_io_counters(perdisk=True).items():
for metric in self._config["system.disk.time"]:
if hasattr(counters, f"{metric}_merged_count"):
self._system_disk_merged_labels["device"] = device
self._system_disk_merged_labels["direction"] = metric
yield Observation(
getattr(counters, f"{metric}_merged_count"),
self._system_disk_merged_labels.copy(),
)
def _get_system_network_dropped_packets(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for network dropped packets"""
for device, counters in psutil.net_io_counters(pernic=True).items():
for metric in self._config["system.network.dropped.packets"]:
in_out = {"receive": "in", "transmit": "out"}[metric]
if hasattr(counters, f"drop{in_out}"):
self._system_network_dropped_packets_labels["device"] = (
device
)
self._system_network_dropped_packets_labels[
"direction"
] = metric
yield Observation(
getattr(counters, f"drop{in_out}"),
self._system_network_dropped_packets_labels.copy(),
)
def _get_system_network_packets(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for network packets"""
for device, counters in psutil.net_io_counters(pernic=True).items():
for metric in self._config["system.network.packets"]:
recv_sent = {"receive": "recv", "transmit": "sent"}[metric]
if hasattr(counters, f"packets_{recv_sent}"):
self._system_network_packets_labels["device"] = device
self._system_network_packets_labels["direction"] = metric
yield Observation(
getattr(counters, f"packets_{recv_sent}"),
self._system_network_packets_labels.copy(),
)
def _get_system_network_errors(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for network errors"""
for device, counters in psutil.net_io_counters(pernic=True).items():
for metric in self._config["system.network.errors"]:
in_out = {"receive": "in", "transmit": "out"}[metric]
if hasattr(counters, f"err{in_out}"):
self._system_network_errors_labels["device"] = device
self._system_network_errors_labels["direction"] = metric
yield Observation(
getattr(counters, f"err{in_out}"),
self._system_network_errors_labels.copy(),
)
def _get_system_network_io(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for network IO"""
for device, counters in psutil.net_io_counters(pernic=True).items():
for metric in self._config["system.network.io"]:
recv_sent = {"receive": "recv", "transmit": "sent"}[metric]
if hasattr(counters, f"bytes_{recv_sent}"):
self._system_network_io_labels["device"] = device
self._system_network_io_labels["direction"] = metric
yield Observation(
getattr(counters, f"bytes_{recv_sent}"),
self._system_network_io_labels.copy(),
)
def _get_system_network_connections(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for network connections"""
# TODO How to find the device identifier for a particular
# connection?
connection_counters = {}
for net_connection in psutil.net_connections():
for metric in self._config["system.network.connections"]:
self._system_network_connections_labels["protocol"] = {
1: "tcp",
2: "udp",
}[net_connection.type.value]
self._system_network_connections_labels["state"] = (
net_connection.status
)
self._system_network_connections_labels[metric] = getattr(
net_connection, metric
)
connection_counters_key = tuple(
sorted(self._system_network_connections_labels.items())
)
if connection_counters_key in connection_counters:
connection_counters[connection_counters_key]["counter"] += 1
else:
connection_counters[connection_counters_key] = {
"counter": 1,
"labels": self._system_network_connections_labels.copy(),
}
for connection_counter in connection_counters.values():
yield Observation(
connection_counter["counter"],
connection_counter["labels"],
)
def _get_system_thread_count(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for active thread count"""
yield Observation(
threading.active_count(), self._system_thread_count_labels
)
# process callbacks
def _get_context_switches(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for context switches"""
ctx_switches = self._proc.num_ctx_switches()
for metric in self._config["process.context_switches"]:
if hasattr(ctx_switches, metric):
self._context_switches_labels["type"] = metric
yield Observation(
getattr(ctx_switches, metric),
self._context_switches_labels.copy(),
)
def _get_cpu_time(self, options: CallbackOptions) -> Iterable[Observation]:
"""Observer callback for CPU time"""
proc_cpu = self._proc.cpu_times()
for metric in self._config["process.cpu.time"]:
if hasattr(proc_cpu, metric):
self._cpu_time_labels["type"] = metric
yield Observation(
getattr(proc_cpu, metric),
self._cpu_time_labels.copy(),
)
def _get_cpu_utilization(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for CPU utilization"""
proc_cpu_percent = self._proc.cpu_percent()
# may return None so add a default of 1 in case
num_cpus = psutil.cpu_count() or 1
yield Observation(
proc_cpu_percent / 100 / num_cpus,
self._cpu_utilization_labels.copy(),
)
def _get_memory_usage(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for memory usage"""
proc_memory = self._proc.memory_info()
if hasattr(proc_memory, "rss"):
yield Observation(
getattr(proc_memory, "rss"),
self._memory_usage_labels.copy(),
)
def _get_memory_virtual(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for memory virtual"""
proc_memory = self._proc.memory_info()
if hasattr(proc_memory, "vms"):
yield Observation(
getattr(proc_memory, "vms"),
self._memory_virtual_labels.copy(),
)
def _get_thread_count(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for active thread count"""
yield Observation(
self._proc.num_threads(), self._thread_count_labels.copy()
)
def _get_process_disk_io(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for process disk IO"""
try:
proc_disk = self._proc.io_counters()
except (AttributeError, NotImplementedError, PermissionError):
# io_counters() is not available on all platforms (e.g. macOS)
return
for metric in self._config["process.disk.io"]:
if metric == "read":
self._process_disk_io_labels["direction"] = "read"
yield Observation(
proc_disk.read_bytes, self._process_disk_io_labels.copy()
)
elif metric == "write":
self._process_disk_io_labels["direction"] = "write"
yield Observation(
proc_disk.write_bytes, self._process_disk_io_labels.copy()
)
# runtime callbacks
def _get_runtime_memory(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for runtime memory"""
proc_memory = self._proc.memory_info()
for metric in self._config["process.runtime.memory"]:
if hasattr(proc_memory, metric):
self._runtime_memory_labels["type"] = metric
yield Observation(
getattr(proc_memory, metric),
self._runtime_memory_labels.copy(),
)
def _get_runtime_cpu_time(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for runtime CPU time"""
proc_cpu = self._proc.cpu_times()
for metric in self._config["process.runtime.cpu.time"]:
if hasattr(proc_cpu, metric):
self._runtime_cpu_time_labels["type"] = metric
yield Observation(
getattr(proc_cpu, metric),
self._runtime_cpu_time_labels.copy(),
)
def _get_runtime_gc_count(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for garbage collection"""
for index, count in enumerate(gc.get_count()):
self._runtime_gc_count_labels["count"] = str(index)
yield Observation(count, self._runtime_gc_count_labels.copy())
def _get_runtime_gc_collections(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for garbage collection"""
for index, stat in enumerate(gc.get_stats()):
self._runtime_gc_collections_labels[CPYTHON_GC_GENERATION] = index
# TODO: remove this a few releases after 1.40.0
self._runtime_gc_collections_labels["generation"] = str(index)
yield Observation(
stat["collections"], self._runtime_gc_collections_labels.copy()
)
def _get_runtime_gc_collected_objects(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for garbage collection collected objects"""
for index, stat in enumerate(gc.get_stats()):
self._runtime_gc_collected_objects_labels[
CPYTHON_GC_GENERATION
] = index
# TODO: remove this a few releases after 1.40.0
self._runtime_gc_collected_objects_labels["generation"] = str(
index
)
yield Observation(
stat["collected"],
self._runtime_gc_collected_objects_labels.copy(),
)
def _get_runtime_gc_uncollectable_objects(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for garbage collection uncollectable objects"""
for index, stat in enumerate(gc.get_stats()):
self._runtime_gc_uncollectable_objects_labels[
CPYTHON_GC_GENERATION
] = index
# TODO: remove this a few releases after 1.40.0
self._runtime_gc_uncollectable_objects_labels["generation"] = str(
index
)
yield Observation(
stat["uncollectable"],
self._runtime_gc_uncollectable_objects_labels.copy(),
)
def _get_runtime_thread_count(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for runtime active thread count"""
yield Observation(
self._proc.num_threads(), self._runtime_thread_count_labels.copy()
)
def _get_runtime_cpu_utilization(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for runtime CPU utilization"""
proc_cpu_percent = self._proc.cpu_percent()
yield Observation(
proc_cpu_percent / 100,
self._runtime_cpu_utilization_labels.copy(),
)
def _get_runtime_context_switches(
self, options: CallbackOptions
) -> Iterable[Observation]:
"""Observer callback for runtime context switches"""
ctx_switches = self._proc.num_ctx_switches()
for metric in self._config["process.runtime.context_switches"]:
if hasattr(ctx_switches, metric):
self._runtime_context_switches_labels["type"] = metric
yield Observation(
getattr(ctx_switches, metric),
self._runtime_context_switches_labels.copy(),
)