Skip to content

[python] Add AsyncStreamingTableScan and StreamReadBuilder for continuous streaming reads#7424

Merged
JingsongLi merged 4 commits intoapache:masterfrom
tub:python-streaming-2c-scan-and-builder
Mar 14, 2026
Merged

[python] Add AsyncStreamingTableScan and StreamReadBuilder for continuous streaming reads#7424
JingsongLi merged 4 commits intoapache:masterfrom
tub:python-streaming-2c-scan-and-builder

Conversation

@tub
Copy link
Contributor

@tub tub commented Mar 13, 2026

What does this PR do?

Adds continuous streaming read support to paimon-python via two new classes:

  • AsyncStreamingTableScan — async generator that continuously polls for new snapshots and yields Plan objects as new data arrives. Python equivalent of Java's DataTableStreamScan.
  • StreamReadBuilder — fluent builder (analogous to ReadBuilder) for configuring and constructing a streaming scan.
  • Table.new_stream_read_builder() — entry point on the table interface.

Key features

  • Initial scan: on first call, performs a full scan of the latest snapshot and yields a Plan
  • Delta / changelog follow-up: subsequent iterations yield only new data per snapshot, using DeltaFollowUpScanner (changelog-producer=none) or ChangelogFollowUpScanner (input/lookup/full-compaction)
  • Diff catch-up: when starting from a historical snapshot with a large gap to the latest, uses `IncrementalDiffScanner` to collapse the catch-up into a single efficient plan instead of replaying every intermediate snapshot
  • Batch lookahead: `SnapshotManager.find_next_scannable()` fetches a batch of snapshot IDs in parallel to skip non-scannable snapshots (e.g. compaction-only) with minimal S3 round-trips
  • Prefetching: optional background thread pre-fetches the next scannable snapshot while the caller processes the current plan
  • Bucket/shard filtering: `with_buckets([0,1])` / `with_bucket_filter(fn)` for parallel consumer sharding
  • Synchronous wrapper: `stream_sync()` for non-async usage

Builder API

scan = (
    table.new_stream_read_builder()
    .with_filter(predicate)
    .with_projection(["col1", "col2"])
    .with_poll_interval_ms(500)
    .new_streaming_scan()
)
reader = table.new_stream_read_builder().new_read()

async for plan in scan.stream():
    arrow_table = reader.to_arrow(plan.splits())
    process(arrow_table)

Part of streaming read PR stack

This is PR 3 of the streaming read stack. Prerequisites already merged:

Tracking issue: #7152

Test plan

  • `pypaimon/tests/streaming_table_scan_test.py` — 12 unit tests covering initial scan, delta/changelog follow-up, diff catch-up, prefetch, bucket filtering, `stream_sync()`
  • `pypaimon/tests/stream_read_builder_test.py` — 8 unit tests covering builder configuration and scan/read construction
python -m pytest pypaimon/tests/streaming_table_scan_test.py pypaimon/tests/stream_read_builder_test.py -v

🤖 Generated with Claude Code

tub and others added 4 commits March 13, 2026 11:01
…w_stream_read_builder()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Remove unused fields: _initialized, _diff_catch_up_used, _prefetch_hits,
  _prefetch_misses
- Fix stream() return type annotation: AsyncIterator -> AsyncGenerator
- Log prefetch failures instead of silently swallowing exceptions
- Simplify _create_catch_up_plan to reuse _create_initial_plan
- Add comment clarifying diff catch-up block only runs once per stream() call
- Revert cosmetic import-style changes in file_store_table.py
- Merge duplicate test_initial_scan_sets_next_snapshot_id and
  test_initial_scan_yields_plan into single test_initial_scan
- Remove SnapshotManagerCacheTest from streaming_table_scan_test.py
  (already covered in snapshot_manager_test.py)
- Remove TestBucketFilteringLogic from stream_read_builder_test.py
  (tests caller convention, not implementation code)
- Move AsyncStreamingTableScan import to module level in test file
- Fix E303 extra blank line; remove stale TDD comment from docstring
- Replace asyncio.new_event_loop() with asyncio.run() in catch-up test

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The diff catch-up block was gating its yield on `catch_up_plan.splits()`,
causing a timeout when the plan had 0 splits: next_snapshot_id was advanced
past the latest snapshot but no plan was yielded, leaving the follow-up loop
polling for a snapshot that doesn't exist.

Yield the catch-up plan unconditionally, matching the behavior of the initial
scan which always yields regardless of split count.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@JingsongLi
Copy link
Contributor

+1

@JingsongLi JingsongLi merged commit e63d9ab into apache:master Mar 14, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants