Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import weakref
from atexit import register, unregister
from logging import getLogger
from os import environ
from uuid import uuid4
from threading import Lock
from time import time_ns
from typing import Optional, Sequence
Expand Down Expand Up @@ -456,6 +458,12 @@ def __init__(
self._shutdown_once = Once()
self._shutdown = False

if hasattr(os, "register_at_fork"):
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(
after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda
)

for metric_reader in self._sdk_config.metric_readers:
with self._all_metric_readers_lock:
if metric_reader in self._all_metric_readers:
Expand All @@ -471,6 +479,22 @@ def __init__(
self._measurement_consumer.collect
)

def _at_fork_reinit(self) -> None:
"""Update the resource with a new unique service.instance.id after a fork.

When gunicorn (or any other prefork server) forks workers, all workers
inherit the same Resource, including the same service.instance.id. This
causes metric collisions in backends like Datadog where multiple workers
exporting with the same resource identity result in last-write-wins
instead of correct aggregation.

This hook runs post-fork in each worker and replaces service.instance.id
with a fresh UUID, ensuring each worker is a distinct instance.
"""
self._sdk_config.resource = self._sdk_config.resource.merge(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not setting this in the resource attributes, also not sure the issue is metrics specific.

Copy link
Author

@sterchelen sterchelen Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not setting this in the resource attributes

Not sure to get your point but if the user didn't set it, workers would be completely indistinguishable in the backend after fork, which is worse than having no ID at all. By always generating one post-fork we ensure every worker has a distinct identity regardless of whether the user configured it upfront.

On the second point: agreed, the problem affects both metrics and traces 👍🏼

Copy link
Contributor

@xrmx xrmx Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that if we have a solution that work fine after forks() since the semantic conventions is now stable we can add this to the default resource detector. This way we have the very same attributes before and after fork.

Resource({"service.instance.id": str(uuid4())})
)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
deadline_ns = time_ns() + timeout_millis * 10**6

Expand Down
23 changes: 23 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import traceback
import typing
import weakref
from uuid import uuid4
from dataclasses import dataclass
from functools import lru_cache
from os import environ
Expand Down Expand Up @@ -1366,6 +1367,28 @@ def __init__(
_tracer_configurator or _default_tracer_configurator
)

if hasattr(os, "register_at_fork"):
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(
after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda
)

def _at_fork_reinit(self) -> None:
"""Update the resource with a new unique service.instance.id after a fork.

When gunicorn (or any other prefork server) forks workers, all workers
inherit the same Resource, including the same service.instance.id. This
causes metric collisions in backends like Datadog where multiple workers
exporting with the same resource identity result in last-write-wins
instead of correct aggregation.

This hook runs post-fork in each worker and replaces service.instance.id
with a fresh UUID, ensuring each worker is a distinct instance.
"""
self._resource = self._resource.merge(
Resource({"service.instance.id": str(uuid4())})
)

def _set_tracer_configurator(
self, *, tracer_configurator: _TracerConfiguratorT
):
Expand Down
126 changes: 126 additions & 0 deletions opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=protected-access

import multiprocessing
import os
import unittest
from platform import system

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource

_fork_ctx = multiprocessing.get_context("fork") if system() != "Windows" else None


@unittest.skipUnless(
hasattr(os, "fork"),
"needs *nix",
)
class TestMeterProviderFork(unittest.TestCase):
def test_at_fork_reinit_changes_service_instance_id(self):
"""_at_fork_reinit should assign a new service.instance.id."""
resource = Resource({"service.instance.id": "original-id"})
provider = MeterProvider(resource=resource)

original_id = provider._sdk_config.resource.attributes.get(
"service.instance.id"
)
self.assertEqual(original_id, "original-id")

provider._at_fork_reinit()

new_id = provider._sdk_config.resource.attributes.get(
"service.instance.id"
)
self.assertNotEqual(new_id, "original-id")
self.assertIsNotNone(new_id)

def test_at_fork_reinit_preserves_other_resource_attributes(self):
"""_at_fork_reinit should not affect other resource attributes."""
resource = Resource(
{
"service.name": "my-service",
"service.instance.id": "original-id",
"deployment.environment": "production",
}
)
provider = MeterProvider(resource=resource)

provider._at_fork_reinit()

attrs = provider._sdk_config.resource.attributes
self.assertEqual(attrs.get("service.name"), "my-service")
self.assertEqual(
attrs.get("deployment.environment"), "production"
)

def test_fork_produces_unique_service_instance_ids(self):
"""Each forked worker should get a distinct service.instance.id."""
provider = MeterProvider()

parent_id = provider._sdk_config.resource.attributes.get(
"service.instance.id"
)

def child(conn):
child_id = provider._sdk_config.resource.attributes.get(
"service.instance.id"
)
conn.send(child_id)
conn.close()

parent_conn, child_conn = _fork_ctx.Pipe()
process = _fork_ctx.Process(target=child, args=(child_conn,))
process.start()
child_id = parent_conn.recv()
process.join()

# Child should have a different service.instance.id than parent
self.assertNotEqual(parent_id, child_id)
self.assertIsNotNone(child_id)

def test_multiple_forks_produce_unique_service_instance_ids(self):
"""Each of N forked workers should have a distinct service.instance.id."""
provider = MeterProvider()

def child(conn):
child_id = provider._sdk_config.resource.attributes.get(
"service.instance.id"
)
conn.send(child_id)
conn.close()

ids = set()
processes = []
conns = []

for _ in range(4):
parent_conn, child_conn = _fork_ctx.Pipe()
process = _fork_ctx.Process(
target=child, args=(child_conn,)
)
processes.append(process)
conns.append(parent_conn)
process.start()

for conn in conns:
ids.add(conn.recv())

for process in processes:
process.join()

# All 4 workers should have distinct IDs
self.assertEqual(len(ids), 4)
116 changes: 116 additions & 0 deletions opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=protected-access

import multiprocessing
import os
import unittest
from platform import system

from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider

_fork_ctx = multiprocessing.get_context("fork") if system() != "Windows" else None


@unittest.skipUnless(
hasattr(os, "fork"),
"needs *nix",
)
class TestTracerProviderFork(unittest.TestCase):
def test_at_fork_reinit_changes_service_instance_id(self):
"""_at_fork_reinit should assign a new service.instance.id."""
resource = Resource({"service.instance.id": "original-id"})
provider = TracerProvider(resource=resource)

original_id = provider._resource.attributes.get("service.instance.id")
self.assertEqual(original_id, "original-id")

provider._at_fork_reinit()

new_id = provider._resource.attributes.get("service.instance.id")
self.assertNotEqual(new_id, "original-id")
self.assertIsNotNone(new_id)

def test_at_fork_reinit_preserves_other_resource_attributes(self):
"""_at_fork_reinit should not affect other resource attributes."""
resource = Resource(
{
"service.name": "my-service",
"service.instance.id": "original-id",
"deployment.environment": "production",
}
)
provider = TracerProvider(resource=resource)

provider._at_fork_reinit()

attrs = provider._resource.attributes
self.assertEqual(attrs.get("service.name"), "my-service")
self.assertEqual(attrs.get("deployment.environment"), "production")

def test_fork_produces_unique_service_instance_ids(self):
"""Each forked worker should get a distinct service.instance.id."""
provider = TracerProvider()

parent_id = provider._resource.attributes.get("service.instance.id")

def child(conn):
child_id = provider._resource.attributes.get(
"service.instance.id"
)
conn.send(child_id)
conn.close()

parent_conn, child_conn = _fork_ctx.Pipe()
process = _fork_ctx.Process(target=child, args=(child_conn,))
process.start()
child_id = parent_conn.recv()
process.join()

self.assertNotEqual(parent_id, child_id)
self.assertIsNotNone(child_id)

def test_multiple_forks_produce_unique_service_instance_ids(self):
"""Each of N forked workers should have a distinct service.instance.id."""
provider = TracerProvider()

def child(conn):
child_id = provider._resource.attributes.get(
"service.instance.id"
)
conn.send(child_id)
conn.close()

ids = set()
processes = []
conns = []

for _ in range(4):
parent_conn, child_conn = _fork_ctx.Pipe()
process = _fork_ctx.Process(
target=child, args=(child_conn,)
)
processes.append(process)
conns.append(parent_conn)
process.start()

for conn in conns:
ids.add(conn.recv())

for process in processes:
process.join()

self.assertEqual(len(ids), 4)