A consumer is an application that processes all data from a Kinesis data stream. This demo is built using Apache Flink (a popular framework and engine for processing data streams), and can be deployed as an application in Amazon Kinesis Data Analytics. It showcases how to (asynchronously) invoke an external endpoint, which is useful when you want to enrich or filter incoming events.
To access the external endpoint, the application leverages the Asynchronous I/O API of Apache Flink. Asynchronous interaction with the external system means that a single parallel function instance can handle many requests concurrently and receive the responses concurrently. The implementation of RichAsyncFunction
is available on the Sig4SignedHttpRequestAsyncFunction class:
public class Sig4SignedHttpRequestAsyncFunction<T> extends RichAsyncFunction<HttpRequest<T>, HttpResponse<T>> {
@Override
public void open(Configuration parameters) throws Exception { ... }
@Override
public void close() throws Exception { ... }
@Override
public void asyncInvoke(HttpRequest<T> request, ResultFuture<HttpResponse<T>> resultFuture) throws Exception { ... }
}
The sample application is configured to read events from an existing dataset and invoke an endpoint to predict the fare amount. To customize it, follow the steps below:
By default, the solution will use a Lambda function, but that can be replaced by any integration supported by API Gateway (such as an Amazon SageMaker endpoint).
There are two classes used for deserialization of incoming records: Event and RideRequest. You can use them as a reference for your use case.
The sample application has several examples on how to handle HTTP requests and responses. You should update them (or use them as reference) for your use case.
By default, the sample application will read data from a Kinesis data stream and invoke an API Gateway endpoint (both values are passed as runtime properties). The complete list of settings for the source (FlinkKinesisConsumer) can be found on GitHub.
mvn clean package --quiet -Dflink.version=1.15.4