Skip to content

Commit

Permalink
[fix][broker] Fixed getting incorrect KeyValue schema version (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Nov 29, 2023
1 parent e820f90 commit 81a9a52
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
Expand Down Expand Up @@ -161,10 +162,14 @@ public CompletableFuture<Long> getVersionBySchemaAsync(PostSchemaPayload payload
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenCompose(__ -> {
String schemaId = getSchemaId();
final SchemaType schemaType = SchemaType.valueOf(payload.getType());
byte[] data = payload.getSchema().getBytes(StandardCharsets.UTF_8);
if (schemaType.getValue() == SchemaType.KEY_VALUE.getValue()) {
data = SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(data);
}
return pulsar().getSchemaRegistryService()
.findSchemaVersion(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(StandardCharsets.UTF_8))
.isDeleted(false).timestamp(clock.millis())
SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,21 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -93,6 +98,7 @@ protected void setup() throws Exception {
Map<SchemaType, SchemaCompatibilityCheck> checkMap = new HashMap<>();
checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());
schemaRegistryService = new SchemaRegistryServiceImpl(storage, checkMap, MockClock, null);
setupDefaultTenantAndNamespace();
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -385,4 +391,27 @@ private static SchemaData getSchemaData(String schemaJson) {
private SchemaVersion version(long version) {
return new LongSchemaVersion(version);
}

@Test
public void testKeyValueSchema() throws Exception {
final String topicName = "persistent://public/default/testKeyValueSchema";
admin.topics().createNonPartitionedTopic(BrokerTestUtil.newUniqueName(topicName));

final SchemaInfo schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
"keyValue",
SchemaInfo.builder().type(SchemaType.STRING).schema(new byte[0])
.build(),
SchemaInfo.builder().type(SchemaType.BOOLEAN).schema(new byte[0])
.build(), KeyValueEncodingType.SEPARATED);
admin.schemas().createSchema(topicName, schemaInfo);

final SchemaInfoWithVersion schemaInfoWithVersion = admin.schemas().getSchemaInfoWithVersion(topicName);
Assert.assertEquals(schemaInfoWithVersion.getVersion(), 0);

final Long version1 = admin.schemas().getVersionBySchema(topicName, schemaInfo);
Assert.assertEquals(version1, 0);

final Long version2 = admin.schemas().getVersionBySchema(topicName, schemaInfoWithVersion.getSchemaInfo());
Assert.assertEquals(version2, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.client.impl;

import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -178,18 +177,14 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
}
httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
if (response.getType() == SchemaType.KEY_VALUE) {
try {
SchemaData data = SchemaData
.builder()
.data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(
response.getData().getBytes(StandardCharsets.UTF_8)))
.type(response.getType())
.props(response.getProperties())
.build();
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data)));
} catch (IOException err) {
future.completeExceptionally(err);
}
SchemaData data = SchemaData
.builder()
.data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(
response.getData().getBytes(StandardCharsets.UTF_8)))
.type(response.getType())
.props(response.getProperties())
.build();
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data)));
} else {
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,7 @@ private static byte[] getKeyOrValueSchemaBytes(JsonElement jsonElement) {
* @param keyValueSchemaInfoDataJsonBytes the key/value schema info data json bytes
* @return the key/value schema info data bytes
*/
public static byte[] convertKeyValueDataStringToSchemaInfoSchema(
byte[] keyValueSchemaInfoDataJsonBytes) throws IOException {
public static byte[] convertKeyValueDataStringToSchemaInfoSchema(byte[] keyValueSchemaInfoDataJsonBytes) {
JsonObject jsonObject = (JsonObject) toJsonElement(new String(keyValueSchemaInfoDataJsonBytes, UTF_8));
byte[] keyBytes = getKeyOrValueSchemaBytes(jsonObject.get("key"));
byte[] valueBytes = getKeyOrValueSchemaBytes(jsonObject.get("value"));
Expand Down

0 comments on commit 81a9a52

Please sign in to comment.