feat: enable pickling for Python aggregate and window UDFs#1545
Conversation
d0baeb6 to
3226978
Compare
Extends the PythonLogicalCodec / PythonPhysicalCodec inline encoding
introduced for scalar UDFs to also cover Python-defined aggregate and
window UDFs. The cloudpickle tuple shape per family is:
DFPYUDA (agg) (name, accumulator_factory, input_schema_bytes,
return_schema_bytes, state_schema_bytes,
volatility_str)
DFPYUDW (window) (name, evaluator_factory, input_schema_bytes,
return_schema_bytes, volatility_str)
Same wire-framing as scalar (family magic + version byte + cloudpickle
blob), same schema serde (arrow-rs native IPC), same cached cloudpickle
handle. The agg state schema is encoded as a full IPC schema so the
post-decode UDF reports the same names + nullability + metadata as the
sender — relevant for accumulators whose StateFieldsArgs consumers key
off names rather than positional DataType.
Required restructuring two existing UDF impls so the codec can grab
the Python callable directly:
* udaf.rs: replaces create_udaf + AccumulatorFactoryFunction closure
with a named PythonFunctionAggregateUDF that stores the Py<PyAny>
accumulator factory. Synthesizes state_{i} field names when the
Python constructor passes only Vec<DataType>; from_parts preserves
the full state schema on the decode side.
* udwf.rs: renames MultiColumnWindowUDF -> PythonFunctionWindowUDF,
drops the PartitionEvaluatorFactory PtrEq wrapper, stores the
Py<PyAny> evaluator directly. PartialEq and Hash get the same
pointer-identity fast path + debug-log exception handling already
on PythonFunctionScalarUDF.
User-facing surface:
* AggregateUDF.name and WindowUDF.name properties (parallel to the
ScalarUDF.name shipped in PR1).
* Existing UDAF/UDWF construction paths are unchanged.
The per-session with_python_udf_inlining toggle, sender-side context,
strict refusal, and user-guide docs land in PRs 3-4 of this series.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
3226978 to
d5bb146
Compare
Re-export `to_rust_accumulator`, `to_rust_partition_evaluator`, and `PythonFunctionWindowUDF` (with a `MultiColumnWindowUDF` alias) by promoting `udaf` and `udwf` to `pub mod` so prior downstream Rust consumers keep their API surface after the inline-encoding refactor. Adds an end-to-end window UDF pickle round-trip test that runs the decoded evaluator over a real session, mirroring the aggregate test. Documents the cloudpickle-based shipping behavior of Python aggregate and window UDFs in the user-guide aggregations and windows pages. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Extends the expression serialization/pickling codec so expressions that reference Python aggregate UDFs (UDAFs) and Python window UDFs (UDWFs) can round-trip via pickle.dumps(expr) / Expr.to_bytes() by inlining the Python factory callables (cloudpickled) into the wire payload, analogous to the existing scalar UDF path.
Changes:
- Add new codec “family” payloads for Python UDAFs (
DFPYUDA) and UDWFs (DFPYUDW) and implement encode/decode for both logical + physical codecs. - Refactor Rust UDAF/UDWF implementations to retain the underlying Python factory callable so the codec can downcast and inline it.
- Add/extend tests and user docs describing aggregate/window UDF serialization and behavior across processes.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| python/tests/test_pickle_expr.py | Adds round-trip + execution tests for pickling/bytes serialization of Python UDAFs/UDWFs. |
| python/datafusion/user_defined.py | Adds .name properties for AggregateUDF and WindowUDF wrappers (consistent with ScalarUDF). |
| python/datafusion/ipc.py | Updates IPC docs to reflect inline shipping for scalar/aggregate/window Python UDFs. |
| python/datafusion/expr.py | Updates Expr serialization/pickle documentation to include aggregate/window UDFs. |
| docs/source/user-guide/common-operations/windows.rst | Documents how to define UDWFs and how they serialize inline. |
| docs/source/user-guide/common-operations/aggregations.rst | Documents how to define UDAFs and how they serialize inline. |
| crates/core/src/udwf.rs | Replaces the prior erased window-UDF impl with a Python-callable-retaining impl to support inlining; adds getter. |
| crates/core/src/udaf.rs | Introduces a Python-callable-retaining aggregate-UDF impl to support inlining; adds getter. |
| crates/core/src/lib.rs | Makes udaf/udwf Rust modules public. |
| crates/core/src/codec.rs | Implements new inline wire families + encode/decode helpers for Python UDAFs/UDWFs and updates wire-header tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for s in states: | ||
| self._count += s[0].as_py() |
| decoded = pickle.loads(pickle.dumps(e)) # noqa: S301 | ||
|
|
||
| ctx = SessionContext() | ||
| df = ctx.from_pydict({"a": [1, 2, 3, 4, 5]}) |
| /// return type, state types schema, volatility). | ||
| pub(crate) const PY_AGG_UDF_FAMILY: &[u8] = b"DFPYUDA"; | ||
|
|
||
| /// Family prefix for an inlined Python window UDF | ||
| /// (cloudpickled tuple of name, evaluator factory, input schema, | ||
| /// return type, volatility). |
| // Shared Python aggregate UDF encode / decode helpers | ||
| // | ||
| // Cloudpickle tuple shape: `(name, accumulator_factory, input_schema_bytes, | ||
| // return_type_bytes, state_schema_bytes, volatility_str)`. The accumulator |
| #[allow(clippy::borrow_deref_ref)] | ||
| mod udaf; | ||
| pub mod udaf; | ||
| #[allow(clippy::borrow_deref_ref)] | ||
| mod udf; | ||
| pub mod udtf; | ||
| mod udwf; | ||
| pub mod udwf; | ||
|
|
mailmindlin
left a comment
There was a problem hiding this comment.
Looks good, contingent on the copilot stuff
| // Python callable that produces a new evaluator instance per partition. | ||
| // ============================================================================= | ||
|
|
||
| pub(crate) fn try_encode_python_window_udf(node: &WindowUDF, buf: &mut Vec<u8>) -> Result<bool> { |
There was a problem hiding this comment.
Is there a reason why you call these udaf/udwf some places but all the methods are agg_udf/window_udf?
There was a problem hiding this comment.
Yes. Some were based on hand written code and some the agent generated mostly from scratch. Then I didn't care enough to make the naming consistent, but I'll update it.
- Fix CountAcc.merge in pickle test: sum over states[0] (partition counts), not over the list of state fields. The prior implementation only added partition 0's count when merging across partitions. - Drive test_agg_udf_evaluates_after_roundtrip with a two-batch DataFrame so merge actually runs and the round-tripped state-field schema is exercised end-to-end. - Correct PY_AGG_UDF_FAMILY / PY_WINDOW_UDF_FAMILY doc comments and the aggregate block comment to reference "return schema bytes" rather than "return type" / "return_type_bytes" so the docs match the actual on-wire layout. - Keep `udaf` and `udwf` modules private (matching `udf`) and selectively re-export the helpers downstream Rust consumers rely on (`to_rust_accumulator`, `to_rust_partition_evaluator`, `PythonFunctionWindowUDF`, `MultiColumnWindowUDF`) instead of exposing the whole module surface. - Rename codec helpers `*_agg_udf` -> `*_udaf` and `*_window_udf` -> `*_udwf` for naming consistency with the Python public aliases. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Which issue does this PR close?
Addresses part of #1517
This is PR 2 of 4. The four PRs stack sequentially on top of this one; subsequent PRs target this branch's tip until it merges.
Follow up PRs:
Rationale for this change
PR 1 closed the round-trip encoding and decoding of scalar UDFs. The same expression problem applies to Python aggregate and window UDFs: their accumulator / partition-evaluator factory is a Python callable, so a receiver that only has the UDF name cannot reconstruct one. This PR extends the inline-encoding mechanism so the natural
pickle.dumps(expr)pattern also works for expressions referencing Python UDAFs and UDWFs.What changes are included in this PR?
Codec extension is a straight parallel of the scalar path. New wire-format families:
DFPYUDA(name, accumulator_factory, input_schema_bytes, return_schema_bytes, state_schema_bytes, volatility_str)DFPYUDW(name, evaluator_factory, input_schema_bytes, return_schema_bytes, volatility_str)Are there any user-facing changes?
The
MultiColumnWindowUDFrename is a Rust-side breaking change, so addingapi changeeven though no Python-facing API breaks.