Add V2 batch format with statistics collection#2886
Add V2 batch format with statistics collection#2886platinumhamburg wants to merge 3 commits intoapache:mainfrom
Conversation
af26717 to
836948c
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a new V2 Arrow log record batch format that can embed per-column min/max + null-count statistics between the batch header and record payload, enabling future filter pushdown improvements (Issue #2885 / FIP-10).
Changes:
- Add statistics collection/serialization/parsing APIs and implementations (collector/writer/parser + batch-access API).
- Extend Arrow log batch building, reading, and projection logic to account for a V2 layout with an optional statistics section and a
StatisticsLengthheader field. - Add table-level configuration (
table.statistics.columns) utilities and validation, and wire statistics collection into the client write path.
Reviewed changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-server/src/main/java/org/apache/fluss/server/kv/wal/ArrowWalBuilder.java | Adapts builder call sites to new statistics-capable Arrow batch builder signature. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java | Validates table statistics configuration on CREATE TABLE. |
| fluss-common/src/test/java/org/apache/fluss/record/TestData.java | Adds schemas/data for statistics-related tests. |
| fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java | Extends batch builder tests to cover V2 + statistics. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsTestUtils.java | Adds reusable utilities for generating batches with statistics in tests. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsTest.java | Adds end-to-end tests for statistics extraction/caching across batch impls. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsParserTest.java | Adds tests for statistics parsing/validation helpers. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsCollectorTest.java | Adds tests for collector behavior across types/nulls/mappings. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchFormatTest.java | Adds V2 header/offset assertions and stats-aware offsets. |
| fluss-common/src/test/java/org/apache/fluss/config/StatisticsConfigUtilsTest.java | Adds tests for statistics config validation. |
| fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java | Adds isBinaryType helper used by stats config/mapping logic. |
| fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java | Exposes RowType schema via getSchema() for stats wiring. |
| fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java | Adds AlignedRow.from(RowType, InternalRow) conversion helper. |
| fluss-common/src/main/java/org/apache/fluss/record/bytesview/MultiBytesView.java | Enhances builder API (addBytes(BytesView), isEmpty) and improves file-region merging. |
| fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java | Implements V2 batch building with optional embedded statistics + CRC handling changes. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsWriter.java | New: serializes statistics in a compact schema-aware format. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsParser.java | New: parses/validates statistics payloads from multiple memory sources. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsCollector.java | New: collects per-column min/max/null-counts during batch build. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatistics.java | New: statistics interface exposed from LogRecordBatch. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java | Adds V2 constants/layout helpers, statistics offsets, and header mutation helper. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java | Adds getStatistics(ReadContext) API to batches. |
| fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java | Updates projection to skip stats section for V2 and clear stats header fields. |
| fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java | Adds stats parsing and provides optional “trim stats” BytesView generation for V2. |
| fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatchStatistics.java | New: zero-copy statistics view with full-schema wrappers + mapping logic. |
| fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java | Updates record decoding offsets for V2 (stats-aware) and implements statistics parsing/caching. |
| fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java | Computes and caches stats column index mappings based on table config. |
| fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java | Adds statistics config accessors and enablement checks. |
| fluss-common/src/main/java/org/apache/fluss/config/StatisticsConfigUtils.java | New: validates table.statistics.columns against schema + supported types. |
| fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java | Adds table.statistics.columns option definition/documentation. |
| fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java | Updates tests for ArrowLogWriteBatch constructor signature. |
| fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java | Wires per-table statistics collection into batch creation when enabled. |
| fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java | Passes statistics collector into Arrow batch builder. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatchStatistics.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java
Outdated
Show resolved
Hide resolved
fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java
Outdated
Show resolved
Hide resolved
836948c to
51dd84c
Compare
Introduce V2 batch format that collects min/max statistics for each column to enable efficient filtering. - Add LogRecordBatchStatistics and related classes for statistics collection - Add StatisticsConfigUtils for parsing table.statistics.columns configuration - Extend DefaultLogRecordBatch to support V2 format with statistics - Place statistics data between header and records with StatisticsLength field - Add comprehensive tests for statistics collection and parsing
51dd84c to
e595708
Compare
wuchong
left a comment
There was a problem hiding this comment.
Thanks @platinumhamburg , I left some comments.
fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/config/StatisticsConfigUtils.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
Outdated
Show resolved
Hide resolved
fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/bytesview/MultiBytesView.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
Outdated
Show resolved
Hide resolved
…stics Config & API: - Change TABLE_STATISTICS_COLUMNS default from "*" to noDefaultValue() - Introduce StatisticsColumnsConfig with three-state mode (DISABLED/ALL/SPECIFIED) - Replace isBinaryType blacklist with isSupportedStatisticsType whitelist - Update config documentation with compatibility requirements Format versioning: - Renumber versions: V0=base, V1=statistics, V2=leaderEpoch(future) - Remove redundant STATISTICS_FLAG, use statisticsLength > 0 instead - Make statisticsLengthOffset/leaderEpochOffset throw for unsupported versions - Remove unsafe arrowChangeTypeOffset(magic) method Write path: - Serialize statistics directly to pagedOutputView (no temp byte[]) - Restore zero-copy CRC over contiguous memory segments - Fix estimatedSizeInBytes to use recordBatchHeaderSize(magic) Writer optimization: - Replace expensive estimatedSizeInBytes with cached heuristic estimate - Reuse AlignedRowWriter instance across writes - Extract backing byte[] directly from MemorySegment for OutputView writes Parser optimization: - Replace sequential MemorySegmentInputView reads with fixed-offset access - Add ByteBuffer.isDirect() check before wrapOffHeapMemory - Add skipBytes() to MemorySegmentInputView Correctness: - Remove redundant minSet/maxSet arrays in statistics collector - Throw UnsupportedOperationException for unsupported types in AlignedRow - Fix minValuesSize condition from <= 0 to == 0 Code cleanup: - Remove unused methods (getBytesView, createTrimmedBytesView, addBytes, etc.) - Add @nullable annotations on statisticsCollector and parseStatistics return - Add logging in statistics parsing catch blocks - Wrap clearStatisticsFromHeader with statisticsLength > 0 guard Tests & docs: - Add zero-rows and all-null column test cases - Update ConfigOptions docs with V1 format and compatibility notes
…stics Config & API: - Change TABLE_STATISTICS_COLUMNS default from "*" to noDefaultValue() - Introduce StatisticsColumnsConfig with three-state mode (DISABLED/ALL/SPECIFIED) - Replace isBinaryType blacklist with isSupportedStatisticsType whitelist - Update config documentation with compatibility requirements Format versioning: - Renumber versions: V0=base, V1=statistics, V2=leaderEpoch(future) - Remove redundant STATISTICS_FLAG, use statisticsLength > 0 instead - Make statisticsLengthOffset/leaderEpochOffset throw for unsupported versions - Remove unsafe arrowChangeTypeOffset(magic) method Write path: - Serialize statistics directly to pagedOutputView (no temp byte[]) - Restore zero-copy CRC over contiguous memory segments - Fix estimatedSizeInBytes to use recordBatchHeaderSize(magic) Writer optimization: - Replace expensive estimatedSizeInBytes with cached heuristic estimate - Reuse AlignedRowWriter instance across writes - Extract backing byte[] directly from MemorySegment for OutputView writes Parser optimization: - Replace sequential MemorySegmentInputView reads with fixed-offset access - Add ByteBuffer.isDirect() check before wrapOffHeapMemory - Add skipBytes() to MemorySegmentInputView Correctness: - Remove redundant minSet/maxSet arrays in statistics collector - Throw UnsupportedOperationException for unsupported types in AlignedRow - Fix minValuesSize condition from <= 0 to == 0 Code cleanup: - Remove unused methods (getBytesView, createTrimmedBytesView, addBytes, etc.) - Add @nullable annotations on statisticsCollector and parseStatistics return - Add logging in statistics parsing catch blocks - Wrap clearStatisticsFromHeader with statisticsLength > 0 guard Tests & docs: - Add zero-rows and all-null column test cases - Update ConfigOptions docs with V1 format and compatibility notes
8903de9 to
bd8123b
Compare
|
Thanks to @wuchong for the detailed and professional review comments. This PR indeed has many areas for improvement, and I have optimized and fixed it according to the feedback. Please continue reviewing when you have time. |
|
wuchong
left a comment
There was a problem hiding this comment.
@platinumhamburg I pushed a commit to address the comment , improve some code, and resolve the code coverage problem. Please take a look .
| @Override | ||
| public InternalRow getMinValues() { | ||
| if (minValuesSize == 0) { | ||
| return null; |
There was a problem hiding this comment.
Since we always serialize the max and min InternalRow for statistics, a minValuesSize of 0 is invalid and should be validated within the constructor. This approach simplifies the usage of getMinValues by eliminating the need to handle the nullability of the returned value.
Introduce V2 batch format that collects min/max statistics for each column to enable efficient filtering.
Purpose
Linked issue: close #2885
Brief change log
Tests
API and Format
Documentation