Skip to content
This repository has been archived by the owner on May 4, 2019. It is now read-only.

Commit

Permalink
Ensure payloads are released in DefaultNetifiSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
rdegnan committed May 16, 2018
1 parent cd4cc25 commit c678c06
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 173 deletions.
316 changes: 144 additions & 172 deletions client/src/main/java/io/netifi/proteus/rs/DefaultNetifiSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,225 +69,197 @@ public ByteBuf getRoute() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
try {
ByteBuf metadataToWrap = payload.sliceMetadata();
ByteBuf data = payload.sliceData();
ByteBuf route = getRoute();

int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
ByteBuf data = payload.sliceData();
ByteBuf metadataToWrap = payload.sliceMetadata();

SecureRSocket secureRSocket = rSocketSupplier.get();
return secureRSocket
.getCurrentSessionCounter()
.flatMap(
counter -> {
long count = counter.incrementAndGet();

return secureRSocket
.getCurrentSessionToken()
.flatMap(
key -> {
byte[] currentRequestToken =
sessionUtil.generateSessionToken(key, data, count);
int requestToken =
sessionUtil.generateRequestToken(currentRequestToken, data, count);
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
RoutingFlyweight.encode(
metadata,
true,
requestToken,
accessKey,
fromDestination,
generator.nextId(),
route,
metadataToWrap);

return secureRSocket.fireAndForget(
ByteBufPayload.create(payload.sliceData(), metadata));
});
});

return secureRSocket.getCurrentSessionCounter()
.zipWith(secureRSocket.getCurrentSessionToken(), (counter, key) -> {
long count = counter.incrementAndGet();
byte[] currentRequestToken = sessionUtil.generateSessionToken(key, data, count);
return sessionUtil.generateRequestToken(currentRequestToken, data, count);
})
.flatMap(requestToken -> {
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
RoutingFlyweight.encode(
metadata,
true,
requestToken,
accessKey,
fromDestination,
generator.nextId(),
route,
metadataToWrap);

Payload wrappedPayload = ByteBufPayload.create(data.retain(), metadata);
payload.release();

return secureRSocket.fireAndForget(wrappedPayload);
});
} catch (Throwable t) {
payload.release();
return Mono.error(t);
}
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
try {
ByteBuf metadataToWrap = payload.sliceMetadata();
ByteBuf route = getRoute();
ByteBuf data = payload.sliceData();
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
ByteBuf metadataToWrap = payload.sliceMetadata();

SecureRSocket secureRSocket = rSocketSupplier.get();
return secureRSocket
.getCurrentSessionCounter()
.flatMap(
counter -> {
long count = counter.incrementAndGet();

return secureRSocket
.getCurrentSessionToken()
.flatMap(
key -> {
byte[] currentRequestToken =
sessionUtil.generateSessionToken(key, data, count);
int requestToken =
sessionUtil.generateRequestToken(currentRequestToken, data, count);
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
RoutingFlyweight.encode(
metadata,
true,
requestToken,
accessKey,
fromDestination,
generator.nextId(),
route,
metadataToWrap);

return secureRSocket.requestResponse(
ByteBufPayload.create(payload.sliceData(), metadata));
});
});
return secureRSocket.getCurrentSessionCounter()
.zipWith(secureRSocket.getCurrentSessionToken(), (counter, key) -> {
long count = counter.incrementAndGet();
byte[] currentRequestToken = sessionUtil.generateSessionToken(key, data, count);
return sessionUtil.generateRequestToken(currentRequestToken, data, count);
})
.flatMap(requestToken -> {
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
RoutingFlyweight.encode(
metadata,
true,
requestToken,
accessKey,
fromDestination,
generator.nextId(),
route,
metadataToWrap);

Payload wrappedPayload = ByteBufPayload.create(data.retain(), metadata);
payload.release();

return secureRSocket.requestResponse(wrappedPayload);
});
} catch (Throwable t) {
payload.release();
return Mono.error(t);
}
}

