Skip to content

River-Kt/river

Repository files navigation

License Maven Central

river

Introducing River, a powerful and flexible reactive stream library for Kotlin that simplifies the process of using and building connectors for multiple enterprise protocols and tools. Heavily inspired by Apache Camel and Alpakka, River makes use of Kotlin's Flow and coroutines to provide a scalable, efficient, and user-friendly way to handle asynchronous and event-based data streams. This library is designed to be flexible and customizable, allowing you to handle complex data streams and integrate with different technologies seamlessly.

Whether you're building a new application or integrating with existing systems, River makes it easy to build reactive and scalable data pipelines that can handle even the most demanding workloads. With its powerful capabilities and easy-to-use API, River allows you to focus on your business logic while it'll take care of the complex task of handling data streams and integrating with multiple technologies.

Disclaimer: This project is heavily under development, anything is subject to change until the first final release.

The core module

Kotlin's Flow API is a powerful and flexible tool for processing data streams, but it can sometimes require a lot of boilerplate code for certain tasks. That's where River-Kt's core module comes in. It provides a set of higher-level abstractions that make working with flows more intuitive and efficient.

At the heart of every connector, the core module offers a range of extension functions that can optimize the processing of data streams. By leveraging these high-level functions, developers can easily build connectors for different protocols. This allows for seamless integration with a variety of services and protocols.

The mapAsync is just an example of how the core module can simplify and speed up data processing by concurrently applying transformations to flow elements. Other functions such as split and chunked can also be used to break down large flows into smaller, more manageable chunks or groups, based on a count or time window strategy. Other useful functions are also provided, such as collecting data into lists, applying timeouts and delays, and continuously polling for data. All of these functions leverage Kotlin's Flow API, which ensures that data processing is done in an asynchronous, non-blocking, and backpressure-safe way.

Check the following example:

suspend fun fetchUsers(page: Int): List<User> = ...

// pollWithState allows polling for data continuously while maintaining a state, which makes it straightforward to handle halts and perform aggregations.
// The shouldStop predicate checks if the polling should be stopped before every poll request.
// In this case, we're stopping when the page is -1, which means we have already paginated the entire API.
// We're using the page number as the state to control which page to call next.
val users: Flow<User> = pollWithState(initial = 1, shouldStop = { it == -1 }) { page ->
    val usersPage = fetchUsers(page)
    
    val next = 
        if (usersPage.isEmpty()) -1 // no more pages available
        else page + 1 // maybe there's one page more
        
    next to usersPage
}

suspend fun fetchAddresses(userId: Long): List<Address> = ...

users
    // Runs the map function concurrently with a maximum of 10 concurrent executions.
    .mapAsync(10) { user -> UserWithAddresses(user, fetchAddresses(user.id)) }
    // Chunks the UserWithAddresses into lists of 100, or whatever fits into the chunk after the defined timeout of 1 second.
    .chunked(100, 1.seconds) 
    .collect { chunk: List<UserWithAddresses> ->
       // Do some processing with the chunk of UserWithAddresses...
    }
    
// Since it's still Kotlin's Flow, everything leverages asynchronous non-blocking backpressure:
// if the processing in the collect block is slow, the polling stage will slow down accordingly.

Are you a visual learner? The code basically represents the flow below:

Alt text

To get to know the core module a bit further, you can refer to the site's kdoc or you can check the code directly.

To start using it, simply add the following dependencies:

implementation("com.river-kt:core:$latestVersion")

The connectors

Each connector leverages Kotlin's Flow API, coroutines and the core module to provide a simple and efficient way to interact with various protocols and services. You can check it out all the modules implemented currently.

