File: //var/opt/nydus/ops/mysql/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from logging import getLogger
from threading import Lock
from time import time_ns
from typing import Dict, List, Sequence
from mysql.opentelemetry.metrics import Instrument
from mysql.opentelemetry.sdk.metrics._internal.aggregation import (
Aggregation,
DefaultAggregation,
_Aggregation,
_SumAggregation,
)
from mysql.opentelemetry.sdk.metrics._internal.export import AggregationTemporality
from mysql.opentelemetry.sdk.metrics._internal.measurement import Measurement
from mysql.opentelemetry.sdk.metrics._internal.point import DataPointT
from mysql.opentelemetry.sdk.metrics._internal.view import View
_logger = getLogger(__name__)
class _ViewInstrumentMatch:
def __init__(
self,
view: View,
instrument: Instrument,
instrument_class_aggregation: Dict[type, Aggregation],
):
self._start_time_unix_nano = time_ns()
self._view = view
self._instrument = instrument
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
self._lock = Lock()
self._instrument_class_aggregation = instrument_class_aggregation
self._name = self._view._name or self._instrument.name
self._description = self._view._description or self._instrument.description
if not isinstance(self._view._aggregation, DefaultAggregation):
self._aggregation = self._view._aggregation._create_aggregation(
self._instrument, None, 0
)
else:
self._aggregation = self._instrument_class_aggregation[
self._instrument.__class__
]._create_aggregation(self._instrument, None, 0)
def conflicts(self, other: "_ViewInstrumentMatch") -> bool:
# pylint: disable=protected-access
result = (
self._name == other._name
and self._instrument.unit == other._instrument.unit
# The aggregation class is being used here instead of data point
# type since they are functionally equivalent.
and self._aggregation.__class__ == other._aggregation.__class__
)
if isinstance(self._aggregation, _SumAggregation):
result = (
result
and self._aggregation._instrument_is_monotonic
== other._aggregation._instrument_is_monotonic
and self._aggregation._instrument_temporality
== other._aggregation._instrument_temporality
)
return result
# pylint: disable=protected-access
def consume_measurement(self, measurement: Measurement) -> None:
if self._view._attribute_keys is not None:
attributes = {}
for key, value in (measurement.attributes or {}).items():
if key in self._view._attribute_keys:
attributes[key] = value
elif measurement.attributes is not None:
attributes = measurement.attributes
else:
attributes = {}
aggr_key = frozenset(attributes.items())
if aggr_key not in self._attributes_aggregation:
with self._lock:
if aggr_key not in self._attributes_aggregation:
if not isinstance(self._view._aggregation, DefaultAggregation):
aggregation = self._view._aggregation._create_aggregation(
self._instrument,
attributes,
self._start_time_unix_nano,
)
else:
aggregation = self._instrument_class_aggregation[
self._instrument.__class__
]._create_aggregation(
self._instrument,
attributes,
self._start_time_unix_nano,
)
self._attributes_aggregation[aggr_key] = aggregation
self._attributes_aggregation[aggr_key].aggregate(measurement)
def collect(
self,
aggregation_temporality: AggregationTemporality,
collection_start_nanos: int,
) -> Sequence[DataPointT]:
data_points: List[DataPointT] = []
with self._lock:
for aggregation in self._attributes_aggregation.values():
data_point = aggregation.collect(
aggregation_temporality, collection_start_nanos
)
if data_point is not None:
data_points.append(data_point)
return data_points