From 70b6fffd0433bf816e48174a41c1554b269d6f0c Mon Sep 17 00:00:00 2001 From: Nicolas Sterchele Date: Mon, 23 Mar 2026 09:02:19 +0100 Subject: [PATCH] fix(sdk): regenerate service.instance.id post-fork in MeterProvider and TracerProvider When gunicorn (or any prefork server) forks workers, all workers inherit the same Resource from the master process, including the same service.instance.id. The SDK already restarts background threads post-fork (PeriodicExportingMetricReader, BatchProcessor) but never updates the resource identity. This causes metric collisions in OTLP backends where multiple workers exporting with the same resource identity result in incorrect aggregation instead of correct summation. Register an os.register_at_fork(after_in_child=...) hook on both MeterProvider and TracerProvider that replaces service.instance.id with a fresh UUID in each forked worker, ensuring distinct resource identities without any user configuration. Resource.merge() is used so all other resource attributes are preserved. WeakMethod is used for the hook reference, consistent with the existing pattern in PeriodicExportingMetricReader and BatchProcessor. Fixes: https://github.com/open-telemetry/opentelemetry-python/issues/4390 Related: https://github.com/open-telemetry/opentelemetry-python/issues/3885 --- .../sdk/metrics/_internal/__init__.py | 24 ++++ .../src/opentelemetry/sdk/trace/__init__.py | 23 ++++ .../tests/metrics/test_meter_provider_fork.py | 126 ++++++++++++++++++ .../tests/trace/test_tracer_provider_fork.py | 116 ++++++++++++++++ 4 files changed, 289 insertions(+) create mode 100644 opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py create mode 100644 opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index a2adaa36a98..d8f36af0720 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -12,10 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import weakref from atexit import register, unregister from logging import getLogger from os import environ +from uuid import uuid4 from threading import Lock from time import time_ns from typing import Optional, Sequence @@ -456,6 +458,12 @@ def __init__( self._shutdown_once = Once() self._shutdown = False + if hasattr(os, "register_at_fork"): + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda + ) + for metric_reader in self._sdk_config.metric_readers: with self._all_metric_readers_lock: if metric_reader in self._all_metric_readers: @@ -471,6 +479,22 @@ def __init__( self._measurement_consumer.collect ) + def _at_fork_reinit(self) -> None: + """Update the resource with a new unique service.instance.id after a fork. + + When gunicorn (or any other prefork server) forks workers, all workers + inherit the same Resource, including the same service.instance.id. This + causes metric collisions in backends like Datadog where multiple workers + exporting with the same resource identity result in last-write-wins + instead of correct aggregation. + + This hook runs post-fork in each worker and replaces service.instance.id + with a fresh UUID, ensuring each worker is a distinct instance. + """ + self._sdk_config.resource = self._sdk_config.resource.merge( + Resource({"service.instance.id": str(uuid4())}) + ) + def force_flush(self, timeout_millis: float = 10_000) -> bool: deadline_ns = time_ns() + timeout_millis * 10**6 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index e0b639d81cf..d03a3bb607d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -24,6 +24,7 @@ import traceback import typing import weakref +from uuid import uuid4 from dataclasses import dataclass from functools import lru_cache from os import environ @@ -1366,6 +1367,28 @@ def __init__( _tracer_configurator or _default_tracer_configurator ) + if hasattr(os, "register_at_fork"): + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda + ) + + def _at_fork_reinit(self) -> None: + """Update the resource with a new unique service.instance.id after a fork. + + When gunicorn (or any other prefork server) forks workers, all workers + inherit the same Resource, including the same service.instance.id. This + causes metric collisions in backends like Datadog where multiple workers + exporting with the same resource identity result in last-write-wins + instead of correct aggregation. + + This hook runs post-fork in each worker and replaces service.instance.id + with a fresh UUID, ensuring each worker is a distinct instance. + """ + self._resource = self._resource.merge( + Resource({"service.instance.id": str(uuid4())}) + ) + def _set_tracer_configurator( self, *, tracer_configurator: _TracerConfiguratorT ): diff --git a/opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py b/opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py new file mode 100644 index 00000000000..41705546eba --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py @@ -0,0 +1,126 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=protected-access + +import multiprocessing +import os +import unittest +from platform import system + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import Resource + +_fork_ctx = multiprocessing.get_context("fork") if system() != "Windows" else None + + +@unittest.skipUnless( + hasattr(os, "fork"), + "needs *nix", +) +class TestMeterProviderFork(unittest.TestCase): + def test_at_fork_reinit_changes_service_instance_id(self): + """_at_fork_reinit should assign a new service.instance.id.""" + resource = Resource({"service.instance.id": "original-id"}) + provider = MeterProvider(resource=resource) + + original_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + self.assertEqual(original_id, "original-id") + + provider._at_fork_reinit() + + new_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + self.assertNotEqual(new_id, "original-id") + self.assertIsNotNone(new_id) + + def test_at_fork_reinit_preserves_other_resource_attributes(self): + """_at_fork_reinit should not affect other resource attributes.""" + resource = Resource( + { + "service.name": "my-service", + "service.instance.id": "original-id", + "deployment.environment": "production", + } + ) + provider = MeterProvider(resource=resource) + + provider._at_fork_reinit() + + attrs = provider._sdk_config.resource.attributes + self.assertEqual(attrs.get("service.name"), "my-service") + self.assertEqual( + attrs.get("deployment.environment"), "production" + ) + + def test_fork_produces_unique_service_instance_ids(self): + """Each forked worker should get a distinct service.instance.id.""" + provider = MeterProvider() + + parent_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + + def child(conn): + child_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + conn.send(child_id) + conn.close() + + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process(target=child, args=(child_conn,)) + process.start() + child_id = parent_conn.recv() + process.join() + + # Child should have a different service.instance.id than parent + self.assertNotEqual(parent_id, child_id) + self.assertIsNotNone(child_id) + + def test_multiple_forks_produce_unique_service_instance_ids(self): + """Each of N forked workers should have a distinct service.instance.id.""" + provider = MeterProvider() + + def child(conn): + child_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + conn.send(child_id) + conn.close() + + ids = set() + processes = [] + conns = [] + + for _ in range(4): + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process( + target=child, args=(child_conn,) + ) + processes.append(process) + conns.append(parent_conn) + process.start() + + for conn in conns: + ids.add(conn.recv()) + + for process in processes: + process.join() + + # All 4 workers should have distinct IDs + self.assertEqual(len(ids), 4) diff --git a/opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py b/opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py new file mode 100644 index 00000000000..f9f228ec6ac --- /dev/null +++ b/opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py @@ -0,0 +1,116 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=protected-access + +import multiprocessing +import os +import unittest +from platform import system + +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider + +_fork_ctx = multiprocessing.get_context("fork") if system() != "Windows" else None + + +@unittest.skipUnless( + hasattr(os, "fork"), + "needs *nix", +) +class TestTracerProviderFork(unittest.TestCase): + def test_at_fork_reinit_changes_service_instance_id(self): + """_at_fork_reinit should assign a new service.instance.id.""" + resource = Resource({"service.instance.id": "original-id"}) + provider = TracerProvider(resource=resource) + + original_id = provider._resource.attributes.get("service.instance.id") + self.assertEqual(original_id, "original-id") + + provider._at_fork_reinit() + + new_id = provider._resource.attributes.get("service.instance.id") + self.assertNotEqual(new_id, "original-id") + self.assertIsNotNone(new_id) + + def test_at_fork_reinit_preserves_other_resource_attributes(self): + """_at_fork_reinit should not affect other resource attributes.""" + resource = Resource( + { + "service.name": "my-service", + "service.instance.id": "original-id", + "deployment.environment": "production", + } + ) + provider = TracerProvider(resource=resource) + + provider._at_fork_reinit() + + attrs = provider._resource.attributes + self.assertEqual(attrs.get("service.name"), "my-service") + self.assertEqual(attrs.get("deployment.environment"), "production") + + def test_fork_produces_unique_service_instance_ids(self): + """Each forked worker should get a distinct service.instance.id.""" + provider = TracerProvider() + + parent_id = provider._resource.attributes.get("service.instance.id") + + def child(conn): + child_id = provider._resource.attributes.get( + "service.instance.id" + ) + conn.send(child_id) + conn.close() + + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process(target=child, args=(child_conn,)) + process.start() + child_id = parent_conn.recv() + process.join() + + self.assertNotEqual(parent_id, child_id) + self.assertIsNotNone(child_id) + + def test_multiple_forks_produce_unique_service_instance_ids(self): + """Each of N forked workers should have a distinct service.instance.id.""" + provider = TracerProvider() + + def child(conn): + child_id = provider._resource.attributes.get( + "service.instance.id" + ) + conn.send(child_id) + conn.close() + + ids = set() + processes = [] + conns = [] + + for _ in range(4): + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process( + target=child, args=(child_conn,) + ) + processes.append(process) + conns.append(parent_conn) + process.start() + + for conn in conns: + ids.add(conn.recv()) + + for process in processes: + process.join() + + self.assertEqual(len(ids), 4)