Skip to content

feat(pruning): add StatisticsSource trait with two-phase resolve/evaluate API#21157

Draft
adriangb wants to merge 3 commits intoapache:mainfrom
pydantic:stats-api
Draft

feat(pruning): add StatisticsSource trait with two-phase resolve/evaluate API#21157
adriangb wants to merge 3 commits intoapache:mainfrom
pydantic:stats-api

Conversation

@adriangb
Copy link
Copy Markdown
Contributor

@adriangb adriangb commented Mar 25, 2026

Summary

  • Introduces StatisticsSource trait: an expression-based async statistics API that accepts &[Expr] and returns Vec<Option<ArrayRef>>
  • Adds ResolvedStatistics: a HashMap<Expr, ArrayRef> cache that separates async data resolution from sync predicate evaluation
  • Adds PruningPredicate::evaluate(): sync evaluation against pre-resolved stats cache
  • Blanket impl bridges all existing PruningStatistics implementations automatically
  • Refactors prune() to delegate through resolve_all_sync() + evaluate(), validating the two-phase pattern end-to-end

Design

The core idea is a two-phase resolve/evaluate split:

  1. Resolve (async): PruningPredicate::all_required_expressions() exposes what stats are needed as Vec<Expr>. The caller passes these to StatisticsSource::expression_statistics(), which returns arrays packaged into a ResolvedStatistics cache.
  2. Evaluate (sync): PruningPredicate::evaluate(&ResolvedStatistics) looks up each required expression in the cache, null-fills missing entries (conservative — won't prune), builds a RecordBatch, and evaluates the predicate.

This keeps the evaluation path synchronous for Stream::poll_next() contexts like EarlyStoppingStream, while allowing the resolution step to be async.

Future work

Struct field pruning (#21003)

Because StatisticsSource accepts arbitrary Expr, a custom implementation can handle expressions like min(get_field(struct_col, 'field')) by resolving nested Parquet column statistics directly. The blanket impl on PruningStatistics returns None for these (it only handles flat Expr::Column args), but a Parquet-aware StatisticsSource impl can override this. No further API changes needed — the expression language is already rich enough.

Async statistics sources

The async StatisticsSource trait enables use cases like querying an external metastore for statistics or sampling data at runtime. The two-phase pattern means callers resolve once (async) and evaluate many times (sync), which works well for dynamic filter scenarios where the predicate changes but the underlying data statistics don't.

Cardinality estimation

StatisticsSource could sit on ExecutionPlan nodes via a method like partition_expression_statistics(&[Expr]), delegating through DataSourceExecFileScanConfigFileSource → format-specific impl. This would enable queries like approx_count_distinct(col) for join optimization.

There is work in progress to add NDV statistics to Parquet but this could unlock things like extracting stats from sampled data.

Test plan

  • All 82 existing pruning tests pass unchanged
  • 16 new tests covering: resolve helpers (min/max/count/InList/NOT IN), ResolvedStatistics cache, evaluate-matches-prune equivalence, missing cache entries → conservative keep
  • Zero clippy warnings
  • datafusion-datasource-parquet compiles unchanged

🤖 Generated with Claude Code

@github-actions github-actions bot added the common Related to common crate label Mar 26, 2026
adriangb and others added 3 commits March 26, 2026 15:32
…uate API

Introduces a new expression-based statistics API for pruning that
separates async data resolution from sync predicate evaluation.

- StatisticsSource trait: accepts &[Expr], returns Vec<Option<ArrayRef>>
- ResolvedStatistics: HashMap<Expr, ArrayRef> cache for pre-resolved stats
- PruningPredicate::evaluate(): sync evaluation against pre-resolved cache
- PruningPredicate::all_required_expressions(): exposes needed Expr list
- Blanket impl bridges existing PruningStatistics implementations
- prune() refactored to delegate through resolve_all_sync + evaluate

This enables async statistics sources (external metastores, runtime
sampling) while keeping the evaluation path synchronous for use in
Stream::poll_next() contexts like EarlyStoppingStream.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix broken intra-doc links for Expr, ResolvedStatistics, PruningPredicate
- Replace deprecated Expr::Wildcard with Expr::Literal in count expressions
- Fix clippy: collapsible if, bool_assert_comparison, uninlined_format_args,
  cloned_ref_to_slice_refs
- Fix unused variable warning in test

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@adriangb
Copy link
Copy Markdown
Contributor Author

@kumarUjjawal i'm curious if you'd be interested in this since you've been working on statistics?

@kumarUjjawal
Copy link
Copy Markdown
Contributor

@kumarUjjawal i'm curious if you'd be interested in this since you've been working on statistics?

Thanks @adriangb I will take a look

Copy link
Copy Markdown
Contributor

@kumarUjjawal kumarUjjawal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just leaving some preliminary thoughts, let me know what you think:

PruningPredicate uses row_count as total rows in null_count != row_count (all-null guard), but all_required_expressions()encodes row count as count(*) FILTER (WHERE column IS NOT NULL).

A custom StatisticsSource can return non-null count for that expression, and then the all-null guard becomes wrong and can prune valid containers.
Current blanket impl hides this by mapping that expression to row_counts(), but external implementations are exposed to this contract mismatch.

We can either:

  • make the row-count expression unambiguous for total rows, or
  • clearly document/enforce that this specific filtered count expression is an alias for total row_counts.

Comment on lines +157 to +163
let num_containers = source.num_containers();
let arrays = source.expression_statistics(expressions).await?;
let cache = expressions
.iter()
.zip(arrays)
.filter_map(|(expr, arr)| arr.map(|a| (expr.clone(), a)))
.collect();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it silently truncates on length mismatch; If it returns a wrong-length vector, we silently lose mappings and continue with null fallback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants