feat: add Celery integration and improve PostHog client fork safety#464
Open
parinporecha wants to merge 3 commits intoPostHog:masterfrom
Open
feat: add Celery integration and improve PostHog client fork safety#464parinporecha wants to merge 3 commits intoPostHog:masterfrom
parinporecha wants to merge 3 commits intoPostHog:masterfrom
Conversation
756a6a2 to
e8eff44
Compare
e8eff44 to
896dfa8
Compare
Contributor
Prompt To Fix All With AIThis is a comment left during a code review.
Path: posthog/integrations/celery.py
Line: 225-248
Comment:
**Context leak if `_on_task_prerun` raises after entering context**
If any code between line 233 (`context_manager.__enter__()`) and the end of the try block throws an exception, the `except` on line 247 swallows it but the context is never exited — it remains pushed onto the `contextvars` stack for the remainder of the thread's life. This corrupts context state for subsequent tasks in the same worker thread.
For example, if `self._apply_propagated_identity(request)` (line 239) or `self._capture_event(...)` (line 246) raises, the context will leak. Similarly, if `request` is `None`, the context manager is entered but never stored on `request._posthog_ctx`, so `_handle_task_end`'s finally block can never exit it.
Consider cleaning up the context in the `except` block:
```python
def _on_task_prerun(self, *args, **kwargs):
context_manager = None
try:
task_id = kwargs.get("task_id")
if not task_id:
return
sender = kwargs.get("sender")
request = getattr(sender, "request", None)
context_tags = self._extract_propagated_tags(request)
task_properties = self._build_task_properties(
sender=sender,
task_id=task_id,
state="started",
)
task_name = task_properties.get("celery_task_name")
context_manager = contexts.new_context(
fresh=True,
capture_exceptions=False,
client=self.client,
)
context_manager.__enter__()
if request is not None:
request._posthog_ctx = context_manager
request._posthog_start = time.monotonic()
self._apply_propagated_identity(request)
merged_tags = {**task_properties, **context_tags}
for key, value in merged_tags.items():
contexts.tag(key, value)
if self.capture_task_lifecycle_events and self._should_track(task_name, task_properties):
self._capture_event("celery task started", properties=task_properties)
except Exception:
logger.exception("Failed to process Celery task_prerun")
if context_manager is not None:
try:
context_manager.__exit__(None, None, None)
except Exception:
pass
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: posthog/client.py
Line: 336-337
Comment:
**`register_at_fork` prevents Client garbage collection**
`os.register_at_fork` callbacks cannot be unregistered. The bound method `self._reinit_after_fork` holds a strong reference to `self`, which means:
1. Every `Client` instance registered here will never be garbage collected for the lifetime of the process.
2. If multiple `Client` instances are created (e.g., in tests, or per-request patterns), each fork will run *all* accumulated callbacks, including for defunct/shutdown clients.
A common mitigation is to use a `weakref` callback so the client can still be collected:
```python
import weakref
# in __init__, replace the current register_at_fork block with:
if hasattr(os, "register_at_fork"):
weak_self = weakref.ref(self)
def _reinit_child():
client = weak_self()
if client is not None:
client._reinit_after_fork()
os.register_at_fork(after_in_child=_reinit_child)
```
This way, if the Client is no longer referenced, the callback becomes a no-op rather than keeping the entire Client alive.
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: 84b314c |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR:
PosthogCeleryIntegrationto automatically capture Celery task lifecycle events and exceptions.distinct_id,session_id, tags) from the task producer to the worker so Celery tasks can be correlated with the originating user/session.Clientsafer across process forks by reinitializing async consumers in child processes.Context
I saw users asking for advice on how to use PostHog with Celery for error tracking in community questions and realized that there's currently no first-class way to instrument Celery workloads with PostHog.
That leaves a few gaps:
This PR addresses those gaps by adding a Celery integration that helps users observe task execution end-to-end out of the box.
The integration takes inspiration from OpenTelemetry's Celery instrumentor and PostHog context propagation is achieved through task headers mirroring Sentry and DataDog's Celery integrations.
While testing this, I found a separate SDK issue: when a
Clientconfigured in async mode is inherited across a process fork, the child process can inherit a client whose consumer threads no longer exist. In practice, that means worker-side events may not be delivered reliably.To make it safer, this PR also adds fork handling to
Clientby reinitializing its queue and consumers in the child process viaos.register_at_fork. That said, the recommended setup for Celery remains to initialize a freshClientand instrumentPosthogCeleryIntegrationinside each worker process, as shown in the example.Changes
New: Celery Integration (
posthog/integrations/celery.py)task_prerun,task_success,task_failure, etc.) to capture events likecelery task started,celery task successetc. Check the docstring in the integration module code for complete list of supported events._on_before_task_publish: Injects current PostHog context (distinct_id, session_id, tags) into task headers._on_task_prerun: Extracts headers in the worker and restores the PostHog context for the duration of the task. This context is exited upon task completion.Refactored: Client Fork Safety
posthog/client.py_reinit_after_forkmethod to reset the internal queue and spin up new consumers in a child process.os.register_at_fork(on supported platforms) to automatically call this method, so that the SDK does not drop captured events when used in child processes.Examples
examples/celery_integration.pyTests
posthog/test/integrations/test_celery_integration.pycovering:posthog/test/test_client.pyfor_reinit_after_forkto verify consumers are restarted correctly.Screenshots (created through example script)
Celery task lifecycle events and captured Exception -

Celery task success event emitted from worker carrying correct distinct ID, session ID set in parent and context tags -

Captured exception -
