diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 41f547a43b698..2b3d76eb0c2c3 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -184,6 +184,11 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.avro.functions$"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.protobuf.functions"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.protobuf.functions$"), + + // SPARK-49434: Move aggregators to sql/api + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.javalang.typed"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.scalalang.typed"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.scalalang.typed$"), ) ++ loggingExcludes("org.apache.spark.sql.DataFrameReader") ++ loggingExcludes("org.apache.spark.sql.streaming.DataStreamReader") ++ loggingExcludes("org.apache.spark.sql.SparkSession#Builder") diff --git a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java b/sql/api/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java similarity index 88% rename from sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java rename to sql/api/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java index e1e4ba4c8e0dc..91a1231ec0303 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java +++ b/sql/api/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java @@ -19,13 +19,13 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.TypedColumn; -import org.apache.spark.sql.execution.aggregate.TypedAverage; -import org.apache.spark.sql.execution.aggregate.TypedCount; -import org.apache.spark.sql.execution.aggregate.TypedSumDouble; -import org.apache.spark.sql.execution.aggregate.TypedSumLong; +import org.apache.spark.sql.internal.TypedAverage; +import org.apache.spark.sql.internal.TypedCount; +import org.apache.spark.sql.internal.TypedSumDouble; +import org.apache.spark.sql.internal.TypedSumLong; /** - * Type-safe functions available for {@link org.apache.spark.sql.Dataset} operations in Java. + * Type-safe functions available for {@link org.apache.spark.sql.api.Dataset} operations in Java. * * Scala users should use {@link org.apache.spark.sql.expressions.scalalang.typed}. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala b/sql/api/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala similarity index 82% rename from sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala rename to sql/api/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala index 192b5bf65c4c5..9d98d1a98b00d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala @@ -18,19 +18,17 @@ package org.apache.spark.sql.expressions import org.apache.spark.SparkException -import org.apache.spark.sql.Encoder -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder, ProductEncoder} +import org.apache.spark.sql.{Encoder, Encoders} /** * An aggregator that uses a single associative and commutative reduce function. This reduce - * function can be used to go through all input values and reduces them to a single value. - * If there is no input, a null value is returned. + * function can be used to go through all input values and reduces them to a single value. If + * there is no input, a null value is returned. * * This class currently assumes there is at least one input row. */ private[sql] class ReduceAggregator[T: Encoder](func: (T, T) => T) - extends Aggregator[T, (Boolean, T), T] { + extends Aggregator[T, (Boolean, T), T] { @transient private val encoder = implicitly[Encoder[T]] @@ -47,10 +45,8 @@ private[sql] class ReduceAggregator[T: Encoder](func: (T, T) => T) override def zero: (Boolean, T) = (false, _zero.asInstanceOf[T]) - override def bufferEncoder: Encoder[(Boolean, T)] = { - ProductEncoder.tuple(Seq(PrimitiveBooleanEncoder, encoder.asInstanceOf[AgnosticEncoder[T]])) - .asInstanceOf[Encoder[(Boolean, T)]] - } + override def bufferEncoder: Encoder[(Boolean, T)] = + Encoders.tuple(Encoders.scalaBoolean, encoder) override def outputEncoder: Encoder[T] = encoder diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala b/sql/api/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala similarity index 94% rename from sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala rename to sql/api/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala index 8d17edd42442e..9ea3ab8cd4e1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.expressions.scalalang -import org.apache.spark.sql._ -import org.apache.spark.sql.execution.aggregate._ +import org.apache.spark.sql.TypedColumn +import org.apache.spark.sql.internal.{TypedAverage, TypedCount, TypedSumDouble, TypedSumLong} /** * Type-safe functions available for `Dataset` operations in Scala. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala b/sql/api/src/main/scala/org/apache/spark/sql/internal/typedaggregators.scala similarity index 81% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala rename to sql/api/src/main/scala/org/apache/spark/sql/internal/typedaggregators.scala index b6550bf3e4aac..aabb3a6f00fd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/internal/typedaggregators.scala @@ -15,26 +15,24 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.aggregate +package org.apache.spark.sql.internal import org.apache.spark.api.java.function.MapFunction -import org.apache.spark.sql.{Encoder, TypedColumn} -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.{Encoder, Encoders, TypedColumn} import org.apache.spark.sql.expressions.Aggregator //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines internal implementations for aggregators. //////////////////////////////////////////////////////////////////////////////////////////////////// - class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] { override def zero: Double = 0.0 override def reduce(b: Double, a: IN): Double = b + f(a) override def merge(b1: Double, b2: Double): Double = b1 + b2 override def finish(reduction: Double): Double = reduction - override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]() - override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() + override def bufferEncoder: Encoder[Double] = Encoders.scalaDouble + override def outputEncoder: Encoder[Double] = Encoders.scalaDouble // Java api support def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) @@ -44,15 +42,14 @@ class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Dou } } - class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] { override def zero: Long = 0L override def reduce(b: Long, a: IN): Long = b + f(a) override def merge(b1: Long, b2: Long): Long = b1 + b2 override def finish(reduction: Long): Long = reduction - override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]() - override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]() + override def bufferEncoder: Encoder[Long] = Encoders.scalaLong + override def outputEncoder: Encoder[Long] = Encoders.scalaLong // Java api support def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => f.call(x).asInstanceOf[Long]) @@ -62,7 +59,6 @@ class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] { } } - class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] { override def zero: Long = 0 override def reduce(b: Long, a: IN): Long = { @@ -71,8 +67,8 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] { override def merge(b1: Long, b2: Long): Long = b1 + b2 override def finish(reduction: Long): Long = reduction - override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]() - override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]() + override def bufferEncoder: Encoder[Long] = Encoders.scalaLong + override def outputEncoder: Encoder[Long] = Encoders.scalaLong // Java api support def this(f: MapFunction[IN, Object]) = this((x: IN) => f.call(x).asInstanceOf[Any]) @@ -81,7 +77,6 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] { } } - class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] { override def zero: (Double, Long) = (0.0, 0L) override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2) @@ -90,8 +85,10 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long (b1._1 + b2._1, b1._2 + b2._2) } - override def bufferEncoder: Encoder[(Double, Long)] = ExpressionEncoder[(Double, Long)]() - override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() + override def bufferEncoder: Encoder[(Double, Long)] = + Encoders.tuple(Encoders.scalaDouble, Encoders.scalaLong) + + override def outputEncoder: Encoder[Double] = Encoders.scalaDouble // Java api support def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala index c993aa8e52031..76fcdfc380950 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala @@ -324,7 +324,7 @@ class ColumnNodeToExpressionConverterSuite extends SparkFunSuite { a.asInstanceOf[AgnosticEncoder[Any]] test("udf") { - val int2LongSum = new aggregate.TypedSumLong[Int]((i: Int) => i.toLong) + val int2LongSum = new TypedSumLong[Int]((i: Int) => i.toLong) val bufferEncoder = encoderFor(int2LongSum.bufferEncoder) val outputEncoder = encoderFor(int2LongSum.outputEncoder) val bufferAttrs = bufferEncoder.namedExpressions.map {