diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index a2adaa36a98..d8f36af0720 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -12,10 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import weakref from atexit import register, unregister from logging import getLogger from os import environ +from uuid import uuid4 from threading import Lock from time import time_ns from typing import Optional, Sequence @@ -456,6 +458,12 @@ def __init__( self._shutdown_once = Once() self._shutdown = False + if hasattr(os, "register_at_fork"): + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda + ) + for metric_reader in self._sdk_config.metric_readers: with self._all_metric_readers_lock: if metric_reader in self._all_metric_readers: @@ -471,6 +479,22 @@ def __init__( self._measurement_consumer.collect ) + def _at_fork_reinit(self) -> None: + """Update the resource with a new unique service.instance.id after a fork. + + When gunicorn (or any other prefork server) forks workers, all workers + inherit the same Resource, including the same service.instance.id. This + causes metric collisions in backends like Datadog where multiple workers + exporting with the same resource identity result in last-write-wins + instead of correct aggregation. + + This hook runs post-fork in each worker and replaces service.instance.id + with a fresh UUID, ensuring each worker is a distinct instance. + """ + self._sdk_config.resource = self._sdk_config.resource.merge( + Resource({"service.instance.id": str(uuid4())}) + ) + def force_flush(self, timeout_millis: float = 10_000) -> bool: deadline_ns = time_ns() + timeout_millis * 10**6 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index e0b639d81cf..d03a3bb607d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -24,6 +24,7 @@ import traceback import typing import weakref +from uuid import uuid4 from dataclasses import dataclass from functools import lru_cache from os import environ @@ -1366,6 +1367,28 @@ def __init__( _tracer_configurator or _default_tracer_configurator ) + if hasattr(os, "register_at_fork"): + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda + ) + + def _at_fork_reinit(self) -> None: + """Update the resource with a new unique service.instance.id after a fork. + + When gunicorn (or any other prefork server) forks workers, all workers + inherit the same Resource, including the same service.instance.id. This + causes metric collisions in backends like Datadog where multiple workers + exporting with the same resource identity result in last-write-wins + instead of correct aggregation. + + This hook runs post-fork in each worker and replaces service.instance.id + with a fresh UUID, ensuring each worker is a distinct instance. + """ + self._resource = self._resource.merge( + Resource({"service.instance.id": str(uuid4())}) + ) + def _set_tracer_configurator( self, *, tracer_configurator: _TracerConfiguratorT ): diff --git a/opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py b/opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py new file mode 100644 index 00000000000..41705546eba --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py @@ -0,0 +1,126 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=protected-access + +import multiprocessing +import os +import unittest +from platform import system + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import Resource + +_fork_ctx = multiprocessing.get_context("fork") if system() != "Windows" else None + + +@unittest.skipUnless( + hasattr(os, "fork"), + "needs *nix", +) +class TestMeterProviderFork(unittest.TestCase): + def test_at_fork_reinit_changes_service_instance_id(self): + """_at_fork_reinit should assign a new service.instance.id.""" + resource = Resource({"service.instance.id": "original-id"}) + provider = MeterProvider(resource=resource) + + original_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + self.assertEqual(original_id, "original-id") + + provider._at_fork_reinit() + + new_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + self.assertNotEqual(new_id, "original-id") + self.assertIsNotNone(new_id) + + def test_at_fork_reinit_preserves_other_resource_attributes(self): + """_at_fork_reinit should not affect other resource attributes.""" + resource = Resource( + { + "service.name": "my-service", + "service.instance.id": "original-id", + "deployment.environment": "production", + } + ) + provider = MeterProvider(resource=resource) + + provider._at_fork_reinit() + + attrs = provider._sdk_config.resource.attributes + self.assertEqual(attrs.get("service.name"), "my-service") + self.assertEqual( + attrs.get("deployment.environment"), "production" + ) + + def test_fork_produces_unique_service_instance_ids(self): + """Each forked worker should get a distinct service.instance.id.""" + provider = MeterProvider() + + parent_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + + def child(conn): + child_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + conn.send(child_id) + conn.close() + + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process(target=child, args=(child_conn,)) + process.start() + child_id = parent_conn.recv() + process.join() + + # Child should have a different service.instance.id than parent + self.assertNotEqual(parent_id, child_id) + self.assertIsNotNone(child_id) + + def test_multiple_forks_produce_unique_service_instance_ids(self): + """Each of N forked workers should have a distinct service.instance.id.""" + provider = MeterProvider() + + def child(conn): + child_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + conn.send(child_id) + conn.close() + + ids = set() + processes = [] + conns = [] + + for _ in range(4): + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process( + target=child, args=(child_conn,) + ) + processes.append(process) + conns.append(parent_conn) + process.start() + + for conn in conns: + ids.add(conn.recv()) + + for process in processes: + process.join() + + # All 4 workers should have distinct IDs + self.assertEqual(len(ids), 4) diff --git a/opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py b/opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py new file mode 100644 index 00000000000..f9f228ec6ac --- /dev/null +++ b/opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py @@ -0,0 +1,116 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=protected-access + +import multiprocessing +import os +import unittest +from platform import system + +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider + +_fork_ctx = multiprocessing.get_context("fork") if system() != "Windows" else None + + +@unittest.skipUnless( + hasattr(os, "fork"), + "needs *nix", +) +class TestTracerProviderFork(unittest.TestCase): + def test_at_fork_reinit_changes_service_instance_id(self): + """_at_fork_reinit should assign a new service.instance.id.""" + resource = Resource({"service.instance.id": "original-id"}) + provider = TracerProvider(resource=resource) + + original_id = provider._resource.attributes.get("service.instance.id") + self.assertEqual(original_id, "original-id") + + provider._at_fork_reinit() + + new_id = provider._resource.attributes.get("service.instance.id") + self.assertNotEqual(new_id, "original-id") + self.assertIsNotNone(new_id) + + def test_at_fork_reinit_preserves_other_resource_attributes(self): + """_at_fork_reinit should not affect other resource attributes.""" + resource = Resource( + { + "service.name": "my-service", + "service.instance.id": "original-id", + "deployment.environment": "production", + } + ) + provider = TracerProvider(resource=resource) + + provider._at_fork_reinit() + + attrs = provider._resource.attributes + self.assertEqual(attrs.get("service.name"), "my-service") + self.assertEqual(attrs.get("deployment.environment"), "production") + + def test_fork_produces_unique_service_instance_ids(self): + """Each forked worker should get a distinct service.instance.id.""" + provider = TracerProvider() + + parent_id = provider._resource.attributes.get("service.instance.id") + + def child(conn): + child_id = provider._resource.attributes.get( + "service.instance.id" + ) + conn.send(child_id) + conn.close() + + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process(target=child, args=(child_conn,)) + process.start() + child_id = parent_conn.recv() + process.join() + + self.assertNotEqual(parent_id, child_id) + self.assertIsNotNone(child_id) + + def test_multiple_forks_produce_unique_service_instance_ids(self): + """Each of N forked workers should have a distinct service.instance.id.""" + provider = TracerProvider() + + def child(conn): + child_id = provider._resource.attributes.get( + "service.instance.id" + ) + conn.send(child_id) + conn.close() + + ids = set() + processes = [] + conns = [] + + for _ in range(4): + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process( + target=child, args=(child_conn,) + ) + processes.append(process) + conns.append(parent_conn) + process.start() + + for conn in conns: + ids.add(conn.recv()) + + for process in processes: + process.join() + + self.assertEqual(len(ids), 4)