Skip to content
8 changes: 7 additions & 1 deletion .ai/skills/check-upstream/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,17 @@ The user may specify an area via `$ARGUMENTS`. If no area is specified or "all"
- Python API: `python/datafusion/functions.py` — each function wraps a call to `datafusion._internal.functions`
- Rust bindings: `crates/core/src/functions.rs` — `#[pyfunction]` definitions registered via `init_module()`

**Evaluated and not requiring separate Python exposure:**
- `get_field_path` — already covered by `get_field(expr, *names)`, which takes a
variadic field path and dispatches to the same underlying
`functions::core::get_field` UDF as the upstream `get_field_path` helper.

**How to check:**
1. Fetch the upstream scalar function documentation page
2. Compare against functions listed in `python/datafusion/functions.py` (check the `__all__` list and function definitions)
3. A function is covered if it exists in the Python API — it does NOT need a dedicated Rust `#[pyfunction]`. Many functions are aliases that reuse another function's Rust binding.
4. Only report functions that are missing from the Python `__all__` list / function definitions
4. Check against the "evaluated and not requiring exposure" list before flagging as a gap
5. Only report functions that are missing from the Python `__all__` list / function definitions

### 2. Aggregate Functions

Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

repos:
- repo: https://github.com/rhysd/actionlint
rev: v1.7.6
rev: v1.7.12
hooks:
- id: actionlint-docker
- repo: https://github.com/astral-sh/ruff-pre-commit
Expand Down
42 changes: 41 additions & 1 deletion crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::execution::TaskContextProvider;
use datafusion::execution::context::{
DataFilePaths, SQLOptions, SessionConfig, SessionContext, TaskContext,
};
Expand All @@ -44,6 +43,7 @@ use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, Unboun
use datafusion::execution::options::{ArrowReadOptions, ReadOptions};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::execution::{FunctionRegistry, TaskContextProvider};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, DataFrame, JsonReadOptions, ParquetReadOptions,
};
Expand Down Expand Up @@ -847,6 +847,13 @@ impl PySessionContext {
Ok(())
}