Connector Name Description Module
Apacke Kafka Provides a Kotlin DSL to interact with Kafka brokers, leveraging Kotlin's Flow API as well. Developers can continuously receive, send and commit messages connector-apache-kafka
Advanced Message Queuing Protocol (AMQP) Provides functionality to interact with AMQP brokers using Kotlin's Flow API. Developers can continuously receive, send, and delete messages from AMQP queues using configurable chunk strategies and concurrency. connector-amqp
Amazon Simple Storage Service (Amazon S3) Allows developers to interact with Amazon S3 buckets via Kotlin's Flow API. It provides high-level functions to download and upload S3 objects. connector-aws-s3
Amazon Simple Queue Service (Amazon SQS) Provides functionality to interact with Amazon SQS using Kotlin's Flow API. Developers can continuously receive, send, and delete messages from Amazon SQS using configurable chunk strategies and concurrency. connector-aws-sqs
Amazon Simple Notification Service (Amazon SNS) Allows developers to send messages through Amazon SNS using Kotlin's Flow API using configurable chunk strategies and concurrency. connector-aws-sns
Amazon Simple Email Service (SES) Allows users to send emails through AWS SES using Kotlin's Flow API. connector-aws-ses
AWS Lambda Provides functionality to interact with AWS Lambda using Kotlin's Flow API. Users can invoke AWS Lambda functions and receive the results through a Flow stream using configurable concurrency. connector-aws-lambda
Console Provides functionality to interact with the console using Kotlin's Flow API. connector-console
Java Database Connectivity (JDBC) Provides an implementation of the RDBMS connector using JDBC driver. It allows users to interact with JDBC-compatible databases through Kotlin's Flow API using configurable chunk strategies and concurrency. connector-rdbms-jdbc
Reactive Relational Database Connectivity (R2DBC) Provides an implementation of the RDBMS connector using R2DBC driver. It allows users to interact with R2DBC-compatible databases through Kotlin's Flow API using configurable chunk strategies and concurrency. connector-rdbms-r2dbc
Debezium Allows users to capture database events and stream them via Kotlin's Flow API. It supports multiple databases, including MySQL, PostgreSQL, and MongoDB. The emitted records can be sent to any other connector available in a seamless manner, enabling easy integration with a variety of services and protocols. connector-redhat-debezium
Azure Queue Storage Provides functionality to interact with Azure Queue Storage using Kotlin's Flow API. Developers can continuously receive, create, update, and delete messages using configurable chunk strategies and concurrency. connector-azure-queue-storage
JSON Provides functionality to read and write JSON data using Kotlin's Flow API. It leverages Jackson, supports JSON serialization and deserialization with configurable options. connector-format-json
Positional Flat Line Provides functionality to read and write Positional Flat Line data using Kotlin's Flow API. connector-format-positional-flat-line
CSV Provides functionality to read and write CSV data using Kotlin's Flow API. connector-format-csv
File Allows users to read and write files using Kotlin's Flow API. It provides functionality to read or write data to files and input streams. connector-file
FTP Provides functionality to interact with FTP servers using Kotlin's Flow API. It's important to know that this module encausplates blocking I/O calls connector-ftp
HTTP Works on top of the java.net.HttpClient and provides functionality to interact with HTTP servers using Kotlin's Flow API. connector-http
Java Message Service (JMS) Provides functionality to interact with JMS brokers using Kotlin's Flow API. Developers can continuously receive, send, and delete messages from JMS queues using configurable chunk strategies and concurrency. connector-jms
MongoDB Provides functionality to interact with MongoDB using Kotlin's Flow API. connector-mongodb

To install a module, you can add the dependency as follows:

/**
 * Can be any of:
 *  [
 *   connector-apache-kafka, connector-aws-s3, connector-aws-sqs, connector-aws-sns, connector-aws-ses, 
 *   connector-aws-lambda, connector-rdbms-jdbc, connector-rdbms-r2dbc, connector-redhat-debezium,
 *   connector-azure-queue-storage, connector-format-json, connector-format-csv,
 *   connector-file, connector-amqp, connector-aws-java-11-http-spi, connector-console, 
 *   connector-format-positional-flat-line, connector-ftp, connector-http, connector-jms, connector-mongodb
 *  ]
*/
val connectorName = "connector-aws-sqs"

implementation("com.river-kt:$connectorName:$latestVersion")

Talking is cheap, show me the code!

Explore the Examples module and take a look at several of use cases for River. More examples are continually being added.

Contributing

Do you want to help us? Please, read our Contributing Guidelines. Don't forget to read our Code of conduct as well.

License

This project is licensed under the MIT License - see the LICENSE.md file for details