Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,32 @@ private[codegen] object CometBatchKernelCodegenOutput {
*
* Scalars emit `perRow` only. Complex types emit both. Inner setup bubbles up so deep child
* casts land at the batch prelude.
*
* `nested` distinguishes the root output vector from a child of a List / Map / Struct.
* `allocateOutput` pre-sizes the root to exactly `numRows` and the kernel writes one scalar per
* row, so the root's fixed-width `set` is always in bounds. A child's element count is instead
* the data-dependent sum of per-row collection sizes, which `numRows` does not bound. We cannot
* pre-size the child either: each row's `ArrayData` / `MapData` is produced by Spark's
* generated `ev.code` inside the write loop, so the total is unknown until we have already
* evaluated every row (counting it first would mean evaluating the tree twice). Nested
* fixed-width writes therefore grow on demand with `setSafe`; the String / Binary / Decimal
* branches already do, for the same reason.
*/
private def emitWrite(
targetVec: String,
idx: String,
source: String,
dataType: DataType,
ctx: CodegenContext): OutputEmit = dataType match {
ctx: CodegenContext,
nested: Boolean = false): OutputEmit = dataType match {
case BooleanType =>
OutputEmit("", s"$targetVec.set($idx, $source ? 1 : 0);")
val set = if (nested) "setSafe" else "set"
OutputEmit("", s"$targetVec.$set($idx, $source ? 1 : 0);")
case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | DateType |
TimestampType | TimestampNTZType =>
// Spark codegen emits the matching primitive Java type; Arrow `set` overloads accept it.
OutputEmit("", s"$targetVec.set($idx, $source);")
val set = if (nested) "setSafe" else "set"
OutputEmit("", s"$targetVec.$set($idx, $source);")
case dt: DecimalType =>
// DecimalOutputShortFastPath: precision <= 18 fits in a signed long, so pass the unscaled
// value to `setSafe(int, long)` and skip the BigDecimal allocation.
Expand Down Expand Up @@ -250,7 +263,8 @@ private[codegen] object CometBatchKernelCodegenOutput {
val childIdx = ctx.freshName("cidx")
val jVar = ctx.freshName("j")
val elemSource = emitSpecializedGetterExpr(arrVar, jVar, elementType)
val inner = emitWrite(childVar, s"$childIdx + $jVar", elemSource, elementType, ctx)
val inner =
emitWrite(childVar, s"$childIdx + $jVar", elemSource, elementType, ctx, nested = true)
val setup =
(s"$childClass $childVar = ($childClass) $targetVec.getDataVector();" +:
Seq(inner.setup).filter(_.nonEmpty)).mkString("\n")
Expand Down Expand Up @@ -285,7 +299,11 @@ private[codegen] object CometBatchKernelCodegenOutput {
val childDecl =
s"$childClass $childVar = ($childClass) $targetVec.getChildByOrdinal($fi);"
val fieldSource = emitSpecializedGetterExpr(rowVar, fi.toString, field.dataType)
val inner = emitWrite(childVar, idx, fieldSource, field.dataType, ctx)
// Struct fields are co-indexed with the struct (written at the same `idx`), so a field is
// nested exactly when the struct is: top-level struct fields land at the row index and are
// pre-sized to numRows (bare `set` is in bounds); a struct nested in an array/map inherits
// that parent's cumulative, unbounded index and needs `setSafe`.
val inner = emitWrite(childVar, idx, fieldSource, field.dataType, ctx, nested = nested)
val write =
if (!field.nullable) {
inner.perRow
Expand Down Expand Up @@ -327,8 +345,10 @@ private[codegen] object CometBatchKernelCodegenOutput {
val valClass = outputVectorClass(mt.valueType)
val keySrcExpr = emitSpecializedGetterExpr(keyArr, jVar, mt.keyType)
val valSrcExpr = emitSpecializedGetterExpr(valArr, jVar, mt.valueType)
val keyEmit = emitWrite(keyVar, s"$childIdx + $jVar", keySrcExpr, mt.keyType, ctx)
val valEmit = emitWrite(valVar, s"$childIdx + $jVar", valSrcExpr, mt.valueType, ctx)
val keyEmit =
emitWrite(keyVar, s"$childIdx + $jVar", keySrcExpr, mt.keyType, ctx, nested = true)
val valEmit =
emitWrite(valVar, s"$childIdx + $jVar", valSrcExpr, mt.valueType, ctx, nested = true)
val setup =
(Seq(
s"$structClass $entriesVar = ($structClass) $targetVec.getDataVector();",
Expand Down
26 changes: 26 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometCodegenAssertions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
package org.apache.comet

import org.apache.arrow.vector.ValueVector
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.DataType

import org.apache.comet.codegen.CometBatchKernelCodegen
import org.apache.comet.udf.codegen.CometScalaUDFCodegen
import org.apache.comet.vector.CometVector

/**
* Shared assertions for the codegen-dispatcher test suites. Mix in alongside `CometTestBase`.
Expand Down Expand Up @@ -79,4 +82,27 @@ trait CometCodegenAssertions {
s"expected kernel signature $expectedNames -> $output; " +
s"cache had ${sigs.map { case (c, d) => (c.map(_.getSimpleName), d) }}")
}

/**
* Compiles `expr` (no input columns), runs one batch of `numRows`, and hands the output
* `CometVector` to `read`. Every row evaluates to the same value (the expression has no input),
* which still exercises the cross-row cumulative child index of the collection output writer:
* the child of a List / Map grows by each row's element count, so a batch of N rows drives the
* accumulation that a single row cannot. Drives the writer directly, without a query plan, so
* it reaches complex-output expressions the serde does not route through dispatch today. The
* vector is closed after `read` returns, so `read` must fully materialize what it needs.
*/
protected def runKernel[T](expr: Expression, numRows: Int)(read: CometVector => T): T = {
val kernel = CometBatchKernelCodegen.compile(expr, IndexedSeq.empty).newInstance()
val field = CometBatchKernelCodegen.toFfiArrowField("out", expr.dataType, nullable = true)
val out = CometBatchKernelCodegen.allocateOutput(field, numRows, 0)
try {
kernel.init(0)
kernel.process(Array.empty[ValueVector], out, numRows)
out.setValueCount(numRows)
read(CometVector.getVector(out, null))
} finally {
out.close()
}
}
}
110 changes: 110 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometCodegenFuzzSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@ import scala.util.Random
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import org.apache.comet.DataTypeSupport.isComplexType
import org.apache.comet.codegen.CometBatchKernelCodegen
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, ParquetGenerator, SchemaGenOptions}
import org.apache.comet.vector.CometVector

/**
* Randomized end-to-end tests for the Arrow-direct codegen dispatcher: schema-driven coverage of
Expand Down Expand Up @@ -406,4 +412,108 @@ class CometCodegenFuzzSuite
}
}
}

/**
* Randomized output-writer coverage (#4539). Generates a random nested output type and a random
* catalyst value of that type, wraps it in a `Literal`, and drives it through the kernel output
* writer with [[runKernel]]. Reading the Arrow output back must reproduce the value.
*
* Random Array / Map sizes mean each collection's child vector fills at a cumulative index that
* `numRows` does not bound, so the writer must grow the child with `setSafe` (the #4539 fix). A
* multi-row batch additionally exercises the cumulative index across rows. The root is always a
* collection so the nested-write path always runs. The generated value is its own oracle:
* `CatalystTypeConverters.convertToScala` materializes both the value and the Arrow readback
* (both expose the catalyst ArrayData / MapData / InternalRow interface) and the two must
* compare equal.
*/
private val outputLeafTypes: Seq[DataType] =
Seq(IntegerType, LongType, DoubleType, BooleanType, StringType, DecimalType(10, 2))

private def randomLeafType(r: Random): DataType =
outputLeafTypes(r.nextInt(outputLeafTypes.size))

/** Random nested type, biased toward leaves as depth runs out. Map keys are always leaves. */
private def randomOutputType(r: Random, depth: Int): DataType =
if (depth <= 0 || r.nextDouble() < 0.4) randomLeafType(r)
else
r.nextInt(3) match {
case 0 => ArrayType(randomOutputType(r, depth - 1), containsNull = true)
case 1 =>
MapType(randomLeafType(r), randomOutputType(r, depth - 1), valueContainsNull = true)
case _ =>
StructType((0 to r.nextInt(2)).map(i =>
StructField(s"f$i", randomOutputType(r, depth - 1), nullable = true)))
}

private def randomLeafValue(r: Random, dt: DataType): Any = dt match {
case IntegerType => r.nextInt()
case LongType => r.nextLong()
case DoubleType => r.nextDouble()
case BooleanType => r.nextBoolean()
case StringType => UTF8String.fromString(s"s${r.nextInt(1000000)}")
case d: DecimalType => Decimal((r.nextInt(2000000) - 1000000).toLong, d.precision, d.scale)
case other => throw new IllegalArgumentException(s"unexpected leaf type $other")
}

/** Random catalyst value of `dt`; `nullable` permits an occasional null element / field. */
private def randomOutputValue(r: Random, dt: DataType, nullable: Boolean): Any = {
if (nullable && r.nextDouble() < 0.2) null
else
dt match {
case ArrayType(e, containsNull) =>
val n = r.nextInt(40)
new GenericArrayData(
(0 until n).map(_ => randomOutputValue(r, e, containsNull)).toArray[Any])
case MapType(k, v, valueContainsNull) =>
// Dedup by materialized key so the map round-trips 1:1 (Spark map keys are distinct).
val entries = scala.collection.mutable.LinkedHashMap.empty[Any, Any]
(0 until r.nextInt(20)).foreach { _ =>
val key = randomOutputValue(r, k, nullable = false)
entries.getOrElseUpdate(key, randomOutputValue(r, v, valueContainsNull))
}
new ArrayBasedMapData(
new GenericArrayData(entries.keys.toArray[Any]),
new GenericArrayData(entries.values.toArray[Any]))
case st: StructType =>
new GenericInternalRow(
st.fields.map(f => randomOutputValue(r, f.dataType, f.nullable)).toArray[Any])
case leaf => randomLeafValue(r, leaf)
}
}

/** Reads the root collection value of `vec` at `row` as a catalyst ArrayData / MapData. */
private def readRoot(vec: CometVector, dt: DataType, row: Int): Any = dt match {
case _: ArrayType => vec.getArray(row)
case _: MapType => vec.getMap(row)
case other => throw new IllegalArgumentException(s"unexpected root type $other")
}

test("randomized dynamically-sized collection output round-trips through the writer (#4539)") {
val r = new Random(42)
val numRows = 4 // > 1 so the child's cumulative index accumulates across rows
// canHandle may reject a generated type (e.g. the maxFields gate on a wide nesting); count
// the ones we actually drove through the writer to guard against a vacuous run.
val exercised = (0 until 300).count { _ =>
// Root is always a collection so the nested-child write path runs every iteration.
val dt =
if (r.nextBoolean()) ArrayType(randomOutputType(r, 2), containsNull = true)
else MapType(randomLeafType(r), randomOutputType(r, 2), valueContainsNull = true)
val value = randomOutputValue(r, dt, nullable = false)
val expr = Literal(value, dt)
val handled = CometBatchKernelCodegen.canHandle(expr).isEmpty
if (handled) {
val expected = CatalystTypeConverters.convertToScala(value, dt)
runKernel(expr, numRows) { vec =>
(0 until numRows).foreach { row =>
val actual = CatalystTypeConverters.convertToScala(readRoot(vec, dt, row), dt)
assert(
actual === expected,
s"row $row mismatch for output type $dt\n expected=$expected\n actual=$actual")
}
}
}
handled
}
assert(exercised > 0, "every generated type was rejected by canHandle (of 300 generated)")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,46 @@ class CometCodegenSourceSuite extends AnyFunSuite {
}
}

test("nested fixed-width map children grow with setSafe, not set (#4539)") {
// Map<Int, Int> output: both key and value are fixed-width children of the entries struct.
// Their element count is the data-dependent sum of per-row map sizes, not bounded by numRows,
// and is unknown until the write loop has evaluated each row, so the writes must use `setSafe`
// to grow on demand. A bare `set` throws once a row's entries exceed the child's initial
// capacity (issue #4539: the literal map's third key overflowed the pre-sized IntVector).
val expr = CreateMap(
Seq(
Literal(1, IntegerType),
Literal(10, IntegerType),
Literal(2, IntegerType),
Literal(20, IntegerType)))
val src = CometBatchKernelCodegen.generateSource(expr, IndexedSeq.empty).body
assert(
src.contains(".setSafe("),
s"expected setSafe for nested fixed-width writes; got:\n$src")
// `.set(` is a bare fixed-width write; `setSafe(` / `setNull(` / `setIndexDefined(` do not
// match this literal. There must be none into the nested children.
assert(
!src.contains(".set("),
s"expected no bare fixed-width set into map children; got:\n$src")
}

test("top-level scalar output keeps the pre-sized set fast path") {
// The root output vector is pre-sized to numRows and written once per row, so it uses the
// bare `set` fast path rather than paying for setSafe's per-write capacity check. This pins
// the boundary the #4539 fix draws: setSafe is for nested children only.
val expr = Add(BoundReference(0, IntegerType, nullable = false), Literal(1, IntegerType))
val intSpec = ArrowColumnSpec(
CometBatchKernelCodegen.vectorClassBySimpleName("IntVector"),
nullable = false)
val src = CometBatchKernelCodegen.generateSource(expr, IndexedSeq(intSpec)).body
assert(
src.contains("output.set("),
s"expected bare set for the pre-sized root output; got:\n$src")
assert(
!src.contains(".setSafe("),
s"expected no setSafe for a scalar root output; got:\n$src")
}

test("ArrayType output elides isNullAt on the element loop when containsNull is false") {
// CreateArray over only-non-null Literals produces ArrayType(elementType, containsNull=false).
// The element write should drop the `arr.isNullAt(j)` guard at source level rather than
Expand Down
Loading
Loading