Skip to content

Commit

Permalink
Merge pull request #30 from zeebe-io/json
Browse files Browse the repository at this point in the history
feat(exporter): serialize records as JSON
  • Loading branch information
saig0 authored Feb 17, 2020
2 parents 2828706 + 8f6fdc7 commit eebd51a
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 3 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ In the Zeebe configuration file, you can change
* the ringbuffer's name
* the ringbuffer's capacity
* the ringbuffer's time-to-live
* the record serialization format

Default values:

Expand All @@ -115,6 +116,9 @@ className = "io.zeebe.hazelcast.exporter.HazelcastExporter"
# Hazelcast ringbuffer's time-to-live in seconds
timeToLiveInSeconds = 3600
# record serialization format: [protobuf|json]
format = "protobuf"
```

## Build it from Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public class ExporterConfiguration {
public int capacity = -1;
public int timeToLiveInSeconds = -1;

public String format = "protobuf";

public String enabledValueTypes = "JOB,WORKFLOW_INSTANCE,DEPLOYMENT,INCIDENT";
public String enabledRecordTypes = "EVENT";

Expand All @@ -26,6 +28,8 @@ public String toString() {
+ capacity
+ ", timeToLiveInSeconds="
+ timeToLiveInSeconds
+ ", format="
+ format
+ "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -29,6 +30,8 @@ public class HazelcastExporter implements Exporter {
private HazelcastInstance hazelcast;
private Ringbuffer<byte[]> ringbuffer;

private Function<Record, byte[]> recordTransformer;

@Override
public void configure(Context context) {
logger = context.getLogger();
Expand All @@ -55,6 +58,23 @@ public boolean acceptValue(ValueType valueType) {
return enabledValueTypes.contains(valueType);
}
});

configureFormat();
}

private void configureFormat() {
if (config.format.equalsIgnoreCase("protobuf")) {
recordTransformer = this::recordToProtobuf;

} else if (config.format.equalsIgnoreCase("json")) {
recordTransformer = this::recordToJson;

} else {
throw new IllegalArgumentException(
String.format(
"Expected the parameter 'format' to be one fo 'protobuf' or 'json' but was '%s'",
config.format));
}
}

private Stream<String> parseList(String list) {
Expand Down Expand Up @@ -112,15 +132,20 @@ public void close() {
public void export(Record record) {

if (ringbuffer != null) {
final byte[] protobuf = transformRecord(record);
ringbuffer.add(protobuf);
final byte[] transformedRecord = recordTransformer.apply(record);
ringbuffer.add(transformedRecord);
}

controller.updateLastExportedRecordPosition(record.getPosition());
}

private byte[] transformRecord(Record record) {
private byte[] recordToProtobuf(Record record) {
final Schema.Record dto = RecordTransformer.toGenericRecord(record);
return dto.toByteArray();
}

private byte[] recordToJson(Record record) {
final var json = record.toJson();
return json.getBytes();
}
}
78 changes: 78 additions & 0 deletions exporter/src/test/java/io/zeebe/hazelcast/ExporterJsonTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package io.zeebe.hazelcast;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.ringbuffer.Ringbuffer;
import io.zeebe.client.ZeebeClient;
import io.zeebe.hazelcast.exporter.ExporterConfiguration;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.test.ZeebeTestRule;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;

public class ExporterJsonTest {

private static final BpmnModelInstance WORKFLOW =
Bpmn.createExecutableProcess("process")
.startEvent("start")
.sequenceFlowId("to-task")
.serviceTask("task", s -> s.zeebeTaskType("test"))
.sequenceFlowId("to-end")
.endEvent("end")
.done();

private static final ExporterConfiguration CONFIGURATION = new ExporterConfiguration();

@Rule
public final ZeebeTestRule testRule =
new ZeebeTestRule("zeebe-json.test.cfg.toml", Properties::new);

private ZeebeClient client;
private HazelcastInstance hz;

@Before
public void init() {
client = testRule.getClient();

final ClientConfig clientConfig = new ClientConfig();
clientConfig.getNetworkConfig().addAddress("127.0.0.1:5701");
hz = HazelcastClient.newHazelcastClient(clientConfig);
}

@After
public void cleanUp() {
hz.shutdown();
}

@Test
public void shouldExportEventsAsProtobuf() throws Exception {
// given
final Ringbuffer<byte[]> buffer = hz.getRingbuffer(CONFIGURATION.name);

var sequence = buffer.headSequence();

// when
client.newDeployCommand().addWorkflowModel(WORKFLOW, "process.bpmn").send().join();

// then
final var message = buffer.readOne(sequence);
assertThat(message).isNotNull();

final var jsonRecord = new String(message);

assertThat(jsonRecord)
.startsWith("{")
.endsWith("}")
.contains("\"valueType\":\"DEPLOYMENT\"")
.contains("\"recordType\":\"EVENT\"")
.contains("\"intent\":\"CREATED\"");
}
}
6 changes: 6 additions & 0 deletions exporter/src/test/resources/zeebe-json.test.cfg.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[[exporters]]
id = "hazelcast"
className = "io.zeebe.hazelcast.exporter.HazelcastExporter"

[exporters.args]
format = "json"

0 comments on commit eebd51a

Please sign in to comment.