feat(pruning): add StatisticsSource trait with two-phase resolve/evaluate API#21157
feat(pruning): add StatisticsSource trait with two-phase resolve/evaluate API#21157adriangb wants to merge 3 commits intoapache:mainfrom
Conversation
…uate API Introduces a new expression-based statistics API for pruning that separates async data resolution from sync predicate evaluation. - StatisticsSource trait: accepts &[Expr], returns Vec<Option<ArrayRef>> - ResolvedStatistics: HashMap<Expr, ArrayRef> cache for pre-resolved stats - PruningPredicate::evaluate(): sync evaluation against pre-resolved cache - PruningPredicate::all_required_expressions(): exposes needed Expr list - Blanket impl bridges existing PruningStatistics implementations - prune() refactored to delegate through resolve_all_sync + evaluate This enables async statistics sources (external metastores, runtime sampling) while keeping the evaluation path synchronous for use in Stream::poll_next() contexts like EarlyStoppingStream. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix broken intra-doc links for Expr, ResolvedStatistics, PruningPredicate - Replace deprecated Expr::Wildcard with Expr::Literal in count expressions - Fix clippy: collapsible if, bool_assert_comparison, uninlined_format_args, cloned_ref_to_slice_refs - Fix unused variable warning in test Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@kumarUjjawal i'm curious if you'd be interested in this since you've been working on statistics? |
Thanks @adriangb I will take a look |
There was a problem hiding this comment.
Just leaving some preliminary thoughts, let me know what you think:
PruningPredicate uses row_count as total rows in null_count != row_count (all-null guard), but all_required_expressions()encodes row count as count(*) FILTER (WHERE column IS NOT NULL).
A custom StatisticsSource can return non-null count for that expression, and then the all-null guard becomes wrong and can prune valid containers.
Current blanket impl hides this by mapping that expression to row_counts(), but external implementations are exposed to this contract mismatch.
We can either:
- make the row-count expression unambiguous for total rows, or
- clearly document/enforce that this specific filtered count expression is an alias for total row_counts.
| let num_containers = source.num_containers(); | ||
| let arrays = source.expression_statistics(expressions).await?; | ||
| let cache = expressions | ||
| .iter() | ||
| .zip(arrays) | ||
| .filter_map(|(expr, arr)| arr.map(|a| (expr.clone(), a))) | ||
| .collect(); |
There was a problem hiding this comment.
I think it silently truncates on length mismatch; If it returns a wrong-length vector, we silently lose mappings and continue with null fallback.
Summary
StatisticsSourcetrait: an expression-based async statistics API that accepts&[Expr]and returnsVec<Option<ArrayRef>>ResolvedStatistics: aHashMap<Expr, ArrayRef>cache that separates async data resolution from sync predicate evaluationPruningPredicate::evaluate(): sync evaluation against pre-resolved stats cachePruningStatisticsimplementations automaticallyprune()to delegate throughresolve_all_sync()+evaluate(), validating the two-phase pattern end-to-endDesign
The core idea is a two-phase resolve/evaluate split:
PruningPredicate::all_required_expressions()exposes what stats are needed asVec<Expr>. The caller passes these toStatisticsSource::expression_statistics(), which returns arrays packaged into aResolvedStatisticscache.PruningPredicate::evaluate(&ResolvedStatistics)looks up each required expression in the cache, null-fills missing entries (conservative — won't prune), builds aRecordBatch, and evaluates the predicate.This keeps the evaluation path synchronous for
Stream::poll_next()contexts likeEarlyStoppingStream, while allowing the resolution step to be async.Future work
Struct field pruning (#21003)
Because
StatisticsSourceaccepts arbitraryExpr, a custom implementation can handle expressions likemin(get_field(struct_col, 'field'))by resolving nested Parquet column statistics directly. The blanket impl onPruningStatisticsreturnsNonefor these (it only handles flatExpr::Columnargs), but a Parquet-awareStatisticsSourceimpl can override this. No further API changes needed — the expression language is already rich enough.Async statistics sources
The async
StatisticsSourcetrait enables use cases like querying an external metastore for statistics or sampling data at runtime. The two-phase pattern means callers resolve once (async) and evaluate many times (sync), which works well for dynamic filter scenarios where the predicate changes but the underlying data statistics don't.Cardinality estimation
StatisticsSourcecould sit onExecutionPlannodes via a method likepartition_expression_statistics(&[Expr]), delegating throughDataSourceExec→FileScanConfig→FileSource→ format-specific impl. This would enable queries likeapprox_count_distinct(col)for join optimization.There is work in progress to add NDV statistics to Parquet but this could unlock things like extracting stats from sampled data.
Test plan
datafusion-datasource-parquetcompiles unchanged🤖 Generated with Claude Code