Skip to content

test: Add local worker execution and statistics testing infrastructure#6697

Draft
samstokes wants to merge 5 commits intomainfrom
claude/review-distributed-tests-L6JMj
Draft

test: Add local worker execution and statistics testing infrastructure#6697
samstokes wants to merge 5 commits intomainfrom
claude/review-distributed-tests-L6JMj

Conversation

@samstokes
Copy link
Copy Markdown
Contributor

Changes Made

This PR adds comprehensive testing infrastructure for the distributed execution pipeline, focusing on statistics collection and aggregation:

New Components

  1. LocalSwordfishWorker (src/daft-distributed/src/scheduling/local_worker.rs)

    • In-process task execution without requiring a Ray cluster
    • Implements the Worker and WorkerManager traits for local execution
    • Executes tasks via the local execution pipeline, producing genuine ExecutionStats
    • Enables testing of real statistics collection without distributed infrastructure
  2. Statistics Tests (src/daft-distributed/src/statistics/tests.rs)

    • End-to-end tests for statistics aggregation across pipeline nodes
    • test_sort_single_partition_stats: Validates Sort node correctly reports rows and duration
    • test_filter_stats_aggregation: Verifies Filter node selectivity calculation (60% for x > 2 on [1,2,3,4,5])
    • test_filter_stats_multi_partition: Tests stats aggregation across multiple partitions (3 partitions with 9 total rows, 3 passing filter)
    • Helper functions for building test pipelines (Sort, Filter) and extracting/running tasks
  3. Local Plan Execution API (src/daft-local-execution/src/run.rs)

    • New execute_local_plan() function for running plans in-process
    • Simplifies testing by avoiding full NativeExecutor machinery
    • Returns both output partitions and execution statistics

Modifications

  • src/daft-context/src/lib.rs: Implement get_context() for non-Python builds to support testing
  • src/daft-distributed/src/pipeline_node/mod.rs: Export filter and in_memory_source modules for test access
  • src/daft-distributed/src/scheduling/scheduler/scheduler_actor.rs: Add into_task() method for test extraction
  • src/daft-distributed/src/plan/runner.rs: Make PlanExecutionContext::new() public for testing
  • src/daft-distributed/src/scheduling/mod.rs: Conditionally export local_worker module for tests
  • src/daft-distributed/src/statistics/mod.rs: Add test module
  • src/daft-local-execution/src/lib.rs: Export execute_local_plan function
  • src/daft-distributed/Cargo.toml: Add dev dependencies (arrow, daft-core, daft-local-execution)

Test Coverage

The new tests validate:

  • Correct row counting and duration reporting for Sort operations
  • Filter selectivity calculation and row filtering accuracy
  • Statistics aggregation across multiple partitions
  • Integration between task execution, statistics collection, and manager export

All tests use the local worker infrastructure to execute real local plans and collect genuine execution statistics.

https://claude.ai/code/session_01FSdbkeidSkPZNWT3supLK9

Add tests that verify the full interaction between distributed plan
node task production, local execution stats emission, and centralized
aggregation through the StatisticsManager.

Key changes:
- Add `LocalSwordfishWorker`: a Worker implementation that executes
  SwordfishTasks in-process via the local execution pipeline, producing
  real ExecutionStats with genuine StatSnapshot data.
- Add `execute_local_plan()` public API to daft-local-execution for
  running a LocalPhysicalPlan and getting results + stats.
- Fix `get_context()` for non-Python builds to return a default
  DaftContext instead of panicking.
- Add `with_execution_stats()` to MockTaskBuilder for richer mock tasks.
- Make `PlanExecutionContext::new()` pub(crate) for test access.

Tests cover:
- Sort single-partition stats: verifies phase-based filtering where
  only FINAL_SORT_PHASE Default snapshots contribute row counts
- Filter stats with selectivity: verifies FilterSnapshot aggregation
  with correct rows_in/out and selectivity percentage
- Filter multi-partition: verifies correct aggregation across multiple
  tasks (partitions)

All tests use produce_tasks() on real DistributedPipelineNode trees,
ensuring the local plans come from actual pipeline node code rather
than being hand-constructed.

https://claude.ai/code/session_01FSdbkeidSkPZNWT3supLK9
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 15, 2026

Rust Dependency Diff

Head: 291c5e52715f987f78c2e358c65192422af60e48 vs Base: f7d061a66f4213cb93316564df89fca7a9153e00.

OK: Within budget.

  • New Crates: 0
  • Removed Crates: 0

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 15, 2026

Codecov Report

