File: //var/opt/nydus/ops/mysql/opentelemetry/sdk/metrics/_internal/__init__.py
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from atexit import register, unregister
from logging import getLogger
from threading import Lock
from time import time_ns
from typing import Optional, Sequence
# This kind of import is needed to avoid Sphinx errors.
import mysql.opentelemetry.sdk.metrics
from mysql.opentelemetry.metrics import (
Counter as APICounter,
Histogram as APIHistogram,
Meter as APIMeter,
MeterProvider as APIMeterProvider,
NoOpMeter,
ObservableCounter as APIObservableCounter,
ObservableGauge as APIObservableGauge,
ObservableUpDownCounter as APIObservableUpDownCounter,
UpDownCounter as APIUpDownCounter,
)
from mysql.opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError
from mysql.opentelemetry.sdk.metrics._internal.instrument import (
_Counter,
_Histogram,
_ObservableCounter,
_ObservableGauge,
_ObservableUpDownCounter,
_UpDownCounter,
)
from mysql.opentelemetry.sdk.metrics._internal.measurement_consumer import (
MeasurementConsumer,
SynchronousMeasurementConsumer,
)
from mysql.opentelemetry.sdk.metrics._internal.sdk_configuration import SdkConfiguration
from mysql.opentelemetry.sdk.resources import Resource
from mysql.opentelemetry.sdk.util.instrumentation import InstrumentationScope
from mysql.opentelemetry.util._once import Once
_logger = getLogger(__name__)
class Meter(APIMeter):
"""See `mysql.opentelemetry.metrics.Meter`."""
def __init__(
self,
instrumentation_scope: InstrumentationScope,
measurement_consumer: MeasurementConsumer,
):
super().__init__(
name=instrumentation_scope.name,
version=instrumentation_scope.version,
schema_url=instrumentation_scope.schema_url,
)
self._instrumentation_scope = instrumentation_scope
self._measurement_consumer = measurement_consumer
self._instrument_id_instrument = {}
self._instrument_id_instrument_lock = Lock()
def create_counter(self, name, unit="", description="") -> APICounter:
(
is_instrument_registered,
instrument_id,
) = self._is_instrument_registered(name, _Counter, unit, description)
if is_instrument_registered:
# FIXME #2558 go through all views here and check if this
# instrument registration conflict can be fixed. If it can be, do
# not log the following warning.
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
APICounter.__name__,
unit,
description,
)
with self._instrument_id_instrument_lock:
return self._instrument_id_instrument[instrument_id]
instrument = _Counter(
name,
self._instrumentation_scope,
self._measurement_consumer,
unit,
description,
)
with self._instrument_id_instrument_lock:
self._instrument_id_instrument[instrument_id] = instrument
return instrument
def create_up_down_counter(self, name, unit="", description="") -> APIUpDownCounter:
(
is_instrument_registered,
instrument_id,
) = self._is_instrument_registered(name, _UpDownCounter, unit, description)
if is_instrument_registered:
# FIXME #2558 go through all views here and check if this
# instrument registration conflict can be fixed. If it can be, do
# not log the following warning.
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
APIUpDownCounter.__name__,
unit,
description,
)
with self._instrument_id_instrument_lock:
return self._instrument_id_instrument[instrument_id]
instrument = _UpDownCounter(
name,
self._instrumentation_scope,
self._measurement_consumer,
unit,
description,
)
with self._instrument_id_instrument_lock:
self._instrument_id_instrument[instrument_id] = instrument
return instrument
def create_observable_counter(
self, name, callbacks=None, unit="", description=""
) -> APIObservableCounter:
(
is_instrument_registered,
instrument_id,
) = self._is_instrument_registered(name, _ObservableCounter, unit, description)
if is_instrument_registered:
# FIXME #2558 go through all views here and check if this
# instrument registration conflict can be fixed. If it can be, do
# not log the following warning.
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
APIObservableCounter.__name__,
unit,
description,
)
with self._instrument_id_instrument_lock:
return self._instrument_id_instrument[instrument_id]
instrument = _ObservableCounter(
name,
self._instrumentation_scope,
self._measurement_consumer,
callbacks,
unit,
description,
)
self._measurement_consumer.register_asynchronous_instrument(instrument)
with self._instrument_id_instrument_lock:
self._instrument_id_instrument[instrument_id] = instrument
return instrument
def create_histogram(self, name, unit="", description="") -> APIHistogram:
(
is_instrument_registered,
instrument_id,
) = self._is_instrument_registered(name, _Histogram, unit, description)
if is_instrument_registered:
# FIXME #2558 go through all views here and check if this
# instrument registration conflict can be fixed. If it can be, do
# not log the following warning.
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
APIHistogram.__name__,
unit,
description,
)
with self._instrument_id_instrument_lock:
return self._instrument_id_instrument[instrument_id]
instrument = _Histogram(
name,
self._instrumentation_scope,
self._measurement_consumer,
unit,
description,
)
with self._instrument_id_instrument_lock:
self._instrument_id_instrument[instrument_id] = instrument
return instrument
def create_observable_gauge(
self, name, callbacks=None, unit="", description=""
) -> APIObservableGauge:
(
is_instrument_registered,
instrument_id,
) = self._is_instrument_registered(name, _ObservableGauge, unit, description)
if is_instrument_registered:
# FIXME #2558 go through all views here and check if this
# instrument registration conflict can be fixed. If it can be, do
# not log the following warning.
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
APIObservableGauge.__name__,
unit,
description,
)
with self._instrument_id_instrument_lock:
return self._instrument_id_instrument[instrument_id]
instrument = _ObservableGauge(
name,
self._instrumentation_scope,
self._measurement_consumer,
callbacks,
unit,
description,
)
self._measurement_consumer.register_asynchronous_instrument(instrument)
with self._instrument_id_instrument_lock:
self._instrument_id_instrument[instrument_id] = instrument
return instrument
def create_observable_up_down_counter(
self, name, callbacks=None, unit="", description=""
) -> APIObservableUpDownCounter:
(
is_instrument_registered,
instrument_id,
) = self._is_instrument_registered(
name, _ObservableUpDownCounter, unit, description
)
if is_instrument_registered:
# FIXME #2558 go through all views here and check if this
# instrument registration conflict can be fixed. If it can be, do
# not log the following warning.
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
APIObservableUpDownCounter.__name__,
unit,
description,
)
with self._instrument_id_instrument_lock:
return self._instrument_id_instrument[instrument_id]
instrument = _ObservableUpDownCounter(
name,
self._instrumentation_scope,
self._measurement_consumer,
callbacks,
unit,
description,
)
self._measurement_consumer.register_asynchronous_instrument(instrument)
with self._instrument_id_instrument_lock:
self._instrument_id_instrument[instrument_id] = instrument
return instrument
class MeterProvider(APIMeterProvider):
r"""See `mysql.opentelemetry.metrics.MeterProvider`.
Args:
metric_readers: Register metric readers to collect metrics from the SDK
on demand. Each :class:`mysql.opentelemetry.sdk.metrics.export.MetricReader` is
completely independent and will collect separate streams of
metrics. TODO: reference ``PeriodicExportingMetricReader`` usage with push
exporters here.
resource: The resource representing what the metrics emitted from the SDK pertain to.
shutdown_on_exit: If true, registers an `atexit` handler to call
`MeterProvider.shutdown`
views: The views to configure the metric output the SDK
By default, instruments which do not match any :class:`mysql.opentelemetry.sdk.metrics.view.View` (or if no :class:`mysql.opentelemetry.sdk.metrics.view.View`\ s
are provided) will report metrics with the default aggregation for the
instrument's kind. To disable instruments by default, configure a match-all
:class:`mysql.opentelemetry.sdk.metrics.view.View` with `DropAggregation` and then create :class:`mysql.opentelemetry.sdk.metrics.view.View`\ s to re-enable
individual instruments:
.. code-block:: python
:caption: Disable default views
MeterProvider(
views=[
View(instrument_name="*", aggregation=DropAggregation()),
View(instrument_name="mycounter"),
],
# ...
)
"""
_all_metric_readers_lock = Lock()
_all_metric_readers = set()
def __init__(
self,
metric_readers: Sequence[
"mysql.opentelemetry.sdk.metrics.export.MetricReader"
] = (),
resource: Resource = Resource.create({}),
shutdown_on_exit: bool = True,
views: Sequence["mysql.opentelemetry.sdk.metrics.view.View"] = (),
):
self._lock = Lock()
self._meter_lock = Lock()
self._atexit_handler = None
self._sdk_config = SdkConfiguration(
resource=resource,
metric_readers=metric_readers,
views=views,
)
self._measurement_consumer = SynchronousMeasurementConsumer(
sdk_config=self._sdk_config
)
if shutdown_on_exit:
self._atexit_handler = register(self.shutdown)
self._meters = {}
self._shutdown_once = Once()
self._shutdown = False
for metric_reader in self._sdk_config.metric_readers:
with self._all_metric_readers_lock:
if metric_reader in self._all_metric_readers:
raise Exception(
f"MetricReader {metric_reader} has been registered "
"already in other MeterProvider instance"
)
self._all_metric_readers.add(metric_reader)
metric_reader._set_collect_callback(self._measurement_consumer.collect)
def force_flush(self, timeout_millis: float = 10_000) -> bool:
deadline_ns = time_ns() + timeout_millis * 10**6
metric_reader_error = {}
for metric_reader in self._sdk_config.metric_readers:
current_ts = time_ns()
try:
if current_ts >= deadline_ns:
raise MetricsTimeoutError("Timed out while flushing metric readers")
metric_reader.force_flush(
timeout_millis=(deadline_ns - current_ts) / 10**6
)
# pylint: disable=broad-except
except Exception as error:
metric_reader_error[metric_reader] = error
if metric_reader_error:
metric_reader_error_string = "\n".join(
[
f"{metric_reader.__class__.__name__}: {repr(error)}"
for metric_reader, error in metric_reader_error.items()
]
)
raise Exception(
"MeterProvider.force_flush failed because the following "
"metric readers failed during collect:\n"
f"{metric_reader_error_string}"
)
return True
def shutdown(self, timeout_millis: float = 30_000):
deadline_ns = time_ns() + timeout_millis * 10**6
def _shutdown():
self._shutdown = True
did_shutdown = self._shutdown_once.do_once(_shutdown)
if not did_shutdown:
_logger.warning("shutdown can only be called once")
return
metric_reader_error = {}
for metric_reader in self._sdk_config.metric_readers:
current_ts = time_ns()
try:
if current_ts >= deadline_ns:
raise Exception("Didn't get to execute, deadline already exceeded")
metric_reader.shutdown(
timeout_millis=(deadline_ns - current_ts) / 10**6
)
# pylint: disable=broad-except
except Exception as error:
metric_reader_error[metric_reader] = error
if self._atexit_handler is not None:
unregister(self._atexit_handler)
self._atexit_handler = None
if metric_reader_error:
metric_reader_error_string = "\n".join(
[
f"{metric_reader.__class__.__name__}: {repr(error)}"
for metric_reader, error in metric_reader_error.items()
]
)
raise Exception(
(
"MeterProvider.shutdown failed because the following "
"metric readers failed during shutdown:\n"
f"{metric_reader_error_string}"
)
)
def get_meter(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> Meter:
if self._shutdown:
_logger.warning("A shutdown `MeterProvider` can not provide a `Meter`")
return NoOpMeter(name, version=version, schema_url=schema_url)
if not name:
_logger.warning("Meter name cannot be None or empty.")
return NoOpMeter(name, version=version, schema_url=schema_url)
info = InstrumentationScope(name, version, schema_url)
with self._meter_lock:
if not self._meters.get(info):
# FIXME #2558 pass SDKConfig object to meter so that the meter
# has access to views.
self._meters[info] = Meter(
info,
self._measurement_consumer,
)
return self._meters[info]