From 9aaf1302a22fd18cc1e37773c3f9af2e2e21ea3e Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Tue, 24 Mar 2026 19:32:28 +0200 Subject: [PATCH 1/7] fix(asyncio): fix error propagation in async request path Ensure `PubNubAsyncioException` always carries a valid `PNStatus` with error data instead of `None`. fix(asyncio): fix `PubNubAsyncioException.__str__` crash Handle cases where status or `error_data` is `None` instead of raising `AttributeError`. fix(event-engine): fix error type checks in effects Match `PubNubAsyncioException` which is what `request_future` actually returns on failure. fix(event-engine): fix give-up logic for unlimited retries Handle `-1 (unlimited)` correctly since `attempts > -1` was always `true`, causing immediate give-up. fix(event-engine): initialize heartbeat max retry attempts Use delay class defaults instead of config value which could be `None` causing `TypeError` on comparison. fix(event-engine): add missing return after `heartbeat` give-up Prevent falling through to start a heartbeat after deciding to give up. fix(request-handlers): use explicit `httpx.Timeout` object Set all four timeout fields explicitly instead of a 2-tuple that left write and pool unset. fix(request-handlers): enforce wall-clock deadline to survive system sleep On macOS and Linux, `time.monotonic()` does not advance during system sleep, causing socket and `asyncio` timeouts (310s subscribe) to stall for hours of wall-clock time. Add `time.time()`-based deadline checks that detect sleep and cancel stale requests within ~5s of wake. fix(asyncio): replace `asyncio.wait_for` with wall-clock-aware loop Use `asyncio.wait()` with periodic `time.time()` checks instead of a single monotonic-based `wait_for()`, yielding to the event loop between checks. fix(native-threads): add `WallClockDeadlineWatchdog` Persistent single daemon thread monitors `time.time()` every 5s and closes the `httpx` session when the wall-clock deadline passes, interrupting the blocking socket read. Tracks deadlines per calling thread so concurrent requests (e.g., subscribe + publish) don't interfere. Only armed for long-timeout requests (>30s). Session is recreated for subsequent requests test(wall-clock-deadline): add unit tests for sleep detection Cover both `asyncio` and threads paths: simulated clock jumps, normal passthrough, clean watchdog shutdown, per-thread deadline isolation, concurrent request independence, cleanup, and exception propagation. test(native-threads): add try/finally cleanup to subscribe tests Ensure `pubnub.stop()` always runs to prevent non-daemon threads from blocking process exit. test(native-threads): fix flaky where_now and here_now tests Enable presence heartbeat and use unique channel names so presence registers on the server. test(file-upload): fix shared state leak in file upload tests Restore `cipher_key` after use in `send_file` and pass it explicitly to `download_file`. test(message-actions): use unique channel names Avoid collisions with stale data from prior test runs. --- pubnub/event_engine/effects.py | 65 ++- pubnub/exceptions.py | 4 +- pubnub/pubnub.py | 12 +- pubnub/pubnub_asyncio.py | 11 +- pubnub/request_handlers/async_httpx.py | 44 +- pubnub/request_handlers/httpx.py | 133 +++++- .../asyncio/test_message_actions.py | 4 +- .../asyncio/test_retry_policies.py | 152 +++++++ .../native_sync/test_file_upload.py | 6 +- .../native_sync/test_message_actions.py | 4 +- .../native_threads/test_heartbeat.py | 105 +++-- .../native_threads/test_here_now.py | 94 ++--- .../native_threads/test_subscribe.py | 237 +++++------ .../native_threads/test_where_now.py | 106 +++-- tests/unit/test_wall_clock_deadline.py | 377 ++++++++++++++++++ 15 files changed, 1046 insertions(+), 308 deletions(-) create mode 100644 tests/integrational/asyncio/test_retry_policies.py create mode 100644 tests/unit/test_wall_clock_deadline.py diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index d7c0b28d..32c8cb30 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -6,7 +6,7 @@ from pubnub.endpoints.presence.leave import Leave from pubnub.endpoints.pubsub.subscribe import Subscribe from pubnub.enums import PNReconnectionPolicy -from pubnub.exceptions import PubNubException +from pubnub.exceptions import PubNubAsyncioException, PubNubException from pubnub.features import feature_enabled from pubnub.models.server.subscribe import SubscribeMessage from pubnub.pubnub import PubNub @@ -80,9 +80,10 @@ async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0 request.timetoken(0) response = await request.future() - if isinstance(response, Exception): + if isinstance(response, PubNubAsyncioException): self.logger.warning(f'Handshake failed: {str(response)}') - handshake_failure = events.HandshakeFailureEvent(response, 1, timetoken=timetoken) + reason = response.status.error_data if response.status and response.status.error_data else str(response) + handshake_failure = events.HandshakeFailureEvent(reason, 1, timetoken=timetoken) self.event_engine.trigger(handshake_failure) elif response.status.error: self.logger.warning(f'Handshake failed: {response.status.error_data.__dict__}') @@ -184,8 +185,15 @@ def calculate_reconnection_delay(self, attempts): return delay + def _should_give_up(self, attempts): + if self.reconnection_policy is PNReconnectionPolicy.NONE: + return True + if self.max_retry_attempts == -1: + return False + return attempts > self.max_retry_attempts + def run(self): - if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: + if self._should_give_up(self.invocation.attempts): self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts) else: attempts = self.invocation.attempts @@ -214,9 +222,10 @@ async def delayed_reconnect_async(self, delay, attempt): response = await request.future() - if isinstance(response, PubNubException): + if isinstance(response, PubNubAsyncioException): self.logger.warning(f'Reconnect failed: {str(response)}') - self.failure(str(response), attempt, self.get_timetoken()) + reason = response.status.error_data if response.status and response.status.error_data else str(response) + self.failure(reason, attempt, self.get_timetoken()) elif response.status.error: self.logger.warning(f'Reconnect failed: {response.status.error_data.__dict__}') @@ -302,10 +311,11 @@ async def heartbeat(self, channels, groups, stop_event): response = await request.future() - if isinstance(response, PubNubException): + if isinstance(response, PubNubAsyncioException): self.logger.warning(f'Heartbeat failed: {str(response)}') + reason = response.status.error_data if response.status and response.status.error_data else str(response) self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, - reason=response.status.error_data, attempt=1)) + reason=reason, attempt=1)) elif response.status and response.status.error: self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}') self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, @@ -345,8 +355,10 @@ async def leave(self, channels, groups, stop_event): leave_request = Leave(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) leave = await leave_request.future() - if leave.status.error: - self.logger.warning(f'Heartbeat failed: {leave.status.error_data.__dict__}') + if isinstance(leave, PubNubAsyncioException): + self.logger.warning(f'Leave failed: {str(leave)}') + elif leave.status and leave.status.error: + self.logger.warning(f'Leave failed: {leave.status.error_data.__dict__}') class HeartbeatDelayedEffect(Effect): @@ -354,9 +366,25 @@ def __init__(self, pubnub_instance, event_engine_instance, invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None: super().__init__(pubnub_instance, event_engine_instance, invocation) self.reconnection_policy = pubnub_instance.config.reconnect_policy - self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries self.interval = pubnub_instance.config.reconnection_interval + if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: + self.max_retry_attempts = ExponentialDelay.MAX_RETRIES + elif self.reconnection_policy is PNReconnectionPolicy.LINEAR: + self.max_retry_attempts = LinearDelay.MAX_RETRIES + else: + self.max_retry_attempts = 0 + + if pubnub_instance.config.maximum_reconnection_retries is not None: + self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries + + def _should_give_up(self, attempts): + if self.reconnection_policy is PNReconnectionPolicy.NONE: + return True + if self.max_retry_attempts == -1: + return False + return attempts > self.max_retry_attempts + def calculate_reconnection_delay(self, attempts): if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: delay = ExponentialDelay.calculate(attempts) @@ -368,11 +396,12 @@ def calculate_reconnection_delay(self, attempts): return delay def run(self): - if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: + if self._should_give_up(self.invocation.attempts): self.event_engine.trigger(events.HeartbeatGiveUpEvent(channels=self.invocation.channels, groups=self.invocation.groups, reason=self.invocation.reason, attempt=self.invocation.attempts)) + return if hasattr(self.pubnub, 'event_loop'): self.stop_event = self.get_new_stop_event() @@ -380,11 +409,6 @@ def run(self): attempt=self.invocation.attempts, stop_event=self.stop_event)) async def heartbeat(self, channels, groups, attempt, stop_event): - if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: - self.event_engine.trigger(events.HeartbeatGiveUpEvent(channels=self.invocation.channels, - groups=self.invocation.groups, - reason=self.invocation.reason, - attempt=self.invocation.attempts)) channels = list(filter(lambda ch: not ch.endswith('-pnpres'), self.invocation.channels)) groups = list(filter(lambda gr: not gr.endswith('-pnpres'), self.invocation.groups)) @@ -395,12 +419,13 @@ async def heartbeat(self, channels, groups, attempt, stop_event): await asyncio.sleep(delay) response = await request.future() - if isinstance(response, PubNubException): + if isinstance(response, PubNubAsyncioException): self.logger.warning(f'Heartbeat failed: {str(response)}') + reason = response.status.error_data if response.status and response.status.error_data else str(response) self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, - reason=response.status.error_data, + reason=reason, attempt=attempt)) - elif response.status.error: + elif response.status and response.status.error: self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}') self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, reason=response.status.error_data, diff --git a/pubnub/exceptions.py b/pubnub/exceptions.py index 73c2d308..b6b2d610 100644 --- a/pubnub/exceptions.py +++ b/pubnub/exceptions.py @@ -43,7 +43,9 @@ def __init__(self, result, status): self.status = status def __str__(self): - return str(self.status.error_data.exception) + if self.status and hasattr(self.status, 'error_data') and self.status.error_data: + return str(self.status.error_data.exception) + return f"PubNubAsyncioException(result={self.result}, status={self.status})" @staticmethod def is_error(): diff --git a/pubnub/pubnub.py b/pubnub/pubnub.py index c44e48fc..525f9116 100644 --- a/pubnub/pubnub.py +++ b/pubnub/pubnub.py @@ -218,10 +218,14 @@ def stop(self): Raises: Exception: If subscription manager is not enabled """ - if self._subscription_manager is not None: - self._subscription_manager.stop() - else: - raise Exception("Subscription manager is not enabled for this instance") + try: + if self._subscription_manager is not None: + self._subscription_manager.stop() + else: + raise Exception("Subscription manager is not enabled for this instance") + finally: + if hasattr(self._request_handler, 'close'): + self._request_handler.close() def request_deferred(self, options_func): raise NotImplementedError diff --git a/pubnub/pubnub_asyncio.py b/pubnub/pubnub_asyncio.py index df1cfda2..6e374c0c 100644 --- a/pubnub/pubnub_asyncio.py +++ b/pubnub/pubnub_asyncio.py @@ -62,6 +62,7 @@ async def main(): from pubnub.event_engine.models import events, states from pubnub.models.consumer.common import PNStatus +from pubnub.models.consumer.pn_error_data import PNErrorData from pubnub.dtos import SubscribeOperation, UnsubscribeOperation from pubnub.event_engine.statemachine import StateMachine from pubnub.endpoints.presence.heartbeat import Heartbeat @@ -234,9 +235,17 @@ async def request_future(self, options_func, cancellation_event): res = await self._request_handler.async_request(options_func, cancellation_event) return res except PubNubException as e: + if e.status is not None: + status = e.status + else: + status = PNStatus() + status.category = PNStatusCategory.PNBadRequestCategory + status.error = True + status.error_data = PNErrorData(str(e), e) + status.status_code = e._status_code if e._status_code != 0 else None return PubNubAsyncioException( result=None, - status=e.status + status=status ) except asyncio.TimeoutError: return PubNubAsyncioException( diff --git a/pubnub/request_handlers/async_httpx.py b/pubnub/request_handlers/async_httpx.py index acaf574f..492fe4b6 100644 --- a/pubnub/request_handlers/async_httpx.py +++ b/pubnub/request_handlers/async_httpx.py @@ -1,6 +1,7 @@ from asyncio import Event import asyncio import logging +import time import httpx import json # noqa # pylint: disable=W0611 import urllib @@ -56,6 +57,39 @@ def sync_request(self, **_): def threaded_request(self, **_): raise NotImplementedError("threaded_request is not implemented for asyncio handler") + WALL_CLOCK_CHECK_INTERVAL = 5.0 + + async def _request_with_wall_clock_deadline(self, request_arguments, timeout): + """Execute an HTTP request with wall-clock deadline enforcement. + + On macOS and Linux, time.monotonic() (and thus asyncio timeouts, socket timeouts) + does not advance during system sleep. A 310-second subscribe timeout can take hours + of wall-clock time if the machine sleeps. This method uses time.time() (wall clock) + to enforce the deadline regardless of sleep, while yielding to the event loop between checks. + """ + if timeout is None: + return await self._session.request(**request_arguments) + + wall_deadline = time.time() + timeout + request_task = asyncio.ensure_future(self._session.request(**request_arguments)) + + try: + while True: + remaining = wall_deadline - time.time() + if remaining <= 0: + request_task.cancel() + raise asyncio.TimeoutError("Wall-clock deadline exceeded") + + done, _ = await asyncio.wait( + {request_task}, + timeout=min(self.WALL_CLOCK_CHECK_INTERVAL, remaining) + ) + if done: + return request_task.result() + except BaseException: + request_task.cancel() + raise + async def async_request(self, options_func, cancellation_event): """ Query string should be provided as a manually serialized and encoded string. @@ -103,7 +137,10 @@ async def async_request(self, options_func, cancellation_event): 'headers': request_headers, 'url': full_url, 'follow_redirects': options.allow_redirects, - 'timeout': (options.connect_timeout, options.request_timeout), + 'timeout': httpx.Timeout(connect=options.connect_timeout, + read=options.request_timeout, + write=options.connect_timeout, + pool=options.connect_timeout), } if options.is_post() or options.is_patch(): request_arguments['content'] = options.data @@ -112,9 +149,8 @@ async def async_request(self, options_func, cancellation_event): try: if not self._session: await self.create_session() - response = await asyncio.wait_for( - self._session.request(**request_arguments), - options.request_timeout + response = await self._request_with_wall_clock_deadline( + request_arguments, options.request_timeout ) except (asyncio.TimeoutError, asyncio.CancelledError): raise diff --git a/pubnub/request_handlers/httpx.py b/pubnub/request_handlers/httpx.py index dc743383..0a7c9bad 100644 --- a/pubnub/request_handlers/httpx.py +++ b/pubnub/request_handlers/httpx.py @@ -1,4 +1,5 @@ import logging +import time import threading import httpx import json # noqa # pylint: disable=W0611 @@ -23,15 +24,118 @@ logger = logging.getLogger("pubnub") +class WallClockDeadlineWatchdog: + """Persistent single-thread watchdog that enforces wall-clock deadlines on HTTP requests. + + On macOS and Linux, socket timeouts use monotonic time (mach_absolute_time / CLOCK_MONOTONIC) + which doesn't advance during system sleep. A 310-second subscribe timeout can take hours of + wall-clock time if the machine sleeps. This watchdog periodically checks time.time() (wall clock) + and closes the HTTP session when the deadline passes, causing the blocking socket read to fail. + + A single daemon thread is reused across requests to avoid thread creation overhead on + high-frequency subscribe loops (where messages arrive rapidly and each long-poll returns quickly). + Deadlines are tracked per calling thread, so concurrent requests (e.g., subscribe + publish + on different threads) don't interfere with each other. + """ + + CHECK_INTERVAL = 5.0 + + def __init__(self): + self._lock = threading.Lock() + self._deadlines = {} + self._triggered_threads = set() + self._wake = threading.Event() + self._stop = threading.Event() + self._thread = None + + def _ensure_started(self): + if self._thread is None or not self._thread.is_alive(): + self._thread = threading.Thread(target=self._run, daemon=True, + name="pubnub-wall-clock-watchdog") + self._thread.start() + + def set_deadline(self, session, deadline): + """Arm the watchdog for the calling thread's request.""" + thread_id = threading.get_ident() + with self._lock: + self._deadlines[thread_id] = (session, deadline) + self._triggered_threads.discard(thread_id) + self._ensure_started() + self._wake.set() + + def clear_deadline(self): + """Disarm the watchdog for the calling thread's request.""" + thread_id = threading.get_ident() + with self._lock: + self._deadlines.pop(thread_id, None) + self._triggered_threads.discard(thread_id) + + @property + def triggered(self): + """Check if the watchdog triggered for the calling thread.""" + return threading.get_ident() in self._triggered_threads + + def stop(self): + """Stop the watchdog thread permanently (call on handler cleanup).""" + self._stop.set() + self._wake.set() + + def _run(self): + while not self._stop.is_set(): + with self._lock: + has_deadlines = bool(self._deadlines) + if has_deadlines: + # Find the earliest deadline across all tracked threads + earliest_tid = None + earliest_deadline = float('inf') + earliest_session = None + for tid, (session, deadline) in self._deadlines.items(): + if deadline < earliest_deadline: + earliest_deadline = deadline + earliest_tid = tid + earliest_session = session + + if not has_deadlines: + # No active deadlines — idle until set_deadline() or stop() + self._wake.wait() + self._wake.clear() + continue + + remaining = earliest_deadline - time.time() + if remaining <= 0: + with self._lock: + current = self._deadlines.get(earliest_tid) + if current is None or current[1] != earliest_deadline: + continue + self._triggered_threads.add(earliest_tid) + self._deadlines.pop(earliest_tid, None) + logger.debug("Wall-clock deadline exceeded, closing session transport") + try: + earliest_session.close() + except Exception as e: + logger.debug(f"Error closing session: {e}") + continue + + # Sleep until next check, new deadline, or stop + self._wake.wait(timeout=min(self.CHECK_INTERVAL, remaining)) + self._wake.clear() + + class HttpxRequestHandler(BaseRequestHandler): """ PubNub Python SDK synchronous requests handler based on `httpx` HTTP library. """ ENDPOINT_THREAD_COUNTER: int = 0 def __init__(self, pubnub): self.session = httpx.Client() + self._watchdog = WallClockDeadlineWatchdog() self.pubnub = pubnub + def close(self): + """Clean up resources: stop the watchdog thread and close the HTTP session.""" + self._watchdog.stop() + self.session.close() + async def async_request(self, options_func, cancellation_event): raise NotImplementedError("async_request is not implemented for synchronous handler") @@ -242,7 +346,10 @@ def _invoke_request(self, p_options, e_options, base_origin): "method": e_options.method_string, "headers": request_headers, "url": httpx.URL(url, query=e_options.query_string.encode("utf-8")), - "timeout": (e_options.connect_timeout, e_options.request_timeout), + "timeout": httpx.Timeout(connect=e_options.connect_timeout, + read=e_options.request_timeout, + write=e_options.connect_timeout, + pool=e_options.connect_timeout), "follow_redirects": e_options.allow_redirects } @@ -265,6 +372,14 @@ def _invoke_request(self, p_options, e_options, base_origin): e_options.path, e_options.query_string))) + # Wall-clock deadline: only for long-timeout requests (e.g., subscribe long-poll) + # where system sleep can cause monotonic-based socket timeouts to stall for hours. + # Short requests (publish, history, etc.) don't need this. + use_watchdog = e_options.request_timeout is not None and e_options.request_timeout > 30 + + if use_watchdog: + self._watchdog.set_deadline(self.session, time.time() + e_options.request_timeout) + try: res = self.session.request(**args) # Safely access response text - read content first for streaming responses @@ -278,6 +393,12 @@ def _invoke_request(self, p_options, e_options, base_origin): logger.debug("GOT response (content access failed: %s)" % str(e)) except httpx.ConnectError as e: + if use_watchdog and self._watchdog.triggered: + self.session = httpx.Client() + raise PubNubException( + pn_error=PNERR_CLIENT_TIMEOUT, + errormsg="Wall-clock deadline exceeded (system sleep detected)" + ) raise PubNubException( pn_error=PNERR_CONNECTION_ERROR, errormsg=str(e) @@ -299,10 +420,20 @@ def _invoke_request(self, p_options, e_options, base_origin): status_code=e.response.status_code ) except Exception as e: + if use_watchdog and self._watchdog.triggered: + self.session = httpx.Client() + raise PubNubException( + pn_error=PNERR_CLIENT_TIMEOUT, + errormsg="Wall-clock deadline exceeded (system sleep detected)" + ) raise PubNubException( pn_error=PNERR_UNKNOWN_ERROR, errormsg=str(e) ) + finally: + if use_watchdog: + self._watchdog.clear_deadline() + return res diff --git a/tests/integrational/asyncio/test_message_actions.py b/tests/integrational/asyncio/test_message_actions.py index b7f4856a..660f39ca 100644 --- a/tests/integrational/asyncio/test_message_actions.py +++ b/tests/integrational/asyncio/test_message_actions.py @@ -2,12 +2,12 @@ from pubnub.models.consumer.message_actions import PNMessageAction from pubnub.pubnub_asyncio import PubNubAsyncio -from tests.helper import pnconf_env_copy +from tests.helper import pnconf_env_copy, gen_channel class TestMessageActions(unittest.IsolatedAsyncioTestCase): pubnub: PubNubAsyncio = None - channel = "test_message_actions" + channel = gen_channel("test_message_actions") message_timetoken = None action_value_1 = "hello" diff --git a/tests/integrational/asyncio/test_retry_policies.py b/tests/integrational/asyncio/test_retry_policies.py new file mode 100644 index 00000000..e636eb74 --- /dev/null +++ b/tests/integrational/asyncio/test_retry_policies.py @@ -0,0 +1,152 @@ +import logging +import asyncio +import pytest +import pubnub as pn + +from unittest.mock import patch +from pubnub.enums import PNReconnectionPolicy +from pubnub.managers import LinearDelay, ExponentialDelay +from pubnub.pubnub_asyncio import PubNubAsyncio, SubscribeListener + +from tests.helper import pnconf_env_copy + + +pn.set_stream_logger('pubnub', logging.DEBUG) + +CONF = dict(enable_subscribe=True, origin='127.0.0.1', ssl=False, connect_timeout=1, + enable_presence_heartbeat=False) + + +@pytest.mark.asyncio +async def test_subscribe_retry_policy_none(): + ch = "test-subscribe-asyncio-retry-policy-none" + pubnub = PubNubAsyncio(pnconf_env_copy(**CONF, + reconnect_policy=PNReconnectionPolicy.NONE)) + listener = SubscribeListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + await asyncio.wait_for(listener.wait_for_disconnect(), timeout=10) + finally: + pubnub.unsubscribe_all() + await pubnub.stop() + + +@pytest.mark.asyncio +async def test_subscribe_retry_policy_linear(): + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-asyncio-retry-policy-linear" + pubnub = PubNubAsyncio(pnconf_env_copy(**CONF, + reconnect_policy=PNReconnectionPolicy.LINEAR)) + listener = SubscribeListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + await asyncio.wait_for(listener.wait_for_disconnect(), timeout=30) + finally: + pubnub.unsubscribe_all() + await pubnub.stop() + + assert calculate_mock.call_count == LinearDelay.MAX_RETRIES + + +@pytest.mark.asyncio +async def test_subscribe_retry_policy_exponential(): + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-asyncio-retry-policy-exponential" + pubnub = PubNubAsyncio(pnconf_env_copy(**CONF, + reconnect_policy=PNReconnectionPolicy.EXPONENTIAL)) + listener = SubscribeListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + await asyncio.wait_for(listener.wait_for_disconnect(), timeout=30) + finally: + pubnub.unsubscribe_all() + await pubnub.stop() + + assert calculate_mock.call_count == ExponentialDelay.MAX_RETRIES + + +@pytest.mark.asyncio +async def test_subscribe_retry_policy_linear_with_max_retries(): + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-asyncio-retry-policy-linear-max" + pubnub = PubNubAsyncio(pnconf_env_copy(**CONF, + maximum_reconnection_retries=3, + reconnect_policy=PNReconnectionPolicy.LINEAR)) + listener = SubscribeListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + await asyncio.wait_for(listener.wait_for_disconnect(), timeout=30) + finally: + pubnub.unsubscribe_all() + await pubnub.stop() + + assert calculate_mock.call_count == 3 + + +@pytest.mark.asyncio +async def test_subscribe_retry_policy_exponential_with_max_retries(): + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-asyncio-retry-policy-exponential-max" + pubnub = PubNubAsyncio(pnconf_env_copy(**CONF, + maximum_reconnection_retries=3, + reconnect_policy=PNReconnectionPolicy.EXPONENTIAL)) + listener = SubscribeListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + await asyncio.wait_for(listener.wait_for_disconnect(), timeout=30) + finally: + pubnub.unsubscribe_all() + await pubnub.stop() + + assert calculate_mock.call_count == 3 + + +@pytest.mark.asyncio +async def test_subscribe_retry_policy_linear_with_custom_interval(): + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-asyncio-retry-policy-linear-interval" + pubnub = PubNubAsyncio(pnconf_env_copy(**CONF, + maximum_reconnection_retries=3, reconnection_interval=0.2, + reconnect_policy=PNReconnectionPolicy.LINEAR)) + listener = SubscribeListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + await asyncio.wait_for(listener.wait_for_disconnect(), timeout=30) + finally: + pubnub.unsubscribe_all() + await pubnub.stop() + + assert calculate_mock.call_count == 0 diff --git a/tests/integrational/native_sync/test_file_upload.py b/tests/integrational/native_sync/test_file_upload.py index a594f134..0ac8330c 100644 --- a/tests/integrational/native_sync/test_file_upload.py +++ b/tests/integrational/native_sync/test_file_upload.py @@ -24,6 +24,7 @@ def send_file(file_for_upload, cipher_key=None, pass_binary=False, timetoken_override=None, pubnub_instance=None): if not pubnub_instance: pubnub_instance = pubnub + original_cipher_key = pubnub_instance.config.cipher_key if cipher_key: pubnub_instance.config.cipher_key = cipher_key @@ -44,6 +45,8 @@ def send_file(file_for_upload, cipher_key=None, pass_binary=False, timetoken_ove envelope = send_file_endpoint.sync() + pubnub_instance.config.cipher_key = original_cipher_key + assert isinstance(envelope.result, PNSendFileResult) assert envelope.result.name assert envelope.result.timestamp @@ -136,7 +139,8 @@ def test_send_and_download_encrypted_file(file_for_upload, file_upload_test_data download_envelope = pubnub.download_file() \ .channel(CHANNEL) \ .file_id(envelope.result.file_id) \ - .file_name(envelope.result.name).sync() + .file_name(envelope.result.name) \ + .cipher_key(cipher_key).sync() assert isinstance(download_envelope.result, PNDownloadFileResult) data = download_envelope.result.data diff --git a/tests/integrational/native_sync/test_message_actions.py b/tests/integrational/native_sync/test_message_actions.py index 669cf566..26da0e15 100644 --- a/tests/integrational/native_sync/test_message_actions.py +++ b/tests/integrational/native_sync/test_message_actions.py @@ -1,12 +1,12 @@ import unittest from pubnub.models.consumer.message_actions import PNMessageAction from pubnub.pubnub import PubNub -from tests.helper import pnconf_env_copy +from tests.helper import pnconf_env_copy, gen_channel class TestMessageActions(unittest.TestCase): pubnub: PubNub = None - channel = "test_message_actions" + channel = gen_channel("test_message_actions") message_timetoken = None action_value_1 = "hello" diff --git a/tests/integrational/native_threads/test_heartbeat.py b/tests/integrational/native_threads/test_heartbeat.py index 351a3833..f37d0f3b 100644 --- a/tests/integrational/native_threads/test_heartbeat.py +++ b/tests/integrational/native_threads/test_heartbeat.py @@ -12,66 +12,65 @@ # TODO: add a success heartbeat test -messenger_config = pnconf_sub_copy() -messenger_config.set_presence_timeout(8) -messenger_config.uuid = helper.gen_channel("messenger") - -listener_config = pnconf_sub_copy() -listener_config.uuid = helper.gen_channel("listener") - - class TestPubNubHeartbeat(unittest.TestCase): def test_timeout_event_on_broken_heartbeat(self): ch = helper.gen_channel("heartbeat-test") + messenger_config = pnconf_sub_copy() + messenger_config.set_presence_timeout(8) + messenger_config.uuid = helper.gen_channel("messenger") + messenger_config.daemon = True + + listener_config = pnconf_sub_copy() + listener_config.uuid = helper.gen_channel("listener") + listener_config.daemon = True + pubnub = PubNub(messenger_config) pubnub_listener = PubNub(listener_config) - pubnub.config.uuid = helper.gen_channel("messenger") - pubnub_listener.config.uuid = helper.gen_channel("listener") - callback_presence = SubscribeListener() callback_messages = SubscribeListener() - # - connect to :ch-pnpres - pubnub_listener.add_listener(callback_presence) - pubnub_listener.subscribe().channels(ch).with_presence().execute() - callback_presence.wait_for_connect() - - presence_message = callback_presence.wait_for_presence_on(ch) - assert ch == presence_message.channel - assert 'join' == presence_message.event - assert pubnub_listener.uuid == presence_message.uuid - - # - connect to :ch - pubnub.add_listener(callback_messages) - pubnub.subscribe().channels(ch).execute() - callback_messages.wait_for_connect() - - prs_envelope = callback_presence.wait_for_presence_on(ch) - assert ch == prs_envelope.channel - assert 'join' == prs_envelope.event - assert pubnub.uuid == prs_envelope.uuid - - # wait for one heartbeat call - time.sleep(6) - - # - break messenger heartbeat loop - pubnub._subscription_manager._stop_heartbeat_timer() - - # - assert for timeout - presence_message = callback_presence.wait_for_presence_on(ch) - assert ch == presence_message.channel - assert 'timeout' == presence_message.event - assert pubnub.uuid == presence_message.uuid - - pubnub.unsubscribe().channels(ch).execute() - callback_messages.wait_for_disconnect() - - # - disconnect from :ch-pnpres - pubnub_listener.unsubscribe_all() - callback_presence.wait_for_disconnect() - - pubnub.stop() - pubnub_listener.stop() - time.sleep(1) + try: + # - connect to :ch-pnpres + pubnub_listener.add_listener(callback_presence) + pubnub_listener.subscribe().channels(ch).with_presence().execute() + callback_presence.wait_for_connect() + + presence_message = callback_presence.wait_for_presence_on(ch) + assert ch == presence_message.channel + assert 'join' == presence_message.event + assert pubnub_listener.uuid == presence_message.uuid + + # - connect to :ch + pubnub.add_listener(callback_messages) + pubnub.subscribe().channels(ch).execute() + callback_messages.wait_for_connect() + + prs_envelope = callback_presence.wait_for_presence_on(ch) + assert ch == prs_envelope.channel + assert 'join' == prs_envelope.event + assert pubnub.uuid == prs_envelope.uuid + + # wait for one heartbeat call + time.sleep(6) + + # - break messenger heartbeat loop + pubnub._subscription_manager._stop_heartbeat_timer() + + # - assert for timeout + presence_message = callback_presence.wait_for_presence_on(ch) + assert ch == presence_message.channel + assert 'timeout' == presence_message.event + assert pubnub.uuid == presence_message.uuid + + pubnub.unsubscribe().channels(ch).execute() + callback_messages.wait_for_disconnect() + + # - disconnect from :ch-pnpres + pubnub_listener.unsubscribe_all() + callback_presence.wait_for_disconnect() + finally: + pubnub.stop() + pubnub_listener.stop() + time.sleep(1) diff --git a/tests/integrational/native_threads/test_here_now.py b/tests/integrational/native_threads/test_here_now.py index 97536b82..48dd3d17 100644 --- a/tests/integrational/native_threads/test_here_now.py +++ b/tests/integrational/native_threads/test_here_now.py @@ -22,72 +22,76 @@ def callback(self, response, status): self.event.set() def test_single_channel(self): - pubnub = PubNub(pnconf_sub_copy()) + config = pnconf_sub_copy() + config.daemon = True + pn = PubNub(config) ch = helper.gen_channel("herenow-asyncio-channel") uuid = helper.gen_channel("herenow-asyncio-uuid") - pubnub.config.uuid = uuid + pn.config.uuid = uuid subscribe_listener = SubscribeListener() here_now_listener = NonSubscribeListener() - pubnub.add_listener(subscribe_listener) - pubnub.subscribe().channels(ch).execute() + pn.add_listener(subscribe_listener) + pn.subscribe().channels(ch).execute() - subscribe_listener.wait_for_connect() + try: + subscribe_listener.wait_for_connect() + time.sleep(2) - time.sleep(2) + pn.here_now() \ + .channels(ch) \ + .include_uuids(True) \ + .pn_async(here_now_listener.callback) - pubnub.here_now() \ - .channels(ch) \ - .include_uuids(True) \ - .pn_async(here_now_listener.callback) + if here_now_listener.pn_await() is False: + self.fail("HereNow operation timeout") - if here_now_listener.pn_await() is False: - self.fail("HereNow operation timeout") + result = here_now_listener.result + channels = result.channels - result = here_now_listener.result - channels = result.channels + assert len(channels) == 1 + assert channels[0].occupancy == 1 + assert channels[0].occupants[0].uuid == pn.uuid - assert len(channels) == 1 - assert channels[0].occupancy == 1 - assert channels[0].occupants[0].uuid == pubnub.uuid - - pubnub.unsubscribe().channels(ch).execute() - subscribe_listener.wait_for_disconnect() - - pubnub.stop() + pn.unsubscribe().channels(ch).execute() + subscribe_listener.wait_for_disconnect() + finally: + pn.stop() def test_multiple_channels(self): - pubnub = PubNub(pnconf_sub_copy()) + config = pnconf_sub_copy() + config.daemon = True + pn = PubNub(config) ch1 = helper.gen_channel("here-now-native-sync-ch1") ch2 = helper.gen_channel("here-now-native-sync-ch2") - pubnub.config.uuid = "here-now-native-sync-uuid" + pn.config.uuid = "here-now-native-sync-uuid" subscribe_listener = SubscribeListener() here_now_listener = NonSubscribeListener() - pubnub.add_listener(subscribe_listener) - pubnub.subscribe().channels([ch1, ch2]).execute() - - subscribe_listener.wait_for_connect() - - time.sleep(5) + pn.add_listener(subscribe_listener) + pn.subscribe().channels([ch1, ch2]).execute() - pubnub.here_now() \ - .channels([ch1, ch2]) \ - .pn_async(here_now_listener.callback) + try: + subscribe_listener.wait_for_connect() + time.sleep(5) - if here_now_listener.pn_await() is False: - self.fail("HereNow operation timeout") + pn.here_now() \ + .channels([ch1, ch2]) \ + .pn_async(here_now_listener.callback) - result = here_now_listener.result - channels = result.channels + if here_now_listener.pn_await() is False: + self.fail("HereNow operation timeout") - assert len(channels) == 2 - assert channels[0].occupancy == 1 - assert channels[0].occupants[0].uuid == pubnub.uuid - assert channels[1].occupancy == 1 - assert channels[1].occupants[0].uuid == pubnub.uuid + result = here_now_listener.result + channels = result.channels - pubnub.unsubscribe().channels([ch1, ch2]).execute() - subscribe_listener.wait_for_disconnect() + assert len(channels) == 2 + assert channels[0].occupancy == 1 + assert channels[0].occupants[0].uuid == pn.uuid + assert channels[1].occupancy == 1 + assert channels[1].occupants[0].uuid == pn.uuid - pubnub.stop() + pn.unsubscribe().channels([ch1, ch2]).execute() + subscribe_listener.wait_for_disconnect() + finally: + pn.stop() diff --git a/tests/integrational/native_threads/test_subscribe.py b/tests/integrational/native_threads/test_subscribe.py index f74ce481..40ce232c 100644 --- a/tests/integrational/native_threads/test_subscribe.py +++ b/tests/integrational/native_threads/test_subscribe.py @@ -152,39 +152,40 @@ def test_cg_subscribe_unsubscribe(self): callback_messages = SubscribeListener() cg_operation = NonSubscribeListener() - pubnub.add_channel_to_channel_group()\ - .channel_group(gr)\ - .channels(ch)\ - .pn_async(cg_operation.callback) - result = cg_operation.await_result(1) - if result is None: - self.fail("Add channel to channel group operation timeout or failed") - if cg_operation.status is not None and cg_operation.status.is_error(): - self.fail(f"Add channel to channel group operation failed with error: {cg_operation.status}") - assert isinstance(result, PNChannelGroupsAddChannelResult) - time.sleep(1) - - pubnub.add_listener(callback_messages) - pubnub.subscribe().channel_groups(gr).execute() - callback_messages.wait_for_connect() - - pubnub.unsubscribe().channel_groups(gr).execute() - callback_messages.wait_for_disconnect() - - # Create a new listener for the remove operation to avoid potential race conditions - cg_remove_operation = NonSubscribeListener() - pubnub.remove_channel_from_channel_group()\ - .channel_group(gr)\ - .channels(ch)\ - .pn_async(cg_remove_operation.callback) - result = cg_remove_operation.await_result(1) - if result is None: - self.fail("Remove channel from channel group operation timeout or failed") - if cg_remove_operation.status is not None and cg_remove_operation.status.is_error(): - self.fail(f"Remove channel from channel group operation failed with error: {cg_remove_operation.status}") - assert isinstance(result, PNChannelGroupsRemoveChannelResult) - - pubnub.stop() + try: + pubnub.add_channel_to_channel_group()\ + .channel_group(gr)\ + .channels(ch)\ + .pn_async(cg_operation.callback) + result = cg_operation.await_result(1) + if result is None: + self.fail("Add channel to channel group operation timeout or failed") + if cg_operation.status is not None and cg_operation.status.is_error(): + self.fail(f"Add channel to channel group operation failed with error: {cg_operation.status}") + assert isinstance(result, PNChannelGroupsAddChannelResult) + time.sleep(1) + + pubnub.add_listener(callback_messages) + pubnub.subscribe().channel_groups(gr).execute() + callback_messages.wait_for_connect() + + pubnub.unsubscribe().channel_groups(gr).execute() + callback_messages.wait_for_disconnect() + + # Create a new listener for the remove operation to avoid potential race conditions + cg_remove_operation = NonSubscribeListener() + pubnub.remove_channel_from_channel_group()\ + .channel_group(gr)\ + .channels(ch)\ + .pn_async(cg_remove_operation.callback) + result = cg_remove_operation.await_result(1) + if result is None: + self.fail("Remove channel from channel group operation timeout or failed") + if cg_remove_operation.status is not None and cg_remove_operation.status.is_error(): + self.fail(f"Remove channel from channel group operation failed with error: {cg_remove_operation.status}") + assert isinstance(result, PNChannelGroupsRemoveChannelResult) + finally: + pubnub.stop() def test_subscribe_cg_publish_unsubscribe(self): ch = "test-subscribe-unsubscribe-channel" @@ -195,52 +196,53 @@ def test_subscribe_cg_publish_unsubscribe(self): callback_messages = SubscribeListener() non_subscribe_listener = NonSubscribeListener() - pubnub.add_channel_to_channel_group() \ - .channel_group(gr) \ - .channels(ch) \ - .pn_async(non_subscribe_listener.callback) - result = non_subscribe_listener.await_result_and_reset(1) - if result is None: - self.fail("Add channel to channel group operation timeout or failed") - if non_subscribe_listener.status is not None and non_subscribe_listener.status.is_error(): - self.fail(f"Add channel to channel group operation failed with error: {non_subscribe_listener.status}") - assert isinstance(result, PNChannelGroupsAddChannelResult) - non_subscribe_listener.reset() - time.sleep(1) - - pubnub.add_listener(callback_messages) - pubnub.subscribe().channel_groups(gr).execute() - callback_messages.wait_for_connect() - - pubnub.publish().message(message).channel(ch).pn_async(non_subscribe_listener.callback) - result = non_subscribe_listener.await_result_and_reset(10) - if result is None: - print(f"Debug: non_subscribe_listener.status = {non_subscribe_listener.status}") - if non_subscribe_listener.status is not None: - print(f"Debug: status.is_error() = {non_subscribe_listener.status.is_error()}") - print(f"Debug: status.category = {non_subscribe_listener.status.category}") - print(f"Debug: status.error_data = {non_subscribe_listener.status.error_data}") - self.fail("Publish operation timeout or failed") - if non_subscribe_listener.status is not None and non_subscribe_listener.status.is_error(): - self.fail(f"Publish operation failed with error: {non_subscribe_listener.status}") - assert isinstance(result, PNPublishResult) - assert result.timetoken > 0 - - pubnub.unsubscribe().channel_groups(gr).execute() - callback_messages.wait_for_disconnect() - - pubnub.remove_channel_from_channel_group() \ - .channel_group(gr) \ - .channels(ch) \ - .pn_async(non_subscribe_listener.callback) - result = non_subscribe_listener.await_result_and_reset(1) - if result is None: - self.fail("Remove channel from channel group operation timeout or failed") - if non_subscribe_listener.status is not None and non_subscribe_listener.status.is_error(): - self.fail(f"Remove channel from channel group operation failed with error: {non_subscribe_listener.status}") - assert isinstance(result, PNChannelGroupsRemoveChannelResult) - - pubnub.stop() + try: + pubnub.add_channel_to_channel_group() \ + .channel_group(gr) \ + .channels(ch) \ + .pn_async(non_subscribe_listener.callback) + result = non_subscribe_listener.await_result_and_reset(1) + if result is None: + self.fail("Add channel to channel group operation timeout or failed") + if non_subscribe_listener.status is not None and non_subscribe_listener.status.is_error(): + self.fail(f"Add channel to channel group operation failed with error: {non_subscribe_listener.status}") + assert isinstance(result, PNChannelGroupsAddChannelResult) + non_subscribe_listener.reset() + time.sleep(1) + + pubnub.add_listener(callback_messages) + pubnub.subscribe().channel_groups(gr).execute() + callback_messages.wait_for_connect() + + pubnub.publish().message(message).channel(ch).pn_async(non_subscribe_listener.callback) + result = non_subscribe_listener.await_result_and_reset(10) + if result is None: + print(f"Debug: non_subscribe_listener.status = {non_subscribe_listener.status}") + if non_subscribe_listener.status is not None: + print(f"Debug: status.is_error() = {non_subscribe_listener.status.is_error()}") + print(f"Debug: status.category = {non_subscribe_listener.status.category}") + print(f"Debug: status.error_data = {non_subscribe_listener.status.error_data}") + self.fail("Publish operation timeout or failed") + if non_subscribe_listener.status is not None and non_subscribe_listener.status.is_error(): + self.fail(f"Publish operation failed with error: {non_subscribe_listener.status}") + assert isinstance(result, PNPublishResult) + assert result.timetoken > 0 + + pubnub.unsubscribe().channel_groups(gr).execute() + callback_messages.wait_for_disconnect() + + pubnub.remove_channel_from_channel_group() \ + .channel_group(gr) \ + .channels(ch) \ + .pn_async(non_subscribe_listener.callback) + result = non_subscribe_listener.await_result_and_reset(1) + if result is None: + self.fail("Remove channel from channel group operation timeout or failed") + if non_subscribe_listener.status is not None and non_subscribe_listener.status.is_error(): + self.fail(f"Remove channel from channel group operation failed with error: {non_subscribe_listener.status}") + assert isinstance(result, PNChannelGroupsRemoveChannelResult) + finally: + pubnub.stop() def test_subscribe_cg_join_leave(self): ch = helper.gen_channel("test-subscribe-unsubscribe-channel") @@ -250,55 +252,56 @@ def test_subscribe_cg_join_leave(self): callback_messages = SubscribeListener() callback_presence = SubscribeListener() - result = pubnub.add_channel_to_channel_group() \ - .channel_group(gr) \ - .channels(ch) \ - .sync() + try: + result = pubnub.add_channel_to_channel_group() \ + .channel_group(gr) \ + .channels(ch) \ + .sync() - assert isinstance(result.result, PNChannelGroupsAddChannelResult) - time.sleep(1) + assert isinstance(result.result, PNChannelGroupsAddChannelResult) + time.sleep(1) - pubnub.config.uuid = helper.gen_channel("messenger") - pubnub_listener.config.uuid = helper.gen_channel("listener") + pubnub.config.uuid = helper.gen_channel("messenger") + pubnub_listener.config.uuid = helper.gen_channel("listener") - pubnub.add_listener(callback_messages) - pubnub_listener.add_listener(callback_presence) - - pubnub_listener.subscribe().channel_groups(gr).with_presence().execute() - callback_presence.wait_for_connect() + pubnub.add_listener(callback_messages) + pubnub_listener.add_listener(callback_presence) - envelope = callback_presence.wait_for_presence_on(ch) - assert envelope.channel == ch - assert envelope.event == 'join' - assert envelope.uuid == pubnub_listener.uuid + pubnub_listener.subscribe().channel_groups(gr).with_presence().execute() + callback_presence.wait_for_connect() - pubnub.subscribe().channel_groups(gr).execute() - callback_messages.wait_for_connect() + envelope = callback_presence.wait_for_presence_on(ch) + assert envelope.channel == ch + assert envelope.event == 'join' + assert envelope.uuid == pubnub_listener.uuid - envelope = callback_presence.wait_for_presence_on(ch) - assert envelope.channel == ch - assert envelope.event == 'join' - assert envelope.uuid == pubnub.uuid + pubnub.subscribe().channel_groups(gr).execute() + callback_messages.wait_for_connect() - pubnub.unsubscribe().channel_groups(gr).execute() - callback_messages.wait_for_disconnect() + envelope = callback_presence.wait_for_presence_on(ch) + assert envelope.channel == ch + assert envelope.event == 'join' + assert envelope.uuid == pubnub.uuid - envelope = callback_presence.wait_for_presence_on(ch) - assert envelope.channel == ch - assert envelope.event == 'leave' - assert envelope.uuid == pubnub.uuid + pubnub.unsubscribe().channel_groups(gr).execute() + callback_messages.wait_for_disconnect() - pubnub_listener.unsubscribe().channel_groups(gr).execute() - callback_presence.wait_for_disconnect() + envelope = callback_presence.wait_for_presence_on(ch) + assert envelope.channel == ch + assert envelope.event == 'leave' + assert envelope.uuid == pubnub.uuid - result = pubnub.remove_channel_from_channel_group() \ - .channel_group(gr) \ - .channels(ch) \ - .sync() - assert isinstance(result.result, PNChannelGroupsRemoveChannelResult) + pubnub_listener.unsubscribe().channel_groups(gr).execute() + callback_presence.wait_for_disconnect() - pubnub.stop() - pubnub_listener.stop() + result = pubnub.remove_channel_from_channel_group() \ + .channel_group(gr) \ + .channels(ch) \ + .sync() + assert isinstance(result.result, PNChannelGroupsRemoveChannelResult) + finally: + pubnub.stop() + pubnub_listener.stop() def test_subscribe_pub_unencrypted_unsubscribe(self): ch = helper.gen_channel("test-subscribe-pub-unencrypted-unsubscribe") diff --git a/tests/integrational/native_threads/test_where_now.py b/tests/integrational/native_threads/test_where_now.py index 218bf7d6..6ba776b3 100644 --- a/tests/integrational/native_threads/test_where_now.py +++ b/tests/integrational/native_threads/test_where_now.py @@ -1,90 +1,82 @@ import unittest import logging import pubnub -import threading import time from pubnub.pubnub import PubNub, SubscribeListener, NonSubscribeListener +from tests import helper from tests.helper import pnconf_env_copy pubnub.set_stream_logger('pubnub', logging.DEBUG) class TestPubNubState(unittest.TestCase): - def setUp(self): - self.event = threading.Event() - - def callback(self, response, status): - self.response = response - self.status = status - self.event.set() - - # for subscribe we don't use VCR due to it's limitations with longpolling def test_single_channel(self): - print('test_single_channel') - pubnub = PubNub(pnconf_env_copy(enable_subscribe=True)) - ch = "wherenow-asyncio-channel" - uuid = "wherenow-asyncio-uuid" - pubnub.config.uuid = uuid + ch = helper.gen_channel("wherenow-native-ch") + uuid = helper.gen_channel("wherenow-native-uuid") + config = pnconf_env_copy(enable_subscribe=True, daemon=True, enable_presence_heartbeat=True) + config.uuid = uuid + pn = PubNub(config) subscribe_listener = SubscribeListener() where_now_listener = NonSubscribeListener() - pubnub.add_listener(subscribe_listener) - pubnub.subscribe().channels(ch).execute() - subscribe_listener.wait_for_connect() + pn.add_listener(subscribe_listener) + pn.subscribe().channels(ch).execute() - # the delay is needed for the server side to propagate presence - time.sleep(3) - pubnub.where_now() \ - .uuid(uuid) \ - .pn_async(where_now_listener.callback) + try: + subscribe_listener.wait_for_connect() + time.sleep(5) - if where_now_listener.pn_await() is False: - self.fail("WhereNow operation timeout") + pn.where_now() \ + .uuid(uuid) \ + .pn_async(where_now_listener.callback) - result = where_now_listener.result - channels = result.channels + if where_now_listener.pn_await() is False: + self.fail("WhereNow operation timeout") - assert len(channels) == 1 - assert channels[0] == ch + result = where_now_listener.result + channels = result.channels - pubnub.unsubscribe().channels(ch).execute() - subscribe_listener.wait_for_disconnect() + assert len(channels) == 1 + assert channels[0] == ch - pubnub.stop() + pn.unsubscribe().channels(ch).execute() + subscribe_listener.wait_for_disconnect() + finally: + pn.stop() - # for subscribe we don't use VCR due to it's limitations with longpolling def test_multiple_channels(self): - pubnub = PubNub(pnconf_env_copy(enable_subscribe=True)) - ch1 = "state-native-sync-ch-1" - ch2 = "state-native-sync-ch-2" - pubnub.config.uuid = "state-native-sync-uuid" - uuid = pubnub.config.uuid + ch1 = helper.gen_channel("state-native-sync-ch-1") + ch2 = helper.gen_channel("state-native-sync-ch-2") + uuid = helper.gen_channel("state-native-sync-uuid") + config = pnconf_env_copy(enable_subscribe=True, daemon=True, enable_presence_heartbeat=True) + config.uuid = uuid + pn = PubNub(config) subscribe_listener = SubscribeListener() where_now_listener = NonSubscribeListener() - pubnub.add_listener(subscribe_listener) - pubnub.subscribe().channels([ch1, ch2]).execute() - - subscribe_listener.wait_for_connect() + pn.add_listener(subscribe_listener) + pn.subscribe().channels([ch1, ch2]).execute() - # the delay is needed for the server side to propagate presence - time.sleep(3) - pubnub.where_now() \ - .uuid(uuid) \ - .pn_async(where_now_listener.callback) + try: + subscribe_listener.wait_for_connect() + time.sleep(5) - if where_now_listener.pn_await() is False: - self.fail("WhereNow operation timeout") + pn.where_now() \ + .uuid(uuid) \ + .pn_async(where_now_listener.callback) - result = where_now_listener.result - channels = result.channels + if where_now_listener.pn_await() is False: + self.fail("WhereNow operation timeout") - assert len(channels) == 2 - assert ch1 in channels - assert ch2 in channels + result = where_now_listener.result + channels = result.channels - pubnub.unsubscribe().channels([ch1, ch2]).execute() - subscribe_listener.wait_for_disconnect() + assert len(channels) == 2 + assert ch1 in channels + assert ch2 in channels - pubnub.stop() + pn.unsubscribe().channels([ch1, ch2]).execute() + subscribe_listener.wait_for_disconnect() + finally: + pn.stop() diff --git a/tests/unit/test_wall_clock_deadline.py b/tests/unit/test_wall_clock_deadline.py new file mode 100644 index 00000000..45449495 --- /dev/null +++ b/tests/unit/test_wall_clock_deadline.py @@ -0,0 +1,377 @@ +"""Tests for wall-clock deadline detection in request handlers. + +On macOS and Linux, time.monotonic() does not advance during system sleep, +causing socket timeouts (which use monotonic time) to take much longer than +expected in wall-clock time. These tests verify that the wall-clock deadline +mechanism detects this and cancels requests promptly. +""" +import asyncio +import threading +import time +import unittest +from unittest.mock import patch, MagicMock + +import httpx + +from pubnub.request_handlers.httpx import WallClockDeadlineWatchdog + + +class TestWallClockDeadlineWatchdog(unittest.TestCase): + """Tests for the persistent per-thread WallClockDeadlineWatchdog.""" + + def _make_watchdog(self, check_interval=0.1): + watchdog = WallClockDeadlineWatchdog() + watchdog.CHECK_INTERVAL = check_interval + return watchdog + + def test_watchdog_does_not_trigger_before_deadline(self): + """Watchdog should not trigger when deadline hasn't passed.""" + session = MagicMock(spec=httpx.Client) + watchdog = self._make_watchdog() + + watchdog.set_deadline(session, time.time() + 60) + time.sleep(0.3) + watchdog.clear_deadline() + watchdog.stop() + + self.assertFalse(watchdog.triggered) + session.close.assert_not_called() + + def test_watchdog_triggers_when_deadline_passed(self): + """Watchdog should trigger when wall-clock deadline has already passed.""" + session = MagicMock(spec=httpx.Client) + watchdog = self._make_watchdog() + + watchdog.set_deadline(session, time.time() - 1) + time.sleep(0.5) + watchdog.stop() + + self.assertTrue(watchdog.triggered) + session.close.assert_called_once() + + def test_watchdog_detects_simulated_sleep(self): + """Simulate system sleep by making time.time() jump forward.""" + session = MagicMock(spec=httpx.Client) + real_time = time.time + + start_wall = real_time() + deadline = start_wall + 10 + + call_count = [0] + + def mock_time(): + call_count[0] += 1 + if call_count[0] <= 2: + return real_time() + else: + return start_wall + 60 + + watchdog = self._make_watchdog() + + with patch('pubnub.request_handlers.httpx.time') as mock_time_module: + mock_time_module.time = mock_time + watchdog.set_deadline(session, deadline) + time.sleep(0.5) + + watchdog.stop() + + self.assertTrue(watchdog.triggered) + session.close.assert_called_once() + + def test_watchdog_reuse_across_requests(self): + """Watchdog thread should be reused across multiple set/clear cycles.""" + session = MagicMock(spec=httpx.Client) + watchdog = self._make_watchdog() + + # First request + watchdog.set_deadline(session, time.time() + 60) + time.sleep(0.05) + thread_id_1 = watchdog._thread.ident + watchdog.clear_deadline() + + # Second request + watchdog.set_deadline(session, time.time() + 60) + time.sleep(0.05) + thread_id_2 = watchdog._thread.ident + watchdog.clear_deadline() + + watchdog.stop() + + self.assertEqual(thread_id_1, thread_id_2) + self.assertFalse(watchdog.triggered) + + def test_watchdog_resets_triggered_on_new_deadline(self): + """set_deadline() should reset the triggered flag for the calling thread.""" + session = MagicMock(spec=httpx.Client) + watchdog = self._make_watchdog() + + # Trigger the watchdog + watchdog.set_deadline(session, time.time() - 1) + time.sleep(0.3) + self.assertTrue(watchdog.triggered) + + # New deadline resets triggered + watchdog.set_deadline(session, time.time() + 60) + self.assertFalse(watchdog.triggered) + + watchdog.clear_deadline() + watchdog.stop() + + def test_watchdog_clear_prevents_trigger(self): + """clear_deadline() before deadline passes should prevent triggering.""" + session = MagicMock(spec=httpx.Client) + watchdog = self._make_watchdog(check_interval=0.5) + + watchdog.set_deadline(session, time.time() + 0.3) + watchdog.clear_deadline() + time.sleep(0.8) + + watchdog.stop() + + self.assertFalse(watchdog.triggered) + session.close.assert_not_called() + + def test_watchdog_thread_is_daemon(self): + """Watchdog thread must be daemon so it doesn't prevent process exit.""" + watchdog = self._make_watchdog() + session = MagicMock(spec=httpx.Client) + + watchdog.set_deadline(session, time.time() + 300) + self.assertTrue(watchdog._thread.daemon) + + watchdog.clear_deadline() + watchdog.stop() + + def test_watchdog_handles_session_close_exception(self): + """Watchdog should handle exceptions from session.close() gracefully.""" + session = MagicMock(spec=httpx.Client) + session.close.side_effect = RuntimeError("Already closed") + watchdog = self._make_watchdog() + + watchdog.set_deadline(session, time.time() - 1) + time.sleep(0.5) + watchdog.stop() + + self.assertTrue(watchdog.triggered) + + def test_watchdog_thread_exits_on_stop(self): + """Watchdog thread should exit after stop() is called.""" + watchdog = self._make_watchdog() + session = MagicMock(spec=httpx.Client) + + watchdog.set_deadline(session, time.time() + 300) + time.sleep(0.05) + watchdog.stop() + time.sleep(0.3) + + self.assertFalse(watchdog._thread.is_alive()) + + def test_watchdog_no_thread_until_needed(self): + """Thread should not be created until set_deadline() is called.""" + watchdog = self._make_watchdog() + self.assertIsNone(watchdog._thread) + + session = MagicMock(spec=httpx.Client) + watchdog.set_deadline(session, time.time() + 60) + self.assertIsNotNone(watchdog._thread) + + watchdog.clear_deadline() + watchdog.stop() + + def test_concurrent_requests_independent_deadlines(self): + """Deadlines from different threads should not interfere with each other.""" + session = MagicMock(spec=httpx.Client) + watchdog = self._make_watchdog() + + thread1_triggered = [None] + thread2_triggered = [None] + + def thread1_work(): + # Long deadline — should NOT trigger + watchdog.set_deadline(session, time.time() + 60) + time.sleep(0.5) + thread1_triggered[0] = watchdog.triggered + watchdog.clear_deadline() + + def thread2_work(): + # Already-passed deadline — SHOULD trigger + time.sleep(0.05) # Start slightly after thread1 + watchdog.set_deadline(session, time.time() - 1) + time.sleep(0.4) + thread2_triggered[0] = watchdog.triggered + watchdog.clear_deadline() + + t1 = threading.Thread(target=thread1_work) + t2 = threading.Thread(target=thread2_work) + t1.start() + t2.start() + t1.join() + t2.join() + + watchdog.stop() + + self.assertFalse(thread1_triggered[0], "Thread 1 (long deadline) should not be triggered") + self.assertTrue(thread2_triggered[0], "Thread 2 (past deadline) should be triggered") + + def test_clear_from_one_thread_does_not_affect_another(self): + """clear_deadline() from thread A should not clear thread B's deadline.""" + session = MagicMock(spec=httpx.Client) + watchdog = self._make_watchdog() + + barrier = threading.Barrier(2) + thread_b_triggered = [None] + + def thread_a(): + watchdog.set_deadline(session, time.time() + 60) + barrier.wait() # Sync with thread B + watchdog.clear_deadline() + + def thread_b(): + watchdog.set_deadline(session, time.time() - 1) + barrier.wait() # Sync with thread A + time.sleep(0.3) # Wait for watchdog to process + thread_b_triggered[0] = watchdog.triggered + watchdog.clear_deadline() + + ta = threading.Thread(target=thread_a) + tb = threading.Thread(target=thread_b) + ta.start() + tb.start() + ta.join() + tb.join() + + watchdog.stop() + + self.assertTrue(thread_b_triggered[0], "Thread B's expired deadline should still trigger") + + +class TestAsyncWallClockDeadline(unittest.TestCase): + """Tests for the asyncio wall-clock deadline in AsyncHttpxRequestHandler.""" + + def test_async_deadline_cancels_on_simulated_sleep(self): + """Async request should be cancelled when wall-clock deadline passes.""" + from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler + + handler = AsyncHttpxRequestHandler.__new__(AsyncHttpxRequestHandler) + handler._session = MagicMock() + + async def never_completes(**kwargs): + await asyncio.sleep(3600) + + handler._session.request = never_completes + + async def run_test(): + real_time = time.time + start = real_time() + call_count = [0] + + def mock_time(): + call_count[0] += 1 + if call_count[0] <= 3: + return real_time() + return start + 60 + + with patch('pubnub.request_handlers.async_httpx.time') as mock_time_module: + mock_time_module.time = mock_time + original = AsyncHttpxRequestHandler.WALL_CLOCK_CHECK_INTERVAL + AsyncHttpxRequestHandler.WALL_CLOCK_CHECK_INTERVAL = 0.1 + try: + with self.assertRaises(asyncio.TimeoutError): + await handler._request_with_wall_clock_deadline( + {"method": "GET", "url": "http://test"}, + timeout=10 + ) + finally: + AsyncHttpxRequestHandler.WALL_CLOCK_CHECK_INTERVAL = original + + asyncio.run(run_test()) + + def test_async_deadline_passes_through_normal_response(self): + """Normal responses should pass through unaffected.""" + from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler + + handler = AsyncHttpxRequestHandler.__new__(AsyncHttpxRequestHandler) + handler._session = MagicMock() + + mock_response = MagicMock() + + async def quick_response(**kwargs): + return mock_response + + handler._session.request = quick_response + + async def run_test(): + result = await handler._request_with_wall_clock_deadline( + {"method": "GET", "url": "http://test"}, + timeout=10 + ) + self.assertEqual(result, mock_response) + + asyncio.run(run_test()) + + def test_async_deadline_none_timeout_skips_watchdog(self): + """When timeout is None, should call request directly without watchdog.""" + from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler + + handler = AsyncHttpxRequestHandler.__new__(AsyncHttpxRequestHandler) + handler._session = MagicMock() + + mock_response = MagicMock() + + async def quick_response(**kwargs): + return mock_response + + handler._session.request = quick_response + + async def run_test(): + result = await handler._request_with_wall_clock_deadline( + {"method": "GET", "url": "http://test"}, + timeout=None + ) + self.assertEqual(result, mock_response) + + asyncio.run(run_test()) + + def test_async_deadline_propagates_request_exception(self): + """Exceptions from the HTTP request should propagate normally.""" + from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler + + handler = AsyncHttpxRequestHandler.__new__(AsyncHttpxRequestHandler) + handler._session = MagicMock() + + async def failing_request(**kwargs): + raise httpx.ConnectError("Connection refused") + + handler._session.request = failing_request + + async def run_test(): + with self.assertRaises(httpx.ConnectError): + await handler._request_with_wall_clock_deadline( + {"method": "GET", "url": "http://test"}, + timeout=10 + ) + + asyncio.run(run_test()) + + +class TestHttpxRequestHandlerCleanup(unittest.TestCase): + """Tests for proper cleanup of HttpxRequestHandler.""" + + def test_close_stops_watchdog(self): + """HttpxRequestHandler.close() should stop the watchdog thread.""" + from pubnub.request_handlers.httpx import HttpxRequestHandler + from tests.helper import pnconf_copy + + handler = HttpxRequestHandler(MagicMock()) + # Start the watchdog by setting a deadline + handler._watchdog.set_deadline(handler.session, time.time() + 300) + self.assertIsNotNone(handler._watchdog._thread) + + handler.close() + time.sleep(0.2) + + self.assertTrue(handler._watchdog._stop.is_set()) + + +if __name__ == '__main__': + unittest.main() From 7b640c98188abb3f2fef74a2b2d9312219f3ff79 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 25 Mar 2026 01:08:16 +0200 Subject: [PATCH 2/7] refactor(lint): address link issues --- pubnub/request_handlers/async_httpx.py | 9 +++++---- pubnub/request_handlers/httpx.py | 9 +++++---- tests/integrational/native_threads/test_subscribe.py | 8 ++++++-- tests/unit/test_wall_clock_deadline.py | 1 - 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pubnub/request_handlers/async_httpx.py b/pubnub/request_handlers/async_httpx.py index 492fe4b6..4fe735a8 100644 --- a/pubnub/request_handlers/async_httpx.py +++ b/pubnub/request_handlers/async_httpx.py @@ -137,10 +137,11 @@ async def async_request(self, options_func, cancellation_event): 'headers': request_headers, 'url': full_url, 'follow_redirects': options.allow_redirects, - 'timeout': httpx.Timeout(connect=options.connect_timeout, - read=options.request_timeout, - write=options.connect_timeout, - pool=options.connect_timeout), + 'timeout': httpx.Timeout( + connect=options.connect_timeout, + read=options.request_timeout, + write=options.connect_timeout, + pool=options.connect_timeout), } if options.is_post() or options.is_patch(): request_arguments['content'] = options.data diff --git a/pubnub/request_handlers/httpx.py b/pubnub/request_handlers/httpx.py index 0a7c9bad..f80bcce6 100644 --- a/pubnub/request_handlers/httpx.py +++ b/pubnub/request_handlers/httpx.py @@ -346,10 +346,11 @@ def _invoke_request(self, p_options, e_options, base_origin): "method": e_options.method_string, "headers": request_headers, "url": httpx.URL(url, query=e_options.query_string.encode("utf-8")), - "timeout": httpx.Timeout(connect=e_options.connect_timeout, - read=e_options.request_timeout, - write=e_options.connect_timeout, - pool=e_options.connect_timeout), + "timeout": httpx.Timeout( + connect=e_options.connect_timeout, + read=e_options.request_timeout, + write=e_options.connect_timeout, + pool=e_options.connect_timeout), "follow_redirects": e_options.allow_redirects } diff --git a/tests/integrational/native_threads/test_subscribe.py b/tests/integrational/native_threads/test_subscribe.py index 40ce232c..d12c9333 100644 --- a/tests/integrational/native_threads/test_subscribe.py +++ b/tests/integrational/native_threads/test_subscribe.py @@ -182,7 +182,9 @@ def test_cg_subscribe_unsubscribe(self): if result is None: self.fail("Remove channel from channel group operation timeout or failed") if cg_remove_operation.status is not None and cg_remove_operation.status.is_error(): - self.fail(f"Remove channel from channel group operation failed with error: {cg_remove_operation.status}") + self.fail( + f"Remove channel from channel group failed: {cg_remove_operation.status}" + ) assert isinstance(result, PNChannelGroupsRemoveChannelResult) finally: pubnub.stop() @@ -239,7 +241,9 @@ def test_subscribe_cg_publish_unsubscribe(self): if result is None: self.fail("Remove channel from channel group operation timeout or failed") if non_subscribe_listener.status is not None and non_subscribe_listener.status.is_error(): - self.fail(f"Remove channel from channel group operation failed with error: {non_subscribe_listener.status}") + self.fail( + f"Remove channel from channel group failed: {non_subscribe_listener.status}" + ) assert isinstance(result, PNChannelGroupsRemoveChannelResult) finally: pubnub.stop() diff --git a/tests/unit/test_wall_clock_deadline.py b/tests/unit/test_wall_clock_deadline.py index 45449495..9465218b 100644 --- a/tests/unit/test_wall_clock_deadline.py +++ b/tests/unit/test_wall_clock_deadline.py @@ -360,7 +360,6 @@ class TestHttpxRequestHandlerCleanup(unittest.TestCase): def test_close_stops_watchdog(self): """HttpxRequestHandler.close() should stop the watchdog thread.""" from pubnub.request_handlers.httpx import HttpxRequestHandler - from tests.helper import pnconf_copy handler = HttpxRequestHandler(MagicMock()) # Start the watchdog by setting a deadline From 47d270df66bf3c9a6e433da39050e22253dafcd5 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 25 Mar 2026 16:40:35 +0200 Subject: [PATCH 3/7] fix(native-threads): recreate `httpx` session after watchdog closes it during sleep Session was never recreated after the wall-clock watchdog closed it, causing all reconnection `/time/0` calls to fail permanently with "client has been closed". fix(native-threads): use socket.shutdown to immediately unblock reads on macOS On macOS, `socket.close()` from another thread does not interrupt a blocked `recv()`. Use `socket.shutdown(SHUT_RDWR)` on the raw TCP socket to unblock within seconds instead of `~25` minutes. fix(reconnect): classify wall-clock sleep timeouts as unexpected disconnect Sleep-induced timeouts were mapped to `PNTimeoutCategory` which triggers an immediate silent restart, bypassing the reconnection manager. Map them to `PNUnexpectedDisconnectCategory` so all paths use the configured retry policy (`exponential`/`linear`). build(deps): pin `httpx<1.0` for internal attribute stability The socket shutdown fix accesses `httpcore` private attributes to reach the raw TCP socket. Pin upper bound to prevent silent breakage on major version changes; access is wrapped in try/except fallback. test(wall-clock-deadline): add tests for sleep/wake reconnection fixes Cover session recreation, `PNERR_CONNECTION_ERROR` mapping, `WallClockTimeoutError` classification, socket shutdown attribute path, and graceful degradation when `httpcore` internals change. --- pubnub/pubnub_asyncio.py | 17 +- pubnub/request_handlers/async_httpx.py | 7 +- pubnub/request_handlers/httpx.py | 59 +++- setup.py | 2 +- tests/unit/test_wall_clock_deadline.py | 373 +++++++++++++++++++++++++ 5 files changed, 448 insertions(+), 10 deletions(-) diff --git a/pubnub/pubnub_asyncio.py b/pubnub/pubnub_asyncio.py index 6e374c0c..62dd8a57 100644 --- a/pubnub/pubnub_asyncio.py +++ b/pubnub/pubnub_asyncio.py @@ -70,13 +70,13 @@ async def main(): from pubnub.endpoints.pubsub.subscribe import Subscribe from pubnub.pubnub_core import PubNubCore from pubnub.request_handlers.base import BaseRequestHandler -from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler +from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler, WallClockTimeoutError from pubnub.workers import SubscribeMessageWorker from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager from pubnub import utils from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy from pubnub.callbacks import SubscribeCallback, ReconnectionCallback -from pubnub.errors import PNERR_REQUEST_CANCELLED, PNERR_CLIENT_TIMEOUT +from pubnub.errors import PNERR_REQUEST_CANCELLED, PNERR_CLIENT_TIMEOUT, PNERR_CONNECTION_ERROR from pubnub.exceptions import PubNubAsyncioException, PubNubException # flake8: noqa @@ -247,6 +247,19 @@ async def request_future(self, options_func, cancellation_event): result=None, status=status ) + except WallClockTimeoutError: + return PubNubAsyncioException( + result=None, + status=options_func().create_status( + PNStatusCategory.PNUnexpectedDisconnectCategory, + None, + None, + exception=PubNubException( + pn_error=PNERR_CONNECTION_ERROR, + errormsg="Wall-clock deadline exceeded (system sleep detected)" + ) + ) + ) except asyncio.TimeoutError: return PubNubAsyncioException( result=None, diff --git a/pubnub/request_handlers/async_httpx.py b/pubnub/request_handlers/async_httpx.py index 4fe735a8..03278a1f 100644 --- a/pubnub/request_handlers/async_httpx.py +++ b/pubnub/request_handlers/async_httpx.py @@ -17,6 +17,11 @@ logger = logging.getLogger("pubnub") +class WallClockTimeoutError(asyncio.TimeoutError): + """Raised when a wall-clock deadline is exceeded, typically due to system sleep.""" + pass + + class PubNubAsyncHTTPTransport(httpx.AsyncHTTPTransport): is_closed = False @@ -78,7 +83,7 @@ async def _request_with_wall_clock_deadline(self, request_arguments, timeout): remaining = wall_deadline - time.time() if remaining <= 0: request_task.cancel() - raise asyncio.TimeoutError("Wall-clock deadline exceeded") + raise WallClockTimeoutError("Wall-clock deadline exceeded (system sleep detected)") done, _ = await asyncio.wait( {request_task}, diff --git a/pubnub/request_handlers/httpx.py b/pubnub/request_handlers/httpx.py index f80bcce6..cef48756 100644 --- a/pubnub/request_handlers/httpx.py +++ b/pubnub/request_handlers/httpx.py @@ -1,4 +1,5 @@ import logging +import socket import time import threading import httpx @@ -80,6 +81,45 @@ def stop(self): self._stop.set() self._wake.set() + @staticmethod + def _force_shutdown_connections(session): + """Force-interrupt blocked socket reads by shutting down raw TCP sockets. + + On macOS (BSD), socket.close() from another thread does NOT interrupt a blocked + recv(). Only socket.shutdown(SHUT_RDWR) reliably unblocks it across platforms. + + We access httpcore internals to reach the raw socket: + session._transport._pool._connections[i]._connection._network_stream._sock + This is wrapped in try/except so version changes in httpcore degrade gracefully + to session.close() behavior. + + Tested with: httpx 0.28.1, httpcore 1.0.9 + """ + try: + transport = getattr(session, '_transport', None) + pool = getattr(transport, '_pool', None) if transport else None + connections = getattr(pool, '_connections', []) if pool else [] + for conn in list(connections): + try: + inner = getattr(conn, '_connection', None) + if inner is None: + continue + stream = getattr(inner, '_network_stream', None) + if stream is None: + continue + sock = getattr(stream, '_sock', None) + if sock is None: + continue + sock.shutdown(socket.SHUT_RDWR) + except (OSError, Exception): + pass + except Exception: + pass + try: + session.close() + except Exception as e: + logger.debug(f"Error closing session: {e}") + def _run(self): while not self._stop.is_set(): with self._lock: @@ -110,10 +150,7 @@ def _run(self): self._triggered_threads.add(earliest_tid) self._deadlines.pop(earliest_tid, None) logger.debug("Wall-clock deadline exceeded, closing session transport") - try: - earliest_session.close() - except Exception as e: - logger.debug(f"Error closing session: {e}") + self._force_shutdown_connections(earliest_session) continue # Sleep until next check, new deadline, or stop @@ -332,6 +369,10 @@ def _invoke_request(self, p_options, e_options, base_origin): assert isinstance(p_options, PlatformOptions) assert isinstance(e_options, RequestOptions) + if self.session.is_closed: + logger.debug("HTTP session was closed (e.g. by wall-clock watchdog), recreating") + self.session = httpx.Client() + if base_origin: url = p_options.pn_config.scheme() + "://" + base_origin + e_options.path else: @@ -397,7 +438,7 @@ def _invoke_request(self, p_options, e_options, base_origin): if use_watchdog and self._watchdog.triggered: self.session = httpx.Client() raise PubNubException( - pn_error=PNERR_CLIENT_TIMEOUT, + pn_error=PNERR_CONNECTION_ERROR, errormsg="Wall-clock deadline exceeded (system sleep detected)" ) raise PubNubException( @@ -405,6 +446,12 @@ def _invoke_request(self, p_options, e_options, base_origin): errormsg=str(e) ) except httpx.TimeoutException as e: + if use_watchdog and self._watchdog.triggered: + self.session = httpx.Client() + raise PubNubException( + pn_error=PNERR_CONNECTION_ERROR, + errormsg="Wall-clock deadline exceeded (system sleep detected)" + ) raise PubNubException( pn_error=PNERR_CLIENT_TIMEOUT, errormsg=str(e) @@ -424,7 +471,7 @@ def _invoke_request(self, p_options, e_options, base_origin): if use_watchdog and self._watchdog.triggered: self.session = httpx.Client() raise PubNubException( - pn_error=PNERR_CLIENT_TIMEOUT, + pn_error=PNERR_CONNECTION_ERROR, errormsg="Wall-clock deadline exceeded (system sleep detected)" ) raise PubNubException( diff --git a/setup.py b/setup.py index d765acb9..2caf9a55 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ python_requires='>=3.9', install_requires=[ 'pycryptodomex>=3.3', - 'httpx>=0.28', + 'httpx>=0.28,<1.0', 'h2>=4.1', 'requests>=2.32.2', 'aiohttp>3.10.11', diff --git a/tests/unit/test_wall_clock_deadline.py b/tests/unit/test_wall_clock_deadline.py index 9465218b..76abf3b3 100644 --- a/tests/unit/test_wall_clock_deadline.py +++ b/tests/unit/test_wall_clock_deadline.py @@ -372,5 +372,378 @@ def test_close_stops_watchdog(self): self.assertTrue(handler._watchdog._stop.is_set()) +class TestHttpxSessionRecreation(unittest.TestCase): + """Tests for automatic session recreation after watchdog closes the session. + + When the WallClockDeadlineWatchdog closes the httpx.Client during system sleep, + subsequent requests (e.g., reconnection /time/0 calls) must detect the closed + session and recreate it. Without this, native threads reconnection never succeeds. + """ + + def _make_handler(self): + from pubnub.request_handlers.httpx import HttpxRequestHandler + pubnub_mock = MagicMock() + pubnub_mock.config.origin = 'ps.pndsn.com' + pubnub_mock.config.scheme.return_value = 'https' + handler = HttpxRequestHandler(pubnub_mock) + return handler + + def test_closed_session_is_recreated_on_next_request(self): + """After session.close(), the next _invoke_request should recreate the session.""" + handler = self._make_handler() + original_session = handler.session + + # Simulate what the watchdog does + handler.session.close() + self.assertTrue(handler.session.is_closed) + + # Build minimal request options + from pubnub.structures import RequestOptions, PlatformOptions + p_options = MagicMock(spec=PlatformOptions) + p_options.pn_config = MagicMock() + p_options.pn_config.scheme.return_value = 'https' + p_options.headers = {} + + e_options = MagicMock(spec=RequestOptions) + e_options.method_string = 'GET' + e_options.path = '/time/0' + e_options.query_string = 'pnsdk=test' + e_options.request_headers = None + e_options.connect_timeout = 10 + e_options.request_timeout = 10 + e_options.is_post.return_value = False + e_options.is_patch.return_value = False + e_options.allow_redirects = True + e_options.use_base_path = True + + # The request will fail (no real server), but session should be recreated first + try: + handler._invoke_request(p_options, e_options, 'ps.pndsn.com') + except Exception: + pass + + self.assertFalse(handler.session.is_closed) + self.assertIsNot(handler.session, original_session) + + handler.close() + + def test_open_session_is_not_recreated(self): + """An open session should not be replaced.""" + handler = self._make_handler() + original_session = handler.session + + self.assertFalse(handler.session.is_closed) + + from pubnub.structures import RequestOptions, PlatformOptions + p_options = MagicMock(spec=PlatformOptions) + p_options.pn_config = MagicMock() + p_options.pn_config.scheme.return_value = 'https' + p_options.headers = {} + + e_options = MagicMock(spec=RequestOptions) + e_options.method_string = 'GET' + e_options.path = '/time/0' + e_options.query_string = 'pnsdk=test' + e_options.request_headers = None + e_options.connect_timeout = 10 + e_options.request_timeout = 10 + e_options.is_post.return_value = False + e_options.is_patch.return_value = False + e_options.allow_redirects = True + e_options.use_base_path = True + + try: + handler._invoke_request(p_options, e_options, 'ps.pndsn.com') + except Exception: + pass + + self.assertIs(handler.session, original_session) + handler.close() + + def test_watchdog_trigger_with_timeout_exception_recreates_session(self): + """When watchdog triggers and the request gets TimeoutException, session should be recreated.""" + handler = self._make_handler() + original_session = handler.session + + # Set up watchdog as triggered + handler._watchdog.set_deadline(handler.session, time.time() - 1) + time.sleep(0.3) + self.assertTrue(handler.session.is_closed) + + from pubnub.structures import RequestOptions, PlatformOptions + p_options = MagicMock(spec=PlatformOptions) + p_options.pn_config = MagicMock() + p_options.pn_config.scheme.return_value = 'https' + p_options.headers = {} + + e_options = MagicMock(spec=RequestOptions) + e_options.method_string = 'GET' + e_options.path = '/v2/subscribe/demo/test/0' + e_options.query_string = 'tt=0&pnsdk=test' + e_options.request_headers = None + e_options.connect_timeout = 10 + e_options.request_timeout = 310 # > 30, triggers watchdog usage + e_options.is_post.return_value = False + e_options.is_patch.return_value = False + e_options.allow_redirects = True + e_options.use_base_path = True + + try: + handler._invoke_request(p_options, e_options, 'ps.pndsn.com') + except Exception: + pass + + # Session should have been recreated (either by is_closed check or watchdog trigger handler) + self.assertFalse(handler.session.is_closed) + self.assertIsNot(handler.session, original_session) + + handler._watchdog.stop() + handler.close() + + def test_reconnection_time_request_works_after_session_close(self): + """Simulates the reconnection manager's /time/0 call after watchdog closed the session. + + This is the exact scenario that caused native threads to never reconnect: + 1. Watchdog closes session during subscribe long-poll + 2. Reconnection manager starts making /time/0 calls + 3. /time/0 calls should detect closed session, recreate it, and eventually succeed + """ + handler = self._make_handler() + + # Step 1: Watchdog closes the session + handler.session.close() + self.assertTrue(handler.session.is_closed) + + from pubnub.structures import RequestOptions, PlatformOptions + p_options = MagicMock(spec=PlatformOptions) + p_options.pn_config = MagicMock() + p_options.pn_config.scheme.return_value = 'https' + p_options.headers = {} + + # Step 2: /time/0 request (short timeout, no watchdog) + e_options = MagicMock(spec=RequestOptions) + e_options.method_string = 'GET' + e_options.path = '/time/0' + e_options.query_string = 'pnsdk=test' + e_options.request_headers = None + e_options.connect_timeout = 10 + e_options.request_timeout = 10 # Short timeout, use_watchdog=False + e_options.is_post.return_value = False + e_options.is_patch.return_value = False + e_options.allow_redirects = True + e_options.use_base_path = True + + # Step 3: The request may fail (no real server) but should NOT fail due to closed session + try: + handler._invoke_request(p_options, e_options, 'ps.pndsn.com') + except Exception as e: + # Should be a connection error, NOT "Cannot send a request, as the client has been closed" + self.assertNotIn("client has been closed", str(e).lower()) + + self.assertFalse(handler.session.is_closed) + handler.close() + + +class TestWallClockErrorCategory(unittest.TestCase): + """Tests that wall-clock sleep timeouts produce PNUnexpectedDisconnectCategory, + not PNTimeoutCategory — so they route through the reconnection manager with + configured retry delays instead of an immediate silent restart. + """ + + def test_watchdog_triggered_produces_connection_error(self): + """When watchdog triggers during a request, the exception should use PNERR_CONNECTION_ERROR + which maps to PNUnexpectedDisconnectCategory in _build_envelope.""" + from pubnub.request_handlers.httpx import HttpxRequestHandler + from pubnub.errors import PNERR_CONNECTION_ERROR + from pubnub.exceptions import PubNubException + from pubnub.structures import RequestOptions, PlatformOptions + + handler = HttpxRequestHandler(MagicMock()) + + # Simulate: session is open, but request fails with TimeoutException + # while watchdog is in triggered state (watchdog fired DURING the request) + mock_session = MagicMock(spec=httpx.Client) + mock_session.is_closed = False + mock_session.request.side_effect = httpx.ReadTimeout("timed out") + handler.session = mock_session + + # Mock watchdog.triggered to return True (simulates watchdog firing during request) + type(handler._watchdog).triggered = property(lambda self: True) + + p_options = MagicMock(spec=PlatformOptions) + p_options.pn_config = MagicMock() + p_options.pn_config.scheme.return_value = 'https' + p_options.headers = {} + + e_options = MagicMock(spec=RequestOptions) + e_options.method_string = 'GET' + e_options.path = '/v2/subscribe/demo/test/0' + e_options.query_string = 'tt=0&pnsdk=test' + e_options.request_headers = None + e_options.connect_timeout = 10 + e_options.request_timeout = 310 # > 30, so use_watchdog=True + e_options.is_post.return_value = False + e_options.is_patch.return_value = False + e_options.allow_redirects = True + e_options.use_base_path = True + + with self.assertRaises(PubNubException) as ctx: + handler._invoke_request(p_options, e_options, 'ps.pndsn.com') + + self.assertEqual(ctx.exception._pn_error, PNERR_CONNECTION_ERROR) + self.assertIn("Wall-clock deadline exceeded", ctx.exception._errormsg) + # Session should have been recreated + self.assertIsNot(handler.session, mock_session) + + handler._watchdog.stop() + handler.close() + + def test_async_wall_clock_timeout_raises_wall_clock_error(self): + """_request_with_wall_clock_deadline should raise WallClockTimeoutError (not generic TimeoutError).""" + from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler, WallClockTimeoutError + + handler = AsyncHttpxRequestHandler.__new__(AsyncHttpxRequestHandler) + handler._session = MagicMock() + + async def never_completes(**kwargs): + await asyncio.sleep(3600) + + handler._session.request = never_completes + + async def run_test(): + real_time = time.time + start = real_time() + call_count = [0] + + def mock_time(): + call_count[0] += 1 + if call_count[0] <= 3: + return real_time() + return start + 60 + + with patch('pubnub.request_handlers.async_httpx.time') as mock_time_module: + mock_time_module.time = mock_time + original = AsyncHttpxRequestHandler.WALL_CLOCK_CHECK_INTERVAL + AsyncHttpxRequestHandler.WALL_CLOCK_CHECK_INTERVAL = 0.1 + try: + with self.assertRaises(WallClockTimeoutError): + await handler._request_with_wall_clock_deadline( + {"method": "GET", "url": "http://test"}, + timeout=10 + ) + finally: + AsyncHttpxRequestHandler.WALL_CLOCK_CHECK_INTERVAL = original + + asyncio.run(run_test()) + + def test_async_wall_clock_timeout_maps_to_unexpected_disconnect(self): + """WallClockTimeoutError should produce PNUnexpectedDisconnectCategory in request_future.""" + from pubnub.request_handlers.async_httpx import WallClockTimeoutError + from pubnub.pubnub_asyncio import PubNubAsyncio + from pubnub.pnconfiguration import PNConfiguration + from pubnub.enums import PNStatusCategory + + config = PNConfiguration() + config.subscribe_key = 'demo' + config.publish_key = 'demo' + config.user_id = 'test' + + async def run_test(): + pubnub = PubNubAsyncio(config) + + async def mock_async_request(options_func, cancellation_event): + raise WallClockTimeoutError("Wall-clock deadline exceeded") + + pubnub._request_handler.async_request = mock_async_request + + try: + # Build an options_func the same way endpoints do + time_endpoint = pubnub.time() + + def options_func(): + time_endpoint.validate_params() + return time_endpoint.options() + + result = await pubnub.request_future(options_func, None) + self.assertTrue(result.is_error()) + self.assertEqual( + result.status.category, + PNStatusCategory.PNUnexpectedDisconnectCategory + ) + finally: + await pubnub.stop() + + asyncio.run(run_test()) + + +class TestForceShutdownConnections(unittest.TestCase): + """Tests for _force_shutdown_connections which uses socket.shutdown(SHUT_RDWR) + to interrupt blocked reads on macOS.""" + + def test_shutdown_calls_socket_shutdown(self): + """Verify the correct httpcore attribute path is traversed to reach the socket.""" + from pubnub.request_handlers.httpx import WallClockDeadlineWatchdog + + mock_sock = MagicMock() + mock_stream = MagicMock() + mock_stream._sock = mock_sock + + mock_inner_conn = MagicMock() + mock_inner_conn._network_stream = mock_stream + + mock_conn = MagicMock() + mock_conn._connection = mock_inner_conn + + mock_pool = MagicMock() + mock_pool._connections = [mock_conn] + + mock_transport = MagicMock() + mock_transport._pool = mock_pool + + mock_session = MagicMock() + mock_session._transport = mock_transport + + WallClockDeadlineWatchdog._force_shutdown_connections(mock_session) + + import socket as sock_module + mock_sock.shutdown.assert_called_once_with(sock_module.SHUT_RDWR) + mock_session.close.assert_called_once() + + def test_shutdown_degrades_gracefully_on_missing_attrs(self): + """If httpcore internals change, should not raise — just fall back to session.close().""" + from pubnub.request_handlers.httpx import WallClockDeadlineWatchdog + + mock_session = MagicMock() + del mock_session._transport # Simulate missing attribute + + # Should not raise + WallClockDeadlineWatchdog._force_shutdown_connections(mock_session) + mock_session.close.assert_called_once() + + def test_shutdown_with_real_httpx_client(self): + """Verify the attribute path works with a real httpx.Client after a request.""" + from pubnub.request_handlers.httpx import WallClockDeadlineWatchdog + + client = httpx.Client() + try: + client.get('https://ps.pndsn.com/time/0') + except Exception: + client.close() + return # Skip if network unavailable + + # Verify the path exists + pool = client._transport._pool + self.assertTrue(len(pool._connections) > 0) + conn = pool._connections[0] + inner = conn._connection + stream = inner._network_stream + sock = stream._sock + self.assertTrue(hasattr(sock, 'shutdown')) + + # Now test the method + WallClockDeadlineWatchdog._force_shutdown_connections(client) + self.assertTrue(client.is_closed) + + if __name__ == '__main__': unittest.main() From 7456b2e5d82f12362cad15f7898039b5a8fd0f5a Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 25 Mar 2026 16:50:35 +0200 Subject: [PATCH 4/7] refactor(lint): address Codacy try-except-pass warning in watchdog --- pubnub/request_handlers/httpx.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubnub/request_handlers/httpx.py b/pubnub/request_handlers/httpx.py index cef48756..7e28a248 100644 --- a/pubnub/request_handlers/httpx.py +++ b/pubnub/request_handlers/httpx.py @@ -113,8 +113,8 @@ def _force_shutdown_connections(session): sock.shutdown(socket.SHUT_RDWR) except (OSError, Exception): pass - except Exception: - pass + except Exception as e: + logger.debug(f"Error iterating connection pool: {e}") try: session.close() except Exception as e: From eb1fdebe5f6367712d397e76201e50171fe8263b Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 25 Mar 2026 17:39:17 +0200 Subject: [PATCH 5/7] fix(ci): unpin Python patch versions in test matrix Python 3.13.2's `VERIFY_X509_STRICT` rejects some certifi CA certs, breaking `httpx.Client()` init. Using minor-only versions lets setup-python resolve to latest patches automatically. --- .github/workflows/run-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 7070c6a9..8a5641c8 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -26,7 +26,7 @@ jobs: max-parallel: 1 fail-fast: true matrix: - python: [3.9.21, 3.10.16, 3.11.11, 3.12.9, 3.13.2] + python: ["3.9", "3.10", "3.11", "3.12", "3.13"] timeout-minutes: 5 steps: - name: Checkout repository From 0f60b572b652ae130d1bfa53355b4c8de351d318 Mon Sep 17 00:00:00 2001 From: PubNub Release Bot <120067856+pubnub-release-bot@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:31:26 +0000 Subject: [PATCH 6/7] PubNub SDK 10.6.2 release. --- .pubnub.yml | 41 +++++++++++++++++++++++++++++++++++++---- CHANGELOG.md | 22 ++++++++++++++++++++++ setup.py | 2 +- 3 files changed, 60 insertions(+), 5 deletions(-) diff --git a/.pubnub.yml b/.pubnub.yml index c69a03b9..8ef81553 100644 --- a/.pubnub.yml +++ b/.pubnub.yml @@ -1,5 +1,5 @@ name: python -version: 10.6.1 +version: 10.6.2 schema: 1 scm: github.com/pubnub/python sdks: @@ -18,7 +18,7 @@ sdks: distributions: - distribution-type: library distribution-repository: package - package-name: pubnub-10.6.1 + package-name: pubnub-10.6.2 location: https://pypi.org/project/pubnub/ supported-platforms: supported-operating-systems: @@ -94,8 +94,8 @@ sdks: - distribution-type: library distribution-repository: git release - package-name: pubnub-10.6.1 - location: https://github.com/pubnub/python/releases/download/10.6.1/pubnub-10.6.1.tar.gz + package-name: pubnub-10.6.2 + location: https://github.com/pubnub/python/releases/download/10.6.2/pubnub-10.6.2.tar.gz supported-platforms: supported-operating-systems: Linux: @@ -169,6 +169,39 @@ sdks: license-url: https://github.com/encode/httpx/blob/master/LICENSE.md is-required: Required changelog: + - date: 2026-03-26 + version: 10.6.2 + changes: + - type: bug + text: "Ensure `PubNubAsyncioException` always carries a valid `PNStatus` with error data instead of `None`." + - type: bug + text: "Handle cases where status or `error_data` is `None` instead of raising `AttributeError`." + - type: bug + text: "Match `PubNubAsyncioException` which is what `request_future` actually returns on failure." + - type: bug + text: "Handle `-1 (unlimited)` correctly since `attempts > -1` was always `true`, causing immediate give-up." + - type: bug + text: "Use delay class defaults instead of config value which could be `None` causing `TypeError` on comparison." + - type: bug + text: "Prevent falling through to start a heartbeat after deciding to give up." + - type: bug + text: "Set all four timeout fields explicitly instead of a 2-tuple that left write and pool unset." + - type: bug + text: "On macOS and Linux, `time.monotonic()` does not advance during system sleep, causing socket and `asyncio` timeouts (310s subscribe) to stall for hours of wall-clock time. Add `time.time()`-based deadline checks that detect sleep and cancel stale requests within ~5s of wake." + - type: bug + text: "Use `asyncio.wait()` with periodic `time.time()` checks instead of a single monotonic-based `wait_for()`, yielding to the event loop between checks." + - type: bug + text: "Persistent single daemon thread monitors `time.time()` every 5s and closes the `httpx` session when the wall-clock deadline passes, interrupting the blocking socket read. Tracks deadlines per calling thread so concurrent requests (e.g., subscribe + publish) don't interfere. Only armed for long-timeout requests (>30s). Session is recreated for subsequent requests." + - type: improvement + text: "Cover both `asyncio` and threads paths simulated clock jumps, normal passthrough, clean watchdog shutdown, per-thread deadline isolation, concurrent request independence, cleanup, and exception propagation." + - type: improvement + text: "Ensure `pubnub.stop()` always runs to prevent non-daemon threads from blocking process exit." + - type: improvement + text: "Enable presence heartbeat and use unique channel names so presence registers on the server." + - type: improvement + text: "Restore `cipher_key` after use in `send_file` and pass it explicitly to `download_file`." + - type: improvement + text: "Avoid collisions with stale data from prior test runs." - date: 2026-02-10 version: 10.6.1 changes: diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ed85b60..78c64d26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,25 @@ +## 10.6.2 +March 26 2026 + +#### Fixed +- Ensure `PubNubAsyncioException` always carries a valid `PNStatus` with error data instead of `None`. +- Handle cases where status or `error_data` is `None` instead of raising `AttributeError`. +- Match `PubNubAsyncioException` which is what `request_future` actually returns on failure. +- Handle `-1 (unlimited)` correctly since `attempts > -1` was always `true`, causing immediate give-up. +- Use delay class defaults instead of config value which could be `None` causing `TypeError` on comparison. +- Prevent falling through to start a heartbeat after deciding to give up. +- Set all four timeout fields explicitly instead of a 2-tuple that left write and pool unset. +- On macOS and Linux, `time.monotonic()` does not advance during system sleep, causing socket and `asyncio` timeouts (310s subscribe) to stall for hours of wall-clock time. Add `time.time()`-based deadline checks that detect sleep and cancel stale requests within ~5s of wake. +- Use `asyncio.wait()` with periodic `time.time()` checks instead of a single monotonic-based `wait_for()`, yielding to the event loop between checks. +- Persistent single daemon thread monitors `time.time()` every 5s and closes the `httpx` session when the wall-clock deadline passes, interrupting the blocking socket read. Tracks deadlines per calling thread so concurrent requests (e.g., subscribe + publish) don't interfere. Only armed for long-timeout requests (>30s). Session is recreated for subsequent requests. + +#### Modified +- Cover both `asyncio` and threads paths simulated clock jumps, normal passthrough, clean watchdog shutdown, per-thread deadline isolation, concurrent request independence, cleanup, and exception propagation. +- Ensure `pubnub.stop()` always runs to prevent non-daemon threads from blocking process exit. +- Enable presence heartbeat and use unique channel names so presence registers on the server. +- Restore `cipher_key` after use in `send_file` and pass it explicitly to `download_file`. +- Avoid collisions with stale data from prior test runs. + ## 10.6.1 February 10 2026 diff --git a/setup.py b/setup.py index 2caf9a55..6eaf9382 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='pubnub', - version='10.6.1', + version='10.6.2', description='PubNub Real-time push service in the cloud', author='PubNub', author_email='support@pubnub.com', From eeabc681576c0b589a9c90af1e34dd5d723eb7b4 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Thu, 26 Mar 2026 18:02:27 +0200 Subject: [PATCH 7/7] fix(subscribe): lazy session init and silent restart on watchdog timeout Defer `httpx.Client()` creation to first use and map watchdog timeout to `PNTimeoutCategory` for silent subscribe restart instead of announcing unexpected disconnect. --- pubnub/pubnub_asyncio.py | 6 +-- pubnub/request_handlers/httpx.py | 32 +++++++------ tests/unit/test_wall_clock_deadline.py | 65 +++++++++++++------------- 3 files changed, 55 insertions(+), 48 deletions(-) diff --git a/pubnub/pubnub_asyncio.py b/pubnub/pubnub_asyncio.py index 62dd8a57..511d865e 100644 --- a/pubnub/pubnub_asyncio.py +++ b/pubnub/pubnub_asyncio.py @@ -76,7 +76,7 @@ async def main(): from pubnub import utils from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy from pubnub.callbacks import SubscribeCallback, ReconnectionCallback -from pubnub.errors import PNERR_REQUEST_CANCELLED, PNERR_CLIENT_TIMEOUT, PNERR_CONNECTION_ERROR +from pubnub.errors import PNERR_REQUEST_CANCELLED, PNERR_CLIENT_TIMEOUT from pubnub.exceptions import PubNubAsyncioException, PubNubException # flake8: noqa @@ -251,11 +251,11 @@ async def request_future(self, options_func, cancellation_event): return PubNubAsyncioException( result=None, status=options_func().create_status( - PNStatusCategory.PNUnexpectedDisconnectCategory, + PNStatusCategory.PNTimeoutCategory, None, None, exception=PubNubException( - pn_error=PNERR_CONNECTION_ERROR, + pn_error=PNERR_CLIENT_TIMEOUT, errormsg="Wall-clock deadline exceeded (system sleep detected)" ) ) diff --git a/pubnub/request_handlers/httpx.py b/pubnub/request_handlers/httpx.py index 7e28a248..d22e0ee2 100644 --- a/pubnub/request_handlers/httpx.py +++ b/pubnub/request_handlers/httpx.py @@ -163,15 +163,26 @@ class HttpxRequestHandler(BaseRequestHandler): ENDPOINT_THREAD_COUNTER: int = 0 def __init__(self, pubnub): - self.session = httpx.Client() + self._session = None + self._session_lock = threading.Lock() self._watchdog = WallClockDeadlineWatchdog() self.pubnub = pubnub + def _ensure_session(self): + """Return the current httpx.Client, creating one if needed. Thread-safe.""" + with self._session_lock: + if self._session is None or self._session.is_closed: + logger.debug("Creating new HTTP session") + self._session = httpx.Client() + return self._session + def close(self): """Clean up resources: stop the watchdog thread and close the HTTP session.""" self._watchdog.stop() - self.session.close() + with self._session_lock: + if self._session is not None: + self._session.close() async def async_request(self, options_func, cancellation_event): raise NotImplementedError("async_request is not implemented for synchronous handler") @@ -369,9 +380,7 @@ def _invoke_request(self, p_options, e_options, base_origin): assert isinstance(p_options, PlatformOptions) assert isinstance(e_options, RequestOptions) - if self.session.is_closed: - logger.debug("HTTP session was closed (e.g. by wall-clock watchdog), recreating") - self.session = httpx.Client() + session = self._ensure_session() if base_origin: url = p_options.pn_config.scheme() + "://" + base_origin + e_options.path @@ -420,10 +429,10 @@ def _invoke_request(self, p_options, e_options, base_origin): use_watchdog = e_options.request_timeout is not None and e_options.request_timeout > 30 if use_watchdog: - self._watchdog.set_deadline(self.session, time.time() + e_options.request_timeout) + self._watchdog.set_deadline(session, time.time() + e_options.request_timeout) try: - res = self.session.request(**args) + res = session.request(**args) # Safely access response text - read content first for streaming responses try: logger.debug("GOT %s" % res.text) @@ -436,9 +445,8 @@ def _invoke_request(self, p_options, e_options, base_origin): except httpx.ConnectError as e: if use_watchdog and self._watchdog.triggered: - self.session = httpx.Client() raise PubNubException( - pn_error=PNERR_CONNECTION_ERROR, + pn_error=PNERR_CLIENT_TIMEOUT, errormsg="Wall-clock deadline exceeded (system sleep detected)" ) raise PubNubException( @@ -447,9 +455,8 @@ def _invoke_request(self, p_options, e_options, base_origin): ) except httpx.TimeoutException as e: if use_watchdog and self._watchdog.triggered: - self.session = httpx.Client() raise PubNubException( - pn_error=PNERR_CONNECTION_ERROR, + pn_error=PNERR_CLIENT_TIMEOUT, errormsg="Wall-clock deadline exceeded (system sleep detected)" ) raise PubNubException( @@ -469,9 +476,8 @@ def _invoke_request(self, p_options, e_options, base_origin): ) except Exception as e: if use_watchdog and self._watchdog.triggered: - self.session = httpx.Client() raise PubNubException( - pn_error=PNERR_CONNECTION_ERROR, + pn_error=PNERR_CLIENT_TIMEOUT, errormsg="Wall-clock deadline exceeded (system sleep detected)" ) raise PubNubException( diff --git a/tests/unit/test_wall_clock_deadline.py b/tests/unit/test_wall_clock_deadline.py index 76abf3b3..2ab0eed0 100644 --- a/tests/unit/test_wall_clock_deadline.py +++ b/tests/unit/test_wall_clock_deadline.py @@ -362,8 +362,9 @@ def test_close_stops_watchdog(self): from pubnub.request_handlers.httpx import HttpxRequestHandler handler = HttpxRequestHandler(MagicMock()) + session = handler._ensure_session() # Start the watchdog by setting a deadline - handler._watchdog.set_deadline(handler.session, time.time() + 300) + handler._watchdog.set_deadline(session, time.time() + 300) self.assertIsNotNone(handler._watchdog._thread) handler.close() @@ -391,11 +392,11 @@ def _make_handler(self): def test_closed_session_is_recreated_on_next_request(self): """After session.close(), the next _invoke_request should recreate the session.""" handler = self._make_handler() - original_session = handler.session + original_session = handler._ensure_session() # Simulate what the watchdog does - handler.session.close() - self.assertTrue(handler.session.is_closed) + original_session.close() + self.assertTrue(handler._session.is_closed) # Build minimal request options from pubnub.structures import RequestOptions, PlatformOptions @@ -422,17 +423,17 @@ def test_closed_session_is_recreated_on_next_request(self): except Exception: pass - self.assertFalse(handler.session.is_closed) - self.assertIsNot(handler.session, original_session) + self.assertFalse(handler._session.is_closed) + self.assertIsNot(handler._session, original_session) handler.close() def test_open_session_is_not_recreated(self): """An open session should not be replaced.""" handler = self._make_handler() - original_session = handler.session + original_session = handler._ensure_session() - self.assertFalse(handler.session.is_closed) + self.assertFalse(handler._session.is_closed) from pubnub.structures import RequestOptions, PlatformOptions p_options = MagicMock(spec=PlatformOptions) @@ -457,18 +458,18 @@ def test_open_session_is_not_recreated(self): except Exception: pass - self.assertIs(handler.session, original_session) + self.assertIs(handler._session, original_session) handler.close() def test_watchdog_trigger_with_timeout_exception_recreates_session(self): """When watchdog triggers and the request gets TimeoutException, session should be recreated.""" handler = self._make_handler() - original_session = handler.session + original_session = handler._ensure_session() # Set up watchdog as triggered - handler._watchdog.set_deadline(handler.session, time.time() - 1) + handler._watchdog.set_deadline(original_session, time.time() - 1) time.sleep(0.3) - self.assertTrue(handler.session.is_closed) + self.assertTrue(original_session.is_closed) from pubnub.structures import RequestOptions, PlatformOptions p_options = MagicMock(spec=PlatformOptions) @@ -493,9 +494,9 @@ def test_watchdog_trigger_with_timeout_exception_recreates_session(self): except Exception: pass - # Session should have been recreated (either by is_closed check or watchdog trigger handler) - self.assertFalse(handler.session.is_closed) - self.assertIsNot(handler.session, original_session) + # Session should have been recreated by _ensure_session() since original was closed + self.assertFalse(handler._session.is_closed) + self.assertIsNot(handler._session, original_session) handler._watchdog.stop() handler.close() @@ -509,10 +510,11 @@ def test_reconnection_time_request_works_after_session_close(self): 3. /time/0 calls should detect closed session, recreate it, and eventually succeed """ handler = self._make_handler() + original_session = handler._ensure_session() # Step 1: Watchdog closes the session - handler.session.close() - self.assertTrue(handler.session.is_closed) + original_session.close() + self.assertTrue(handler._session.is_closed) from pubnub.structures import RequestOptions, PlatformOptions p_options = MagicMock(spec=PlatformOptions) @@ -540,21 +542,22 @@ def test_reconnection_time_request_works_after_session_close(self): # Should be a connection error, NOT "Cannot send a request, as the client has been closed" self.assertNotIn("client has been closed", str(e).lower()) - self.assertFalse(handler.session.is_closed) + self.assertFalse(handler._session.is_closed) handler.close() class TestWallClockErrorCategory(unittest.TestCase): - """Tests that wall-clock sleep timeouts produce PNUnexpectedDisconnectCategory, - not PNTimeoutCategory — so they route through the reconnection manager with - configured retry delays instead of an immediate silent restart. + """Tests that wall-clock sleep timeouts produce PNTimeoutCategory, + so they silently restart the subscribe loop. If the network is actually + down, the next request will fail with a real connection error that routes + through the reconnection manager with configured retry delays. """ - def test_watchdog_triggered_produces_connection_error(self): - """When watchdog triggers during a request, the exception should use PNERR_CONNECTION_ERROR - which maps to PNUnexpectedDisconnectCategory in _build_envelope.""" + def test_watchdog_triggered_produces_timeout_error(self): + """When watchdog triggers during a request, the exception should use PNERR_CLIENT_TIMEOUT + which maps to PNTimeoutCategory in _build_envelope.""" from pubnub.request_handlers.httpx import HttpxRequestHandler - from pubnub.errors import PNERR_CONNECTION_ERROR + from pubnub.errors import PNERR_CLIENT_TIMEOUT from pubnub.exceptions import PubNubException from pubnub.structures import RequestOptions, PlatformOptions @@ -565,7 +568,7 @@ def test_watchdog_triggered_produces_connection_error(self): mock_session = MagicMock(spec=httpx.Client) mock_session.is_closed = False mock_session.request.side_effect = httpx.ReadTimeout("timed out") - handler.session = mock_session + handler._session = mock_session # Mock watchdog.triggered to return True (simulates watchdog firing during request) type(handler._watchdog).triggered = property(lambda self: True) @@ -590,10 +593,8 @@ def test_watchdog_triggered_produces_connection_error(self): with self.assertRaises(PubNubException) as ctx: handler._invoke_request(p_options, e_options, 'ps.pndsn.com') - self.assertEqual(ctx.exception._pn_error, PNERR_CONNECTION_ERROR) + self.assertEqual(ctx.exception._pn_error, PNERR_CLIENT_TIMEOUT) self.assertIn("Wall-clock deadline exceeded", ctx.exception._errormsg) - # Session should have been recreated - self.assertIsNot(handler.session, mock_session) handler._watchdog.stop() handler.close() @@ -636,8 +637,8 @@ def mock_time(): asyncio.run(run_test()) - def test_async_wall_clock_timeout_maps_to_unexpected_disconnect(self): - """WallClockTimeoutError should produce PNUnexpectedDisconnectCategory in request_future.""" + def test_async_wall_clock_timeout_maps_to_timeout_category(self): + """WallClockTimeoutError should produce PNTimeoutCategory in request_future.""" from pubnub.request_handlers.async_httpx import WallClockTimeoutError from pubnub.pubnub_asyncio import PubNubAsyncio from pubnub.pnconfiguration import PNConfiguration @@ -668,7 +669,7 @@ def options_func(): self.assertTrue(result.is_error()) self.assertEqual( result.status.category, - PNStatusCategory.PNUnexpectedDisconnectCategory + PNStatusCategory.PNTimeoutCategory ) finally: await pubnub.stop()