From bb3ca3728e33c35575f6ede40f6a4d4384d65b2c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Jun 2026 12:44:59 -0600 Subject: [PATCH 1/2] fix: array expression audit follow-ups (#4503) Address the actionable array expression audit follow-ups from #4503: - ArrayReverse: report Incompatible for struct/map element types so they route through the JVM codegen dispatcher and stay native, instead of reporting Compatible while convert silently declined and fell back. - SortArray: accept any foldable boolean ascendingOrder (Spark 4.0+ widens it beyond a boolean Literal) and evaluate it at convert time. - ArrayJoin: fall back for non-default string collations, mirroring ArrayIntersect, so the limitation surfaces in EXPLAIN. - ArrayExcept: surface the native element-type restriction in getSupportLevel rather than only in convert. - Remove the dead ArrayCompact serde registration (RuntimeReplaceable in all supported versions, dispatched via ArrayFilter) and document ArrayAppend as reachable only on Spark 3.x. Item 2 (NaN/signed-zero gating for array_contains/distinct/union/max/min) is intentionally omitted: existing SQL tests show native DataFusion already canonicalizes NaN and signed zero to match Spark, so reporting Incompatible would force a needless fallback to the codegen dispatcher with no correctness benefit. Item 5 (array_union ordering) is tracked in #4681. --- .../apache/comet/serde/QueryPlanSerde.scala | 7 +- .../scala/org/apache/comet/serde/arrays.scala | 80 ++++++++++++++----- .../array/array_join_collation.sql | 30 +++++++ .../expressions/array/array_reverse.sql | 75 +++++++++++++++++ .../expressions/array/sort_array_foldable.sql | 41 ++++++++++ 5 files changed, 211 insertions(+), 22 deletions(-) create mode 100644 spark/src/test/resources/sql-tests/expressions/array/array_join_collation.sql create mode 100644 spark/src/test/resources/sql-tests/expressions/array/array_reverse.sql create mode 100644 spark/src/test/resources/sql-tests/expressions/array/sort_array_foldable.sql diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 143048fb44..2d98883971 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -51,8 +51,13 @@ import org.apache.comet.shims.{CometExprShim, CometTypeShim} object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { private[comet] val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + // ArrayAppend is a concrete expression only on Spark 3.x. Spark 4.0+ marks it + // RuntimeReplaceable and rewrites it to ArrayInsert before serde, so this entry is + // unreachable there and is kept solely for Spark 3.4/3.5. classOf[ArrayAppend] -> CometArrayAppend, - classOf[ArrayCompact] -> CometArrayCompact, + // ArrayCompact is RuntimeReplaceable in all supported Spark versions (rewritten to + // ArrayFilter(arr, IsNotNull(...))), so it never reaches serde directly; dispatch flows + // through CometArrayFilter -> CometArrayCompact instead. No direct registration here. classOf[ArrayContains] -> CometArrayContains, classOf[ArrayDistinct] -> CometScalarFunction("array_distinct"), classOf[ArrayExcept] -> CometArrayExcept, diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index eaecd1b49a..e569fafce9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -152,9 +152,11 @@ object CometSortArray extends CometExpressionSerde[SortArray] with CodegenDispat .strictFloatingPointReason(elementType, "Sorting on floating-point") .map(reason => Incompatible(Some(reason))) .getOrElse(expr.ascendingOrder match { - case Literal(_: Boolean, BooleanType) => Compatible() + // Spark 3.x requires a boolean Literal; Spark 4.0+ widens ascendingOrder to any + // foldable boolean. Accept both; convert evaluates the foldable expression. + case ao if ao.foldable && ao.dataType == BooleanType => Compatible() case other => - Unsupported(Some(s"ascendingOrder must be a boolean literal: $other")) + Unsupported(Some(s"ascendingOrder must be a foldable boolean: $other")) }) } } @@ -164,17 +166,13 @@ object CometSortArray extends CometExpressionSerde[SortArray] with CodegenDispat inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { val arrayExprProto = exprToProtoInternal(expr.base, inputs, binding) - val (sortDirectionExprProto, nullOrderingExprProto) = expr.ascendingOrder match { - case Literal(value: Boolean, BooleanType) => - val direction = if (value) "ASC" else "DESC" - val nullOrdering = if (value) "NULLS FIRST" else "NULLS LAST" - ( - exprToProtoInternal(Literal(direction), inputs, binding), - exprToProtoInternal(Literal(nullOrdering), inputs, binding)) - case _ => - // Unreachable: getSupportLevel gates a non-boolean-literal ascendingOrder. - (None, None) - } + // ascendingOrder is a foldable boolean (gated in getSupportLevel). Evaluate it; a null result + // unboxes to false, matching Spark's `right.eval().asInstanceOf[Boolean]`. + val ascending = expr.ascendingOrder.eval(EmptyRow).asInstanceOf[Boolean] + val direction = if (ascending) "ASC" else "DESC" + val nullOrdering = if (ascending) "NULLS FIRST" else "NULLS LAST" + val sortDirectionExprProto = exprToProtoInternal(Literal(direction), inputs, binding) + val nullOrderingExprProto = exprToProtoInternal(Literal(nullOrdering), inputs, binding) val sortArrayScalarExpr = scalarFunctionExprToProto( @@ -339,8 +337,18 @@ object CometArrayExcept override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason) - override def getSupportLevel(expr: ArrayExcept): SupportLevel = Incompatible( - Some(incompatReason)) + override def getSupportLevel(expr: ArrayExcept): SupportLevel = { + // Surface the native element-type restriction in EXPLAIN. We report Incompatible (not + // Unsupported) for these types so the JVM codegen dispatcher still evaluates them natively + // under the default config; the convert-time guard below is only reached under + // allowIncompatible=true, where the native array_except cannot handle them. + expr.children.map(_.dataType).find(dt => !isTypeSupported(dt)) match { + case Some(dt) => + Incompatible( + Some(s"native array_except does not support element type $dt: $incompatReason")) + case None => Incompatible(Some(incompatReason)) + } + } @tailrec def isTypeSupported(dt: DataType): Boolean = { @@ -361,6 +369,9 @@ object CometArrayExcept expr: ArrayExcept, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { + // Defensive: only reached under allowIncompatible=true (the default-config Incompatible path + // routes through the codegen dispatcher before convert). Native array_except cannot handle + // these element types, so decline and let Spark evaluate. val inputTypes = expr.children.map(_.dataType).toSet for (dt <- inputTypes) { if (!isTypeSupported(dt)) { @@ -377,13 +388,31 @@ object CometArrayExcept } } -object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with CodegenDispatchFallback { +object CometArrayJoin + extends CometExpressionSerde[ArrayJoin] + with CometTypeShim + with CodegenDispatchFallback { private val incompatReason = "Null handling may differ from Spark" + private val unsupportedCollationReason = + "array_join on collated strings is not supported " + + "(https://github.com/apache/datafusion-comet/issues/2190)" + override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason) - override def getSupportLevel(expr: ArrayJoin): SupportLevel = Incompatible(Some(incompatReason)) + override def getUnsupportedReasons(): Seq[String] = Seq(unsupportedCollationReason) + + override def getSupportLevel(expr: ArrayJoin): SupportLevel = { + // Spark 4.0 widens ArrayJoin's input to StringTypeWithCollation; the native array_to_string + // produces UTF8_BINARY semantics and does not propagate non-default collations, so surface + // that as a fallback reason in EXPLAIN, mirroring CometArrayIntersect. + if (hasNonDefaultStringCollation(expr.array.dataType)) { + Unsupported(Some(unsupportedCollationReason)) + } else { + Incompatible(Some(incompatReason)) + } + } override def convert( expr: ArrayJoin, @@ -592,15 +621,21 @@ object CometGetArrayItem extends CometExpressionSerde[GetArrayItem] { } object CometArrayReverse extends CometExpressionSerde[Reverse] with ArraysBase { - val unsupportedReason = "reverse on array containing binary is not supported" + val unsupportedReason = + "native reverse does not support arrays whose element type contains binary, struct, or map" override def getIncompatibleReasons(): Seq[String] = Seq(unsupportedReason) override def getSupportLevel(expr: Reverse): SupportLevel = { - if (SupportLevel.containsType(expr.child.dataType, classOf[BinaryType])) { - Incompatible(Some(unsupportedReason)) - } else { + // Mirror the native impl's element-type support. Report Incompatible (not Unsupported) for + // element types the native array_reverse cannot handle so the expression routes through the + // JVM codegen dispatcher (via CometReverse, which mixes in CodegenDispatchFallback) instead + // of silently falling back to Spark. Previously StructType reported Compatible here while + // convert rejected it, so such arrays silently fell back. + if (isTypeSupported(expr.child.dataType)) { Compatible(None) + } else { + Incompatible(Some(unsupportedReason)) } } @@ -608,6 +643,9 @@ object CometArrayReverse extends CometExpressionSerde[Reverse] with ArraysBase { expr: Reverse, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { + // Defensive: only reached under allowIncompatible=true (the default-config Incompatible path + // routes through the codegen dispatcher before convert). Native array_reverse cannot handle + // these element types, so decline and let Spark evaluate. if (!isTypeSupported(expr.child.dataType)) { withFallbackReason(expr, s"child data type not supported: ${expr.child.dataType}") return None diff --git a/spark/src/test/resources/sql-tests/expressions/array/array_join_collation.sql b/spark/src/test/resources/sql-tests/expressions/array/array_join_collation.sql new file mode 100644 index 0000000000..abe18b7e63 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/array/array_join_collation.sql @@ -0,0 +1,30 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.0 + +-- Spark 4.0+ widens ArrayJoin's input to collated strings. Comet's native array_to_string matches +-- raw UTF8_BINARY bytes and does not propagate non-default collations, so collated inputs fall +-- back to Spark. + +-- collated array elements +query expect_fallback(array_join on collated strings is not supported) +SELECT array_join(array('a' COLLATE UTF8_LCASE, 'b' COLLATE UTF8_LCASE), ',') + +-- collated elements with null replacement +query expect_fallback(array_join on collated strings is not supported) +SELECT array_join(array('a' COLLATE UTF8_LCASE, NULL, 'c' COLLATE UTF8_LCASE), ',', 'NULL') diff --git a/spark/src/test/resources/sql-tests/expressions/array/array_reverse.sql b/spark/src/test/resources/sql-tests/expressions/array/array_reverse.sql new file mode 100644 index 0000000000..9d365d75dd --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/array/array_reverse.sql @@ -0,0 +1,75 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ===== INT arrays (native array_reverse) ===== + +statement +CREATE TABLE test_array_reverse_int(arr array) USING parquet + +statement +INSERT INTO test_array_reverse_int VALUES + (array(1, 2, 3)), + (array(1, NULL, 3)), + (array()), + (NULL) + +query +SELECT reverse(arr) FROM test_array_reverse_int + +-- ===== STRING arrays ===== + +statement +CREATE TABLE test_array_reverse_string(arr array) USING parquet + +statement +INSERT INTO test_array_reverse_string VALUES + (array('a', 'b', 'c')), + (array('a', NULL, 'c')), + (array()), + (NULL) + +query +SELECT reverse(arr) FROM test_array_reverse_string + +-- ===== STRUCT arrays ===== +-- Native array_reverse cannot handle struct element types, so getSupportLevel reports +-- Incompatible and Comet routes these through the JVM codegen dispatcher, staying native instead +-- of silently falling back to Spark. + +statement +CREATE TABLE test_array_reverse_struct(c1 struct, c2 struct) +USING parquet + +statement +INSERT INTO test_array_reverse_struct VALUES + (named_struct('a', 1, 'b', 'x'), named_struct('a', 2, 'b', 'y')), + (named_struct('a', 3, 'b', NULL), named_struct('a', 4, 'b', 'z')), + (NULL, named_struct('a', 5, 'b', 'w')) + +query +SELECT reverse(array(c1, c2)) FROM test_array_reverse_struct + +query +SELECT reverse(array(array(c1), array(c2))) FROM test_array_reverse_struct + +-- literal arguments (constant folding is disabled by the suite, so these run natively) +query +SELECT + reverse(array(1, 2, 3)), + reverse(array('a', 'b', 'c')), + reverse(array(named_struct('a', 1), named_struct('a', 2))), + reverse(cast(NULL as array)) diff --git a/spark/src/test/resources/sql-tests/expressions/array/sort_array_foldable.sql b/spark/src/test/resources/sql-tests/expressions/array/sort_array_foldable.sql new file mode 100644 index 0000000000..e4d5c05d60 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/array/sort_array_foldable.sql @@ -0,0 +1,41 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.0 + +-- Spark 4.0+ widens SortArray.ascendingOrder from a boolean Literal to any foldable boolean +-- expression. Spark 3.x rejects a non-Literal ascendingOrder at analysis, so this file is gated +-- to 4.0+. CometSqlFileTestSuite disables ConstantFolding, so the foldable casts below reach the +-- serde unfolded (as Cast nodes, not Literals) and exercise the convert-time evaluation path. + +statement +CREATE TABLE test_sort_array_foldable(arr array) USING parquet + +statement +INSERT INTO test_sort_array_foldable VALUES + (array(3, 1, 4, 1, 5)), + (array(3, NULL, 1, NULL, 2)), + (array()), + (NULL) + +-- foldable ascending (cast(1 as boolean) => true) +query +SELECT sort_array(arr, cast(1 as boolean)) FROM test_sort_array_foldable + +-- foldable descending (cast(0 as boolean) => false) +query +SELECT sort_array(arr, cast(0 as boolean)) FROM test_sort_array_foldable From 0571632923d778c73dc6b657052b029dcdfe935b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Jun 2026 12:52:06 -0600 Subject: [PATCH 2/2] refactor: simplify CometArrayExcept support-level and convert guards Collapse the dual-Incompatible match in getSupportLevel into a single Incompatible construction, and tighten the convert-time type guard to the same find idiom (dropping a needless toSet). Behavior preserving. --- .../scala/org/apache/comet/serde/arrays.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index e569fafce9..7de51732fc 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -342,12 +342,11 @@ object CometArrayExcept // Unsupported) for these types so the JVM codegen dispatcher still evaluates them natively // under the default config; the convert-time guard below is only reached under // allowIncompatible=true, where the native array_except cannot handle them. - expr.children.map(_.dataType).find(dt => !isTypeSupported(dt)) match { - case Some(dt) => - Incompatible( - Some(s"native array_except does not support element type $dt: $incompatReason")) - case None => Incompatible(Some(incompatReason)) + val reason = expr.children.map(_.dataType).find(dt => !isTypeSupported(dt)) match { + case Some(dt) => s"native array_except does not support element type $dt: $incompatReason" + case None => incompatReason } + Incompatible(Some(reason)) } @tailrec @@ -372,12 +371,11 @@ object CometArrayExcept // Defensive: only reached under allowIncompatible=true (the default-config Incompatible path // routes through the codegen dispatcher before convert). Native array_except cannot handle // these element types, so decline and let Spark evaluate. - val inputTypes = expr.children.map(_.dataType).toSet - for (dt <- inputTypes) { - if (!isTypeSupported(dt)) { + expr.children.map(_.dataType).find(dt => !isTypeSupported(dt)) match { + case Some(dt) => withFallbackReason(expr, s"data type not supported: $dt") return None - } + case None => } val leftArrayExprProto = exprToProto(expr.left, inputs, binding) val rightArrayExprProto = exprToProto(expr.right, inputs, binding)