Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
Signed-off-by: georgi-l95 <glazarov95@gmail.com>
  • Loading branch information
georgi-l95 committed Dec 16, 2024
1 parent 1eb06b9 commit 713b515
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Defines the configuration data for the block stream in the Hedera Block Simulator.
*
* @param simulatorMode the mode of the simulator, in terms of publishing, consuming or both
* @param lastKnownStatusesCapacity the capacity of the last known statuses
* @param delayBetweenBlockItems the delay in microseconds between streaming each block item
* @param maxBlockItemsToStream the maximum number of block items to stream before stopping
* @param streamingMode the mode of streaming for the block stream (e.g., time-based, count-based)
Expand All @@ -38,7 +39,7 @@ public record BlockStreamConfig(
@ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems,
@ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream,
@ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode,
@ConfigProperty(defaultValue = "10") int millisecondsPerBlock,
@ConfigProperty(defaultValue = "1000") int millisecondsPerBlock,
@ConfigProperty(defaultValue = "1000") int blockItemsBatchSize) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.inject.Inject;
Expand All @@ -56,13 +57,14 @@ public class ConsumerStreamGrpcClientImpl implements ConsumerStreamGrpcClient {

// State
private final int lastKnownStatusesCapacity;
private final ArrayDeque<String> lastKnownStatuses;
private final Deque<String> lastKnownStatuses;
private CountDownLatch streamLatch;

/**
* Constructs a new ConsumerStreamGrpcClientImpl with the specified configuration and metrics service.
*
* @param grpcConfig The configuration for gRPC connection settings
* @param blockStreamConfig The configuration for the block stream
* @param metricsService The service for recording consumption metrics
* @throws NullPointerException if any parameter is null
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CountDownLatch;

Expand All @@ -44,21 +44,22 @@ public class ConsumerStreamObserver implements StreamObserver<SubscribeStreamRes
// State
private final CountDownLatch streamLatch;
private final int lastKnownStatusesCapacity;
private final ArrayDeque<String> lastKnownStatuses;
private final Deque<String> lastKnownStatuses;

/**
* Constructs a new ConsumerStreamObserver.
*
* @param metricsService The service for recording consumption metrics
* @param streamLatch A latch used to coordinate stream completion
* @param lastKnownStatuses List to store the most recent status messages
* @param lastKnownStatusesCapacity the capacity of the last known statuses
* @throws NullPointerException if any parameter is null
*/
public ConsumerStreamObserver(
@NonNull final MetricsService metricsService,
@NonNull final CountDownLatch streamLatch,
@NonNull final ArrayDeque<String> lastKnownStatuses,
@NonNull final int lastKnownStatusesCapacity) {
@NonNull final Deque<String> lastKnownStatuses,
final int lastKnownStatusesCapacity) {
this.metricsService = requireNonNull(metricsService);
this.streamLatch = requireNonNull(streamLatch);
this.lastKnownStatuses = requireNonNull(lastKnownStatuses);
Expand All @@ -75,7 +76,7 @@ public ConsumerStreamObserver(
public void onNext(SubscribeStreamResponse subscribeStreamResponse) {
final SubscribeStreamResponse.ResponseCase responseType = subscribeStreamResponse.getResponseCase();
if (lastKnownStatuses.size() == lastKnownStatusesCapacity) {
lastKnownStatuses.removeFirst();
lastKnownStatuses.pollFirst();

Check warning on line 79 in simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java

View check run for this annotation

Codecov / codecov/patch

simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java#L79

Added line #L79 was not covered by tests
}
lastKnownStatuses.add(subscribeStreamResponse.toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
Expand All @@ -64,7 +65,7 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {
// State
private final AtomicBoolean streamEnabled;
private final int lastKnownStatusesCapacity;
private final ArrayDeque<String> lastKnownStatuses;
private final Deque<String> lastKnownStatuses;

/**
* Creates a new PublishStreamGrpcClientImpl with the specified dependencies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -37,19 +37,20 @@ public class PublishStreamObserver implements StreamObserver<PublishStreamRespon
// State
private final AtomicBoolean streamEnabled;
private final int lastKnownStatusesCapacity;
private final ArrayDeque<String> lastKnownStatuses;
private final Deque<String> lastKnownStatuses;

/**
* Creates a new PublishStreamObserver instance.
*
* @param streamEnabled Controls whether streaming should continue
* @param lastKnownStatuses List to store the most recent status messages
* @param lastKnownStatusesCapacity the capacity of the last known statuses
* @throws NullPointerException if any parameter is null
*/
public PublishStreamObserver(
@NonNull final AtomicBoolean streamEnabled,
@NonNull final ArrayDeque<String> lastKnownStatuses,
@NonNull final int lastKnownStatusesCapacity) {
@NonNull final Deque<String> lastKnownStatuses,
final int lastKnownStatusesCapacity) {
this.streamEnabled = requireNonNull(streamEnabled);
this.lastKnownStatuses = requireNonNull(lastKnownStatuses);
this.lastKnownStatusesCapacity = lastKnownStatusesCapacity;
Expand All @@ -63,7 +64,7 @@ public PublishStreamObserver(
@Override
public void onNext(PublishStreamResponse publishStreamResponse) {
if (lastKnownStatuses.size() == lastKnownStatusesCapacity) {
lastKnownStatuses.removeFirst();
lastKnownStatuses.pollFirst();

Check warning on line 67 in simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java

View check run for this annotation

Codecov / codecov/patch

simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java#L67

Added line #L67 was not covered by tests
}
lastKnownStatuses.add(publishStreamResponse.toString());
LOGGER.log(INFO, "Received Response: " + publishStreamResponse.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hedera.block.suites.grpc.positive;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.hedera.block.simulator.BlockStreamSimulatorApp;
Expand Down Expand Up @@ -74,10 +73,10 @@ void verifyPublishBlockStreamEndpoint() throws IOException, InterruptedException
blockStreamSimulatorApp.stop();
StreamStatus streamStatus = blockStreamSimulatorApp.getStreamStatus();
assertTrue(streamStatus.publishedBlocks() > 0);
// We just need to make sure that number of published blocks is equal or greater than the statuses. Statuses are tracked in a queue to avoid unnecessary memory usage, therefore will always be less or equal to published.
assertTrue(
streamStatus.publishedBlocks() >=
streamStatus.lastKnownPublisherStatuses().size());
// We just need to make sure that number of published blocks is equal or greater than the statuses. Statuses are
// tracked in a queue to avoid unnecessary memory usage, therefore will always be less or equal to published.
assertTrue(streamStatus.publishedBlocks()
>= streamStatus.lastKnownPublisherStatuses().size());

// Verify each status contains the word "acknowledgement"
streamStatus
Expand Down

0 comments on commit 713b515

Please sign in to comment.