From 5826a53cfb4fddeb1413cc73f80e43529714c7e2 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 00:42:40 -0700 Subject: [PATCH 01/11] refactor: add TableScanBuilder, deprecate TableScan::try_new `TableScan::try_new` takes five positional arguments and bare `TableScan { .. }` struct literals are scattered across the codebase, making both fragile to field additions. Introduce `TableScanBuilder` (with `From`, so an existing scan can be decomposed, tweaked, and rebuilt) and move the schema-derivation logic into `build()`. `TableScan::try_new` is now deprecated and delegates to the builder; all in-tree callers are migrated to the builder. Pure refactor, no behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/expr/src/logical_plan/builder.rs | 11 ++- datafusion/expr/src/logical_plan/mod.rs | 4 +- datafusion/expr/src/logical_plan/plan.rs | 88 +++++++++++++++++-- .../optimizer/src/optimize_projections/mod.rs | 20 ++--- datafusion/proto/src/logical_plan/mod.rs | 15 +--- 5 files changed, 102 insertions(+), 36 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 8c033745786cd..7bc705e0f46b5 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -33,8 +33,8 @@ use crate::expr_rewriter::{ use crate::logical_plan::{ Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, - Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values, - Window, + Projection, Repartition, Sort, SubqueryAlias, TableScanBuilder, Union, Unnest, + Values, Window, }; use crate::select_expr::SelectExpr; use crate::utils::{ @@ -515,8 +515,11 @@ impl LogicalPlanBuilder { filters: Vec, fetch: Option, ) -> Result { - let table_scan = - TableScan::try_new(table_name, table_source, projection, filters, fetch)?; + let table_scan = TableScanBuilder::new(table_name, table_source) + .with_projection(projection) + .with_filters(filters) + .with_fetch(fetch) + .build()?; // Inline TableScan if table_scan.filters.is_empty() diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index c2b01868c97f3..5087b25178ab6 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -42,8 +42,8 @@ pub use plan::{ EmptyRelation, Explain, ExplainOption, Extension, FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, - SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, - projection_schema, + SubqueryAlias, TableScan, TableScanBuilder, ToStringifiedPlan, Union, Unnest, Values, + Window, projection_schema, }; pub use statement::{ Deallocate, Execute, Prepare, ResetVariable, SetVariable, Statement, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 2f1061c4382b3..fc94e0a33c6c7 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2867,6 +2867,7 @@ impl Hash for TableScan { impl TableScan { /// Initialize TableScan with appropriate schema from the given /// arguments. + #[deprecated(since = "54.0.0", note = "use `TableScanBuilder` instead")] pub fn try_new( table_name: impl Into, table_source: Arc, @@ -2874,14 +2875,79 @@ impl TableScan { filters: Vec, fetch: Option, ) -> Result { - let table_name = table_name.into(); + TableScanBuilder::new(table_name, table_source) + .with_projection(projection) + .with_filters(filters) + .with_fetch(fetch) + .build() + } +} + +/// Builder for [`TableScan`]. +/// +/// Prefer this over constructing a [`TableScan`] directly: it derives the +/// `projected_schema` from the source schema and projection, and is resilient +/// to new fields being added to [`TableScan`]. An existing scan can be turned +/// back into a builder with `TableScanBuilder::from(scan)`, tweaked, and +/// rebuilt with [`TableScanBuilder::build`]. +pub struct TableScanBuilder { + table_name: TableReference, + source: Arc, + projection: Option>, + filters: Vec, + fetch: Option, +} + +impl TableScanBuilder { + /// Create a new builder for a scan of `source` named `table_name`. + pub fn new( + table_name: impl Into, + source: Arc, + ) -> Self { + Self { + table_name: table_name.into(), + source, + projection: None, + filters: vec![], + fetch: None, + } + } + + /// Set the column projection (indices into the source schema). + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + + /// Set the filter expressions offered to the table provider. + pub fn with_filters(mut self, filters: Vec) -> Self { + self.filters = filters; + self + } + + /// Set the maximum number of rows to read. + pub fn with_fetch(mut self, fetch: Option) -> Self { + self.fetch = fetch; + self + } + + /// Build the [`TableScan`], deriving its `projected_schema` from the + /// source schema and projection. + pub fn build(self) -> Result { + let TableScanBuilder { + table_name, + source, + projection, + filters, + fetch, + } = self; if table_name.table().is_empty() { return plan_err!("table_name cannot be empty"); } - let schema = table_source.schema(); + let schema = source.schema(); let func_dependencies = FunctionalDependencies::new_from_constraints( - table_source.constraints(), + source.constraints(), schema.fields.len(), ); let projected_schema = projection @@ -2907,9 +2973,9 @@ impl TableScan { })?; let projected_schema = Arc::new(projected_schema); - Ok(Self { + Ok(TableScan { table_name, - source: table_source, + source, projection, projected_schema, filters, @@ -2918,6 +2984,18 @@ impl TableScan { } } +impl From for TableScanBuilder { + fn from(scan: TableScan) -> Self { + Self { + table_name: scan.table_name, + source: scan.source, + projection: scan.projection, + filters: scan.filters, + fetch: scan.fetch, + } + } +} + // Repartition the plan based on a partitioning scheme. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] pub struct Repartition { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 59109a822bdbe..b9f22a3f9e52d 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -29,8 +29,8 @@ use datafusion_common::{ }; use datafusion_expr::expr::Alias; use datafusion_expr::{ - Aggregate, Distinct, EmptyRelation, Expr, Projection, TableScan, Unnest, Window, - logical_plan::LogicalPlan, + Aggregate, Distinct, EmptyRelation, Expr, Projection, TableScanBuilder, Unnest, + Window, logical_plan::LogicalPlan, }; use crate::optimize_projections::required_indices::RequiredIndices; @@ -269,23 +269,15 @@ fn optimize_projections( .transform_data(|plan| optimize_subqueries(plan, config)); } LogicalPlan::TableScan(table_scan) => { - let TableScan { - table_name, - source, - projection, - filters, - fetch, - projected_schema: _, - } = table_scan; - // Get indices referred to in the original (schema with all fields) // given projected indices. - let projection = match &projection { + let projection = match &table_scan.projection { Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), None => indices.into_inner(), }; - let new_scan = - TableScan::try_new(table_name, source, Some(projection), filters, fetch)?; + let new_scan = TableScanBuilder::from(table_scan) + .with_projection(Some(projection)) + .build()?; return Transformed::yes(LogicalPlan::TableScan(new_scan)) .transform_data(|plan| optimize_subqueries(plan, config)); diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 542cae890d693..e3785326675c1 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -40,7 +40,7 @@ use datafusion_catalog::empty::EmptyTable; use datafusion_common::file_options::file_type::FileType; use datafusion_common::format::ExplainFormat; use datafusion_common::{ - NullEquality, Result, TableReference, ToDFSchema, assert_or_internal_err, context, + NullEquality, Result, TableReference, assert_or_internal_err, context, internal_datafusion_err, internal_err, not_impl_err, plan_err, }; use datafusion_datasource::file_format::FileFormat; @@ -66,7 +66,8 @@ use datafusion_expr::{ logical_plan::{ Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation, Extension, Join, Prepare, Projection, - Repartition, Sort, SubqueryAlias, TableScan, Values, Window, builder::project, + Repartition, Sort, SubqueryAlias, TableScan, TableScanBuilder, Values, Window, + builder::project, }, }; @@ -371,15 +372,7 @@ fn from_table_source( target: Arc, extension_codec: &dyn LogicalExtensionCodec, ) -> Result { - let projected_schema = target.schema().to_dfschema_ref()?; - let r = LogicalPlan::TableScan(TableScan { - table_name, - source: target, - projection: None, - projected_schema, - filters: vec![], - fetch: None, - }); + let r = LogicalPlan::TableScan(TableScanBuilder::new(table_name, target).build()?); LogicalPlanNode::try_from_logical_plan(&r, extension_codec) } From 702786635bfdc087497f7c75a12a504c5a019d78 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 00:44:47 -0700 Subject: [PATCH 02/11] feat: add StatisticsRequest type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `StatisticsRequest` to `datafusion-expr-common::statistics` — a small vocabulary for query-aware statistics: a caller can ask a provider for a specific statistic (Min/Max/NullCount/DistinctCount/Sum/ByteSize per column, plus RowCount and TotalByteSize) instead of for a dense `Statistics` covering every column. It derives `Ord` so requests can be held in a deduplicating, deterministically-ordered `BTreeSet` (see the next commit). It is intentionally just a vocabulary; nothing in DataFusion populates or consumes it yet. Re-exported via `datafusion_expr::statistics`. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/expr-common/src/statistics.rs | 41 ++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/datafusion/expr-common/src/statistics.rs b/datafusion/expr-common/src/statistics.rs index c94c181615aed..055edd8ba297c 100644 --- a/datafusion/expr-common/src/statistics.rs +++ b/datafusion/expr-common/src/statistics.rs @@ -1694,3 +1694,44 @@ mod tests { all_ops.into_iter().collect() } } + +// --------------------------------------------------------------------------- +// Query-aware statistics requests. +// +// A small extension to the existing `Statistics` model: instead of "give me +// everything you have for every column", a caller can ask for a specific list +// of stats by name. `StatisticsRequest` is just that vocabulary — DataFusion +// itself does not populate or consume it. It exists so a request can be +// threaded from a `TableScan` (see `TableScan::statistics_requests`) through +// `ScanArgs::statistics_requests` to a `TableProvider`, which is enough for a +// query-aware statistics feature to be implemented outside of DataFusion. +// --------------------------------------------------------------------------- + +use datafusion_common::Column; + +/// A statistic a caller would like a provider to supply, if it can do so +/// cheaply. +/// +/// Each variant maps onto a field of [`datafusion_common::Statistics`] / +/// [`datafusion_common::ColumnStatistics`], so a provider that already +/// populates one can answer the request trivially. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub enum StatisticsRequest { + /// Smallest non-null value of `column`. + Min(Column), + /// Largest non-null value of `column`. + Max(Column), + /// Number of NULLs in `column`. + NullCount(Column), + /// Number of distinct values in `column` (exact or estimated). + DistinctCount(Column), + /// Sum of values in `column` (numerics, widened per + /// `ColumnStatistics::sum_value`). + Sum(Column), + /// Encoded/output byte size of `column`. + ByteSize(Column), + /// Number of rows in the container (table / file). + RowCount, + /// Total byte size of the container's output. + TotalByteSize, +} From 45adad37f9b53788dc83d85c0ba03915c7b74299 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 00:50:34 -0700 Subject: [PATCH 03/11] feat: add TableScan::statistics_requests field MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an advisory `statistics_requests: BTreeSet` field to `TableScan`. A custom optimizer rule can record the statistics the surrounding plan shape would benefit from (e.g. Min/Max for sort keys); the physical planner threads them into the table provider (next commit). A `BTreeSet` rather than a `Vec`: the optimizer runs rules to a fixpoint, so the request collection must be idempotent under re-derivation. With a set a rule simply `insert`s its requests — re-running is a no-op and composes with other rules — instead of every rule having to dedupe a `Vec` itself. `BTree` ordering also keeps plans deterministic. The field is empty by default and DataFusion's own rules never populate it. It can be set when building a scan via `TableScanBuilder::with_statistics_requests`, or mutated directly (it is `pub`, like `TableScan`'s other fields). `Debug`/`PartialEq`/`Eq`/`Hash`/ `PartialOrd` for `TableScan` are left unchanged — it is advisory metadata, not part of plan identity. `map_expressions` in `tree_node.rs` is rewritten to rebuild `TableScan` via `..scan` instead of an exhaustive destructure, so it carries this (and any future) field through untouched. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/expr/src/logical_plan/plan.rs | 33 ++++++++++++++++++- datafusion/expr/src/logical_plan/tree_node.rs | 24 +++++--------- datafusion/optimizer/src/push_down_filter.rs | 1 + 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index fc94e0a33c6c7..c0e02bb2f535e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -18,7 +18,7 @@ //! Logical plan types use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt::{self, Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::{Arc, LazyLock}; @@ -50,6 +50,7 @@ use crate::{ WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed, }; +use crate::statistics::StatisticsRequest; use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; use datafusion_common::cse::{NormalizeEq, Normalizeable}; use datafusion_common::format::ExplainFormat; @@ -2793,6 +2794,19 @@ pub struct TableScan { pub filters: Vec, /// Optional number of rows to read pub fetch: Option, + /// Statistics the planner would like the provider to answer for this + /// scan, typically attached by a custom optimizer rule from the + /// surrounding plan shape (e.g. Min/Max for sort keys). Threaded into + /// the table provider via `ScanArgs::statistics_requests` at + /// physical-planning time. Advisory and empty by default; DataFusion's + /// own rules never populate it. + /// + /// A [`BTreeSet`], not a `Vec`: optimizer rules run to a fixpoint, so the + /// collection must be idempotent under re-derivation — a rule `insert`s + /// the requests it wants and re-running is a no-op (and composes with + /// other rules) without each rule having to dedupe. The ordering also + /// keeps the resulting plan deterministic. + pub statistics_requests: BTreeSet, } impl Debug for TableScan { @@ -2896,6 +2910,7 @@ pub struct TableScanBuilder { projection: Option>, filters: Vec, fetch: Option, + statistics_requests: BTreeSet, } impl TableScanBuilder { @@ -2910,6 +2925,7 @@ impl TableScanBuilder { projection: None, filters: vec![], fetch: None, + statistics_requests: BTreeSet::new(), } } @@ -2931,6 +2947,16 @@ impl TableScanBuilder { self } + /// Set the statistics requests for the scan. See + /// [`TableScan::statistics_requests`]. + pub fn with_statistics_requests( + mut self, + statistics_requests: BTreeSet, + ) -> Self { + self.statistics_requests = statistics_requests; + self + } + /// Build the [`TableScan`], deriving its `projected_schema` from the /// source schema and projection. pub fn build(self) -> Result { @@ -2940,6 +2966,7 @@ impl TableScanBuilder { projection, filters, fetch, + statistics_requests, } = self; if table_name.table().is_empty() { @@ -2980,6 +3007,7 @@ impl TableScanBuilder { projected_schema, filters, fetch, + statistics_requests, }) } } @@ -2992,6 +3020,7 @@ impl From for TableScanBuilder { projection: scan.projection, filters: scan.filters, fetch: scan.fetch, + statistics_requests: scan.statistics_requests, } } } @@ -5230,6 +5259,7 @@ mod tests { projected_schema: Arc::clone(&schema), filters: vec![], fetch: None, + statistics_requests: BTreeSet::new(), })); let col = schema.field_names()[0].clone(); @@ -5260,6 +5290,7 @@ mod tests { projected_schema: Arc::clone(&unique_schema), filters: vec![], fetch: None, + statistics_requests: BTreeSet::new(), })); let col = schema.field_names()[0].clone(); diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 1f58de37e93b0..45af8eaa83130 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -608,23 +608,15 @@ impl LogicalPlan { Transformed::new(plan, exprs.transformed, exprs.tnr) } } - LogicalPlan::TableScan(TableScan { - table_name, - source, - projection, - projected_schema, - filters, - fetch, - }) => filters.map_elements(f)?.update_data(|filters| { - LogicalPlan::TableScan(TableScan { - table_name, - source, - projection, - projected_schema, - filters, - fetch, + LogicalPlan::TableScan(mut scan) => { + // Only `filters` carry expressions; take them out, rewrite, + // and rebuild with `..scan` so every other field (including + // any added later) is carried through untouched. + let filters = std::mem::take(&mut scan.filters); + filters.map_elements(f)?.update_data(|filters| { + LogicalPlan::TableScan(TableScan { filters, ..scan }) }) - }), + } LogicalPlan::Distinct(Distinct::On(DistinctOn { on_expr, select_expr, diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 9c2ac07ff07d8..54878d2f542c0 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3119,6 +3119,7 @@ mod tests { projection, source: Arc::new(test_provider), fetch: None, + statistics_requests: std::collections::BTreeSet::new(), }); Ok(LogicalPlanBuilder::from(table_scan)) From d1852502415a0664b4a854b36335190b82ccb8e5 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 00:53:38 -0700 Subject: [PATCH 04/11] feat: thread statistics requests into ScanArgs Add a `statistics_requests` field to `ScanArgs` (with `with_statistics_requests` / `statistics_requests` accessors) and have the physical planner thread `TableScan::statistics_requests` into it. This completes the request-side path: a custom optimizer rule annotates `TableScan`, and the request reaches a custom `TableProvider` in `scan_with_args`. DataFusion's own providers ignore the field; the default `ScanArgs` value is an empty slice. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/catalog/src/table.rs | 23 +++++++++++++++++++++++ datafusion/core/src/physical_planner.rs | 6 +++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 5d1391bed1172..3ae0b2ed10a95 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -26,6 +26,7 @@ use async_trait::async_trait; use datafusion_common::{Constraints, Statistics, not_impl_err}; use datafusion_common::{Result, internal_err}; use datafusion_expr::Expr; +use datafusion_expr::statistics::StatisticsRequest; use datafusion_expr::dml::InsertOp; use datafusion_expr::{ @@ -406,6 +407,7 @@ pub struct ScanArgs<'a> { filters: Option<&'a [Expr]>, projection: Option<&'a [usize]>, limit: Option, + statistics_requests: &'a [StatisticsRequest], } impl<'a> ScanArgs<'a> { @@ -467,6 +469,27 @@ impl<'a> ScanArgs<'a> { pub fn limit(&self) -> Option { self.limit } + + /// Set the statistics the caller would like the provider to answer for + /// this scan, if it can do so cheaply. + /// + /// Providers read these via [`Self::statistics_requests()`]; anything a + /// provider cannot answer cheaply it simply ignores. DataFusion's own + /// `TableProvider`s ignore this field — it exists so a request can be + /// threaded from a custom optimizer rule (which annotates + /// `TableScan::statistics_requests`) through to a custom provider. + pub fn with_statistics_requests( + mut self, + statistics_requests: &'a [StatisticsRequest], + ) -> Self { + self.statistics_requests = statistics_requests; + self + } + + /// Get the statistics requests for the scan. Empty if none were set. + pub fn statistics_requests(&self) -> &'a [StatisticsRequest] { + self.statistics_requests + } } /// Result of a table scan operation from [`TableProvider::scan_with_args`]. diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index ee97309c27aae..a00d07a09fd78 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -647,6 +647,7 @@ impl DefaultPhysicalPlanner { filters, fetch, projected_schema, + statistics_requests, .. } = scan; @@ -656,10 +657,13 @@ impl DefaultPhysicalPlanner { // referred to in the query let filters = unnormalize_cols(filters.iter().cloned()); let filters_vec = filters.into_iter().collect::>(); + let stats_requests = + statistics_requests.iter().cloned().collect::>(); let opts = ScanArgs::default() .with_projection(projection.as_deref()) .with_filters(Some(&filters_vec)) - .with_limit(*fetch); + .with_limit(*fetch) + .with_statistics_requests(&stats_requests); let res = source.scan_with_args(session_state, opts).await?; Arc::clone(res.plan()) } else { From cd2202e36b36eb5cb796faed6081b70498be83f0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 01:00:18 -0700 Subject: [PATCH 05/11] test: e2e statistics-request flow via a custom optimizer rule Add a self-contained integration test that plays both external roles: a custom `OptimizerRule` annotates each `TableScan` with `StatisticsRequest`s, and a custom `TableProvider` records the `ScanArgs::statistics_requests` it receives in `scan_with_args`. This demonstrates the request-side hooks are sufficient to build the feature entirely outside of DataFusion. A second test confirms that without such a rule the provider sees an empty request list. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/core/tests/user_defined/mod.rs | 4 + .../tests/user_defined/statistics_requests.rs | 216 ++++++++++++++++++ 2 files changed, 220 insertions(+) create mode 100644 datafusion/core/tests/user_defined/statistics_requests.rs diff --git a/datafusion/core/tests/user_defined/mod.rs b/datafusion/core/tests/user_defined/mod.rs index bc9949f5d681c..4dad3ec4577d9 100644 --- a/datafusion/core/tests/user_defined/mod.rs +++ b/datafusion/core/tests/user_defined/mod.rs @@ -41,3 +41,7 @@ mod relation_planner; /// Tests for insert operations mod insert_operation; + +/// Tests for `StatisticsRequest`s flowing from a custom optimizer rule +/// through the physical planner into a custom `TableProvider`. +mod statistics_requests; diff --git a/datafusion/core/tests/user_defined/statistics_requests.rs b/datafusion/core/tests/user_defined/statistics_requests.rs new file mode 100644 index 0000000000000..b95bbf06757d7 --- /dev/null +++ b/datafusion/core/tests/user_defined/statistics_requests.rs @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end test that a *custom* optimizer rule can annotate a +//! `TableScan` with `StatisticsRequest`s and have them reach a *custom* +//! `TableProvider`'s `scan_with_args`. +//! +//! DataFusion ships no rule that populates `TableScan::statistics_requests` +//! and no provider that consumes `ScanArgs::statistics_requests`. This test +//! plays both roles, demonstrating that the request-side hooks are +//! sufficient to build the whole feature outside of DataFusion. + +use std::sync::{Arc, Mutex}; + +use arrow::array::{Int64Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::catalog::{ScanArgs, ScanResult, Session, TableProvider}; +use datafusion::common::tree_node::Transformed; +use datafusion::common::{Column, Result}; +use datafusion::datasource::TableType; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::execution::context::SessionContext; +use datafusion::execution::session_state::SessionStateBuilder; +use datafusion::logical_expr::statistics::StatisticsRequest; +use datafusion::logical_expr::{Expr, LogicalPlan}; +use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; +use datafusion::physical_plan::ExecutionPlan; + +/// A custom optimizer rule that annotates every `TableScan` with a +/// `RowCount` request plus a `Min` request for each of its columns. +/// +/// This stands in for whatever request-derivation logic an external +/// implementer would write (e.g. Min/Max for sort keys, DistinctCount for +/// join keys). Here it is intentionally trivial and deterministic. +#[derive(Debug)] +struct RequestColumnStatistics; + +impl OptimizerRule for RequestColumnStatistics { + fn name(&self) -> &str { + "test_request_column_statistics" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + let LogicalPlan::TableScan(mut scan) = plan else { + return Ok(Transformed::no(plan)); + }; + // Insert into the scan's existing request set. `BTreeSet::insert` + // reports whether the value was new, so the rule is idempotent — and + // composes with other rules' requests for free: re-inserting an + // existing request is a no-op, and we report `Transformed::yes` only + // when something was actually added, so the optimizer reaches a + // fixpoint without a manual "already visited" guard. + let mut changed = scan.statistics_requests.insert(StatisticsRequest::RowCount); + for field in scan.projected_schema.fields() { + let req = StatisticsRequest::Min(Column::new_unqualified(field.name())); + changed |= scan.statistics_requests.insert(req); + } + Ok(if changed { + Transformed::yes(LogicalPlan::TableScan(scan)) + } else { + Transformed::no(LogicalPlan::TableScan(scan)) + }) + } +} + +/// A `TableProvider` that records the `statistics_requests` it was asked +/// for, so the test can assert what reached it. +#[derive(Debug)] +struct RecordingTable { + schema: SchemaRef, + batch: RecordBatch, + last_requests: Arc>>, +} + +#[async_trait] +impl TableProvider for RecordingTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(MemorySourceConfig::try_new_exec( + &[vec![self.batch.clone()]], + Arc::clone(&self.schema), + projection.cloned(), + )?) + } + + async fn scan_with_args<'a>( + &self, + state: &dyn Session, + args: ScanArgs<'a>, + ) -> Result { + // Record what reached us, then delegate to `scan`. + *self.last_requests.lock().unwrap() = args.statistics_requests().to_vec(); + let plan = self + .scan( + state, + args.projection().map(|p| p.to_vec()).as_ref(), + args.filters().unwrap_or(&[]), + args.limit(), + ) + .await?; + Ok(ScanResult::new(plan)) + } +} + +fn make_table() -> (Arc, Arc>>) { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(Int64Array::from(vec![10, 20, 30])), + ], + ) + .unwrap(); + let last_requests = Arc::new(Mutex::new(Vec::new())); + let provider = Arc::new(RecordingTable { + schema, + batch, + last_requests: Arc::clone(&last_requests), + }); + (provider, last_requests) +} + +#[tokio::test] +async fn custom_rule_requests_reach_custom_provider() -> Result<()> { + let (provider, last_requests) = make_table(); + + let state = SessionStateBuilder::new() + .with_default_features() + .with_optimizer_rule(Arc::new(RequestColumnStatistics)) + .build(); + let ctx = SessionContext::new_with_state(state); + ctx.register_table("t", provider)?; + + ctx.sql("SELECT a, b FROM t").await?.collect().await?; + + let got = last_requests.lock().unwrap().clone(); + assert_eq!( + got.len(), + 3, + "expected RowCount + Min(a) + Min(b), got {got:?}" + ); + assert!( + got.contains(&StatisticsRequest::RowCount), + "expected RowCount, got {got:?}" + ); + assert!( + got.contains(&StatisticsRequest::Min(Column::new_unqualified("a"))), + "expected Min(a), got {got:?}" + ); + assert!( + got.contains(&StatisticsRequest::Min(Column::new_unqualified("b"))), + "expected Min(b), got {got:?}" + ); + Ok(()) +} + +#[tokio::test] +async fn no_requests_without_a_rule() -> Result<()> { + // Without a rule populating `TableScan::statistics_requests`, the + // provider sees an empty request list — stock DataFusion behavior. + let (provider, last_requests) = make_table(); + let ctx = SessionContext::new(); + ctx.register_table("t", provider)?; + + ctx.sql("SELECT a, b FROM t").await?.collect().await?; + + assert!( + last_requests.lock().unwrap().is_empty(), + "expected no requests without a custom rule" + ); + Ok(()) +} From a7c1f20c7b500cf5a8f3768d9c4c77c5e3327435 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 26 May 2026 13:18:19 -0500 Subject: [PATCH 06/11] Apply suggestions from code review Co-authored-by: Andrew Lamb --- datafusion/catalog/src/table.rs | 8 ++++++-- datafusion/expr/src/logical_plan/plan.rs | 11 ++--------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 3ae0b2ed10a95..33a125d9b74b4 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -470,8 +470,10 @@ impl<'a> ScanArgs<'a> { self.limit } - /// Set the statistics the caller would like the provider to answer for - /// this scan, if it can do so cheaply. + /// Specifies the statistics the caller may use when optimizing the query. + /// + /// This is intended for to allow the provider to cheaply provide statistics that may help + /// such as those it has in an in memory catalog or from some other metadata source. /// /// Providers read these via [`Self::statistics_requests()`]; anything a /// provider cannot answer cheaply it simply ignores. DataFusion's own @@ -487,6 +489,8 @@ impl<'a> ScanArgs<'a> { } /// Get the statistics requests for the scan. Empty if none were set. + /// + /// See [`Self::with_statistics_requests`] for more details pub fn statistics_requests(&self) -> &'a [StatisticsRequest] { self.statistics_requests } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c0e02bb2f535e..97318dc4da633 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2796,16 +2796,9 @@ pub struct TableScan { pub fetch: Option, /// Statistics the planner would like the provider to answer for this /// scan, typically attached by a custom optimizer rule from the - /// surrounding plan shape (e.g. Min/Max for sort keys). Threaded into - /// the table provider via `ScanArgs::statistics_requests` at - /// physical-planning time. Advisory and empty by default; DataFusion's - /// own rules never populate it. + /// surrounding plan (e.g. Min/Max for sort keys). /// - /// A [`BTreeSet`], not a `Vec`: optimizer rules run to a fixpoint, so the - /// collection must be idempotent under re-derivation — a rule `insert`s - /// the requests it wants and re-running is a no-op (and composes with - /// other rules) without each rule having to dedupe. The ordering also - /// keeps the resulting plan deterministic. + /// A [`BTreeSet`], not a `Vec` to keep the resulting plan deterministic. pub statistics_requests: BTreeSet, } From 23d03a415ed7437b094d4560917da4a30f3bb44a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 26 May 2026 13:23:26 -0500 Subject: [PATCH 07/11] refactor: revert TableScan tree-node match to explicit destructuring Restores the explicit field destructuring for the LogicalPlan::TableScan arm of map_expressions instead of the `mut scan` + `..scan` shorthand. Explicit destructuring forces a compile error if a new field carrying expressions is added to TableScan, so this callsite cannot be silently missed. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/expr/src/logical_plan/tree_node.rs | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 45af8eaa83130..801caddcd089a 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -608,15 +608,25 @@ impl LogicalPlan { Transformed::new(plan, exprs.transformed, exprs.tnr) } } - LogicalPlan::TableScan(mut scan) => { - // Only `filters` carry expressions; take them out, rewrite, - // and rebuild with `..scan` so every other field (including - // any added later) is carried through untouched. - let filters = std::mem::take(&mut scan.filters); - filters.map_elements(f)?.update_data(|filters| { - LogicalPlan::TableScan(TableScan { filters, ..scan }) + LogicalPlan::TableScan(TableScan { + table_name, + source, + projection, + projected_schema, + filters, + fetch, + statistics_requests, + }) => filters.map_elements(f)?.update_data(|filters| { + LogicalPlan::TableScan(TableScan { + table_name, + source, + projection, + projected_schema, + filters, + fetch, + statistics_requests, }) - } + }), LogicalPlan::Distinct(Distinct::On(DistinctOn { on_expr, select_expr, From 3c29d2ebae5435a86e63b2b4d401fa2a452f2bb9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 26 May 2026 13:23:43 -0500 Subject: [PATCH 08/11] docs: clarify "Provider" as "TableProvider" in with_statistics_requests Resolves an ambiguity raised in review: the doc referred to "the provider"/"Providers" without saying which provider. Use `TableProvider` consistently, and fix a typo ("for to allow") and trailing whitespace. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/catalog/src/table.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 33a125d9b74b4..c6468fd5ad131 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -472,14 +472,15 @@ impl<'a> ScanArgs<'a> { /// Specifies the statistics the caller may use when optimizing the query. /// - /// This is intended for to allow the provider to cheaply provide statistics that may help - /// such as those it has in an in memory catalog or from some other metadata source. + /// This is intended to allow the `TableProvider` to cheaply provide + /// statistics that may help, such as those it has in an in-memory catalog + /// or from some other metadata source. /// - /// Providers read these via [`Self::statistics_requests()`]; anything a - /// provider cannot answer cheaply it simply ignores. DataFusion's own - /// `TableProvider`s ignore this field — it exists so a request can be + /// `TableProvider`s read these via [`Self::statistics_requests()`]; anything + /// a `TableProvider` cannot answer cheaply it simply ignores. DataFusion's + /// own `TableProvider`s ignore this field — it exists so a request can be /// threaded from a custom optimizer rule (which annotates - /// `TableScan::statistics_requests`) through to a custom provider. + /// `TableScan::statistics_requests`) through to a custom `TableProvider`. pub fn with_statistics_requests( mut self, statistics_requests: &'a [StatisticsRequest], From 6f3cfc94ea040835cd39657040f37d723996dcbb Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 26 May 2026 13:24:37 -0500 Subject: [PATCH 09/11] docs: fold StatisticsRequest overview comment into its doc comment The "Query-aware statistics requests" explanation lived in a `//` block comment above the enum, where it is invisible in rustdoc. Move it into the enum's `///` doc comment so it is discoverable. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/expr-common/src/statistics.rs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/datafusion/expr-common/src/statistics.rs b/datafusion/expr-common/src/statistics.rs index 055edd8ba297c..18a19b1c83883 100644 --- a/datafusion/expr-common/src/statistics.rs +++ b/datafusion/expr-common/src/statistics.rs @@ -1695,23 +1695,19 @@ mod tests { } } -// --------------------------------------------------------------------------- -// Query-aware statistics requests. -// -// A small extension to the existing `Statistics` model: instead of "give me -// everything you have for every column", a caller can ask for a specific list -// of stats by name. `StatisticsRequest` is just that vocabulary — DataFusion -// itself does not populate or consume it. It exists so a request can be -// threaded from a `TableScan` (see `TableScan::statistics_requests`) through -// `ScanArgs::statistics_requests` to a `TableProvider`, which is enough for a -// query-aware statistics feature to be implemented outside of DataFusion. -// --------------------------------------------------------------------------- - use datafusion_common::Column; /// A statistic a caller would like a provider to supply, if it can do so /// cheaply. /// +/// A small, query-aware extension to the existing `Statistics` model: instead +/// of "give me everything you have for every column", a caller can ask for a +/// specific list of stats by name. `StatisticsRequest` is just that vocabulary +/// — DataFusion itself does not populate or consume it. It exists so a request +/// can be threaded from a `TableScan` (see `TableScan::statistics_requests`) +/// through `ScanArgs::statistics_requests` to a `TableProvider`, which is enough +/// for a query-aware statistics feature to be implemented outside of DataFusion. +/// /// Each variant maps onto a field of [`datafusion_common::Statistics`] / /// [`datafusion_common::ColumnStatistics`], so a provider that already /// populates one can answer the request trivially. From c147c379f04e9bb3e7bafd2ce25170e8c54914d7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 26 May 2026 13:28:22 -0500 Subject: [PATCH 10/11] refactor: hold Arc in StatisticsRequest per-column variants A Column owns its relation/name strings, so cloning a StatisticsRequest cloned those strings. The requests live in a BTreeSet on TableScan that is cloned with the plan throughout optimization, so wrap the per-column payload in Arc to make those clones cheap. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tests/user_defined/statistics_requests.rs | 11 ++++++++--- datafusion/expr-common/src/statistics.rs | 19 +++++++++++++------ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/datafusion/core/tests/user_defined/statistics_requests.rs b/datafusion/core/tests/user_defined/statistics_requests.rs index b95bbf06757d7..64c6676c23746 100644 --- a/datafusion/core/tests/user_defined/statistics_requests.rs +++ b/datafusion/core/tests/user_defined/statistics_requests.rs @@ -79,7 +79,8 @@ impl OptimizerRule for RequestColumnStatistics { // fixpoint without a manual "already visited" guard. let mut changed = scan.statistics_requests.insert(StatisticsRequest::RowCount); for field in scan.projected_schema.fields() { - let req = StatisticsRequest::Min(Column::new_unqualified(field.name())); + let req = + StatisticsRequest::Min(Arc::new(Column::new_unqualified(field.name()))); changed |= scan.statistics_requests.insert(req); } Ok(if changed { @@ -188,11 +189,15 @@ async fn custom_rule_requests_reach_custom_provider() -> Result<()> { "expected RowCount, got {got:?}" ); assert!( - got.contains(&StatisticsRequest::Min(Column::new_unqualified("a"))), + got.contains(&StatisticsRequest::Min(Arc::new(Column::new_unqualified( + "a" + )))), "expected Min(a), got {got:?}" ); assert!( - got.contains(&StatisticsRequest::Min(Column::new_unqualified("b"))), + got.contains(&StatisticsRequest::Min(Arc::new(Column::new_unqualified( + "b" + )))), "expected Min(b), got {got:?}" ); Ok(()) diff --git a/datafusion/expr-common/src/statistics.rs b/datafusion/expr-common/src/statistics.rs index 18a19b1c83883..034358b043135 100644 --- a/datafusion/expr-common/src/statistics.rs +++ b/datafusion/expr-common/src/statistics.rs @@ -1695,6 +1695,8 @@ mod tests { } } +use std::sync::Arc; + use datafusion_common::Column; /// A statistic a caller would like a provider to supply, if it can do so @@ -1711,21 +1713,26 @@ use datafusion_common::Column; /// Each variant maps onto a field of [`datafusion_common::Statistics`] / /// [`datafusion_common::ColumnStatistics`], so a provider that already /// populates one can answer the request trivially. +/// +/// The per-column variants hold an `Arc` rather than an owned +/// [`Column`] (which carries owned strings) so cloning a request — and the +/// `BTreeSet` stored on `TableScan`, which is cloned with +/// the plan during optimization — stays cheap. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum StatisticsRequest { /// Smallest non-null value of `column`. - Min(Column), + Min(Arc), /// Largest non-null value of `column`. - Max(Column), + Max(Arc), /// Number of NULLs in `column`. - NullCount(Column), + NullCount(Arc), /// Number of distinct values in `column` (exact or estimated). - DistinctCount(Column), + DistinctCount(Arc), /// Sum of values in `column` (numerics, widened per /// `ColumnStatistics::sum_value`). - Sum(Column), + Sum(Arc), /// Encoded/output byte size of `column`. - ByteSize(Column), + ByteSize(Arc), /// Number of rows in the container (table / file). RowCount, /// Total byte size of the container's output. From 12963f395a08c630cc9e416a04175176d67fb410 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 26 May 2026 13:28:32 -0500 Subject: [PATCH 11/11] style: strip trailing whitespace in TableScan doc comment Leftover trailing whitespace from a previously applied review suggestion; removed by cargo fmt. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/expr/src/logical_plan/plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 97318dc4da633..e7e03bcac5150 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2796,7 +2796,7 @@ pub struct TableScan { pub fetch: Option, /// Statistics the planner would like the provider to answer for this /// scan, typically attached by a custom optimizer rule from the - /// surrounding plan (e.g. Min/Max for sort keys). + /// surrounding plan (e.g. Min/Max for sort keys). /// /// A [`BTreeSet`], not a `Vec` to keep the resulting plan deterministic. pub statistics_requests: BTreeSet,