Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Demo consumer application

A consumer is an application that processes all data from a Kinesis data stream. This demo is built using Apache Flink (a popular framework and engine for processing data streams), and can be deployed as an application in Amazon Kinesis Data Analytics. It showcases how to (asynchronously) invoke an external endpoint, which is useful when you want to enrich or filter incoming events.

Asynchronous I/O for External Data Access

To access the external endpoint, the application leverages the Asynchronous I/O API of Apache Flink. Asynchronous interaction with the external system means that a single parallel function instance can handle many requests concurrently and receive the responses concurrently. The implementation of RichAsyncFunction is available on the Sig4SignedHttpRequestAsyncFunction class:

public class Sig4SignedHttpRequestAsyncFunction<T> extends RichAsyncFunction<HttpRequest<T>, HttpResponse<T>> {
    @Override
    public void open(Configuration parameters) throws Exception { ... }

    @Override
    public void close() throws Exception { ... }

    @Override
    public void asyncInvoke(HttpRequest<T> request, ResultFuture<HttpResponse<T>> resultFuture) throws Exception { ... }
}

Modifying the application

The sample application is configured to read events from an existing dataset and invoke an endpoint to predict the fare amount. To customize it, follow the steps below:

By default, the solution will use a Lambda function, but that can be replaced by any integration supported by API Gateway (such as an Amazon SageMaker endpoint).

1. Add classes defining the incoming schema

There are two classes used for deserialization of incoming records: Event and RideRequest. You can use them as a reference for your use case.

2. Update the operators on the main method

The sample application has several examples on how to handle HTTP requests and responses. You should update them (or use them as reference) for your use case.

3. (Optional) Update settings on the createSource methods

By default, the sample application will read data from a Kinesis data stream and invoke an API Gateway endpoint (both values are passed as runtime properties). The complete list of settings for the source (FlinkKinesisConsumer) can be found on GitHub.

4. Build the application using Apache Maven

mvn clean package --quiet -Dflink.version=1.15.4