From 81a9a527b353c07db0743f970edb009a929888fe Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Wed, 29 Nov 2023 14:56:54 +0800 Subject: [PATCH] [fix][broker] Fixed getting incorrect KeyValue schema version (#21632) --- .../admin/impl/SchemasResourceBase.java | 9 ++++-- .../service/schema/SchemaServiceTest.java | 29 +++++++++++++++++++ .../pulsar/client/impl/HttpLookupService.java | 21 +++++--------- .../client/impl/schema/SchemaUtils.java | 3 +- 4 files changed, 45 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java index 0bab772044a6d..1992ea7e47799 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java @@ -34,6 +34,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.impl.schema.SchemaUtils; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; @@ -161,10 +162,14 @@ public CompletableFuture getVersionBySchemaAsync(PostSchemaPayload payload return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA) .thenCompose(__ -> { String schemaId = getSchemaId(); + final SchemaType schemaType = SchemaType.valueOf(payload.getType()); + byte[] data = payload.getSchema().getBytes(StandardCharsets.UTF_8); + if (schemaType.getValue() == SchemaType.KEY_VALUE.getValue()) { + data = SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(data); + } return pulsar().getSchemaRegistryService() .findSchemaVersion(schemaId, - SchemaData.builder().data(payload.getSchema().getBytes(StandardCharsets.UTF_8)) - .isDeleted(false).timestamp(clock.millis()) + SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis()) .type(SchemaType.valueOf(payload.getType())) .user(defaultIfEmpty(clientAppId(), "")) .props(payload.getProperties()).build()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index c7e30d5c3fc37..2bdb24dceebd1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -40,16 +40,21 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.LongSchemaVersion; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaInfoWithVersion; import org.apache.pulsar.common.schema.SchemaType; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -93,6 +98,7 @@ protected void setup() throws Exception { Map checkMap = new HashMap<>(); checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck()); schemaRegistryService = new SchemaRegistryServiceImpl(storage, checkMap, MockClock, null); + setupDefaultTenantAndNamespace(); } @AfterMethod(alwaysRun = true) @@ -385,4 +391,27 @@ private static SchemaData getSchemaData(String schemaJson) { private SchemaVersion version(long version) { return new LongSchemaVersion(version); } + + @Test + public void testKeyValueSchema() throws Exception { + final String topicName = "persistent://public/default/testKeyValueSchema"; + admin.topics().createNonPartitionedTopic(BrokerTestUtil.newUniqueName(topicName)); + + final SchemaInfo schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo( + "keyValue", + SchemaInfo.builder().type(SchemaType.STRING).schema(new byte[0]) + .build(), + SchemaInfo.builder().type(SchemaType.BOOLEAN).schema(new byte[0]) + .build(), KeyValueEncodingType.SEPARATED); + admin.schemas().createSchema(topicName, schemaInfo); + + final SchemaInfoWithVersion schemaInfoWithVersion = admin.schemas().getSchemaInfoWithVersion(topicName); + Assert.assertEquals(schemaInfoWithVersion.getVersion(), 0); + + final Long version1 = admin.schemas().getVersionBySchema(topicName, schemaInfo); + Assert.assertEquals(version1, 0); + + final Long version2 = admin.schemas().getVersionBySchema(topicName, schemaInfoWithVersion.getSchemaInfo()); + Assert.assertEquals(version2, 0); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 7969ce402363f..e33efabcc9e0e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.impl; import io.netty.channel.EventLoopGroup; -import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; @@ -178,18 +177,14 @@ public CompletableFuture> getSchema(TopicName topicName, by } httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> { if (response.getType() == SchemaType.KEY_VALUE) { - try { - SchemaData data = SchemaData - .builder() - .data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema( - response.getData().getBytes(StandardCharsets.UTF_8))) - .type(response.getType()) - .props(response.getProperties()) - .build(); - future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data))); - } catch (IOException err) { - future.completeExceptionally(err); - } + SchemaData data = SchemaData + .builder() + .data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema( + response.getData().getBytes(StandardCharsets.UTF_8))) + .type(response.getType()) + .props(response.getProperties()) + .build(); + future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data))); } else { future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response))); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java index 8acbf26559b7b..881ad424669d2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java @@ -359,8 +359,7 @@ private static byte[] getKeyOrValueSchemaBytes(JsonElement jsonElement) { * @param keyValueSchemaInfoDataJsonBytes the key/value schema info data json bytes * @return the key/value schema info data bytes */ - public static byte[] convertKeyValueDataStringToSchemaInfoSchema( - byte[] keyValueSchemaInfoDataJsonBytes) throws IOException { + public static byte[] convertKeyValueDataStringToSchemaInfoSchema(byte[] keyValueSchemaInfoDataJsonBytes) { JsonObject jsonObject = (JsonObject) toJsonElement(new String(keyValueSchemaInfoDataJsonBytes, UTF_8)); byte[] keyBytes = getKeyOrValueSchemaBytes(jsonObject.get("key")); byte[] valueBytes = getKeyOrValueSchemaBytes(jsonObject.get("value"));