Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ import org.apache.comet.shims.{CometExprShim, CometTypeShim}
object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {

private[comet] val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
// ArrayAppend is a concrete expression only on Spark 3.x. Spark 4.0+ marks it
// RuntimeReplaceable and rewrites it to ArrayInsert before serde, so this entry is
// unreachable there and is kept solely for Spark 3.4/3.5.
classOf[ArrayAppend] -> CometArrayAppend,
classOf[ArrayCompact] -> CometArrayCompact,
// ArrayCompact is RuntimeReplaceable in all supported Spark versions (rewritten to
// ArrayFilter(arr, IsNotNull(...))), so it never reaches serde directly; dispatch flows
// through CometArrayFilter -> CometArrayCompact instead. No direct registration here.
classOf[ArrayContains] -> CometArrayContains,
classOf[ArrayDistinct] -> CometScalarFunction("array_distinct"),
classOf[ArrayExcept] -> CometArrayExcept,
Expand Down
86 changes: 61 additions & 25 deletions spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ object CometSortArray extends CometExpressionSerde[SortArray] with CodegenDispat
.strictFloatingPointReason(elementType, "Sorting on floating-point")
.map(reason => Incompatible(Some(reason)))
.getOrElse(expr.ascendingOrder match {
case Literal(_: Boolean, BooleanType) => Compatible()
// Spark 3.x requires a boolean Literal; Spark 4.0+ widens ascendingOrder to any
// foldable boolean. Accept both; convert evaluates the foldable expression.
case ao if ao.foldable && ao.dataType == BooleanType => Compatible()
case other =>
Unsupported(Some(s"ascendingOrder must be a boolean literal: $other"))
Unsupported(Some(s"ascendingOrder must be a foldable boolean: $other"))
})
}
}
Expand All @@ -164,17 +166,13 @@ object CometSortArray extends CometExpressionSerde[SortArray] with CodegenDispat
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val arrayExprProto = exprToProtoInternal(expr.base, inputs, binding)
val (sortDirectionExprProto, nullOrderingExprProto) = expr.ascendingOrder match {
case Literal(value: Boolean, BooleanType) =>
val direction = if (value) "ASC" else "DESC"
val nullOrdering = if (value) "NULLS FIRST" else "NULLS LAST"
(
exprToProtoInternal(Literal(direction), inputs, binding),
exprToProtoInternal(Literal(nullOrdering), inputs, binding))
case _ =>
// Unreachable: getSupportLevel gates a non-boolean-literal ascendingOrder.
(None, None)
}
// ascendingOrder is a foldable boolean (gated in getSupportLevel). Evaluate it; a null result
// unboxes to false, matching Spark's `right.eval().asInstanceOf[Boolean]`.
val ascending = expr.ascendingOrder.eval(EmptyRow).asInstanceOf[Boolean]
val direction = if (ascending) "ASC" else "DESC"
val nullOrdering = if (ascending) "NULLS FIRST" else "NULLS LAST"
val sortDirectionExprProto = exprToProtoInternal(Literal(direction), inputs, binding)
val nullOrderingExprProto = exprToProtoInternal(Literal(nullOrdering), inputs, binding)

val sortArrayScalarExpr =
scalarFunctionExprToProto(
Expand Down Expand Up @@ -339,8 +337,17 @@ object CometArrayExcept

override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason)

override def getSupportLevel(expr: ArrayExcept): SupportLevel = Incompatible(
Some(incompatReason))
override def getSupportLevel(expr: ArrayExcept): SupportLevel = {
// Surface the native element-type restriction in EXPLAIN. We report Incompatible (not
// Unsupported) for these types so the JVM codegen dispatcher still evaluates them natively
// under the default config; the convert-time guard below is only reached under
// allowIncompatible=true, where the native array_except cannot handle them.
val reason = expr.children.map(_.dataType).find(dt => !isTypeSupported(dt)) match {
case Some(dt) => s"native array_except does not support element type $dt: $incompatReason"
case None => incompatReason
}
Incompatible(Some(reason))
}

@tailrec
def isTypeSupported(dt: DataType): Boolean = {
Expand All @@ -361,12 +368,14 @@ object CometArrayExcept
expr: ArrayExcept,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val inputTypes = expr.children.map(_.dataType).toSet
for (dt <- inputTypes) {
if (!isTypeSupported(dt)) {
// Defensive: only reached under allowIncompatible=true (the default-config Incompatible path
// routes through the codegen dispatcher before convert). Native array_except cannot handle
// these element types, so decline and let Spark evaluate.
expr.children.map(_.dataType).find(dt => !isTypeSupported(dt)) match {
case Some(dt) =>
withFallbackReason(expr, s"data type not supported: $dt")
return None
}
case None =>
}
val leftArrayExprProto = exprToProto(expr.left, inputs, binding)
val rightArrayExprProto = exprToProto(expr.right, inputs, binding)
Expand All @@ -377,13 +386,31 @@ object CometArrayExcept
}
}

