diff --git a/README.md b/README.md index 601bcac..06d4e4b 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,7 @@ In the Zeebe configuration file, you can change * the ringbuffer's name * the ringbuffer's capacity * the ringbuffer's time-to-live +* the record serialization format Default values: @@ -115,6 +116,9 @@ className = "io.zeebe.hazelcast.exporter.HazelcastExporter" # Hazelcast ringbuffer's time-to-live in seconds timeToLiveInSeconds = 3600 + + # record serialization format: [protobuf|json] + format = "protobuf" ``` ## Build it from Source diff --git a/exporter/src/main/java/io/zeebe/hazelcast/exporter/ExporterConfiguration.java b/exporter/src/main/java/io/zeebe/hazelcast/exporter/ExporterConfiguration.java index 160883d..37a8046 100644 --- a/exporter/src/main/java/io/zeebe/hazelcast/exporter/ExporterConfiguration.java +++ b/exporter/src/main/java/io/zeebe/hazelcast/exporter/ExporterConfiguration.java @@ -9,6 +9,8 @@ public class ExporterConfiguration { public int capacity = -1; public int timeToLiveInSeconds = -1; + public String format = "protobuf"; + public String enabledValueTypes = "JOB,WORKFLOW_INSTANCE,DEPLOYMENT,INCIDENT"; public String enabledRecordTypes = "EVENT"; @@ -26,6 +28,8 @@ public String toString() { + capacity + ", timeToLiveInSeconds=" + timeToLiveInSeconds + + ", format=" + + format + "]"; } } diff --git a/exporter/src/main/java/io/zeebe/hazelcast/exporter/HazelcastExporter.java b/exporter/src/main/java/io/zeebe/hazelcast/exporter/HazelcastExporter.java index f9e3e29..cf7c592 100644 --- a/exporter/src/main/java/io/zeebe/hazelcast/exporter/HazelcastExporter.java +++ b/exporter/src/main/java/io/zeebe/hazelcast/exporter/HazelcastExporter.java @@ -17,6 +17,7 @@ import java.util.Arrays; import java.util.List; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -29,6 +30,8 @@ public class HazelcastExporter implements Exporter { private HazelcastInstance hazelcast; private Ringbuffer ringbuffer; + private Function recordTransformer; + @Override public void configure(Context context) { logger = context.getLogger(); @@ -55,6 +58,23 @@ public boolean acceptValue(ValueType valueType) { return enabledValueTypes.contains(valueType); } }); + + configureFormat(); + } + + private void configureFormat() { + if (config.format.equalsIgnoreCase("protobuf")) { + recordTransformer = this::recordToProtobuf; + + } else if (config.format.equalsIgnoreCase("json")) { + recordTransformer = this::recordToJson; + + } else { + throw new IllegalArgumentException( + String.format( + "Expected the parameter 'format' to be one fo 'protobuf' or 'json' but was '%s'", + config.format)); + } } private Stream parseList(String list) { @@ -112,15 +132,20 @@ public void close() { public void export(Record record) { if (ringbuffer != null) { - final byte[] protobuf = transformRecord(record); - ringbuffer.add(protobuf); + final byte[] transformedRecord = recordTransformer.apply(record); + ringbuffer.add(transformedRecord); } controller.updateLastExportedRecordPosition(record.getPosition()); } - private byte[] transformRecord(Record record) { + private byte[] recordToProtobuf(Record record) { final Schema.Record dto = RecordTransformer.toGenericRecord(record); return dto.toByteArray(); } + + private byte[] recordToJson(Record record) { + final var json = record.toJson(); + return json.getBytes(); + } } diff --git a/exporter/src/test/java/io/zeebe/hazelcast/ExporterJsonTest.java b/exporter/src/test/java/io/zeebe/hazelcast/ExporterJsonTest.java new file mode 100644 index 0000000..9d13394 --- /dev/null +++ b/exporter/src/test/java/io/zeebe/hazelcast/ExporterJsonTest.java @@ -0,0 +1,78 @@ +package io.zeebe.hazelcast; + +import com.hazelcast.client.HazelcastClient; +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.ringbuffer.Ringbuffer; +import io.zeebe.client.ZeebeClient; +import io.zeebe.hazelcast.exporter.ExporterConfiguration; +import io.zeebe.model.bpmn.Bpmn; +import io.zeebe.model.bpmn.BpmnModelInstance; +import io.zeebe.test.ZeebeTestRule; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ExporterJsonTest { + + private static final BpmnModelInstance WORKFLOW = + Bpmn.createExecutableProcess("process") + .startEvent("start") + .sequenceFlowId("to-task") + .serviceTask("task", s -> s.zeebeTaskType("test")) + .sequenceFlowId("to-end") + .endEvent("end") + .done(); + + private static final ExporterConfiguration CONFIGURATION = new ExporterConfiguration(); + + @Rule + public final ZeebeTestRule testRule = + new ZeebeTestRule("zeebe-json.test.cfg.toml", Properties::new); + + private ZeebeClient client; + private HazelcastInstance hz; + + @Before + public void init() { + client = testRule.getClient(); + + final ClientConfig clientConfig = new ClientConfig(); + clientConfig.getNetworkConfig().addAddress("127.0.0.1:5701"); + hz = HazelcastClient.newHazelcastClient(clientConfig); + } + + @After + public void cleanUp() { + hz.shutdown(); + } + + @Test + public void shouldExportEventsAsProtobuf() throws Exception { + // given + final Ringbuffer buffer = hz.getRingbuffer(CONFIGURATION.name); + + var sequence = buffer.headSequence(); + + // when + client.newDeployCommand().addWorkflowModel(WORKFLOW, "process.bpmn").send().join(); + + // then + final var message = buffer.readOne(sequence); + assertThat(message).isNotNull(); + + final var jsonRecord = new String(message); + + assertThat(jsonRecord) + .startsWith("{") + .endsWith("}") + .contains("\"valueType\":\"DEPLOYMENT\"") + .contains("\"recordType\":\"EVENT\"") + .contains("\"intent\":\"CREATED\""); + } +} diff --git a/exporter/src/test/resources/zeebe-json.test.cfg.toml b/exporter/src/test/resources/zeebe-json.test.cfg.toml new file mode 100644 index 0000000..71138f8 --- /dev/null +++ b/exporter/src/test/resources/zeebe-json.test.cfg.toml @@ -0,0 +1,6 @@ +[[exporters]] +id = "hazelcast" +className = "io.zeebe.hazelcast.exporter.HazelcastExporter" + + [exporters.args] + format = "json"