Skip to content

Investigate adopting DataFusion's allocator-level memory accounting to replace manual memory tuning #4576

@andygrove

Description

@andygrove

What is the problem the feature request solves?

Comet's native memory accounting relies on DataFusion's MemoryPool, which is voluntary: it only counts bytes that an operator explicitly reserves via reserve() / try_grow(). Allocations that bypass the pool (Arrow buffers built during predicate evaluation, intermediate scratch in joins, allocations inside expression kernels) are invisible to the accounting. Comet's CometUnifiedMemoryPool forwards these reservations across JNI to Spark's TaskMemoryManager, so the undercount propagates up to Spark as well.

This is a known, documented limitation. From the tuning guide:

Comet's memory accounting isn't 100% accurate and this can result in Comet using more memory than it reserves, leading to out-of-memory exceptions. To work around this issue, it is possible to set spark.comet.exec.memoryPool.fraction to a value less than 1.0...

spark.comet.exec.memoryPool.fraction is effectively a manual fudge factor that compensates for an unknown undercount margin, tuned per workload. We would like to move away from this manual and unreliable accounting toward something backed by real, measured memory usage.

The Comet roadmap already calls this out:

Improving memory accounting, reservation strategies, and spill integration will reduce out-of-memory errors and allow Comet to make better use of available resources, especially in multi-query and multi-task environments.

Describe the potential solution

Two complementary mechanisms, both backed by real allocator stats rather than voluntary reservations:

1. Online accounting / observability

DataFusion PR apache/datafusion#22626 introduces allocator-level accounting that measures actual heap usage independent of voluntary pool reservations. Comet already does the offline version of exactly this: native/common/src/bin/analyze_trace.rs "compares jemalloc usage against the sum of per-thread Comet memory pool reservations [and] reports any points where jemalloc exceeds the total pool size." The proposal is to make this an online signal so the divergence can drive behavior (and ultimately retire the memoryPool.fraction heuristic).

2. A process-level RSS circuit breaker (OomGuard)

Wrap the global allocator to maintain a running estimate of memory headroom, periodically resynced from real resident memory. The proposed design has a few notable properties:

  • A global-allocator wrapper tracks alloc/dealloc deltas. Each allocation debits and each free credits a running balance. To avoid contention, the balance is kept per-thread and only settled into a shared atomic bank when per-thread drift crosses a threshold (e.g. 64 KB), so the common path is a thread-local add with no atomics.
  • The fast path is free when disarmed. A single relaxed atomic load gates the whole mechanism, so until the guard is explicitly armed (and on any unstamped thread) the wrapper costs effectively nothing versus the inner allocator.
  • Ground truth comes from the allocator's resident stat, not the running tally. A periodic poll resyncs the bank to threshold - resident each tick (jemalloc's resident, refreshed via its epoch). The per-alloc tracking only estimates drift between ticks; the poll drags it back to truth so dealloc-credit slop cannot accumulate.
  • Only "stamped" worker threads enforce. Query-worker threads are marked at runtime construction (on_thread_start); control-plane and the poll task itself stay unstamped so they never self-panic and can keep running to recover the balance after a query unwinds.
  • Enforcement is a typed panic caught at the task boundary. When a stamped thread allocates while over threshold, it panics with a typed payload carrying the overdraft amount. The panic is caught at the task boundary and converted into a recoverable ResourcesExhausted error rather than letting the process die. Credits (frees) never fire the kill, and a re-entrant panic during unwinding is suppressed to avoid a double-fault.
  • The threshold is derived from the memory budget, typically a high-water fraction (e.g. ~95%) of the container/cgroup memory limit.

The value for Comet: today, when Comet undercounts and the executor exceeds its container memory budget, YARN/k8s SIGKILLs the entire executor JVM, taking down all tasks and cached data and forcing stage retries. A guard that instead fails a single Comet task with a retriable exception is a strict improvement over a whole-executor kill. This is a safety net, complementary to graceful spill (see below), not a replacement for it.

Additional context

Comet already ships nearly every building block this needs, which makes a prototype low-cost:

  • jemalloc stats are already a dependency and already used. tikv-jemallocator + tikv-jemalloc-ctl (with the stats feature) are optional global allocators (native/core/Cargo.toml:41-43, native/core/src/lib.rs:44-88), and log_jemalloc_usage() already reads epoch/stats (native/core/src/execution/jni_api.rs:124). mimalloc is the other option (it exposes mi_process_info for the equivalent stats).
  • The offline gap analysis already exists (native/common/src/bin/analyze_trace.rs); this proposal is the online version of it.
  • The panic-across-JNI hazard is already handled. Every JNI entry uses try_unwrap_or_throw, and the streaming task is wrapped in catch_unwind (native/core/src/execution/jni_api.rs:777-784). The guard panic would unwind only within the native call and be caught there, then thrown as a Spark exception — ideally one Spark treats as a retriable task failure.
  • There is a clean stamp hook point. Comet builds its own multi-thread tokio runtime in build_runtime (native/core/src/execution/jni_api.rs:192-209), so worker threads can be stamped via on_thread_start.

Open questions specific to running inside a Spark executor JVM (rather than a single-process service):

  • jemalloc resident covers only native allocations, not the JVM heap. It reflects the native side only, which is arguably desirable here — the thing that unexpectedly blows the off-heap/container budget is native allocation — but the threshold must be derived from Comet's allotted native budget (off-heap size × fraction, or memoryOverhead), not the pod cgroup limit, and the container sizing must still account for JVM heap + native together.
  • Multi-task granularity. A Spark executor runs many tasks concurrently, while the guard tracks a single process-wide balance. A panic kills whichever stamped thread allocates while over-threshold, not necessarily the task that is the memory hog. Any task failing relieves pressure, but fairness vs. Comet's per-task TASK_SHARED_MEMORY_POOLS granularity needs thought.
  • Relationship to spill. This guard is a last-resort circuit breaker. The graceful path is spill, which depends on closing the native-spill-from-JVM gap (NativeMemoryConsumer.spill() currently returns 0). The two are complementary.
  • mimalloc vs jemalloc default. The stats path is wired for jemalloc; if mimalloc is the default build, the guard would either require the jemalloc feature or use mimalloc's process-info stats.
  • API stability of the upstream PR. #22626 predates significant churn in DataFusion's memory subsystem; verify the current API surface before depending on it.

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions