OmniStream is a high-performance library designed to seamlessly synchronize data queues across multiple hosts over a network. Inspired by the seamless flow of the river and the all-encompassing journey in Hermann Hesse's Siddhartha, OmniStream offers a robust solution for real-time data streaming and synchronization with an emphasis on simplicity and integrity.
To include OmniStream in your project, add the following dependency to your project's build file:
<dependency>
<groupId>io.github.vuhoangha</groupId>
<artifactId>omni-stream</artifactId>
<version>1.0.15</version>
</dependency>
dependencies {
implementation 'io.github.vuhoangha:omni-stream:1.0.15'
}
- High Throughput: Utilizes Chronicle Queue, LMAX Disruptor, and ZeroMQ to ensure high-speed data processing and low latency.
- Scalability: Effortlessly scales across multiple nodes to accommodate growing data demands.
- Resilience: Built to handle failures gracefully, ensuring continuous data availability.
- Simplicity: Easy to set up and integrate into existing infrastructures with minimal configuration.
Ensure you have the following installed:
- Java 8 or higher
- Maven or Gradle
OmniStream is compatible with Java 8, Java 11, and the latest long-term support version, Java 17. It is possible to run all new releases under Java 17 on the classpath (not yet under the module path).
Java 17 introduces "Strongly Encapsulated JDK Internals" (JEP 403), enhancing the robustness and security of the execution environment. Since Chronicle libraries leverage a tightly integrated subset of JDK internals for performance, certain adjustments are necessary when migrating to Java 17. These adjustments are also recommended when running under Java 11 to avoid warnings.
For applications explicitly started with the java
command, the following command line parameters need to be included
to ensure compatibility with Java 11 and Java 17:
--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED
--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-opens=jdk.compiler/com.sun.tools.javac=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
These parameters can often be included via the environment variable JAVA_OPTS
.
If you are using Maven, such as with the exec-maven-plugin
to run Java applications, set the MAVEN_OPTS
environment
variable as follows:
export MAVEN_OPTS="--add-exports java.base/jdk.internal.ref=ALL-UNNAMED \
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED \
--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED \
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED \
--add-opens=jdk.compiler/com.sun.tools.javac=ALL-UNNAMED \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED"
Although only a subset of the above command line options is required for Java 11, we recommend applying the same parameters to Java 11 as to Java 17 to eliminate certain warnings that may appear in output logs.
Here's a simple example to get you started with OmniStream:
- Data type for testing
public class PeopleTest extends SelfDescribingMarshallable {
private int index;
private String name;
// Constructors
public PeopleTest() {
}
public PeopleTest(int index, String name) {
this.index = index;
this.name = name;
}
// Getters and setters
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "People{" +
"index=" + index +
", name='" + name + '\'' +
'}';
}
}
- Fanout/Sinkin
public class OneToManyExample {
public static String sourcePath = "xxx";
public static String sinkPath = "zzz";
public static void run() {
new Thread(OneToManyExample::runSource).start();
LockSupport.parkNanos(2_000_000_000L);
new Thread(OneToManyExample::runSink).start();
}
public static void runSink() {
SinkinHandler<PeopleTest> handler = (byte version, PeopleTest data, long seq, long id) -> {
System.out.println("\uD83D\uDCE9Received");
System.out.println("Version: " + version);
System.out.println("People: " + data.toString());
System.out.println("Seq: " + seq);
System.out.println("ID: " + id);
};
new Sinkin(
SinkinCfg.builder()
.setQueuePath(sinkPath)
.setSourceIP("127.0.0.1"),
PeopleTest.class,
handler);
}
public static void runSource() {
Fanout<PeopleTest> fanout = new Fanout<>(
FanoutCfg.builder().setQueuePath(sourcePath),
PeopleTest.class);
PeopleTest people = new PeopleTest();
int count = 0;
while (true) {
count++;
people.setIndex(count);
people.setName("people " + count);
System.out.println("\n\uD83D\uDE80Send: " + people);
fanout.write(people);
LockSupport.parkNanos(2_000_000_000L);
}
}
}
- Snipper/Collector
public class ManyToOneExample {
public static String collectorPath = "xxx";
public static void run() {
new Thread(ManyToOneExample::runCollector).start();
LockSupport.parkNanos(2_000_000_000L);
new Thread(() -> ManyToOneExample.runSnipper(1)).start();
LockSupport.parkNanos(500_000_000L);
new Thread(() -> ManyToOneExample.runSnipper(100000)).start();
}
public static void runCollector() {
new Collector<>(
CollectorCfg.builder()
.setQueuePath(collectorPath)
.setReaderName("reader_name"),
PeopleTest.class,
(people, index) -> {
System.out.println("\uD83D\uDCE9Received");
System.out.println("index: " + index);
System.out.println("people: " + people);
}
);
}
public static void runSnipper(int startIndex) {
Snipper<PeopleTest> snipper = new Snipper<>(SnipperCfg.builder().setCollectorIP("localhost"));
int count = startIndex;
while (true) {
PeopleTest people = new PeopleTest(count, "people " + count);
System.out.println("\n\uD83D\uDE80Send: " + people);
snipper.send(people);
count++;
LockSupport.parkNanos(1_000_000_000);
}
}
}
This pattern provides a robust solution for synchronizing a queue from one host (Fanout) to multiple other hosts (Sinkin) using the Fanout/Sinkin pattern. It guarantees the order of queue items and ensures exact replication across hosts.
The Fanout class is responsible for distributing data across multiple hosts. It operates as follows:
- Data Input: Threads from the main application send data to an Lmax Disruptor, which acts as a high-performance, inter-thread messaging library.
- Data Storage: The Disruptor writes this data into a Chronicle Queue, a low-latency, high-throughput, persisted queue.
- Data Distribution: A dedicated processor listens for new messages written to the Chronicle Queue and forwards them via ZeroMQ to other hosts (Sinkin) for synchronization.
The Sinkin class handles receiving messages from the Fanout host and ensures they are processed correctly:
- Data Subscription: Uses ZeroMQ to subscribe to new messages sent by the Fanout.
- Data Recording: Records these messages into a Chronicle Queue on the local host.
- Data Processing: A processor listens for new messages added to the local queue and forwards them to the application for further processing.
- Message Delivery: While the pub/sub model facilitates real-time data distribution, message loss can occur under certain conditions.
- Reliability Mechanism: To address potential data loss, an additional ZeroMQ Req/Rep mechanism is implemented. This mechanism checks for any missed messages and synchronizes them accordingly, ensuring complete data integrity across all hosts.
-
queuePath *
Configures the directory that will contain the queue data on disk. Example:
/var/lib/yourapp/queue
-
realtimePort
Port used by ZeroMQ to listen for connections for publishing/subscribing to new messages. Default:
5555
-
confirmPort
Port used by ZeroMQ to listen for connections to retrieve missed messages. Default:
5556
-
numberMsgInBatch
Specifies the maximum number of messages sent in one request when a Sinkin starts. This setting balances the number of requests with potential data transmission interruptions. Default:
10,000
-
disruptorWaitStrategy
The wait strategy of the Lmax Disruptor for batching messages from multiple threads. Consider the trade-off between latency and CPU performance based on your usage needs. Default:
YieldingWaitStrategy
More info: Lmax Disruptor User Guide -
ringBufferSize
Size of the Lmax Disruptor's Ring Buffer for sending/receiving messages. Must be a power of two. Default:
131,072
-
queueWaitStrategy
The wait strategy of the Processor for new messages from Chronicle Queue. Default:
OmniWaitStrategy.YIELD
-
maxNumberMsgInCachePub
Maximum number of messages in the ZeroMQ publisher's cache. Default:
1,000,000
-
version
Version identifier for the messages currently in the queue, used for future checks and validations. Default:
-128
-
rollCycles
Frequency at which the Chronicle Queue rolls over from an old file to a new file. Default:
LargeRollCycles.LARGE_DAILY
(daily rollover) More info: Chronicle Queue FAQ -
enableBindingCore
Allows the entire Fanout process to run on a dedicated CPU core. Default:
false
-
cpu
Specifies the logical processor for the Fanout process:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. Default:Constance.CPU_TYPE.ANY
-
enableDisruptorBindingCore
Allows the Lmax Disruptor to run on a dedicated CPU core. Default:
false
-
disruptorCpu
Specifies the logical processor for the Fanout process:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. > Default:Constance.CPU_TYPE.NONE
-
enableQueueBindingCore
Allows the Chronicle Queue to run on a dedicated CPU core. Default:
false
-
queueCpu
Specifies the logical processor for the Fanout process:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. > Default:Constance.CPU_TYPE.NONE
-
enableHandleConfirmBindingCore
Allows the confirm handler to run on a dedicated CPU core. Default:
false
-
handleConfirmCpu
Specifies the logical processor for the Fanout process:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. > Default:Constance.CPU_TYPE.NONE
-
queuePath *
Specifies the directory that will contain the queue data on disk. Example:
/var/lib/yourapp/queue
-
sourceIP *
IP address of the Fanout host from which the Sinkin receives data.
-
realtimePort
Port used by the Fanout to publish data to the Sinkins. Default:
5555
-
confirmPort
Port used by the Fanout to respond to requests for retrieving missed messages from Sinkins. Default:
5556
-
maxTimeWaitMS
Maximum time a message can stay in the queue before it is considered as missing preceding messages, prompting a request to the Fanout to retrieve them. Default:
1000
ms -
maxObjectsPoolWait
Maximum number of messages in the ObjectsPool waiting to be processed. Default:
30,000
-
zmqSubBufferSize
Maximum size of the ZeroMQ SUB buffer for pending messages. Default:
1,000,000
-
timeRateGetLatestMsgMS
Frequency in milliseconds to fetch the latest messages from the Fanout. Default:
3000
ms -
timeRateGetMissMsgMS
Frequency in milliseconds to check and retrieve missing messages. Default:
3000
ms -
timeoutSendReqMissMsg
Timeout for sending requests to retrieve missing messages from the Fanout. Default:
5000
ms -
timeoutRecvReqMissMsg
Timeout for receiving responses for missing messages from the Fanout. Default:
5000
ms -
waitStrategy
The wait strategy used by Lmax Disruptor for batching and processing messages. Default:
BlockingWaitStrategy
More info: Lmax Disruptor User Guide -
ringBufferSize
Size of the Disruptor's ring buffer for message processing. Must be a power of two. Default:
131,072
-
rollCycles
Period for rolling over from an old file to a new file in the queue. The
LargeRollCycles.LARGE_DAILY
setting provides a balance between indexing and file management. Default:LargeRollCycles.LARGE_DAILY
More info: Chronicle Queue FAQ -
enableBindingCore
Allows the entire Sinkin process to run on a dedicated CPU core. Default:
false
-
cpu
Specifies the logical processor for the Sinkin process:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. Default:Constance.CPU_TYPE.ANY
-
enableDisruptorProcessMsgBindingCore
Allows the Disruptor message processing to run on a dedicated CPU core. Default:
false
-
disruptorProcessMsgCpu
Specifies the logical processor for the Disruptor message processing:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. Default:Constance.CPU_TYPE.NONE
-
enableCheckMissMsgAndSubQueueBindingCore
Allows the process that checks for missed messages and subscribes to new items in the queue to run on a dedicated CPU core. Default:
false
-
checkMissMsgAndSubQueueCpu
Specifies the logical processor for checking missed messages and subscribing to new queue items:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. Default:Constance.CPU_TYPE.NONE
-
enableSubMsgBindingCore
Allows the process for subscribing to new messages to run on a dedicated CPU core. Default:
false
-
subMsgCpu
Specifies the logical processor for subscribing to new messages:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. Default:Constance.CPU_TYPE.NONE
-
queueWaitStrategy
The wait strategy of the Processor for new messages from Chronicle Queue. Default:
OmniWaitStrategy.YIELD
This pattern designed to streamline the process of aggregating messages from multiple hosts into a single Collector host. It ensures efficient and reliable message delivery into a central queue for processing.
The Snipper class is tasked with capturing and forwarding data from various sources. Its operations are as follows:
- Data Input: Threads from the main application send data to an Lmax Disruptor, which serves as a high-performance inter-thread messaging platform.
- Data Transmission: The Disruptor sends this data to the Collector host via ZeroMQ.
The Collector class handles the centralized collection and processing of messages:
- Data Reception: Utilizes ZeroMQ to listen for incoming messages from Snippers.
- Data Recording: Upon receiving messages, they are recorded into a Chronicle Queue on the Collector host.
- Data Processing: A dedicated processor listens for new messages in the queue and forwards them to the application for further processing.
- Acknowledgment Mechanism: The communication between Snippers and the Collector includes an acknowledgment (ACK) protocol. The Collector, upon receiving a message, notifies the respective Snipper, confirming receipt.
- Loss Handling: In case of a message timeout or failure, the Snipper marks the message as undelivered, allowing for corrective actions to be taken.
- Data Replay: The Collector can perform data replays from past records, enabling recovery and continuity in case of disruptions.
-
queuePath *
Path to the folder containing queue data. Example:
/var/lib/yourapp/queue
-
port
Port that the Collector listens on to receive requests from the Snipper. Default:
5557
-
readerName *
Name used as an ID for the reader. This name allows the Collector to continue reading from the last position in the queue after a restart, rather than starting over from the beginning. Example:
defaultReader
-
startId
Starting index from which the queue will be read. If set to
-1
, the reading will begin from the start of the queue. -
queueWaitStrategy
The wait strategy used for listening to new items written to the queue. OmniWaitStrategy optimizes performance by reducing CPU usage while waiting. Default:
OmniWaitStrategy.YIELD
-
enableBindingCore
Allows the entire Collector process to run on a dedicated CPU core. Default:
false
-
cpu
Specifies the logical processor for the entire Collector process:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. Default:Constance.CPU_TYPE.ANY
-
enableZRouterBindingCore
Allows the ZeroMQ Router to run on a dedicated CPU core. Default:
false
-
zRouterCpu
Specifies the logical processor for the ZeroMQ Router:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. Default:Constance.CPU_TYPE.NONE
-
enableQueueBindingCore
Allows the Queue Listener to run on a dedicated CPU core. Default:
false
-
queueCpu
Specifies the logical processor for the Queue Listener:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. Default:Constance.CPU_TYPE.NONE
-
collectorIP *
IP address of the Collector host to which the Snipper sends data.
-
timeout
Timeout for sending and receiving messages. Default:
10,000
ms -
waitStrategy
The wait strategy used by Lmax Disruptor to batch messages from multiple threads before sending them via ZeroMQ to the Collector. This setting balances throughput and system responsiveness. Default:
BlockingWaitStrategy
More info: Lmax Disruptor User Guide -
port
Port that the Collector listens on to receive messages from the Snipper. Default:
5557
-
ringBufferSize
Size of the Disruptor's ring buffer for sending/receiving messages. Must be a power of two to ensure efficient data handling. Default:
131,072
-
disruptorWaitStrategy
The wait strategy of the Processor for new messages from application. Default:
OmniWaitStrategy.YIELD
-
enableDisruptorBindingCore
Allows the Disruptor message processing to run on a dedicated CPU core. Default:
false
-
disruptorCpu
Specifies the logical processor for the Disruptor message processing:
Constance.CPU_TYPE.ANY
: Runs on any available logical processor, prioritizing isolated ones if available.Constance.CPU_TYPE.NONE
: Runs on multiple logical processors as managed by the operating system.>= 0
: Runs on specifies the logical processor. > Default:Constance.CPU_TYPE.ANY
For detailed documentation, examples, and API references, please visit OmniStream Documentation.
We welcome contributions from the community! If you'd like to contribute, please follow our contributing guidelines.
OmniStream is released under the MIT License. See the LICENSE file for more details.
- Inspired by Hermann Hesse's philosophical exploration in Siddhartha.
- Thanks to the open source community for continuous support and inspiration.
For support, contact vuhoangha100995@gmail.com.