Skip to content
This repository has been archived by the owner on May 4, 2019. It is now read-only.

Commit

Permalink
added delay to retry for streamBrokerEvents, and changed the error me…
Browse files Browse the repository at this point in the history
…ssage
  • Loading branch information
robertroeser committed Jul 27, 2018
1 parent 628b034 commit ab7ca63
Showing 1 changed file with 38 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@
import io.rsocket.RSocket;
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.ByteBufPayload;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.retry.Retry;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;

public class DefaultProteusBrokerService implements ProteusBrokerService, Disposable {
private static final Logger logger = LoggerFactory.getLogger(DefaultProteusBrokerService.class);
Expand Down Expand Up @@ -117,14 +118,43 @@ public DefaultProteusBrokerService(

this.client = new BrokerInfoServiceClient(unwrappedGroup("com.netifi.proteus.brokerServices"));
this.presenceNotifier = new BrokerInfoPresenceNotifier(client);

Disposable disposable =
client
.streamBrokerEvents(Empty.getDefaultInstance())
.doOnNext(this::handleBrokerEvent)
.filter(event -> event.getType() == Event.Type.JOIN)
.doOnNext(event -> createConnection())
.doOnError(t -> logger.error("error streaming broker events", t))
.doOnError(
t -> {
logger.warn(
"error streaming broker events - make sure access key {} has a valid access token",
accessKey);
logger.trace("error streaming broker events", t);
})
.onErrorResume(
new Function<Throwable, Publisher<? extends Event>>() {
long attempts = 0;
long lastAttempt = System.currentTimeMillis();

@Override
public synchronized Publisher<? extends Event> apply(Throwable throwable) {
if (Duration.ofMillis(System.currentTimeMillis() - lastAttempt).getSeconds()
> 30) {
attempts = 0;
}

Mono<Event> then =
Mono.delay(Duration.ofMillis(attempts * 500)).then(Mono.error(throwable));
if (attempts < 30) {
attempts++;
}

lastAttempt = System.currentTimeMillis();

return then;
}
})
.retry()
.subscribe();

Expand Down Expand Up @@ -278,7 +308,7 @@ private ProteusSocket unwrappedGroup(String group) {
},
this::selectRSocket);
}

private ProteusSocket unwrappedBroadcast(String group) {
return new DefaultProteusSocket(
payload -> {
Expand Down Expand Up @@ -308,12 +338,12 @@ public ProteusSocket destination(String destination, String group) {
public ProteusSocket group(String group) {
return PresenceAwareRSocket.wrap(unwrappedGroup(group), null, group, presenceNotifier);
}

@Override
public ProteusSocket broadcast(String group) {
return PresenceAwareRSocket.wrap(unwrappedBroadcast(group), null, group, presenceNotifier);
}

@Override
public void dispose() {
onClose.onComplete();
Expand Down Expand Up @@ -445,7 +475,7 @@ private WeightedClientTransportSupplier selectClientTransportSupplier() {
}
}
}

logger.info("selecting socket {} with weight {}", supplier.toString(), supplier.weight());
if (logger.isDebugEnabled()) {
logger.debug("selecting socket {} with weight {}", supplier.toString(), supplier.weight());
Expand Down

0 comments on commit ab7ca63

Please sign in to comment.