forked from krka/futures-guide
-
Notifications
You must be signed in to change notification settings - Fork 0
/
OnTimeoutTest.java
148 lines (126 loc) · 4.29 KB
/
OnTimeoutTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package se.krka.futures;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class OnTimeoutTest {
@Test
public void testTimeoutNotBlocking() throws Exception {
waitForNotBlocking();
assertTrue(measureTimeout() < 200);
}
@Test
public void testTimeoutNotBlockingTwice() throws Exception {
waitForNotBlocking();
assertTrue(measureTimeout() < 200);
assertTrue(measureTimeout() < 200);
}
@Test
public void testTimeoutBlocking() throws Exception {
waitForNotBlocking();
try (Killer killer = new Killer(killTimeout())) {
expectFailure();
}
}
@Test
public void testTimeoutBlocking2() throws Exception {
waitForNotBlocking();
try (Killer killer = new Killer(killTimeout2())) {
expectFailure();
}
}
@Test
public void testTimeoutBlockingWithDelayedExecutor() throws Exception {
waitForNotBlocking();
try (Killer killer = new Killer(killTimeout3())) {
expectFailure();
}
}
private void expectFailure() throws Exception {
long time = measureTimeout();
assertTrue("time was " + time, time >= 900);
}
private long measureTimeout() throws InterruptedException, TimeoutException {
CompletableFuture<String> timeoutFuture = new CompletableFuture<String>().orTimeout(1, TimeUnit.MILLISECONDS);
long t1 = System.currentTimeMillis();
try {
timeoutFuture.get(10, TimeUnit.SECONDS);
throw new AssertionError("Unreachable");
} catch (ExecutionException e) {
assertEquals(TimeoutException.class, Util.exceptionFromCallback(timeoutFuture).getClass());
assertEquals(TimeoutException.class, e.getCause().getClass());
long t2 = System.currentTimeMillis();
return t2 - t1;
}
}
private CompletableFuture<String> killTimeout() {
waitForNotBlocking();
CompletableFuture<String> future = new CompletableFuture<String>()
.orTimeout(10, TimeUnit.MILLISECONDS)
.exceptionally(Throwable::getMessage)
.thenApply(s -> sleepOnThread(s, 1000));
waitForBlocking();
return future;
}
private CompletableFuture<String> killTimeout2() {
CompletableFuture<String> future = new CompletableFuture<String>()
.completeOnTimeout("", 10, TimeUnit.MILLISECONDS)
.thenApply(s -> sleepOnThread(s, 1000));
waitForBlocking();
return future;
}
private CompletableFuture<String> killTimeout3() {
Executor executor = CompletableFuture.delayedExecutor(1, TimeUnit.MILLISECONDS, MoreExecutors.directExecutor());
CompletableFuture<String> future = new CompletableFuture<>();
executor.execute(() -> {
sleepOnThread(null, 1000);
future.complete("value");
});
waitForBlocking();
return future;
}
private static final AtomicBoolean BLOCKING_COMPLETABLE_FUTURE_DELAY_SCHEDULER = new AtomicBoolean(false);
private <T> T sleepOnThread(T value, int millis) {
if (Util.currThread().equals("CompletableFutureDelayScheduler")) {
BLOCKING_COMPLETABLE_FUTURE_DELAY_SCHEDULER.set(true);
}
try {
System.out.println("Sleeping on " + Util.currThread() + " for " + millis + " ms");
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (Util.currThread().equals("CompletableFutureDelayScheduler")) {
BLOCKING_COMPLETABLE_FUTURE_DELAY_SCHEDULER.set(false);
}
}
return value;
}
private void waitForBlocking() {
while (!BLOCKING_COMPLETABLE_FUTURE_DELAY_SCHEDULER.get()) {
Thread.yield();
}
}
private void waitForNotBlocking() {
while (BLOCKING_COMPLETABLE_FUTURE_DELAY_SCHEDULER.get()) {
Thread.yield();
}
}
private static class Killer implements Closeable {
private final CompletableFuture<String> future;
private Killer(CompletableFuture<String> future) {
this.future = future;
}
@Override
public void close() {
future.join();
}
}
}