Skip to content

Commit

Permalink
add testing
Browse files Browse the repository at this point in the history
  • Loading branch information
chaoqin-li1123 committed Jul 31, 2023
1 parent 49ed6ea commit ad7cdac
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 275 deletions.
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<scalatest.version>3.2.14</scalatest.version>
<spark.version>3.2.2</spark.version>
<commons-io.version>2.11.0</commons-io.version>
<testcontainers.version>1.17.6</testcontainers.version>
<testcontainers.version>1.18.3</testcontainers.version>

<!-- plugin dependencies -->
<maven.version>3.5.4</maven.version>
Expand Down Expand Up @@ -201,6 +201,14 @@
<version>${scalatest.version}</version>
<scope>test</scope>
</dependency>

<!-- Pulsar testing environment -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.pulsar.PulsarOptions.{ADMIN_URL_OPTION_KEY, SERVICE_URL_OPTION_KEY, TOPIC_PATTERN}
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.pulsar.PulsarOptions.{ServiceUrlOptionKey, TopicPattern}
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
import org.apache.spark.util.Utils

class PulsarMicroBatchV1SourceSuite extends PulsarMicroBatchSourceSuiteBase {
Expand All @@ -38,9 +38,8 @@ class PulsarMicroBatchV1SourceSuite extends PulsarMicroBatchSourceSuiteBase {

val pulsar = spark.readStream
.format("pulsar")
.option(SERVICE_URL_OPTION_KEY, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TOPIC_PATTERN, s"$topic.*")
.option(ServiceUrlOptionKey, serviceUrl)
.option(TopicPattern, s"$topic.*")
.load()

testStream(pulsar)(
Expand All @@ -61,9 +60,8 @@ class PulsarMicroBatchV2SourceSuite extends PulsarMicroBatchSourceSuiteBase {

val pulsar = spark.readStream
.format("pulsar")
.option(SERVICE_URL_OPTION_KEY, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TOPIC_PATTERN, s"$topic.*")
.option(ServiceUrlOptionKey, serviceUrl)
.option(TopicPattern, s"$topic.*")
.load()

testStream(pulsar)(
Expand All @@ -87,9 +85,8 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {

val reader = spark.readStream
.format("pulsar")
.option(SERVICE_URL_OPTION_KEY, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TOPIC_SINGLE, topic)
.option(ServiceUrlOptionKey, serviceUrl)
.option(TopicSingle, topic)

testStream(reader.load)(makeSureGetOffsetCalled, StopStream, StartStream(), StopStream)
}
Expand All @@ -101,9 +98,8 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {

val pulsar = spark.readStream
.format("pulsar")
.option(TOPIC_SINGLE, topic)
.option(SERVICE_URL_OPTION_KEY, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TopicSingle, topic)
.option(ServiceUrlOptionKey, serviceUrl)
.load()
.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Expand All @@ -122,18 +118,18 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {
}

test("subscribing topic by pattern with topic deletions") {
sparkContext.setLogLevel("INFO")
val topicPrefix = newTopic()
val topic = topicPrefix + "-seems"
val topic2 = topicPrefix + "-bad"
createTopic(topic, partitions = 5)
createNonPartitionedTopic(topic)
sendMessages(topic, Array("-1"))
require(getLatestOffsets(Set(topic)).size === 5)
require(getLatestOffsets(Set(topic)).size === 1)

val reader = spark.readStream
.format("pulsar")
.option(SERVICE_URL_OPTION_KEY, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TOPIC_PATTERN, s"$topicPrefix-.*")
.option(ServiceUrlOptionKey, serviceUrl)
.option(TopicPattern, s"$topicPrefix-.*")
.option("failOnDataLoss", "false")

val pulsar = reader
Expand Down Expand Up @@ -167,9 +163,8 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {

val reader = spark.readStream
.format("pulsar")
.option(SERVICE_URL_OPTION_KEY, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TOPIC_PATTERN, s"$topicPrefix-.*")
.option(ServiceUrlOptionKey, serviceUrl)
.option(TopicPattern, s"$topicPrefix-.*")
.option("failOnDataLoss", "true")
.option("startingOffsets", "earliest")

Expand All @@ -191,20 +186,20 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {
// has seen it.
StopStream,
// Recreate `topic2` and wait until it's available
WithOffsetSync(topic2),
TopicRecreation(topic2),
StartStream(),
ExpectFailure[SparkException](e => {
assert(e.getMessage.contains("Potential Data Loss"))
assert(e.getCause.getMessage.contains("Potential Data Loss"))
})
)
}

case class WithOffsetSync(topic2: String) extends ExternalAction {
case class TopicRecreation(topic2: String) extends ExternalAction {
override def runAction(): Unit = {
deleteTopic(topic2)
createNonPartitionedTopic(topic2)
val mid = sendMessages(topic2, Array("6")).head._2
waitUntilOffsetAppears(topic2, mid)
val mid = sendMessages(topic2, Array("6", "7", "8")).last._2
// waitUntilOffsetAppears(topic2, mid)
}
}

Expand All @@ -215,10 +210,9 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {

val pulsar = spark.readStream
.format("pulsar")
.option(SERVICE_URL_OPTION_KEY, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(STARTING_OFFSETS_OPTION_KEY, "earliest")
.option(TOPIC_SINGLE, topic)
.option(ServiceUrlOptionKey, serviceUrl)
.option(StartingOffsetsOptionKey, "earliest")
.option(TopicSingle, topic)
.load()

val windowedAggregation = pulsar
Expand Down Expand Up @@ -254,11 +248,10 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {

val reader = spark.readStream
.format("pulsar")
.option(SERVICE_URL_OPTION_KEY, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TOPIC_SINGLE, topic)
.option(STARTING_OFFSETS_OPTION_KEY, "earliest")
.option(POLL_TIMEOUT_MS, "1000")
.option(ServiceUrlOptionKey, serviceUrl)
.option(TopicSingle, topic)
.option(StartingOffsetsOptionKey, "earliest")
.option(PollTimeoutMS, "1000")
.option("failOnDataLoss", "false")
val pulsar = reader
.load()
Expand Down Expand Up @@ -297,9 +290,8 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {

val pulsar = spark.readStream
.format("pulsar")
.option(TOPIC_SINGLE, topic)
.option(SERVICE_URL_OPTION_KEY, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TopicSingle, topic)
.option(ServiceUrlOptionKey, serviceUrl)
.load()

val values = pulsar
Expand Down
73 changes: 34 additions & 39 deletions src/test/scala/org/apache/spark/sql/pulsar/PulsarSinkSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,19 @@ package org.apache.spark.sql.pulsar
import java.text.SimpleDateFormat
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Date, Locale}

import scala.reflect.ClassTag

import org.scalatest.time.SpanSugar._

import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.common.naming.TopicName
import org.apache.pulsar.common.schema.SchemaInfo

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{BinaryType, DataType}
import org.apache.spark.sql._

class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
class PulsarSinkSuite extends StreamTest with PulsarTest with SharedSparkSession {
import PulsarOptions._
import SchemaData._
import testImplicits._
Expand All @@ -47,7 +43,6 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
df.write
.format("pulsar")
.option(ServiceUrlOptionKey, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TopicSingle, topic)
.save()

Expand All @@ -57,10 +52,10 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
}

private def batchCheck[T: ClassTag](
schemaInfo: SchemaInfo,
data: Seq[T],
encoder: Encoder[T],
str: T => String) = {
schemaInfo: SchemaInfo,
data: Seq[T],
encoder: Encoder[T],
str: T => String) = {
val topic = newTopic()

val df = if (str == null) {
Expand All @@ -72,7 +67,6 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
df.write
.format("pulsar")
.option(ServiceUrlOptionKey, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TopicSingle, topic)
.save()

Expand Down Expand Up @@ -144,7 +138,6 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
df.write
.format("pulsar")
.option(ServiceUrlOptionKey, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TopicSingle, topic)
.save()

Expand All @@ -159,7 +152,6 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
df.write
.format("pulsar")
.option(ServiceUrlOptionKey, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.save()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic option required"))
Expand Down Expand Up @@ -203,7 +195,8 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
withTopic = None,
withOutputMode = Some(OutputMode.Append))(withSelectExpr = s"'$topic' as __topic", "value")

val reader = createPulsarReader(topic)
// reader needs to be created on demand because start and end offsets are resolved when dataframes are created.
def reader = createPulsarReader(topic)
.selectExpr("CAST(__key as STRING) __key", "CAST(value as STRING) value")
.selectExpr("CAST(__key as INT) __key", "CAST(value as INT) value")
.as[(Option[Int], Int)]
Expand Down Expand Up @@ -237,7 +230,8 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
"CAST(count as STRING) value"
)

val reader = createPulsarReader(topic)
// reader needs to be created on demand because start and end offsets are resolved when dataframes are created.
def reader = createPulsarReader(topic)
.selectExpr("CAST(__key as STRING) __key", "CAST(value as STRING) value")
.selectExpr("CAST(__key as INT) __key", "CAST(value as INT) value")
.as[(Int, Int)]
Expand Down Expand Up @@ -278,7 +272,8 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
"CAST(value as STRING) __key",
"CAST(count as STRING) value")

val reader = createPulsarReader(topic)
// reader needs to be created on demand because start and end offsets are resolved when dataframes are created.
def reader = createPulsarReader(topic)
.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.selectExpr("CAST(__key AS INT)", "CAST(value AS INT)")
.as[(Int, Int)]
Expand All @@ -300,10 +295,10 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
}

private def streamCheck[T: ClassTag](
schemaInfo: SchemaInfo,
data: Seq[T],
encoder: Encoder[T],
str: T => String) = {
schemaInfo: SchemaInfo,
data: Seq[T],
encoder: Encoder[T],
str: T => String) = {
implicit val enc = encoder
val input = MemoryStream[T]
val topic = newTopic()
Expand Down Expand Up @@ -501,9 +496,8 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
} toDF ("key", "value")
df.write
.format("pulsar")
.option(SERVICE_URL_OPTION_KEY, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(TOPIC_SINGLE, topic)
.option(ServiceUrlOptionKey, serviceUrl)
.option(TopicSingle, topic)
.option("pulsar.producer.blockIfQueueFull", "true")
.option("pulsar.producer.maxPendingMessages", "100000")
.option("pulsar.producer.maxPendingMessagesAcrossPartitions", "5000000")
Expand All @@ -530,23 +524,25 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
"pulsar.producer.sendTimeoutMs" -> "30000")
)(withSelectExpr = s"'$topic' as __topic", "value")

val reader = createPulsarReader(topic)
.selectExpr("CAST(__key as STRING) __key", "CAST(value as STRING) value")
.selectExpr("CAST(__key as INT) __key", "CAST(value as INT) value")
.as[(Option[Int], Int)]
.map(_._2)
// reader needs to be created on demand because start and end offsets are resolved when dataframes are created.
def reader()= createPulsarReader(topic).selectExpr("CAST(value as STRING) value")
.selectExpr("CAST(value as INT) value").as[Int]


try {
input.addData("1", "2", "3", "4", "5")

failAfter(streamingTimeout) {
writer.processAllAvailable()
}
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
checkDatasetUnorderly(reader(), 1, 2, 3, 4, 5)


input.addData("6", "7", "8", "9", "10")
failAfter(streamingTimeout) {
writer.processAllAvailable()
}
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
checkDatasetUnorderly(reader(), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
} finally {
writer.stop()
}
Expand Down Expand Up @@ -589,19 +585,18 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
spark.read
.format("pulsar")
.option(ServiceUrlOptionKey, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.option(StartingOffsetsOptionKey, "earliest")
.option(EndingOffsetsOptionKey, "latest")
.option(TopicSingle, topic)
.load()
}

private def createPulsarWriter(
input: DataFrame,
withTopic: Option[String] = None,
withOutputMode: Option[OutputMode] = None,
withOptions: Map[String, String] = Map[String, String]())(
withSelectExpr: String*): StreamingQuery = {
input: DataFrame,
withTopic: Option[String] = None,
withOutputMode: Option[OutputMode] = None,
withOptions: Map[String, String] = Map[String, String]())(
withSelectExpr: String*): StreamingQuery = {
var stream: DataStreamWriter[Row] = null
withTempDir { checkpointDir =>
var df = input.toDF()
Expand All @@ -612,11 +607,11 @@ class PulsarSinkSuite extends StreamTest with SharedSQLContext with {
.format("pulsar")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.option(ServiceUrlOptionKey, serviceUrl)
.option(ADMIN_URL_OPTION_KEY, adminUrl)
.queryName("pulsarStream")
withTopic.foreach(stream.option(TopicSingle, _))
withOutputMode.foreach(stream.outputMode(_))
withOptions.foreach(opt => stream.option(opt._1, opt._2))
stream.options(withOptions)
// withOptions.foreach(opt => stream.option(opt._1, opt._2))
}
stream.start()
}
Expand Down
Loading

0 comments on commit ad7cdac

Please sign in to comment.