diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/Notification.java b/client-api/src/main/java/io/streamnative/oxia/client/api/Notification.java index db95e7d4..9bc25161 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/Notification.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/Notification.java @@ -19,7 +19,10 @@ /** A notification from an Oxia server indicating a change to a record associated with a key. */ public sealed interface Notification - permits Notification.KeyCreated, Notification.KeyModified, Notification.KeyDeleted { + permits Notification.KeyCreated, + Notification.KeyDeleted, + Notification.KeyModified, + Notification.KeyRangeDelete { /** * @return The key of the record. @@ -56,4 +59,18 @@ record KeyModified(@NonNull String key, long version) implements Notification { * @param key The key of the deleted record. */ record KeyDeleted(@NonNull String key) implements Notification {} + + /** + * The record associated with the key range has been deleted. + * + * @param startKeyInclusive The range deletion start key. (inclusive) + * @param endKeyExclusive The range deletion end key. (exclusive) + */ + record KeyRangeDelete(@NonNull String startKeyInclusive, @NonNull String endKeyExclusive) + implements Notification { + @Override + public String key() { + return startKeyInclusive; + } + } } diff --git a/client-it/src/test/java/io/streamnative/oxia/client/it/NotificationIt.java b/client-it/src/test/java/io/streamnative/oxia/client/it/NotificationIt.java new file mode 100644 index 00000000..5e1b43a8 --- /dev/null +++ b/client-it/src/test/java/io/streamnative/oxia/client/it/NotificationIt.java @@ -0,0 +1,114 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.it; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import io.streamnative.oxia.client.api.AsyncOxiaClient; +import io.streamnative.oxia.client.api.Notification; +import io.streamnative.oxia.client.api.OxiaClientBuilder; +import io.streamnative.oxia.testcontainers.OxiaContainer; +import java.nio.charset.StandardCharsets; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.images.PullPolicy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +@Slf4j +public class NotificationIt { + @Container + private static final OxiaContainer oxia = + new OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME) + .withImagePullPolicy(PullPolicy.alwaysPull()) + .withShards(10) + .withLogConsumer(new Slf4jLogConsumer(log)); + + private static AsyncOxiaClient client; + + private static Queue notifications = new LinkedBlockingQueue<>(); + + private static InMemoryMetricReader metricReader; + + @BeforeAll + static void beforeAll() { + Resource resource = + Resource.getDefault() + .merge( + Resource.create( + Attributes.of(ResourceAttributes.SERVICE_NAME, "logical-service-name"))); + + metricReader = InMemoryMetricReader.create(); + SdkMeterProvider sdkMeterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).setResource(resource).build(); + + OpenTelemetry openTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build(); + + client = + OxiaClientBuilder.create(oxia.getServiceAddress()) + .openTelemetry(openTelemetry) + .asyncClient() + .join(); + client.notifications(notifications::add); + } + + @AfterAll + static void afterAll() throws Exception { + if (client != null) { + client.close(); + } + } + + @Test + public void testDeleteRange() { + + for (int i = 0; i < 10; i++) { + client.put(i + "", (i + "").getBytes(StandardCharsets.UTF_8)).join(); + } + + Awaitility.await().untilAsserted(() -> Assertions.assertEquals(notifications.size(), 10)); + + notifications.clear(); + + client.deleteRange("0", "100").join(); + + Awaitility.await() + .untilAsserted( + () -> { + Assertions.assertEquals(notifications.size(), 10); // 10 shards + for (Notification notification : notifications) { + Assertions.assertInstanceOf(Notification.KeyRangeDelete.class, notification); + final Notification.KeyRangeDelete krd = (Notification.KeyRangeDelete) notification; + Assertions.assertEquals(krd.startKeyInclusive(), "0"); + Assertions.assertEquals(krd.endKeyExclusive(), "100"); + } + }); + } +} diff --git a/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java b/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java index a1d800b4..a0e1dfd5 100644 --- a/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java +++ b/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java @@ -100,6 +100,8 @@ public void onNext(NotificationBatch batch) { case KEY_CREATED -> new KeyCreated(key, notification.getVersionId()); case KEY_MODIFIED -> new KeyModified(key, notification.getVersionId()); case KEY_DELETED -> new KeyDeleted(key); + case KEY_RANGE_DELETED -> new Notification.KeyRangeDelete( + key, notification.getKeyRangeLast()); case UNRECOGNIZED -> null; }; diff --git a/client/src/main/proto/io/streamnative/oxia/client.proto b/client/src/main/proto/io/streamnative/oxia/client.proto index 767396f1..25794e1d 100644 --- a/client/src/main/proto/io/streamnative/oxia/client.proto +++ b/client/src/main/proto/io/streamnative/oxia/client.proto @@ -458,6 +458,7 @@ enum NotificationType { KEY_CREATED = 0; KEY_MODIFIED = 1; KEY_DELETED = 2; + KEY_RANGE_DELETED = 3; } message NotificationsRequest { @@ -477,4 +478,6 @@ message NotificationBatch { message Notification { NotificationType type = 1; optional int64 version_id = 2; + + optional string key_range_last = 3; }