Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,28 @@ object CometCreateArray extends CometExpressionSerde[CreateArray] {
return exprToProtoInternal(emptyArrayLiteral, inputs, binding)
}

// DataFusion's `make_array` asserts strict element-type equality in
// `MutableArrayData::with_capacities` and panics on a mismatch. Spark's CreateArray is more
// permissive: its type coercion compares element types with `sameType`, which ignores
// nullability, so children that share a surface type but differ only in nested field
// nullability get no unifying cast. DataFusion tolerates container nullability differences
// (an `ArrayType.containsNull` / `MapType.valueContainsNull` mismatch is coerced), but NOT a
// struct field's nullability -- `array(struct(a not null), struct(a nullable))` panics inside
// `make_array_inner`. Decline only those cases (i.e. children that still differ after
// normalizing container nullability) so Spark's evaluator handles them.
//
// TODO: remove this decline once apache/datafusion#22366 lands; the upstream fix widens the
// element type via nullability-OR-merge and casts each child before MutableArrayData.
val normalizedTypes = children.map(c => normalizeContainerNullability(c.dataType))
if (normalizedTypes.distinct.size > 1) {
withFallbackReason(
expr,
"CreateArray children have mismatched data types: " +
children.map(_.dataType).distinct.mkString(", "),
children: _*)
return None
}

val childExprs = children.map(exprToProtoInternal(_, inputs, binding))

if (childExprs.forall(_.isDefined)) {
Expand All @@ -517,6 +539,26 @@ object CometCreateArray extends CometExpressionSerde[CreateArray] {
None
}
}

/**
* Rewrites a type so that container nullability (`ArrayType.containsNull`,
* `MapType.valueContainsNull`) is forced to `true` everywhere, while struct field nullability
* is left intact. Two CreateArray children whose types differ ONLY in container nullability are
* tolerated by DataFusion's `make_array` (coerced), so they normalize equal here; a difference
* in a struct field's nullability survives normalization and triggers the decline above.
*/
private def normalizeContainerNullability(dt: DataType): DataType = dt match {
case ArrayType(elementType, _) =>
ArrayType(normalizeContainerNullability(elementType), containsNull = true)
case MapType(keyType, valueType, _) =>
MapType(
normalizeContainerNullability(keyType),
normalizeContainerNullability(valueType),
valueContainsNull = true)
case StructType(fields) =>
StructType(fields.map(f => f.copy(dataType = normalizeContainerNullability(f.dataType))))
case other => other
}
}

object CometGetArrayItem extends CometExpressionSerde[GetArrayItem] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,4 +1113,43 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
}
}
}

test("array of structs with nullability-divergent children") {
// Spark's type coercion compares element types with `sameType`, which ignores nullability,
// so two struct children that differ ONLY in a nested field's nullability get no unifying
// cast -- CreateArray keeps children of different StructTypes. DataFusion's make_array asserts
// strict element-type equality (down to nested nullability) and panics on the mismatch. Comet
// must decline this CreateArray so Spark's evaluator handles it.
withParquetTable((0 until 5).map(i => (i, i.toLong)), "tbl") {
val df = spark
.table("tbl")
.select(
array(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test covers arrays and structs, but does not cover maps. Could you add a map test as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a map test, array of maps with nullability-divergent struct values, which wraps the same nested-nullability divergence in a MapType value. That covers normalizeContainerNullability's
MapType branch. The struct field nullability difference survives container-nullability normalization, so CreateArray still declines, mirroring the struct case. Both tests pass under Spark 4.1.

// ct is NOT NULL (literal)
struct(col("_1").as("id"), lit("a").as("ct")),
// ct is NULLABLE (when without otherwise) -- same type, different nullability
struct(col("_1").as("id"), when(col("_1") === 0, lit("b")).as("ct"))).as("arr"))
checkSparkAnswerAndFallbackReason(df, "CreateArray children have mismatched data types")
}
}

test("array of maps with nullability-divergent struct values") {
// Same nested-nullability divergence as the struct case, but wrapped in a MapType value so we
// exercise normalizeContainerNullability's MapType branch: the two map children share a surface
// type and differ only in a nested struct field's nullability, so they survive container
// (`MapType.valueContainsNull`) normalization as distinct types and CreateArray must still
// decline -- DataFusion's make_array would otherwise panic on the struct-field mismatch.
withParquetTable((0 until 5).map(i => (i, i.toLong)), "tbl") {
val df = spark
.table("tbl")
.select(
array(
// map value struct has ct NOT NULL (literal)
map(lit("k"), struct(col("_1").as("id"), lit("a").as("ct"))),
// map value struct has ct NULLABLE -- same type, different nested nullability
map(lit("k"), struct(col("_1").as("id"), when(col("_1") === 0, lit("b")).as("ct"))))
.as("arr"))
checkSparkAnswerAndFallbackReason(df, "CreateArray children have mismatched data types")
}
}
}