diff --git a/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java b/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java index 38c8b1d5..845ff5af 100644 --- a/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java +++ b/client/src/main/java/io/streamnative/oxia/client/notify/ShardNotificationReceiver.java @@ -40,6 +40,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.Disposable; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import reactor.util.retry.RetryBackoffSpec; @@ -54,6 +55,8 @@ public class ShardNotificationReceiver extends GrpcResponseStream { private final @NonNull NotificationMetrics metrics; private @NonNull Optional startingOffset = Optional.empty(); + + private Scheduler scheduler; private long offset; ShardNotificationReceiver( @@ -75,6 +78,15 @@ public void start(@NonNull Optional offset) { this.start(); } + @Override + public void close() { + super.close(); + + if (scheduler != null) { + scheduler.dispose(); + } + } + @Override protected @NonNull CompletableFuture start( @NonNull ReactorOxiaClientStub stub, @NonNull Consumer consumer) { @@ -87,13 +99,14 @@ public void start(@NonNull Optional offset) { signal -> log.warn("Retrying receiving notifications for shard {}: {}", shardId, signal)); var threadName = String.format("shard-%s-notifications", shardId); + scheduler = Schedulers.newSingle(threadName); var disposable = Flux.defer(() -> stub.getNotifications(request.build())) .doOnError(t -> log.warn("Error receiving notifications for shard {}", shardId, t)) .doOnEach(metrics::recordBatch) .retryWhen(retrySpec) .repeat() - .publishOn(Schedulers.newSingle(threadName)) + .publishOn(scheduler) .subscribe(this::notify); consumer.accept(disposable); return completedFuture(null); diff --git a/client/src/main/java/io/streamnative/oxia/client/session/Session.java b/client/src/main/java/io/streamnative/oxia/client/session/Session.java index 8237798d..2cc5680c 100644 --- a/client/src/main/java/io/streamnative/oxia/client/session/Session.java +++ b/client/src/main/java/io/streamnative/oxia/client/session/Session.java @@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import reactor.util.retry.RetryBackoffSpec; @@ -55,6 +56,7 @@ public class Session implements AutoCloseable { private final @NonNull SessionHeartbeat heartbeat; private final @NonNull SessionMetrics metrics; + private Scheduler scheduler; private Disposable keepAliveSubscription; Session( @@ -72,6 +74,8 @@ public class Session implements AutoCloseable { sessionId, SessionHeartbeat.newBuilder().setShardId(shardId).setSessionId(sessionId).build(), metrics); + var threadName = String.format("session-[id=%s,shard=%s]-keep-alive", sessionId, shardId); + scheduler = Schedulers.newSingle(threadName); } void start() { @@ -84,8 +88,6 @@ void start() { sessionId, shardId, signal)); - var threadName = String.format("session-[id=%s,shard=%s]-keep-alive", sessionId, shardId); - keepAliveSubscription = Mono.just(heartbeat) .repeat() @@ -93,7 +95,7 @@ void start() { .flatMap(hb -> stubByShardId.apply(shardId).keepAlive(hb)) .retryWhen(retrySpec) .timeout(sessionTimeout) - .publishOn(Schedulers.newSingle(threadName)) + .publishOn(scheduler) .doOnEach(metrics::recordKeepAlive) .doOnError( t -> log.warn("Session keep-alive error: [id={},shard={}]", sessionId, shardId, t)) @@ -107,6 +109,7 @@ public void close() throws Exception { var request = CloseSessionRequest.newBuilder().setShardId(shardId).setSessionId(sessionId).build(); stub.closeSession(request).block(); + scheduler.dispose(); } @RequiredArgsConstructor(access = PACKAGE) diff --git a/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java b/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java index 74cf27cf..27e86e5d 100644 --- a/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java +++ b/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java @@ -57,6 +57,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import reactor.util.retry.RetryBackoffSpec; @@ -67,6 +68,8 @@ public class ShardManager extends GrpcResponseStream implements AutoCloseable { private final @NonNull CompositeConsumer callbacks; private final @NonNull ShardAssignmentMetrics metrics; + private final Scheduler scheduler; + @VisibleForTesting ShardManager( @NonNull Supplier stubFactory, @@ -77,6 +80,7 @@ public class ShardManager extends GrpcResponseStream implements AutoCloseable { this.assignments = assignments; this.callbacks = callbacks; this.metrics = metrics; + this.scheduler = Schedulers.newSingle("shard-assignments"); } public ShardManager( @@ -90,6 +94,12 @@ public ShardManager( ShardAssignmentMetrics.create(metrics)); } + @Override + public void close() { + super.close(); + scheduler.dispose(); + } + @Override protected CompletableFuture start( ReactorOxiaClientStub stub, Consumer consumer) { @@ -107,7 +117,7 @@ protected CompletableFuture start( .doOnError(this::processError) .retryWhen(retrySpec) .repeat() - .publishOn(Schedulers.newSingle("shard-assignments")) + .publishOn(scheduler) .doOnNext(this::updateAssignments) .doOnEach(metrics::recordAssignments) .publish();