object CometArrayJoin extends CometExpressionSerde[ArrayJoin] with CodegenDispatchFallback {
object CometArrayJoin
extends CometExpressionSerde[ArrayJoin]
with CometTypeShim
with CodegenDispatchFallback {

private val incompatReason = "Null handling may differ from Spark"

private val unsupportedCollationReason =
"array_join on collated strings is not supported " +
"(https://github.com/apache/datafusion-comet/issues/2190)"

override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason)

override def getSupportLevel(expr: ArrayJoin): SupportLevel = Incompatible(Some(incompatReason))
override def getUnsupportedReasons(): Seq[String] = Seq(unsupportedCollationReason)

override def getSupportLevel(expr: ArrayJoin): SupportLevel = {
// Spark 4.0 widens ArrayJoin's input to StringTypeWithCollation; the native array_to_string
// produces UTF8_BINARY semantics and does not propagate non-default collations, so surface
// that as a fallback reason in EXPLAIN, mirroring CometArrayIntersect.
if (hasNonDefaultStringCollation(expr.array.dataType)) {
Unsupported(Some(unsupportedCollationReason))
} else {
Incompatible(Some(incompatReason))
}
}

override def convert(
expr: ArrayJoin,
Expand Down Expand Up @@ -592,22 +619,31 @@ object CometGetArrayItem extends CometExpressionSerde[GetArrayItem] {
}

object CometArrayReverse extends CometExpressionSerde[Reverse] with ArraysBase {
val unsupportedReason = "reverse on array containing binary is not supported"
val unsupportedReason =
"native reverse does not support arrays whose element type contains binary, struct, or map"

override def getIncompatibleReasons(): Seq[String] = Seq(unsupportedReason)

override def getSupportLevel(expr: Reverse): SupportLevel = {
if (SupportLevel.containsType(expr.child.dataType, classOf[BinaryType])) {
Incompatible(Some(unsupportedReason))
} else {
// Mirror the native impl's element-type support. Report Incompatible (not Unsupported) for
// element types the native array_reverse cannot handle so the expression routes through the
// JVM codegen dispatcher (via CometReverse, which mixes in CodegenDispatchFallback) instead
// of silently falling back to Spark. Previously StructType reported Compatible here while
// convert rejected it, so such arrays silently fell back.
if (isTypeSupported(expr.child.dataType)) {
Compatible(None)
} else {
Incompatible(Some(unsupportedReason))
}
}

override def convert(
expr: Reverse,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
// Defensive: only reached under allowIncompatible=true (the default-config Incompatible path
// routes through the codegen dispatcher before convert). Native array_reverse cannot handle
// these element types, so decline and let Spark evaluate.
if (!isTypeSupported(expr.child.dataType)) {
withFallbackReason(expr, s"child data type not supported: ${expr.child.dataType}")
return None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- MinSparkVersion: 4.0

-- Spark 4.0+ widens ArrayJoin's input to collated strings. Comet's native array_to_string matches
-- raw UTF8_BINARY bytes and does not propagate non-default collations, so collated inputs fall
-- back to Spark.

-- collated array elements
query expect_fallback(array_join on collated strings is not supported)
SELECT array_join(array('a' COLLATE UTF8_LCASE, 'b' COLLATE UTF8_LCASE), ',')

-- collated elements with null replacement
query expect_fallback(array_join on collated strings is not supported)
SELECT array_join(array('a' COLLATE UTF8_LCASE, NULL, 'c' COLLATE UTF8_LCASE), ',', 'NULL')
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- ===== INT arrays (native array_reverse) =====

statement
CREATE TABLE test_array_reverse_int(arr array<int>) USING parquet

statement
INSERT INTO test_array_reverse_int VALUES
(array(1, 2, 3)),
(array(1, NULL, 3)),
(array()),
(NULL)

query
SELECT reverse(arr) FROM test_array_reverse_int

-- ===== STRING arrays =====

statement
CREATE TABLE test_array_reverse_string(arr array<string>) USING parquet

statement
INSERT INTO test_array_reverse_string VALUES
(array('a', 'b', 'c')),
(array('a', NULL, 'c')),
(array()),
(NULL)

query
SELECT reverse(arr) FROM test_array_reverse_string

-- ===== STRUCT arrays =====
-- Native array_reverse cannot handle struct element types, so getSupportLevel reports
-- Incompatible and Comet routes these through the JVM codegen dispatcher, staying native instead
-- of silently falling back to Spark.

statement
CREATE TABLE test_array_reverse_struct(c1 struct<a:int, b:string>, c2 struct<a:int, b:string>)
USING parquet

statement
INSERT INTO test_array_reverse_struct VALUES
(named_struct('a', 1, 'b', 'x'), named_struct('a', 2, 'b', 'y')),
(named_struct('a', 3, 'b', NULL), named_struct('a', 4, 'b', 'z')),
(NULL, named_struct('a', 5, 'b', 'w'))

query
SELECT reverse(array(c1, c2)) FROM test_array_reverse_struct

query
SELECT reverse(array(array(c1), array(c2))) FROM test_array_reverse_struct

-- literal arguments (constant folding is disabled by the suite, so these run natively)
query
SELECT
reverse(array(1, 2, 3)),
reverse(array('a', 'b', 'c')),
reverse(array(named_struct('a', 1), named_struct('a', 2))),
reverse(cast(NULL as array<int>))
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- MinSparkVersion: 4.0

-- Spark 4.0+ widens SortArray.ascendingOrder from a boolean Literal to any foldable boolean
-- expression. Spark 3.x rejects a non-Literal ascendingOrder at analysis, so this file is gated
-- to 4.0+. CometSqlFileTestSuite disables ConstantFolding, so the foldable casts below reach the
-- serde unfolded (as Cast nodes, not Literals) and exercise the convert-time evaluation path.

statement
CREATE TABLE test_sort_array_foldable(arr array<int>) USING parquet

statement
INSERT INTO test_sort_array_foldable VALUES
(array(3, 1, 4, 1, 5)),
(array(3, NULL, 1, NULL, 2)),
(array()),
(NULL)

-- foldable ascending (cast(1 as boolean) => true)
query
SELECT sort_array(arr, cast(1 as boolean)) FROM test_sort_array_foldable

-- foldable descending (cast(0 as boolean) => false)
query
SELECT sort_array(arr, cast(0 as boolean)) FROM test_sort_array_foldable
Loading