@Override
public Flux<Payload> requestStream(Payload payload) {
try {
ByteBuf metadataToWrap = payload.sliceMetadata();
ByteBuf route = getRoute();
ByteBuf data = payload.sliceData();

int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
ByteBuf metadataToWrap = payload.sliceMetadata();

SecureRSocket secureRSocket = rSocketSupplier.get();
return secureRSocket
.getCurrentSessionCounter()
.flatMapMany(
counter -> {
long count = counter.incrementAndGet();

return secureRSocket
.getCurrentSessionToken()
.flatMapMany(
key -> {
byte[] currentRequestToken =
sessionUtil.generateSessionToken(key, data, count);
int requestToken =
sessionUtil.generateRequestToken(currentRequestToken, data, count);
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
RoutingFlyweight.encode(
metadata,
true,
requestToken,
accessKey,
fromDestination,
generator.nextId(),
route,
metadataToWrap);

return secureRSocket.requestStream(
ByteBufPayload.create(payload.sliceData(), metadata));
});
});

return secureRSocket.getCurrentSessionCounter()
.zipWith(secureRSocket.getCurrentSessionToken(), (counter, key) -> {
long count = counter.incrementAndGet();
byte[] currentRequestToken = sessionUtil.generateSessionToken(key, data, count);
return sessionUtil.generateRequestToken(currentRequestToken, data, count);
})
.flatMapMany(requestToken -> {
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
RoutingFlyweight.encode(
metadata,
true,
requestToken,
accessKey,
fromDestination,
generator.nextId(),
route,
metadataToWrap);

Payload wrappedPayload = ByteBufPayload.create(data.retain(), metadata);
payload.release();

return secureRSocket.requestStream(wrappedPayload);
});
} catch (Throwable t) {
payload.release();
return Flux.error(t);
}
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
SecureRSocket secureRSocket = rSocketSupplier.get();
ByteBuf route = getRoute();
Flux<Payload> payloadFlux =

SecureRSocket secureRSocket = rSocketSupplier.get();
Flux<Payload> wrappedPayloads =
Flux.from(payloads)
.flatMap(
payload -> {
ByteBuf data = payload.sliceData();
ByteBuf metadataToWrap = payload.sliceMetadata();
int length =
RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);

return secureRSocket
.getCurrentSessionCounter()
.flatMapMany(
counter -> {
long count = counter.incrementAndGet();

return secureRSocket
.getCurrentSessionToken()
.map(
key -> {
byte[] currentRequestToken =
sessionUtil.generateSessionToken(key, data, count);
int requestToken =
sessionUtil.generateRequestToken(
currentRequestToken, data, count);
ByteBuf metadata =
ByteBufAllocator.DEFAULT.directBuffer(length);
RoutingFlyweight.encode(
metadata,
true,
requestToken,
accessKey,
fromDestination,
generator.nextId(),
route,
metadataToWrap);

return ByteBufPayload.create(payload.sliceData(), metadata);
});
});
});

return secureRSocket.requestChannel(payloadFlux);
.concatMap(payload -> {
try {
ByteBuf data = payload.sliceData();
ByteBuf metadataToWrap = payload.sliceMetadata();

return secureRSocket.getCurrentSessionCounter()
.zipWith(secureRSocket.getCurrentSessionToken(), (counter, key) -> {
long count = counter.incrementAndGet();
byte[] currentRequestToken = sessionUtil.generateSessionToken(key, data, count);
return sessionUtil.generateRequestToken(currentRequestToken, data, count);
})
.map(requestToken -> {
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
RoutingFlyweight.encode(
metadata,
true,
requestToken,
accessKey,
fromDestination,
generator.nextId(),
route,
metadataToWrap);

Payload wrappedPayload = ByteBufPayload.create(data.retain(), metadata);
payload.release();

return wrappedPayload;
});
} catch (Throwable t) {
payload.release();
return Flux.error(t);
}
});

return secureRSocket.requestChannel(wrappedPayloads);
}

@Override
public Mono<Void> metadataPush(Payload payload) {
try {
ByteBuf route = getRoute();
ByteBuf unwrappedMetadata = payload.sliceMetadata();
ByteBuf data = payload.sliceData();
ByteBuf metadataToWrap = payload.sliceMetadata();

int length = RoutingFlyweight.computeLength(true, fromDestination, route);
SecureRSocket secureRSocket = rSocketSupplier.get();

return secureRSocket
.getCurrentSessionCounter()
.flatMap(
counter -> {
long count = counter.incrementAndGet();

return secureRSocket
.getCurrentSessionToken()
.flatMap(
key -> {
byte[] currentRequestToken =
sessionUtil.generateSessionToken(key, data, count);
int requestToken =
sessionUtil.generateRequestToken(currentRequestToken, data, count);
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
RoutingFlyweight.encode(
metadata,
true,
requestToken,
accessKey,
fromDestination,
generator.nextId(),
route,
unwrappedMetadata);

return secureRSocket.metadataPush(
ByteBufPayload.create(payload.sliceData(), metadata));
});
});
return secureRSocket.getCurrentSessionCounter()
.zipWith(secureRSocket.getCurrentSessionToken(), (counter, key) -> {
long count = counter.incrementAndGet();
byte[] currentRequestToken = sessionUtil.generateSessionToken(key, data, count);
return sessionUtil.generateRequestToken(currentRequestToken, data, count);
})
.flatMap(requestToken -> {
int length = RoutingFlyweight.computeLength(true, fromDestination, route, metadataToWrap);
ByteBuf metadata = ByteBufAllocator.DEFAULT.directBuffer(length);
RoutingFlyweight.encode(
metadata,
true,
requestToken,
accessKey,
fromDestination,
generator.nextId(),
route,
metadataToWrap);

Payload wrappedPayload = ByteBufPayload.create(data.retain(), metadata);
payload.release();

return secureRSocket.metadataPush(wrappedPayload);
});

} catch (Throwable t) {
payload.release();
return Mono.error(t);
}
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
group=io.netifi.proteus
version=0.6.2
version=0.6.3

0 comments on commit c678c06

Please sign in to comment.