Skip to content

Commit

Permalink
Shutdown client reactor threads on close (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Jan 18, 2024
1 parent d7d8726 commit 14d137b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;
Expand All @@ -54,6 +55,8 @@ public class ShardNotificationReceiver extends GrpcResponseStream {
private final @NonNull NotificationMetrics metrics;
private @NonNull Optional<Long> startingOffset = Optional.empty();


private Scheduler scheduler;
private long offset;

ShardNotificationReceiver(
Expand All @@ -75,6 +78,15 @@ public void start(@NonNull Optional<Long> offset) {
this.start();
}

@Override
public void close() {
super.close();

if (scheduler != null) {
scheduler.dispose();
}
}

@Override
protected @NonNull CompletableFuture<Void> start(
@NonNull ReactorOxiaClientStub stub, @NonNull Consumer<Disposable> consumer) {
Expand All @@ -87,13 +99,14 @@ public void start(@NonNull Optional<Long> offset) {
signal ->
log.warn("Retrying receiving notifications for shard {}: {}", shardId, signal));
var threadName = String.format("shard-%s-notifications", shardId);
scheduler = Schedulers.newSingle(threadName);
var disposable =
Flux.defer(() -> stub.getNotifications(request.build()))
.doOnError(t -> log.warn("Error receiving notifications for shard {}", shardId, t))
.doOnEach(metrics::recordBatch)
.retryWhen(retrySpec)
.repeat()
.publishOn(Schedulers.newSingle(threadName))
.publishOn(scheduler)
.subscribe(this::notify);
consumer.accept(disposable);
return completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;
Expand All @@ -55,6 +56,7 @@ public class Session implements AutoCloseable {
private final @NonNull SessionHeartbeat heartbeat;
private final @NonNull SessionMetrics metrics;

private Scheduler scheduler;
private Disposable keepAliveSubscription;

Session(
Expand All @@ -72,6 +74,8 @@ public class Session implements AutoCloseable {
sessionId,
SessionHeartbeat.newBuilder().setShardId(shardId).setSessionId(sessionId).build(),
metrics);
var threadName = String.format("session-[id=%s,shard=%s]-keep-alive", sessionId, shardId);
scheduler = Schedulers.newSingle(threadName);
}

void start() {
Expand All @@ -84,16 +88,14 @@ void start() {
sessionId,
shardId,
signal));
var threadName = String.format("session-[id=%s,shard=%s]-keep-alive", sessionId, shardId);

keepAliveSubscription =
Mono.just(heartbeat)
.repeat()
.delayElements(heartbeatInterval)
.flatMap(hb -> stubByShardId.apply(shardId).keepAlive(hb))
.retryWhen(retrySpec)
.timeout(sessionTimeout)
.publishOn(Schedulers.newSingle(threadName))
.publishOn(scheduler)
.doOnEach(metrics::recordKeepAlive)
.doOnError(
t -> log.warn("Session keep-alive error: [id={},shard={}]", sessionId, shardId, t))
Expand All @@ -107,6 +109,7 @@ public void close() throws Exception {
var request =
CloseSessionRequest.newBuilder().setShardId(shardId).setSessionId(sessionId).build();
stub.closeSession(request).block();
scheduler.dispose();
}

@RequiredArgsConstructor(access = PACKAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;
Expand All @@ -67,6 +68,8 @@ public class ShardManager extends GrpcResponseStream implements AutoCloseable {
private final @NonNull CompositeConsumer<ShardAssignmentChanges> callbacks;
private final @NonNull ShardAssignmentMetrics metrics;

private final Scheduler scheduler;

@VisibleForTesting
ShardManager(
@NonNull Supplier<ReactorOxiaClientStub> stubFactory,
Expand All @@ -77,6 +80,7 @@ public class ShardManager extends GrpcResponseStream implements AutoCloseable {
this.assignments = assignments;
this.callbacks = callbacks;
this.metrics = metrics;
this.scheduler = Schedulers.newSingle("shard-assignments");
}

public ShardManager(
Expand All @@ -90,6 +94,12 @@ public ShardManager(
ShardAssignmentMetrics.create(metrics));
}

@Override
public void close() {
super.close();
scheduler.dispose();
}

@Override
protected CompletableFuture<Void> start(
ReactorOxiaClientStub stub, Consumer<Disposable> consumer) {
Expand All @@ -107,7 +117,7 @@ protected CompletableFuture<Void> start(
.doOnError(this::processError)
.retryWhen(retrySpec)
.repeat()
.publishOn(Schedulers.newSingle("shard-assignments"))
.publishOn(scheduler)
.doOnNext(this::updateAssignments)
.doOnEach(metrics::recordAssignments)
.publish();
Expand Down

0 comments on commit 14d137b

Please sign in to comment.