Skip to content

Commit

Permalink
Upgrading Pulsar Version to 3.3.2 (#180)
Browse files Browse the repository at this point in the history
* init

* adding jsonignore filter back

* updating exception

* adding bouncycastle ack

* adding back lines
  • Loading branch information
ericm-db authored Nov 8, 2024
1 parent 0c5130d commit e43e8a4
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 13 deletions.
12 changes: 11 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

<!-- dependencies -->
<!-- latest version from apache pulsar -->
<pulsar.version>2.10.2</pulsar.version>
<pulsar.version>3.3.2</pulsar.version>
<scala.version>2.12.17</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<scalatest.version>3.2.14</scalatest.version>
Expand Down Expand Up @@ -153,6 +153,11 @@
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.72</version>
</dependency>

<!-- spark dependency -->

Expand Down Expand Up @@ -390,6 +395,7 @@
<include>org.bouncycastle*:*</include>
<include>org.lz4*:*</include>
<include>commons-io:commons-io:jar:*</include>
<include>io.opentelemetry:*</include> <!-- Add this -->
</includes>
</artifactSet>
<filters>
Expand All @@ -409,6 +415,10 @@
</filter>
</filters>
<relocations>
<relocation>
<pattern>io.opentelemetry</pattern>
<shadedPattern>org.apache.pulsar.shade.io.opentelemetry</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@ import java.util.Locale

import scala.reflect._

import org.apache.pulsar.client.impl.conf.{
ClientConfigurationData,
ProducerConfigurationData,
ReaderConfigurationData
}
import org.apache.pulsar.shade.com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonIgnore
import org.apache.pulsar.client.impl.conf.{ClientConfigurationData, ProducerConfigurationData, ReaderConfigurationData}

object PulsarConfigurationUtils {

Expand Down
10 changes: 7 additions & 3 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,10 @@ private[pulsar] case class PulsarHelper(
private def getTopics(topicsPattern: String): Seq[String] = {
val dest = TopicName.get(topicsPattern)
val allTopics: ju.List[String] = client.getLookup
.getTopicsUnderNamespace(dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL)
.get()
.getTopicsUnderNamespace(
// passing an empty topicsHash because we don't cache the GetTopicsResponse
dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL, topicsPattern, "")
.get().getTopics

val allNonPartitionedTopics: ju.List[String] = allTopics.asScala
.filter(t => !TopicName.get(t).isPartitioned)
Expand Down Expand Up @@ -345,7 +347,9 @@ private[pulsar] case class PulsarHelper(
while (waitList.nonEmpty) {
val topic = waitList.head
try {
client.getPartitionedTopicMetadata(topic).get()
// setting metadataAutoCreationEnabled to false, and useFallbackForNonPIP344Brokers
// to true to conform to non-breaking behavior.
client.getPartitionedTopicMetadata(topic, false, true).get()
waitList -= topic
} catch {
case NonFatal(_) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package org.apache.spark.sql.pulsar
import java.{util => ju}

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.internal.DefaultImplementation

import org.apache.spark.sql.pulsar.PulsarSourceUtils.{getEntryId, getLedgerId}
import org.apache.spark.sql.streaming.Trigger.{Once, ProcessingTime}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -48,10 +49,9 @@ class PulsarAdmissionControlSuite extends PulsarSourceTest {
// Need to call latestOffsetForTopicPartition so the helper instantiates
// the admin
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl, conf)
val e = intercept[RuntimeException] {
intercept[NotFoundException] {
admissionControlHelper.latestOffsetForTopicPartition(topic, MessageId.earliest, approxSizeOfInt)
}
assert(e.getMessage.contains("Failed to load config into existing configuration data"))
}

test("Admit entry in the middle of the ledger") {
Expand Down

0 comments on commit e43e8a4

Please sign in to comment.