File: //var/opt/nydus/ops/mysql/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from logging import getLogger
from threading import RLock
from time import time_ns
from typing import Dict, List
from mysql.opentelemetry.metrics import (
Asynchronous,
Counter,
Instrument,
ObservableCounter,
)
from mysql.opentelemetry.sdk.metrics._internal._view_instrument_match import (
_ViewInstrumentMatch,
)
from mysql.opentelemetry.sdk.metrics._internal.aggregation import (
Aggregation,
ExplicitBucketHistogramAggregation,
_DropAggregation,
_ExplicitBucketHistogramAggregation,
_ExponentialBucketHistogramAggregation,
_LastValueAggregation,
_SumAggregation,
)
from mysql.opentelemetry.sdk.metrics._internal.export import AggregationTemporality
from mysql.opentelemetry.sdk.metrics._internal.measurement import Measurement
from mysql.opentelemetry.sdk.metrics._internal.point import (
ExponentialHistogram,
Gauge,
Histogram,
Metric,
MetricsData,
ResourceMetrics,
ScopeMetrics,
Sum,
)
from mysql.opentelemetry.sdk.metrics._internal.sdk_configuration import SdkConfiguration
from mysql.opentelemetry.sdk.metrics._internal.view import View
from mysql.opentelemetry.sdk.util.instrumentation import InstrumentationScope
_logger = getLogger(__name__)
_DEFAULT_VIEW = View(instrument_name="")
class MetricReaderStorage:
"""The SDK's storage for a given reader"""
def __init__(
self,
sdk_config: SdkConfiguration,
instrument_class_temporality: Dict[type, AggregationTemporality],
instrument_class_aggregation: Dict[type, Aggregation],
) -> None:
self._lock = RLock()
self._sdk_config = sdk_config
self._instrument_view_instrument_matches: Dict[
Instrument, List[_ViewInstrumentMatch]
] = {}
self._instrument_class_temporality = instrument_class_temporality
self._instrument_class_aggregation = instrument_class_aggregation
def _get_or_init_view_instrument_match(
self, instrument: Instrument
) -> List[_ViewInstrumentMatch]:
# Optimistically get the relevant views for the given instrument. Once set for a given
# instrument, the mapping will never change
if instrument in self._instrument_view_instrument_matches:
return self._instrument_view_instrument_matches[instrument]
with self._lock:
# double check if it was set before we held the lock
if instrument in self._instrument_view_instrument_matches:
return self._instrument_view_instrument_matches[instrument]
# not present, hold the lock and add a new mapping
view_instrument_matches = []
self._handle_view_instrument_match(instrument, view_instrument_matches)
# if no view targeted the instrument, use the default
if not view_instrument_matches:
view_instrument_matches.append(
_ViewInstrumentMatch(
view=_DEFAULT_VIEW,
instrument=instrument,
instrument_class_aggregation=(
self._instrument_class_aggregation
),
)
)
self._instrument_view_instrument_matches[
instrument
] = view_instrument_matches
return view_instrument_matches
def consume_measurement(self, measurement: Measurement) -> None:
for view_instrument_match in self._get_or_init_view_instrument_match(
measurement.instrument
):
view_instrument_match.consume_measurement(measurement)
def collect(self) -> MetricsData:
# Use a list instead of yielding to prevent a slow reader from holding
# SDK locks
# While holding the lock, new _ViewInstrumentMatch can't be added from
# another thread (so we are sure we collect all existing view).
# However, instruments can still send measurements that will make it
# into the individual aggregations; collection will acquire those locks
# iteratively to keep locking as fine-grained as possible. One side
# effect is that end times can be slightly skewed among the metric
# streams produced by the SDK, but we still align the output timestamps
# for a single instrument.
collection_start_nanos = time_ns()
with self._lock:
instrumentation_scope_scope_metrics: (
Dict[InstrumentationScope, ScopeMetrics]
) = {}
for (
instrument,
view_instrument_matches,
) in self._instrument_view_instrument_matches.items():
aggregation_temporality = self._instrument_class_temporality[
instrument.__class__
]
metrics: List[Metric] = []
for view_instrument_match in view_instrument_matches:
if isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_SumAggregation,
):
data = Sum(
aggregation_temporality=aggregation_temporality,
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
is_monotonic=isinstance(
instrument, (Counter, ObservableCounter)
),
)
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_LastValueAggregation,
):
data = Gauge(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
)
)
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_ExplicitBucketHistogramAggregation,
):
data = Histogram(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
aggregation_temporality=aggregation_temporality,
)
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_DropAggregation,
):
continue
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_ExponentialBucketHistogramAggregation,
):
data = ExponentialHistogram(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
aggregation_temporality=aggregation_temporality,
)
metrics.append(
Metric(
# pylint: disable=protected-access
name=view_instrument_match._name,
description=view_instrument_match._description,
unit=view_instrument_match._instrument.unit,
data=data,
)
)
if instrument.instrumentation_scope not in (
instrumentation_scope_scope_metrics
):
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
] = ScopeMetrics(
scope=instrument.instrumentation_scope,
metrics=metrics,
schema_url=instrument.instrumentation_scope.schema_url,
)
else:
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
].metrics.extend(metrics)
return MetricsData(
resource_metrics=[
ResourceMetrics(
resource=self._sdk_config.resource,
scope_metrics=list(instrumentation_scope_scope_metrics.values()),
schema_url=self._sdk_config.resource.schema_url,
)
]
)
def _handle_view_instrument_match(
self,
instrument: Instrument,
view_instrument_matches: List["_ViewInstrumentMatch"],
) -> None:
for view in self._sdk_config.views:
# pylint: disable=protected-access
if not view._match(instrument):
continue
if not self._check_view_instrument_compatibility(view, instrument):
continue
new_view_instrument_match = _ViewInstrumentMatch(
view=view,
instrument=instrument,
instrument_class_aggregation=(self._instrument_class_aggregation),
)
for (
existing_view_instrument_matches
) in self._instrument_view_instrument_matches.values():
for existing_view_instrument_match in existing_view_instrument_matches:
if existing_view_instrument_match.conflicts(
new_view_instrument_match
):
_logger.warning(
"Views %s and %s will cause conflicting "
"metrics identities",
existing_view_instrument_match._view,
new_view_instrument_match._view,
)
view_instrument_matches.append(new_view_instrument_match)
@staticmethod
def _check_view_instrument_compatibility(
view: View, instrument: Instrument
) -> bool:
"""
Checks if a view and an instrument are compatible.
Returns `true` if they are compatible and a `_ViewInstrumentMatch`
object should be created, `false` otherwise.
"""
result = True
# pylint: disable=protected-access
if isinstance(instrument, Asynchronous) and isinstance(
view._aggregation, ExplicitBucketHistogramAggregation
):
_logger.warning(
"View %s and instrument %s will produce "
"semantic errors when matched, the view "
"has not been applied.",
view,
instrument,
)
result = False
return result