❌ Patch coverage is 88.43750% with 74 lines in your changes missing coverage. Please review.
✅ Project coverage is 75.62%. Comparing base (d48924c) to head (28fb0de).
⚠️ Report is 10 commits behind head on main.

Files with missing lines Patch % Lines
...rc/daft-distributed/src/scheduling/local_worker.rs 75.67% 45 Missing ⚠️
src/daft-distributed/src/pipeline_node/sort.rs 70.37% 8 Missing ⚠️
src/daft-local-execution/src/run.rs 87.69% 8 Missing ⚠️
src/daft-distributed/src/statistics/tests.rs 97.74% 7 Missing ⚠️
src/daft-distributed/src/scheduling/task.rs 55.55% 4 Missing ⚠️
src/daft-local-execution/src/shuffle_metadata.rs 84.61% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #6697      +/-   ##
==========================================
+ Coverage   75.02%   75.62%   +0.59%     
==========================================
  Files        1067     1081      +14     
  Lines      147155   149729    +2574     
==========================================
+ Hits       110408   113227    +2819     
+ Misses      36747    36502     -245     
Files with missing lines Coverage Δ
src/daft-context/src/lib.rs 88.72% <100.00%> (+2.09%) ⬆️
src/daft-distributed/src/pipeline_node/mod.rs 77.89% <ø> (+19.67%) ⬆️
src/daft-distributed/src/plan/runner.rs 29.05% <100.00%> (+23.07%) ⬆️
...ibuted/src/scheduling/scheduler/scheduler_actor.rs 92.77% <100.00%> (+2.72%) ⬆️
src/daft-distributed/src/statistics/mod.rs 94.18% <ø> (+48.83%) ⬆️
src/daft-local-execution/src/lib.rs 78.35% <ø> (+1.49%) ⬆️
src/daft-local-execution/src/sinks/repartition.rs 42.04% <100.00%> (+42.04%) ⬆️
src/daft-local-execution/src/shuffle_metadata.rs 28.20% <84.61%> (+28.20%) ⬆️
src/daft-distributed/src/scheduling/task.rs 84.49% <55.55%> (+14.87%) ⬆️
src/daft-distributed/src/statistics/tests.rs 97.74% <97.74%> (ø)
... and 3 more

... and 97 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Extend the distributed stats integration tests with a multi-partition
sort test that exercises all three sort phases (sample, repartition,
final-sort) through real execution, verifying that SortStats correctly
filters stats by phase to avoid double-counting.

To enable this without Python/Ray:
- Implement pure-Rust get_partition_boundaries_from_samples using
  MicroPartition::sort() + quantiles() (non-Python cfg)
- Fix RepartitionSink non-Python finalize to produce partitions with
  data instead of panicking
- Add ShufflePartitionMetadata.data field for non-Python mode to carry
  actual partition data (replacing ray.put() object refs)
- Update execute_local_plan to return LocalPlanOutput enum handling
  both regular partitions and shuffle output
- Update LocalSwordfishWorker to convert shuffle output into
  TaskOutput::ShuffleWrite with proper ShufflePartitionRef::Ray entries

The test creates a 3-partition sort pipeline, calls produce_tasks()
which drives the sort node's real execution_loop through all phases,
and verifies the aggregated stats show rows only from the final-sort
phase while duration spans all phases.

https://claude.ai/code/session_01FSdbkeidSkPZNWT3supLK9
@samstokes samstokes changed the title Add local worker execution and statistics testing infrastructure test: Add local worker execution and statistics testing infrastructure Apr 16, 2026
@github-actions github-actions bot added the test label Apr 16, 2026
claude added 3 commits April 16, 2026 23:28
…exports

Revert filter, in_memory_source, and sort modules to private visibility.
Instead, add #[cfg(test)] re-exports of FilterNode, InMemorySourceNode,
and SortNode from the pipeline_node module so test code can access them
without widening the production API surface.

Also remove unused imports from the test file.

https://claude.ai/code/session_01FSdbkeidSkPZNWT3supLK9
Add comments to all #[cfg(not(feature = "python"))] paths that were
added or modified to clarify they are only exercised by Rust-only
test runs (cargo test). Production always enables the python feature.

Affected paths:
- daft-context get_context() non-Python impl
- sort get_partition_boundaries_from_samples non-Python impl
- RepartitionSink non-Python finalize path
- ShufflePartitionMetadata.data field
- execute_local_plan / LocalPlanOutput

https://claude.ai/code/session_01FSdbkeidSkPZNWT3supLK9
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants