[python] Add AsyncStreamingTableScan and StreamReadBuilder for continuous streaming reads#7424
Merged
JingsongLi merged 4 commits intoapache:masterfrom Mar 14, 2026
Merged
Conversation
…w_stream_read_builder() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Remove unused fields: _initialized, _diff_catch_up_used, _prefetch_hits, _prefetch_misses - Fix stream() return type annotation: AsyncIterator -> AsyncGenerator - Log prefetch failures instead of silently swallowing exceptions - Simplify _create_catch_up_plan to reuse _create_initial_plan - Add comment clarifying diff catch-up block only runs once per stream() call - Revert cosmetic import-style changes in file_store_table.py - Merge duplicate test_initial_scan_sets_next_snapshot_id and test_initial_scan_yields_plan into single test_initial_scan - Remove SnapshotManagerCacheTest from streaming_table_scan_test.py (already covered in snapshot_manager_test.py) - Remove TestBucketFilteringLogic from stream_read_builder_test.py (tests caller convention, not implementation code) - Move AsyncStreamingTableScan import to module level in test file - Fix E303 extra blank line; remove stale TDD comment from docstring - Replace asyncio.new_event_loop() with asyncio.run() in catch-up test Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The diff catch-up block was gating its yield on `catch_up_plan.splits()`, causing a timeout when the plan had 0 splits: next_snapshot_id was advanced past the latest snapshot but no plan was yielded, leaving the follow-up loop polling for a snapshot that doesn't exist. Yield the catch-up plan unconditionally, matching the behavior of the initial scan which always yields regardless of split count. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Contributor
|
+1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What does this PR do?
Adds continuous streaming read support to
paimon-pythonvia two new classes:AsyncStreamingTableScan— async generator that continuously polls for new snapshots and yieldsPlanobjects as new data arrives. Python equivalent of Java'sDataTableStreamScan.StreamReadBuilder— fluent builder (analogous toReadBuilder) for configuring and constructing a streaming scan.Table.new_stream_read_builder()— entry point on the table interface.Key features
PlanDeltaFollowUpScanner(changelog-producer=none) orChangelogFollowUpScanner(input/lookup/full-compaction)Builder API
Part of streaming read PR stack
This is PR 3 of the streaming read stack. Prerequisites already merged:
Tracking issue: #7152
Test plan
🤖 Generated with Claude Code