Skip to content

feat: add Comet CachedBatchSerializer for native in-memory cache#4569

Draft
andygrove wants to merge 20 commits into
apache:mainfrom
andygrove:comet-cached-batch-serializer
Draft

feat: add Comet CachedBatchSerializer for native in-memory cache#4569
andygrove wants to merge 20 commits into
apache:mainfrom
andygrove:comet-cached-batch-serializer

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #.

Rationale for this change

When a DataFrame or table is cached (df.cache() / CACHE TABLE), Spark's DefaultCachedBatchSerializer stores each column in Spark's compressed columnar format. Comet does not treat InMemoryTableScanExec as native, so it inserts a CometSparkToColumnarExec above it and pays a JVM-to-Arrow conversion on every read of the cached data:

cached (compressed) -> decompress to Spark ColumnarBatch -> convert to Arrow -> native

That conversion runs on every scan, which undercuts the benefit of caching for native pipelines. This PR lets Comet store the cache as compressed Arrow IPC once, at cache-build time, so repeated scans feed native execution directly with no per-read conversion. This is the same approach used by other columnar Spark accelerators.

What changes are included in this PR?

A new CometCachedBatchSerializer (plugged into Spark's spark.sql.cache.serializer) that:

  • Encodes each cached batch to compressed Arrow IPC (reusing Comet's existing serializeBatches/decodeBatches), storing the bytes plus a Spark-format per-column stats row.
  • Extends Spark's SimpleMetricsCachedBatchSerializer, so batch-level partition pruning (buildFilter) works using the computed min/max/null/count stats.
  • Decodes back to CometVector-backed ColumnarBatch on read, with column pruning and an InternalRow fallback for non-Comet consumers.
  • Delegates transparently to Spark's DefaultCachedBatchSerializer for schemas it does not support (nested/complex types), so it is a safe drop-in.

Supporting changes:

  • CometSparkToColumnarExec gains a passthrough fast-path: batches whose columns are already CometVector are forwarded without a re-copy (with a numPassthroughBatches metric).
  • CometDriverPlugin installs the serializer at startup when the new spark.comet.cache.serializer.enabled config (default off) is set, respecting any user-provided serializer. The Spark property is a static config, so it must be set before the session is created.
  • New config spark.comet.cache.serializer.enabled (default off).

Supported flat types: boolean, integral, floating point, decimal, string, binary, date, timestamp, timestamp_ntz. Nested types delegate. Off by default.

How are these changes tested?

New tests:

  • CometCachedBatchSerializerSuite: stats-row layout; build path (compressed IPC + stats); decode round-trip; column pruning; the columnar read path (identity and pruned projections); the CometSparkToColumnarExec passthrough metric; a regression test that string min/max stats survive encoding (they are copied off the Arrow buffer); and end-to-end tests for cached-vs-uncached correctness, filtered pruning on numeric and string columns, MEMORY_AND_DISK spill, array-type delegation, and timestamp_ntz value round-tripping.
  • CometPluginsSuite: the driver plugin installs the serializer only when enabled and never overrides a user-provided non-default serializer.

Verified compiling and passing on Spark 3.4, 3.5, and 4.x profiles.

andygrove added 20 commits June 2, 2026 08:13
Add TimestampNTZType alongside TimestampType in isCometType, readValue,
CometCacheColumnStats.ordered, and CometCacheColumnStats.compare so that
timestamp-without-timezone columns are cached natively.

Remove the unnecessary .asInstanceOf[CachedBatch] cast from encode (Iterator
covariance makes it redundant), drop the dead val attrs = schema aliases in
both build methods, and add a lifecycle comment on encodeBytes documenting
that stats must be computed before serialization clears the VectorSchemaRoot.

Strengthen the build test to use coalesce(1) for deterministic batch count
(exactly 3 batches for 250 rows at batch size 100) and assert real stat
values: minimum lowerBound of the id column is 0 and nullCount is 0.
Add a test for convertCachedBatchToColumnarBatch exercising both the
identity-projection passthrough (full columns) and pruned projection.
Improve selectedIndices to throw IllegalStateException with a diagnostic
message instead of a raw map key lookup. Update the isIdentityProjection
comment to state both conditions explicitly.
…hout copy

When CometSparkToColumnarExec receives batches that are already Arrow
(all columns are CometVector, e.g. from CometCachedBatchSerializer),
skip the columnarBatchToArrowBatchIter re-copy and pass the batch
through directly. Adds a numPassthroughBatches metric to track when
the fast-path fires.
Wire COMET_CACHE_SERIALIZER_ENABLED to CometDriverPlugin.init so the
static spark.sql.cache.serializer config is set on the SparkConf at
SparkContext startup. A user-provided non-default serializer is
respected and not overridden.
…g bounds

In CometCachedBatchSerializer.readValue, the StringType case previously
returned col.getUTF8String(r) directly, which is a view into the Arrow
value buffer (backed by UTF8String.fromAddress). The stats accumulator
stored these views as lowerBound/upperBound, then encodeBytes called
serializeBatches which clears the VectorSchemaRoot and releases those
Arrow buffers. The stored string bounds then dangled, causing
SimpleMetricsCachedBatchSerializer.buildFilter to prune batches using
garbage stats, resulting in missing rows on filtered cached string scans.

Fix by calling .copy() to materialize a heap copy before the buffer is
freed. Add regression tests: one verifying stats survive encode, one
verifying an equality filter on a cached string column returns correct rows.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant