A Slow Event Streaming client for Git
Lego Mindstorms EV3
is the first programmable brick which it is able to run a Linux OS
inside, and
it has Internet connection capabilities
. Using the Brick, you can develop educational robots which
run software to manage data
from Sensors & Actuators. In the ecosystem you could find many awesome examples
but it is not common to use the bricks in a remote way, and it is hard to find Bricks communicating each others outside
of your local network.
In real world, large distributed systems implements Event-Driven architectures (EDA) and Streaming architectures to manage a huge amount of data to solve problems.
Why not explore with the help of Bricks the development of some examples to "imitate"
some architectural patterns
with Bricks?
is a free and open source distributed version control system. On Internet exist some companies
which offer free
git accounts to store the code.
Why not use a Git repository as a persistant event storage for your events?
- The producer
- The consumer
- The reader
- The topics
- The partitions
- Produce Messages
- Consume Messages
- Read Messages
- Initial partition support
- It is not possible to create a Node in High Availability
- It is not possible to send multiple message in parallel with the same Producer Object
public class PingPongDemoTest extends TestContainersBaseTest {
public void given_PingPong_when_execute_then_Ok() {
final String topic1 = "PING";
final String topic2 = "PONG";
final int iterations = 5;
final int timeout = 60;
var playersList = List.of(
new Player(topic1, topic2, iterations, timeout),
new Player(topic2, topic1, iterations, timeout));
var futureRequests = playersList.stream()
var results = futureRequests.stream()
.reduce(0L, (x1, x2) -> x1 + x2);
then(results).isEqualTo((iterations - 1) * 2);
verify(topic1, iterations);
verify(topic2, iterations);
private void verify(String TOPIC, int iterations) {
GitBrokerClient client = GitBrokerClient.builder()
Reader<String> reader = client.newReader()
int counter = 0;
while (true) {
if (reader.hasReachedEndOfTopic()) {
Message<String> value = reader.readNext();
LOGGER.info("{}", counter);
private interface Client<T> {
T run();
CompletableFuture<T> runAsync();
private static class Player implements Client<Long> {
private final String TOPIC_PRODUCE;
private final String TOPIC_CONSUME;
private final int iterations;
private final GitBrokerClient client;
private final Producer<String> producer;
private final Consumer<String> consumer;
private final int timeout;
private final Authentication authentication =
new Authentication("user", "user@my-email.com", "xxx", "yyy");
public Player(
@NonNull String topicProduce,
@NonNull String topicConsume,
@NonNull int iterations,
@NonNull int timeout) {
this.TOPIC_PRODUCE = topicProduce;
this.TOPIC_CONSUME = topicConsume;
this.iterations = iterations;
this.timeout = timeout;
client = GitBrokerClient.builder()
producer = client.newProducer()
consumer = client.newConsumer()
public Long run() {
return IntStream.rangeClosed(1, iterations).boxed()
.mapToLong(x -> {
LOGGER.info("Iteration {}: {}", TOPIC_PRODUCE, x);
Messages<String> messages = consumer.batchReceive();
return StreamSupport.stream(messages.spliterator(), false).count();
.reduce(0L, (x, y) -> x + y);
public CompletableFuture<Long> runAsync() {
LOGGER.info("Thread: {}", Thread.currentThread().getName());
CompletableFuture<Long> future = CompletableFuture
.supplyAsync(() -> run())
.orTimeout(this.timeout, TimeUnit.SECONDS)
.handle((response, ex) -> {
if (!Objects.isNull(ex)) {
LOGGER.error(ex.getLocalizedMessage(), ex);
return response;
return future;
public void given_MultipleProducers_when_sendAsyncInParallel_then_Ok() {
Authentication authentication =
new Authentication("user", "user@my-email.com", "xxx", "yyy");
GitBrokerClient client = GitBrokerClient.builder()
final String topic = "PINGPONG";
Producer<String> producer = client.newProducer()
Producer<String> producer2 = client.newProducer()
final String message = "Hello World";
var futures = List.of(
var list = futures.stream()
.peek(s -> LOGGER.info(s))
public void given_MultipleConsumers_when_batchReceiveAsync_then_Ok() {
Authentication authentication =
new Authentication("user", "user@my-email.com", "xxx", "yyy");
GitBrokerClient client = GitBrokerClient.builder()
final String topic = "PINGPONG";
Producer<String> producer = client.newProducer()
String expectedMessage = "Hello World";
final String node1 = "PING-NODE1";
final String node2 = "PING-NODE2";
Consumer<String> consumer1 = client.newConsumer()
Consumer<String> consumer2 = client.newConsumer()
var futures = List.of(
var list = futures.stream()
.flatMap(messages ->
StreamSupport.stream(messages.spliterator(), false)
.map(m -> m.getValue()))
.peek(s -> LOGGER.info(s))
Reader<String> reader = client.newReader()
int counter = 0;
while (true) {
if (reader.hasReachedEndOfTopic()) {
Message<String> value = reader.readNext();
LOGGER.info("D: {}", value.getValue());
Current default JVM has an issue with CA Certificates
and it is necessary to
install a complete JDK. Create a ssh session with your EV3 Brick and execute
the following steps:
wget https://ci.adoptopenjdk.net/view/ev3dev/job/eljbuild/job/stretch-14/lastSuccessfulBuild/artifact/build/jdk-ev3.tar.gz
sudo tar -zxvf jdk-ev3.tar.gz -C /opt
sudo mv /opt/jdk/ /opt/jdk-14
sudo update-alternatives --install /usr/bin/java java /opt/jdk-14/bin/java 2014
java -version
You should see:
openjdk version "14" 2020-03-17
OpenJDK Runtime Environment (build 14+36-ev3-unreleased)
OpenJDK Client VM (build 14+36-ev3-unreleased, mixed mode, sharing)
In order to use this library, you need to add the following dependency:
mvn clean test
# Generate Checkstyle report
mvn clean site -DskipTests
mvn clean test site