diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index a2adaa36a98..e6bd610b0c0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -440,11 +440,12 @@ def __init__( ) ), resource=resource, - metric_readers=metric_readers, views=views, ) + self._metric_readers = metric_readers self._measurement_consumer = SynchronousMeasurementConsumer( - sdk_config=self._sdk_config + sdk_config=self._sdk_config, + metric_readers=metric_readers, ) disabled = environ.get(OTEL_SDK_DISABLED, "") self._disabled = disabled.lower().strip() == "true" @@ -456,7 +457,7 @@ def __init__( self._shutdown_once = Once() self._shutdown = False - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: with self._all_metric_readers_lock: if metric_reader in self._all_metric_readers: # pylint: disable=broad-exception-raised @@ -476,7 +477,7 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: metric_reader_error = {} - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: current_ts = time_ns() try: if current_ts >= deadline_ns: @@ -521,7 +522,7 @@ def _shutdown(): metric_reader_error = {} - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: current_ts = time_ns() try: if current_ts >= deadline_ns: @@ -588,3 +589,33 @@ def get_meter( self._measurement_consumer, ) return self._meters[info] + + def add_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.export.MetricReader" + ) -> None: + with self._all_metric_readers_lock: + if metric_reader in self._all_metric_readers: + raise ValueError( + f"MetricReader {metric_reader} has been registered already!" + ) + self._measurement_consumer.add_metric_reader(metric_reader) + # pylint: disable-next=protected-access + metric_reader._set_collect_callback( + self._measurement_consumer.collect + ) + self._all_metric_readers.add(metric_reader) + + def remove_metric_reader( + self, + metric_reader: "opentelemetry.sdk.metrics.export.MetricReader", + ) -> None: + with self._all_metric_readers_lock: + if metric_reader not in self._all_metric_readers: + raise ValueError( + f"MetricReader {metric_reader} has not been registered!" + ) + self._measurement_consumer.remove_metric_reader(metric_reader) + # pylint: disable-next=protected-access + metric_reader._set_collect_callback(None) + metric_reader.shutdown() + self._all_metric_readers.remove(metric_reader) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 43ebc3d345e..5e63ad23551 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -14,15 +14,15 @@ # pylint: disable=unused-import +import weakref from abc import ABC, abstractmethod from threading import Lock from time import time_ns -from typing import List, Mapping, Optional +from typing import Iterable, List, Mapping, Optional # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk.metrics import opentelemetry.sdk.metrics._internal.instrument -import opentelemetry.sdk.metrics._internal.sdk_configuration from opentelemetry.metrics._internal.instrument import CallbackOptions from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError from opentelemetry.sdk.metrics._internal.measurement import Measurement @@ -59,10 +59,10 @@ class SynchronousMeasurementConsumer(MeasurementConsumer): def __init__( self, sdk_config: "opentelemetry.sdk.metrics._internal.SdkConfiguration", + metric_readers: Iterable["opentelemetry.sdk.metrics.MetricReader"], ) -> None: self._lock = Lock() self._sdk_config = sdk_config - # should never be mutated self._reader_storages: Mapping[ "opentelemetry.sdk.metrics.MetricReader", MetricReaderStorage ] = { @@ -71,7 +71,7 @@ def __init__( reader._instrument_class_temporality, reader._instrument_class_aggregation, ) - for reader in sdk_config.metric_readers + for reader in metric_readers } self._async_instruments: List[ "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" @@ -86,7 +86,9 @@ def consume_measurement(self, measurement: Measurement) -> None: measurement.context, ) ) - for reader_storage in self._reader_storages.values(): + with self._lock: + reader_storages = weakref.WeakSet(self._reader_storages.values()) + for reader_storage in reader_storages: reader_storage.consume_measurement( measurement, should_sample_exemplar ) @@ -143,3 +145,23 @@ def collect( result = self._reader_storages[metric_reader].collect() return result + + def add_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" + ) -> None: + """Registers a new metric reader.""" + with self._lock: + self._reader_storages[metric_reader] = MetricReaderStorage( + self._sdk_config, + # pylint: disable-next=protected-access + metric_reader._instrument_class_temporality, + # pylint: disable-next=protected-access + metric_reader._instrument_class_aggregation, + ) + + def remove_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" + ) -> None: + """Unregisters the given metric reader.""" + with self._lock: + self._reader_storages.pop(metric_reader) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py index 3d88facb0c3..f87da33ce35 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py @@ -26,5 +26,4 @@ class SdkConfiguration: exemplar_filter: "opentelemetry.sdk.metrics.ExemplarFilter" resource: "opentelemetry.sdk.resources.Resource" - metric_readers: Sequence["opentelemetry.sdk.metrics.MetricReader"] views: Sequence["opentelemetry.sdk.metrics.View"] diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index 22abfbd3cfe..521b2203543 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -14,6 +14,7 @@ # pylint: disable=invalid-name,no-self-use +from threading import Event, Thread from time import sleep from unittest import TestCase from unittest.mock import MagicMock, Mock, patch @@ -34,7 +35,8 @@ class TestSynchronousMeasurementConsumer(TestCase): def test_parent(self, _): self.assertIsInstance( - SynchronousMeasurementConsumer(MagicMock()), MeasurementConsumer + SynchronousMeasurementConsumer(MagicMock(), metric_readers=()), + MeasurementConsumer, ) def test_creates_metric_reader_storages(self, MockMetricReaderStorage): @@ -44,9 +46,9 @@ def test_creates_metric_reader_storages(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) self.assertEqual(len(MockMetricReaderStorage.mock_calls), 5) @@ -61,9 +63,9 @@ def test_measurements_passed_to_each_reader_storage( SdkConfiguration( exemplar_filter=Mock(should_sample=Mock(return_value=False)), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) measurement_mock = Mock() consumer.consume_measurement(measurement_mock) @@ -83,9 +85,9 @@ def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) for r_mock, rs_mock in zip(reader_mocks, reader_storage_mocks): rs_mock.collect.assert_not_called() @@ -102,9 +104,9 @@ def test_collect_calls_async_instruments(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(should_sample=Mock(return_value=False)), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) async_instrument_mocks = [MagicMock() for _ in range(5)] for i_mock in async_instrument_mocks: @@ -133,9 +135,9 @@ def test_collect_timeout(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) def sleep_1(*args, **kwargs): @@ -166,9 +168,9 @@ def test_collect_deadline( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) def sleep_1(*args, **kwargs): @@ -192,3 +194,89 @@ def sleep_1(*args, **kwargs): callback_options_time_call, 10000, ) + + +class TestSynchronousMeasurementConsumerConcurrency(TestCase): + def test_concurrent_changes_to_metric_readers(self): + timeout = 1 + failure = None + iteration_started = Event() + mutation_done = Event() + iteration_timeout_error = "Timed out waiting for iteration to start" + mutation_timeout_error = "Timed out waiting for mutation to be done" + + consumer = SynchronousMeasurementConsumer( + SdkConfiguration( + exemplar_filter=MagicMock(), + resource=MagicMock(), + views=MagicMock(), + ), + metric_readers=[MagicMock()], + ) + + def _hooked_iter(iterable): + nonlocal failure + + iterable = iter(iterable) + iteration_started.set() + if not mutation_done.wait(timeout): + failure = mutation_timeout_error + yield next(iterable, None) + yield from iterable + + class HookedDict(dict): + def __iter__(self): + return _hooked_iter(super().__iter__()) + + def keys(self): + return _hooked_iter(super().keys()) + + def values(self): + return _hooked_iter(super().values()) + + def items(self): + return _hooked_iter(super().items()) + + with patch.object( + # pylint: disable-next=protected-access + consumer, "_reader_storages", HookedDict(consumer._reader_storages) + ): + + def mutate(): + """Directly mutate _reader_storages after iteration starts""" + nonlocal failure + if not iteration_started.wait(timeout): + failure = iteration_timeout_error + #pylint: disable-next=protected-access + consumer._reader_storages.clear() + + # Verify that test setup works (direct mutation with no synchronization fails) + with self.assertRaises(RuntimeError) as cm: + t = Thread(target=mutate) + t.start() + try: + consumer.consume_measurement(MagicMock()) + finally: + t.join() + self.assertEqual( + "dictionary changed size during iteration", str(cm.exception) + ) + + def add_and_remove_readers(): + """Modifies _reader_storages after iteration starts""" + nonlocal failure + if not iteration_started.wait(timeout): + failure = iteration_timeout_error + reader = MagicMock() + consumer.add_metric_reader(reader) + consumer.remove_metric_reader(reader) + + # Verify the API calls do not attempt concurrent modification of reader storages + t = Thread(target=add_and_remove_readers) + t.start() + try: + consumer.add_metric_reader(MagicMock()) + consumer.consume_measurement(MagicMock()) + finally: + t.join() + self.assertEqual(mutation_timeout_error, failure) diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index ec1456ae84c..5f23d6e7472 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -79,7 +79,6 @@ def test_creates_view_instrument_matches( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1, view2), ), MagicMock( @@ -146,7 +145,6 @@ def test_forwards_calls_to_view_instrument_match( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1, view2), ), MagicMock( @@ -256,7 +254,6 @@ def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1,), ), MagicMock( @@ -283,7 +280,6 @@ def test_race_collect_with_new_instruments(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(View(instrument_name="test"),), ), MagicMock( @@ -329,7 +325,6 @@ def test_default_view_enabled(self, MockViewInstrumentMatch: Mock): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(), ), MagicMock( @@ -365,7 +360,6 @@ def test_drop_aggregation(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View( instrument_name="name", aggregation=DropAggregation() @@ -393,7 +387,6 @@ def test_same_collection_start(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(View(instrument_name="name"),), ), MagicMock( @@ -440,7 +433,6 @@ def test_conflicting_view_configuration(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View( instrument_name="observable_counter", @@ -489,7 +481,6 @@ def test_view_instrument_match_conflict_0(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -547,7 +538,6 @@ def test_view_instrument_match_conflict_1(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -616,7 +606,6 @@ def test_view_instrument_match_conflict_2(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="foo"), View(instrument_name="bar"), @@ -669,7 +658,6 @@ def test_view_instrument_match_conflict_3(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -720,7 +708,6 @@ def test_view_instrument_match_conflict_4(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -767,7 +754,6 @@ def test_view_instrument_match_conflict_5(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -822,7 +808,6 @@ def test_view_instrument_match_conflict_6(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter", name="foo"), View(instrument_name="histogram", name="foo"), @@ -877,7 +862,6 @@ def test_view_instrument_match_conflict_7(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -932,7 +916,6 @@ def test_view_instrument_match_conflict_8(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="up_down_counter", name="foo"), View( diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 0dc6d4ddf08..1fa9a3098c6 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -13,10 +13,9 @@ # limitations under the License. # pylint: disable=protected-access,no-self-use - import weakref from collections.abc import Callable -from logging import WARNING +from logging import DEBUG, WARNING from threading import Lock from time import sleep from typing import Any, Iterable, Sequence @@ -38,6 +37,7 @@ ) from opentelemetry.sdk.metrics._internal import SynchronousMeasurementConsumer from opentelemetry.sdk.metrics.export import ( + InMemoryMetricReader, Metric, MetricExporter, MetricExportResult, @@ -428,6 +428,35 @@ def test_consume_measurement_gauge(self, mock_sync_measurement_consumer): sync_consumer_instance.consume_measurement.assert_called() + def test_addition_of_metric_reader(self): + logger_name = "opentelemetry.sdk.metrics._internal.export" + + reader = InMemoryMetricReader() + meter_provider = MeterProvider() + meter = meter_provider.get_meter(__name__) + counter = meter.create_counter("counter") + counter.add(1) + # Suppress warnings for calling collect on an unregistered metric reader + with self.assertLogs(logger_name, DEBUG): + self.assertIsNone(reader.get_metrics_data()) + + meter_provider.add_metric_reader(reader) + counter.add(1) + self.assertIsNotNone(reader.get_metrics_data()) + + with self.assertRaises(ValueError) as cm: + meter_provider.add_metric_reader(reader) + self.assertIn("has been registered already!", str(cm.exception)) + + meter_provider.remove_metric_reader(reader) + counter.add(1) + with self.assertLogs(logger_name, DEBUG): + self.assertIsNone(reader.get_metrics_data()) + + with self.assertRaises(ValueError) as cm: + meter_provider.remove_metric_reader(reader) + self.assertIn("has not been registered!", str(cm.exception)) + class TestMeterConcurrency(ConcurrencyTestBase, TestCase): def test_create_instrument_concurrency(self): diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 38d36758f39..63a5edfed83 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -82,7 +82,6 @@ def setUpClass(cls): cls.sdk_configuration = SdkConfiguration( exemplar_filter=Mock(), resource=cls.mock_resource, - metric_readers=[], views=[], ) diff --git a/opentelemetry-sdk/tests/test_configurator.py b/opentelemetry-sdk/tests/test_configurator.py index 333494df746..073f054aa45 100644 --- a/opentelemetry-sdk/tests/test_configurator.py +++ b/opentelemetry-sdk/tests/test_configurator.py @@ -1187,7 +1187,7 @@ def test_metrics_init_exporter(self): provider._sdk_config.resource.attributes.get("service.name"), "otlp-service", ) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, DummyMetricReader) self.assertIsInstance(reader.exporter, DummyOTLPMetricExporter) @@ -1200,7 +1200,7 @@ def test_metrics_init_pull_exporter(self): self.assertEqual(self.set_provider_mock.call_count, 1) provider = self.set_provider_mock.call_args[0][0] self.assertIsInstance(provider, DummyMeterProvider) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, DummyMetricReaderPullExporter) def test_metrics_init_exporter_uses_exporter_args_map(self): @@ -1214,7 +1214,7 @@ def test_metrics_init_exporter_uses_exporter_args_map(self): }, ) provider = self.set_provider_mock.call_args[0][0] - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader.exporter.compression, "gzip")