generated from DataStax-Examples/datastax-examples-template
-
Notifications
You must be signed in to change notification settings - Fork 5
/
LimitConcurrencyCustomAsync.java
190 lines (166 loc) · 7.13 KB
/
LimitConcurrencyCustomAsync.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.examples;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
/**
* Creates a keyspace and table, and loads data using an async API.
*
* <p>This example makes usage of a {@link CqlSession#executeAsync(String)} method, which is
* responsible for executing requests in a non-blocking way. It uses {@link CompletableFuture} to
* limit number of concurrent request to {@code CONCURRENCY_LEVEL}.
*
* <p>Preconditions:
*
* <ul>
* <li>An Apache Cassandra(R) cluster is running and accessible through the contact points
* identified by basic.contact-points (see application.conf).
* </ul>
*
* <p>Side effects:
*
* <ul>
* <li>creates a new keyspace "examples" in the session. If a keyspace with this name already
* exists, it will be reused;
* <li>creates a table "examples.tbl_sample_kv". If it exist already, it will be reused;
* <li>inserts a TOTAL_NUMBER_OF_INSERTS of rows into the table.
* </ul>
*
* @see <a href="http://datastax.github.io/java-driver/manual/">Java driver online manual</a>
*/
public class LimitConcurrencyCustomAsync {
private static final int CONCURRENCY_LEVEL = 32;
private static final int TOTAL_NUMBER_OF_INSERTS = 10_000;
// Used to track number of total inserts
private static final AtomicInteger INSERTS_COUNTER = new AtomicInteger();
public static void main(String[] args) throws InterruptedException, ExecutionException {
try (CqlSession session = new CqlSessionBuilder().build()) {
createSchema(session);
insertConcurrent(session);
}
}
private static void insertConcurrent(CqlSession session)
throws InterruptedException, ExecutionException {
PreparedStatement pst =
session.prepare(
insertInto("examples", "tbl_sample_kv")
.value("id", bindMarker("id"))
.value("value", bindMarker("value"))
.build());
// Construct CONCURRENCY_LEVEL number of ranges.
// Each range will be executed independently.
List<Range> ranges = createRanges(CONCURRENCY_LEVEL, TOTAL_NUMBER_OF_INSERTS);
// List of pending CONCURRENCY_LEVEL features that we will wait for at the end of the program.
List<CompletableFuture<?>> pending = new ArrayList<>();
// Every range will have dedicated CompletableFuture handling the execution.
for (Range range : ranges) {
pending.add(executeOneAtATime(session, pst, range));
}
// Wait for completion of all CONCURRENCY_LEVEL pending CompletableFeatures
CompletableFuture.allOf(pending.toArray(new CompletableFuture[0])).get();
System.out.println(
String.format(
"LimitConcurrencyCustomAsync finished executing %s queries with a concurrency level of %s.",
INSERTS_COUNTER.get(), CONCURRENCY_LEVEL));
}
private static CompletableFuture<?> executeOneAtATime(
CqlSession session, PreparedStatement pst, Range range) {
CompletableFuture<?> lastFeature = null;
for (int i = range.getFrom(); i < range.getTo(); i++) {
int counter = i;
// If this is a first request init the lastFeature.
if (lastFeature == null) {
lastFeature = executeInsert(session, pst, counter);
} else {
// If lastFeature is already created, chain next async action.
// The next action will execute only after the lastFeature will finish.
// If the lastFeature finishes with failure, the subsequent chained executions
// will not be invoked. If you wish to alter that behaviour and recover from failure
// add the exceptionally() call after whenComplete() of lastFeature.
lastFeature = lastFeature.thenComposeAsync((ignored) -> executeInsert(session, pst, counter));
}
}
return lastFeature;
}
private static CompletableFuture<? extends AsyncResultSet> executeInsert(
CqlSession session, PreparedStatement pst, int counter) {
return session
.executeAsync(pst.bind().setUuid("id", UUID.randomUUID()).setInt("value", counter))
.toCompletableFuture()
.whenComplete(
(asyncResultSet, throwable) -> {
if (throwable == null) {
// When the Feature completes and there is no exception - increment counter.
INSERTS_COUNTER.incrementAndGet();
} else {
// On production you should leverage logger and use logger.error() method.
throwable.printStackTrace();
}
});
}
private static List<Range> createRanges(int concurrencyLevel, int totalNumberOfInserts) {
ArrayList<Range> ranges = new ArrayList<>();
int numberOfElementsInRange = totalNumberOfInserts / concurrencyLevel;
// Create concurrencyLevel number of Ranges.
for (int i = 0; i < concurrencyLevel; i++) {
// If this is a last range give it all remaining elements.
// It may be longer than numberOfElementsInRange in case of
// totalNumberOfInserts / concurrencyLevel will return floating point number.
if (i == concurrencyLevel - 1) {
ranges.add(new Range(i * numberOfElementsInRange, totalNumberOfInserts));
} else {
// Construct Ranges with numberOfElementsInRange elements.
ranges.add(new Range(i * numberOfElementsInRange, (i + 1) * numberOfElementsInRange));
}
}
return ranges;
}
private static void createSchema(CqlSession session) {
session.execute(
"CREATE KEYSPACE IF NOT EXISTS examples "
+ "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute(
"CREATE TABLE IF NOT EXISTS examples.tbl_sample_kv (id uuid, value int, PRIMARY KEY (id))");
}
private static class Range {
private final int from;
private final int to;
private Range(int from, int to) {
this.from = from;
this.to = to;
}
public int getFrom() {
return from;
}
public int getTo() {
return to;
}
@Override
public String toString() {
return "Range{" + "from=" + from + ", to=" + to + '}';
}
}
}