Skip to content

Commit

Permalink
chore: Batch ack requests and sink responses for better performance (#…
Browse files Browse the repository at this point in the history
…149)

Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 authored Oct 31, 2024
1 parent 0a803d7 commit 134414a
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ public void read(ReadRequest request, OutputObserver observer) {

@Override
public void ack(AckRequest request) {
Long offset = Longs.fromByteArray(request.getOffset().getValue());
// remove the acknowledged messages from the map
messages.remove(offset);
for (Offset offset : request.getOffsets()) {
Long decoded_offset = Longs.fromByteArray(offset.getValue());
// remove the acknowledged messages from the map
messages.remove(decoded_offset);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@ public void test_ReadAndAck() {
offsets.add(message.getOffset());
}

for (Offset offset : offsets) {
SourcerTestKit.TestAckRequest ackRequest = SourcerTestKit.TestAckRequest.builder()
.offset(offset).build();
simpleSource.ack(ackRequest);
}
SourcerTestKit.TestAckRequest ackRequest = SourcerTestKit.TestAckRequest.builder()
.offsets(offsets).build();
simpleSource.ack(ackRequest);

// Try reading 6 more messages
// Since the previous batch got acked, the data source should allow us to read more messages
Expand Down
21 changes: 10 additions & 11 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ public void onNext(SinkOuterClass.SinkRequest request) {
datumStream.writeMessage(HandlerDatum.EOF_DATUM);

ResponseList responses = result.join();
responses.getResponses().forEach(response -> {
SinkOuterClass.SinkResponse sinkResponse = buildResponse(response);
responseObserver.onNext(sinkResponse);
});
SinkOuterClass.SinkResponse.Builder responseBuilder = SinkOuterClass.SinkResponse.newBuilder();
for (Response response : responses.getResponses()) {
responseBuilder.addResults(buildResult(response));
}
responseObserver.onNext(responseBuilder.build());

// send eot response to indicate end of transmission for the batch
SinkOuterClass.SinkResponse eotResponse = SinkOuterClass.SinkResponse
Expand Down Expand Up @@ -113,15 +114,13 @@ public void onCompleted() {
};
}

private SinkOuterClass.SinkResponse buildResponse(Response response) {
private SinkOuterClass.SinkResponse.Result buildResult(Response response) {
SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK :
response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE;
return SinkOuterClass.SinkResponse.newBuilder()
.setResult(SinkOuterClass.SinkResponse.Result.newBuilder()
.setId(response.getId() == null ? "" : response.getId())
.setErrMsg(response.getErr() == null ? "" : response.getErr())
.setStatus(status)
.build())
return SinkOuterClass.SinkResponse.Result.newBuilder()
.setId(response.getId() == null ? "" : response.getId())
.setErrMsg(response.getErr() == null ? "" : response.getErr())
.setStatus(status)
.build();
}

Expand Down
23 changes: 13 additions & 10 deletions src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,22 @@ public void onCompleted() {
if (result.getHandshake().getSot()) {
continue;
}

if (result.hasStatus() && result.getStatus().getEot()) {
continue;
}
if (result.getResult().getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(result
.getResult()
.getId()));
} else if (result.getResult().getStatus() == SinkOuterClass.Status.FALLBACK) {
responseListBuilder.addResponse(Response.responseFallback(
result.getResult().getId()));
} else {
responseListBuilder.addResponse(Response.responseFailure(
result.getResult().getId(), result.getResult().getErrMsg()));

for (SinkOuterClass.SinkResponse.Result response : result.getResultsList()) {
if (response.getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(response
.getId()));
} else if (response.getStatus() == SinkOuterClass.Status.FALLBACK) {
responseListBuilder.addResponse(Response.responseFallback(
response.getId()));
} else {
responseListBuilder.addResponse(Response.responseFailure(
response.getId(), response.getErrMsg()));
}
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/numaproj/numaflow/sourcer/AckRequest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.numaproj.numaflow.sourcer;


import java.util.List;

/**
* AckRequest request for acknowledging messages.
*/
public interface AckRequest {
/**
* @return the offset to be acknowledged
* @return the list of offsets to be acknowledged
*/
Offset getOffset();
List<Offset> getOffsets();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

import lombok.AllArgsConstructor;

import java.util.List;

/**
* AckRequestImpl is the implementation of AckRequest.
*/
@AllArgsConstructor
class AckRequestImpl implements AckRequest {
private final Offset offset;
private final List<Offset> offsets;

@Override
public Offset getOffset() {
return this.offset;
public List<Offset> getOffsets() {
return this.offsets;
}
}
13 changes: 8 additions & 5 deletions src/main/java/io/numaproj/numaflow/sourcer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.numaproj.numaflow.source.v1.SourceOuterClass;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import static io.numaproj.numaflow.source.v1.SourceGrpc.getPendingFnMethod;
Expand Down Expand Up @@ -115,14 +116,16 @@ public void onNext(SourceOuterClass.AckRequest request) {
return;
}

SourceOuterClass.Offset offset = request.getRequest().getOffset();
List<Offset> offsets = new ArrayList<>(request.getRequest().getOffsetsCount());
for (SourceOuterClass.Offset offset : request.getRequest().getOffsetsList()) {
offsets.add(new Offset(
offset.getOffset().toByteArray(),
offset.getPartitionId()));
}

AckRequestImpl ackRequest = new AckRequestImpl(new Offset(
offset.getOffset().toByteArray(),
offset.getPartitionId()));
AckRequestImpl ackRequest = new AckRequestImpl(offsets);

// invoke the sourcer's ack method

sourcer.ack(ackRequest);

// send an ack response to the client after acking the message
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/io/numaproj/numaflow/sourcer/SourcerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* SourcerTestKit is a test kit for testing Sourcer implementations.
Expand Down Expand Up @@ -165,14 +166,15 @@ public void onCompleted() {
*/
public void sendAckRequest(AckRequest request) throws Exception {
CompletableFuture<SourceOuterClass.AckResponse> future = new CompletableFuture<>();

SourceOuterClass.AckRequest.Request.Builder builder = SourceOuterClass.AckRequest.Request
.newBuilder()
.setOffset(SourceOuterClass.Offset.newBuilder()
.setOffset(com.google.protobuf.ByteString.copyFrom(request
.getOffset()
.getValue()))
.setPartitionId(request.getOffset().getPartitionId())
.build());
.addAllOffsets(request.getOffsets().stream().map(
offset -> SourceOuterClass.Offset.newBuilder()
.setOffset(com.google.protobuf.ByteString.copyFrom(offset.getValue()))
.setPartitionId(offset.getPartitionId())
.build()
).collect(Collectors.toList()));

SourceOuterClass.AckRequest grpcRequest = SourceOuterClass.AckRequest.newBuilder()
.setRequest(builder.build())
Expand Down Expand Up @@ -288,7 +290,7 @@ public static class TestReadRequest implements ReadRequest {
@Setter
@Builder
public static class TestAckRequest implements AckRequest {
Offset offset;
List<Offset> offsets;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/proto/sink/v1/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ message SinkResponse {
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
Result result = 1;
repeated Result results = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
4 changes: 2 additions & 2 deletions src/main/proto/source/v1/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ message ReadResponse {
*/
message AckRequest {
message Request {
// Required field holding the offset to be acked
Offset offset = 1;
// Required field holding the offsets to be acked
repeated Offset offsets = 1;
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
Expand Down
14 changes: 7 additions & 7 deletions src/test/java/io/numaproj/numaflow/sinker/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,19 @@ public void sinkerSuccess() {

while (!outputStreamObserver.completed.get()) ;
List<SinkOuterClass.SinkResponse> responseList = outputStreamObserver.getSinkResponse();
assertEquals(111, responseList.size());
assertEquals(21, responseList.size());
// first response is the handshake response
assertTrue(responseList.get(0).getHandshake().getSot());

responseList = responseList.subList(1, responseList.size());
responseList.forEach(response -> {
if (response.hasStatus() && response.getStatus().getEot()) {
var response = responseList.get(0);
response.getResultsList().forEach(result -> {
if (result.getStatus() == SinkOuterClass.Status.FAILURE) {
assertEquals(result.getErrMsg(), "error message");
return;
}
assertEquals(response.getResult().getId(), expectedId);
if (response.getResult().getStatus() == SinkOuterClass.Status.FAILURE) {
assertEquals(response.getResult().getErrMsg(), "error message");
}
assertEquals(result.getId(), expectedId);
assertEquals(result.getStatus(), SinkOuterClass.Status.SUCCESS);
});
}

Expand Down
8 changes: 5 additions & 3 deletions src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void onNext(SourceOuterClass.ReadResponse readResponse) {
.newBuilder()
.getRequest()
.toBuilder()
.setOffset(offset)
.addOffsets(offset)
.build();
ackRequests.add(SourceOuterClass.AckRequest
.newBuilder()
Expand Down Expand Up @@ -243,8 +243,10 @@ public List<Integer> getPartitions() {

@Override
public void ack(AckRequest request) {
Integer offset = ByteBuffer.wrap(request.getOffset().getValue()).getInt();
yetToBeAcked.remove(offset);
for (Offset offset : request.getOffsets()) {
Integer decoded_offset = ByteBuffer.wrap(offset.getValue()).getInt();
yetToBeAcked.remove(decoded_offset);
}
}

@Override
Expand Down

0 comments on commit 134414a

Please sign in to comment.