Skip to content

Commit

Permalink
Merge branch 'master' into dependency-refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
rmichela authored Apr 11, 2023
2 parents bac393d + ae3f588 commit 0e9eb9d
Show file tree
Hide file tree
Showing 14 changed files with 716 additions and 37 deletions.
8 changes: 7 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,15 @@ jobs:
- image: circleci/openjdk:11-jdk
<<: *shared

java-17:
docker:
- image: circleci/openjdk:17-jdk-buster
<<: *shared

workflows:
version: 2
java-8-and-11:
jobs:
- java-8
- java-11
- java-11
- java-17
6 changes: 0 additions & 6 deletions common/reactive-grpc-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@
<artifactId>rxgrpc-stub</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void request(long n) {
private volatile boolean done;
private Throwable error;

private volatile Subscriber<? super T> downstream;
protected volatile Subscriber<? super T> downstream;

private volatile boolean cancelled;

Expand Down Expand Up @@ -226,7 +226,7 @@ private void drainFused(final Subscriber<? super T> subscriber) {

for (;;) {
if (cancelled) {
queue.clear();
discardQueue(queue);
downstream = null;
return;
}
Expand Down Expand Up @@ -283,7 +283,7 @@ private void drain() {

private boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> subscriber, Queue<T> q) {
if (cancelled) {
q.clear();
discardQueue(q);
downstream = null;
return true;
}
Expand All @@ -305,6 +305,7 @@ private boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T>
@Override
public void onNext(T t) {
if (done || cancelled) {
discardElement(t);
return;
}

Expand Down Expand Up @@ -419,7 +420,7 @@ public void cancel() {

if (!outputFused) {
if (WIP.getAndIncrement(this) == 0) {
queue.clear();
discardQueue(queue);
downstream = null;
}
}
Expand Down Expand Up @@ -456,4 +457,10 @@ public boolean isEmpty() {
public void clear() {
queue.clear();
}

protected void discardQueue(Queue<T> q) {
q.clear();
}

protected void discardElement(T t) { }
}
8 changes: 3 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<canteen.plugin.version>1.1.0</canteen.plugin.version>

<!-- Dependency Versions -->
<reactive.streams.version>1.0.3</reactive.streams.version>
<reactive.streams.version>1.0.4</reactive.streams.version>
<grpc.version>1.54.0</grpc.version>
<protoc.version>3.22.2</protoc.version> <!-- Same version as grpc-proto -->
<jprotoc.version>1.2.0</jprotoc.version>
Expand Down Expand Up @@ -308,11 +308,9 @@
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M3</version>
<configuration>
<argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
</configuration>
<version>3.0.0</version>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ public static <TRequest, TResponse> Mono<TResponse> manyToOne(
s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) s),
subscriberAndGRPCProducer::cancel
);
delegate.apply(observerAndPublisher);

return Flux.from(observerAndPublisher)
.singleOrEmpty();
.doOnSubscribe(s -> delegate.apply(observerAndPublisher))
.singleOrEmpty();
} catch (Throwable throwable) {
return Mono.error(throwable);
}
Expand All @@ -134,9 +134,8 @@ public static <TRequest, TResponse> Flux<TResponse> manyToMany(
s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) s),
subscriberAndGRPCProducer::cancel, prefetch, lowTide
);
delegate.apply(observerAndPublisher);

return Flux.from(observerAndPublisher);
return Flux.from(observerAndPublisher).doOnSubscribe(s -> delegate.apply(observerAndPublisher));
} catch (Throwable throwable) {
return Flux.error(throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
import com.salesforce.reactivegrpc.common.AbstractClientStreamObserverAndPublisher;
import com.salesforce.reactivegrpc.common.Consumer;
import io.grpc.stub.CallStreamObserver;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

import java.util.Queue;

/**
* TODO: Explain what this class does.
* @param <T> T
Expand Down Expand Up @@ -46,4 +50,20 @@ public int requestFusion(int requestedMode) {
}
return Fuseable.NONE;
}

@Override
protected void discardQueue(Queue<T> q) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscardQueueWithClear(q, ((CoreSubscriber) downstream).currentContext(), null);
} else {
q.clear();
}
}

@Override
protected void discardElement(T t) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscard(t, ((CoreSubscriber) downstream).currentContext());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import com.salesforce.reactivegrpc.common.Consumer;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ServerCallStreamObserver;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

import java.util.Queue;

/**
* TODO: Explain what this class does.
* @param <T> T
Expand All @@ -38,4 +42,20 @@ public int requestFusion(int requestedMode) {
}
return Fuseable.NONE;
}

@Override
protected void discardQueue(Queue<T> q) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscardQueueWithClear(q, ((CoreSubscriber) downstream).currentContext(), null);
} else {
q.clear();
}
}

@Override
protected void discardElement(T t) {
if (downstream instanceof CoreSubscriber) {
Operators.onDiscard(t, ((CoreSubscriber) downstream).currentContext());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@

package com.salesforce.reactorgrpc.stub;

import java.util.concurrent.ForkJoinPool;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;


public class ReactorClientStreamObserverAndPublisherTest {

Expand All @@ -24,7 +27,7 @@ public class ReactorClientStreamObserverAndPublisherTest {
@Test
public void multiThreadedProducerTest() {
ReactorClientStreamObserverAndPublisher<Integer> processor =
new ReactorClientStreamObserverAndPublisher<>(null);
new ReactorClientStreamObserverAndPublisher<>(null);
int countPerThread = 100000;
TestCallStreamObserverProducer observer =
new TestCallStreamObserverProducer(ForkJoinPool.commonPool(), processor, countPerThread);
Expand All @@ -34,21 +37,70 @@ public void multiThreadedProducerTest() {
.expectNextCount(countPerThread)
.verifyComplete();

Assertions.assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1,
(countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
}

@Test
public void producerFusedTest() {
ReactorClientStreamObserverAndPublisher<Integer> processor =
new ReactorClientStreamObserverAndPublisher<>(null);
new ReactorClientStreamObserverAndPublisher<>(null);
int countPerThread = 100000;
TestCallStreamObserverProducer observer = new TestCallStreamObserverProducer(ForkJoinPool.commonPool(), processor, countPerThread);
TestCallStreamObserverProducer observer = new TestCallStreamObserverProducer(ForkJoinPool.commonPool(),
processor, countPerThread);
processor.beforeStart(observer);
StepVerifier.create(Flux.from(processor))
.expectFusion(Fuseable.ANY, Fuseable.ASYNC)
.expectNextCount(countPerThread)
.verifyComplete();

Assertions.assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1,
(countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
}

@Test
public void discardQueueTest() {
ReactorClientStreamObserverAndPublisher<Integer> processor =
new ReactorClientStreamObserverAndPublisher<>(null);
int countPerThread = 5;
TestCallStreamObserverProducer observer = new TestCallStreamObserverProducer(ForkJoinPool.commonPool(),
processor, countPerThread);
processor.beforeStart(observer);

ConcurrentLinkedQueue<Integer> discardedByObserverAndPublisher = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Integer> discardedByPublishOn = new ConcurrentLinkedQueue<>();

AtomicBoolean firstHandled = new AtomicBoolean();
Flux<Integer> consumer =
Flux.from(processor)
.doOnDiscard(Integer.class, discardedByObserverAndPublisher::add)
.log("processor")
.limitRate(1)
.publishOn(Schedulers.parallel())
.limitRate(1)
.doOnDiscard(Integer.class, discardedByPublishOn::add)
.<Integer>handle((i, sink) -> {
if (firstHandled.compareAndSet(false, true)) {
try {
Thread.sleep(100);
} catch (Exception e) {
// noop
}
sink.next(i);
} else {
sink.complete();
}
})
.log("handled");

StepVerifier.create(consumer)
.expectNext(0)
.verifyComplete();

// 1 is dropped in handle without invoking the discard hook,
assertThat(discardedByObserverAndPublisher).containsExactly(3, 4);
// impl details: processor is able to schedule 2 before it's cancelled
// also, discard hooks are cumulative, so not using containsExactly
assertThat(discardedByPublishOn).contains(2);
}
}
Loading

0 comments on commit 0e9eb9d

Please sign in to comment.