pub fn read_batches(
&self,
batches: PyArrowType<Vec<RecordBatch>>,
) -> PyDataFusionResult<PyDataFrame> {
Ok(PyDataFrame::new(self.ctx.read_batches(batches.0)?))
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (name, path, table_partition_cols=vec![],
parquet_pruning=true,
Expand Down Expand Up @@ -1065,6 +1072,39 @@ impl PySessionContext {
self.ctx.deregister_udwf(name);
}

pub fn udf(&self, name: &str) -> PyDataFusionResult<PyScalarUDF> {
let function = (*self.ctx.udf(name)?).clone();
Ok(PyScalarUDF { function })
}

pub fn udaf(&self, name: &str) -> PyDataFusionResult<PyAggregateUDF> {
let function = (*self.ctx.udaf(name)?).clone();
Ok(PyAggregateUDF { function })
}

pub fn udwf(&self, name: &str) -> PyDataFusionResult<PyWindowUDF> {
let function = (*self.ctx.udwf(name)?).clone();
Ok(PyWindowUDF { function })
}

pub fn udfs(&self) -> Vec<String> {
let mut names: Vec<String> = self.ctx.udfs().into_iter().collect();
names.sort();
names
}

pub fn udafs(&self) -> Vec<String> {
let mut names: Vec<String> = self.ctx.udafs().into_iter().collect();
names.sort();
names
}

pub fn udwfs(&self) -> Vec<String> {
let mut names: Vec<String> = self.ctx.udwfs().into_iter().collect();
names.sort();
names
}

#[pyo3(signature = (name="datafusion"))]
pub fn catalog(&self, py: Python, name: &str) -> PyResult<Py<PyAny>> {
let catalog = self.ctx.catalog(name).ok_or(PyKeyError::new_err(format!(
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,10 @@ expr_fn!(union_tag, arg1);
expr_fn!(random);

#[pyfunction]
fn get_field(expr: PyExpr, name: PyExpr) -> PyExpr {
functions::core::get_field()
.call(vec![expr.into(), name.into()])
.into()
fn get_field(expr: PyExpr, names: Vec<PyExpr>) -> PyExpr {
let mut args = vec![expr.into()];
args.extend(names.into_iter().map(Into::into));
functions::core::get_field().call(args).into()
}

#[pyfunction]
Expand Down
5 changes: 2 additions & 3 deletions examples/datafusion-ffi-example/src/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

use std::sync::Arc;

use datafusion_catalog::{TableFunctionImpl, TableProvider};
use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider};
use datafusion_common::error::Result as DataFusionResult;
use datafusion_expr::Expr;
use datafusion_ffi::udtf::FFI_TableFunction;
use datafusion_python_util::ffi_logical_codec_from_pycapsule;
use pyo3::types::PyCapsule;
Expand Down Expand Up @@ -59,7 +58,7 @@ impl MyTableFunction {
}

impl TableFunctionImpl for MyTableFunction {
fn call(&self, _args: &[Expr]) -> DataFusionResult<Arc<dyn TableProvider>> {
fn call_with_args(&self, _args: TableFunctionArgs) -> DataFusionResult<Arc<dyn TableProvider>> {
let provider = MyTableProvider::new(4, 3, 2).create_table()?;
Ok(Arc::new(provider))
}
Expand Down
204 changes: 200 additions & 4 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,16 @@

import pandas as pd
import polars as pl # type: ignore[import]
from _typeshed import CapsuleType as _PyCapsule

from datafusion.catalog import CatalogProvider, Table
from datafusion.common import DFSchema
from datafusion.expr import Expr, SortKey
from datafusion.plan import ExecutionPlan, LogicalPlan
from datafusion.user_defined import (
AggregateUDF,
LogicalExtensionCodecExportable,
PhysicalExtensionCodecExportable,
ScalarUDF,
TableFunction,
WindowUDF,
Expand Down Expand Up @@ -959,6 +962,45 @@ def register_record_batches(
"""
self.ctx.register_record_batches(name, partitions)

def read_batch(self, batch: pa.RecordBatch) -> DataFrame:
"""Return a :py:class:`~datafusion.DataFrame` reading a single batch.

Convenience wrapper around :py:meth:`read_batches` for the single-batch
case. Unlike :py:meth:`register_batch`, this does not register the
batch as a named table; it returns an anonymous
:py:class:`~datafusion.DataFrame` directly.

Args:
batch: Record batch to wrap as a DataFrame.

Examples:
>>> ctx = dfn.SessionContext()
>>> batch = pa.RecordBatch.from_pydict({"a": [1, 2, 3]})
>>> ctx.read_batch(batch).to_pydict()
{'a': [1, 2, 3]}
"""
return self.read_batches([batch])

def read_batches(self, batches: list[pa.RecordBatch]) -> DataFrame:
"""Return a :py:class:`~datafusion.DataFrame` reading the given batches.

All batches must share the same schema. Unlike
:py:meth:`register_record_batches`, this does not register the batches
as a named table; it returns an anonymous
:py:class:`~datafusion.DataFrame` directly.

Args:
batches: Record batches to wrap as a DataFrame.

Examples:
>>> ctx = dfn.SessionContext()
>>> b1 = pa.RecordBatch.from_pydict({"a": [1, 2]})
>>> b2 = pa.RecordBatch.from_pydict({"a": [3, 4]})
>>> ctx.read_batches([b1, b2]).to_pydict()
{'a': [1, 2, 3, 4]}
"""
return DataFrame(self.ctx.read_batches(batches))

def register_parquet(
self,
name: str,
Expand Down Expand Up @@ -1268,6 +1310,152 @@ def deregister_udwf(self, name: str) -> None:
"""
self.ctx.deregister_udwf(name)

def udf(self, name: str) -> ScalarUDF:
"""Look up a registered scalar UDF by name.

Returns the same :py:class:`~datafusion.user_defined.ScalarUDF`
wrapper that :py:meth:`register_udf` accepts, so it can be invoked
as an expression in the DataFrame API or re-registered into a
different :py:class:`SessionContext`. Built-in scalar functions
from the session's function registry are also looked up.

Args:
name: Name of the registered scalar UDF.

Raises:
Exception: If no scalar UDF is registered under ``name``.

Examples:
Register a UDF, then look it up by name and use it in the
DataFrame API:

>>> ctx = dfn.SessionContext()
>>> nullcheck = dfn.udf(
... lambda x: x.is_null(),
... [pa.int64()],
... pa.bool_(),
... volatility="immutable",
... name="nullcheck",
... )
>>> ctx.register_udf(nullcheck)
>>> fn = ctx.udf("nullcheck")
>>> df = ctx.from_pydict({"a": [1, None, 3]})
>>> df.select(fn(col("a")).alias("is_null")).to_pydict()
{'is_null': [False, True, False]}

Late-binding: the function name can come from configuration
rather than an imported symbol, which is useful when the set
of UDFs is plugin-driven or chosen at runtime:

>>> config = {"null_check": "nullcheck"}
>>> fn = ctx.udf(config["null_check"])
>>> df.select(fn(col("a")).alias("is_null")).to_pydict()
{'is_null': [False, True, False]}
"""
from datafusion.user_defined import ScalarUDF as _ScalarUDF # noqa: PLC0415

wrapper = _ScalarUDF.__new__(_ScalarUDF)
wrapper._udf = self.ctx.udf(name)
return wrapper

def udaf(self, name: str) -> AggregateUDF:
"""Look up a registered aggregate UDF by name.

Returns the same :py:class:`~datafusion.user_defined.AggregateUDF`
wrapper that :py:meth:`register_udaf` accepts. Built-in aggregate
functions such as ``sum`` or ``avg`` are also discoverable through
this lookup. See :py:meth:`udf` for a worked late-binding example;
the pattern is identical for aggregates.

Args:
name: Name of the registered aggregate UDF.

Raises:
Exception: If no aggregate UDF is registered under ``name``.

Examples:
Look up a built-in aggregate by name and use it in
:py:meth:`~datafusion.DataFrame.aggregate`:

>>> ctx = dfn.SessionContext()
>>> sum_fn = ctx.udaf("sum")
>>> df = ctx.from_pydict({"a": [1, 2, 3]})
>>> df.aggregate([], [sum_fn(col("a")).alias("total")]).to_pydict()
{'total': [6]}
"""
from datafusion.user_defined import ( # noqa: PLC0415
AggregateUDF as _AggregateUDF,
)

wrapper = _AggregateUDF.__new__(_AggregateUDF)
wrapper._udaf = self.ctx.udaf(name)
return wrapper

def udwf(self, name: str) -> WindowUDF:
"""Look up a registered window UDF by name.

Returns the same :py:class:`~datafusion.user_defined.WindowUDF`
wrapper that :py:meth:`register_udwf` accepts. Built-in window
functions such as ``row_number`` or ``rank`` are also discoverable
through this lookup. See :py:meth:`udf` for a worked late-binding
example; the pattern is identical for window functions.

Args:
name: Name of the registered window UDF.

Raises:
Exception: If no window UDF is registered under ``name``.

Examples:
Look up a built-in window function by name and use it in
``select``:

>>> ctx = dfn.SessionContext()
>>> rn = ctx.udwf("row_number")
>>> df = ctx.from_pydict({"a": [10, 20, 30]})
>>> df.select(col("a"), rn().alias("rn")).to_pydict()
{'a': [10, 20, 30], 'rn': [1, 2, 3]}
"""
from datafusion.user_defined import WindowUDF as _WindowUDF # noqa: PLC0415

wrapper = _WindowUDF.__new__(_WindowUDF)
wrapper._udwf = self.ctx.udwf(name)
return wrapper

def udfs(self) -> list[str]:
"""Return the sorted names of all registered scalar UDFs.

Includes both user-registered and built-in scalar functions. Pair
with :py:meth:`udf` to drive discovery, validation, or config-based
dispatch.

Examples:
>>> ctx = dfn.SessionContext()
>>> "abs" in ctx.udfs()
True
"""
return self.ctx.udfs()

def udafs(self) -> list[str]:
"""Return the sorted names of all registered aggregate UDFs.

Examples:
>>> ctx = dfn.SessionContext()
>>> "sum" in ctx.udafs()
True
"""
return self.ctx.udafs()

def udwfs(self) -> list[str]:
"""Return the sorted names of all registered window UDFs.

Examples:
>>> ctx = dfn.SessionContext()
>>> "row_number" in ctx.udwfs()
True
"""
return self.ctx.udwfs()

def catalog(self, name: str = "datafusion") -> Catalog:
"""Retrieve a catalog by name."""
return Catalog(self.ctx.catalog(name))
Expand Down Expand Up @@ -1744,11 +1932,15 @@ def __datafusion_logical_extension_codec__(self) -> Any:
"""Access the PyCapsule FFI_LogicalExtensionCodec."""
return self.ctx.__datafusion_logical_extension_codec__()

def with_logical_extension_codec(self, codec: Any) -> SessionContext:
def with_logical_extension_codec(
self, codec: LogicalExtensionCodecExportable | _PyCapsule
) -> SessionContext:
"""Create a new session context with specified codec.

This only supports codecs that have been implemented using the
FFI interface.
FFI interface. ``codec`` must either be a raw ``FFI_LogicalExtensionCodec``
``PyCapsule`` or an object exposing
``__datafusion_logical_extension_codec__``.
"""
new_internal = self.ctx.with_logical_extension_codec(codec)
new = SessionContext.__new__(SessionContext)
Expand All @@ -1759,11 +1951,15 @@ def __datafusion_physical_extension_codec__(self) -> Any:
"""Access the PyCapsule FFI_PhysicalExtensionCodec."""
return self.ctx.__datafusion_physical_extension_codec__()

def with_physical_extension_codec(self, codec: Any) -> SessionContext:
def with_physical_extension_codec(
self, codec: PhysicalExtensionCodecExportable | _PyCapsule
) -> SessionContext:
"""Create a new session context with the specified physical codec.

This only supports codecs that have been implemented using the
FFI interface.
FFI interface. ``codec`` must either be a raw
``FFI_PhysicalExtensionCodec`` ``PyCapsule`` or an object exposing
``__datafusion_physical_extension_codec__``.
"""
new_internal = self.ctx.with_physical_extension_codec(codec)
new = SessionContext.__new__(SessionContext)
Expand Down
Loading
Loading