Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
sixpluszero committed Nov 16, 2024
1 parent 5bc4e15 commit bb98be2
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1916,10 +1916,8 @@ protected boolean shouldProcessRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelo
}
}

if (!record.getTopicPartition()
.getPubSubTopic()
.getPubSubTopicType()
.equals(currentLeaderTopic.getPubSubTopicType())) {
if (!Utils.getLeaderTopicFromPubSubTopic(pubSubTopicRepository, record.getTopicPartition().getPubSubTopic())
.equals(currentLeaderTopic)) {
String errorMsg =
"Leader replica: {} received a pubsub message that doesn't belong to the leader topic. Leader topic: "
+ currentLeaderTopic + ", topic of incoming message: "
Expand Down Expand Up @@ -1978,7 +1976,8 @@ protected boolean shouldPersistRecord(
case LEADER:
PubSubTopic currentLeaderTopic =
partitionConsumptionState.getOffsetRecord().getLeaderTopic(pubSubTopicRepository);
if (!incomingMessageTopic.getPubSubTopicType().equals(currentLeaderTopic.getPubSubTopicType())) {
if (!Utils.getLeaderTopicFromPubSubTopic(pubSubTopicRepository, incomingMessageTopic)
.equals(currentLeaderTopic)) {
String errorMsg =
"Leader replica: {} received a pubsub message that doesn't belong to the leader topic. Leader topic: "
+ currentLeaderTopic + ", topic of incoming message: " + incomingMessageTopic.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.api.PubSubTopicType;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
Expand Down Expand Up @@ -941,4 +943,14 @@ public static boolean isSepTopicRegion(String region) {
return region.endsWith(SEPARATE_TOPIC_SUFFIX);
}

public static PubSubTopic getLeaderTopicFromPubSubTopic(
PubSubTopicRepository pubSubTopicRepository,
PubSubTopic pubSubTopic) {
if (pubSubTopic.getPubSubTopicType().equals(PubSubTopicType.REALTIME_TOPIC)
&& pubSubTopic.getName().endsWith(SEPARATE_TOPIC_SUFFIX)) {
return pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(pubSubTopic.getStoreName()));
}
return pubSubTopic;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
pubSubBrokerWrapper.getPubSubClientsFactory().getAdminAdapterFactory().getClass().getName())
.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000)
.put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 5000)
.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, false);
.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true);
if (sslToKafka) {
serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name());
serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration()));
Expand Down

0 comments on commit bb98be2

Please sign in to comment.