# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Instrument `Boto`_ to trace service requests.
There are two options for instrumenting code. The first option is to use the
``opentelemetry-instrument`` executable which will automatically
instrument your Boto client. The second is to programmatically enable
instrumentation via the following code:
.. _boto: https://pypi.org/project/boto/
Usage
-----
.. code:: python
from opentelemetry.instrumentation.boto import BotoInstrumentor
import boto
# Instrument Boto
BotoInstrumentor().instrument()
# This will create a span with Boto-specific attributes
ec2 = boto.ec2.connect_to_region("us-west-2")
ec2.get_all_instances()
API
---
"""
import logging
from inspect import currentframe
from typing import Collection
from boto.connection import AWSAuthConnection, AWSQueryConnection
from wrapt import wrap_function_wrapper
from opentelemetry.instrumentation.boto.package import _instruments
from opentelemetry.instrumentation.boto.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer
logger = logging.getLogger(__name__)
SERVICE_PARAMS_BLOCK_LIST = {"s3": ["params.Body"]}
def _get_instance_region_name(instance):
region = getattr(instance, "region", None)
if not region:
return None
if isinstance(region, str):
return region.split(":")[1]
return region.name
[docs]class BotoInstrumentor(BaseInstrumentor):
"""A instrumentor for Boto
See `BaseInstrumentor`
"""
def __init__(self):
super().__init__()
self._original_boto = None
[docs] def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
def _instrument(self, **kwargs):
# AWSQueryConnection and AWSAuthConnection are two different classes
# called by different services for connection.
# For example EC2 uses AWSQueryConnection and S3 uses
# AWSAuthConnection
# pylint: disable=attribute-defined-outside-init
self._tracer = get_tracer(
__name__,
__version__,
kwargs.get("tracer_provider"),
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
wrap_function_wrapper(
"boto.connection",
"AWSQueryConnection.make_request",
self._patched_query_request,
)
wrap_function_wrapper(
"boto.connection",
"AWSAuthConnection.make_request",
self._patched_auth_request,
)
def _uninstrument(self, **kwargs):
unwrap(AWSQueryConnection, "make_request")
unwrap(AWSAuthConnection, "make_request")
def _common_request( # pylint: disable=too-many-locals
self,
args_name,
traced_args,
operation_name,
original_func,
instance,
args,
kwargs,
):
endpoint_name = getattr(instance, "host").split(".")[0]
with self._tracer.start_as_current_span(
f"{endpoint_name}.command",
kind=SpanKind.CONSUMER,
) as span:
span.set_attribute("endpoint", endpoint_name)
if args:
http_method = args[0]
span.set_attribute("http_method", http_method.lower())
# Original func returns a boto.connection.HTTPResponse object
result = original_func(*args, **kwargs)
if span.is_recording():
add_span_arg_tags(
span,
endpoint_name,
args,
args_name,
traced_args,
)
# Obtaining region name
region_name = _get_instance_region_name(instance)
meta = {
"aws.agent": "boto",
"aws.operation": operation_name,
}
if region_name:
meta["aws.region"] = region_name
for key, value in meta.items():
span.set_attribute(key, value)
span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE, getattr(result, "status")
)
span.set_attribute(
SpanAttributes.HTTP_METHOD, getattr(result, "_method")
)
return result
def _patched_query_request(self, original_func, instance, args, kwargs):
return self._common_request(
("operation_name", "params", "path", "verb"),
["operation_name", "params", "path"],
args[0] if args else None,
original_func,
instance,
args,
kwargs,
)
def _patched_auth_request(self, original_func, instance, args, kwargs):
frame = currentframe().f_back
operation_name = None
while frame:
if frame.f_code.co_name == "make_request":
operation_name = frame.f_back.f_code.co_name
break
frame = frame.f_back
return self._common_request(
(
"method",
"path",
"headers",
"data",
"host",
"auth_path",
"sender",
),
["path", "data", "host"],
operation_name,
original_func,
instance,
args,
kwargs,
)
[docs]def flatten_dict(dict_, sep=".", prefix=""):
"""
Returns a normalized dict of depth 1 with keys in order of embedding
"""
# NOTE: This should probably be in `opentelemetry.instrumentation.utils`.
# adapted from https://stackoverflow.com/a/19647596
return (
{
prefix + sep + k if prefix else k: v
for kk, vv in dict_.items()
for k, v in flatten_dict(vv, sep, kk).items()
}
if isinstance(dict_, dict)
else {prefix: dict_}
)