Skip to content

Latest commit

 

History

History

kda-cli

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 

Riffl on AWS with Kinesis Data Analytics (KDA)

Prerequisites

Environment

Config name, bucket, IAM Role and Cloudwatch logging should align with values in the riffl-$VERSION.json file.

export CONFIG_PATH=example/
export CONFIG_NAME=application-glue-iceberg-kda.yaml
export CONFIG_BUCKET=<S3 bucket>
export RIFFL_VERSION=0.3.0
export RIFFL_RELEASE=https://github.com/riffl/riffl/releases/download/release-$RIFFL_VERSION/riffl-runtime-kda-$RIFFL_VERSION-all.jar

Dependencies

# Download locally and upload Riffl KDA runtime to S3
curl -L -o riffl-runtime-kda-$RIFFL_VERSION-all.jar $RIFFL_RELEASE
aws s3 cp riffl-runtime-kda-$RIFFL_VERSION-all.jar s3://$CONFIG_BUCKET/

# Upload configuration file
# NOTE: Correct S3 bucket must be configured before uploading the config file
aws s3 cp example/$CONFIG_NAME s3://$CONFIG_BUCKET/$CONFIG_NAME

Deploy

Create KDA application

# NOTE: Correct S3 bucket, IAM Role and Cloudwatch logging must be configured before creating the application
aws kinesisanalyticsv2 create-application --cli-input-json file://riffl-$RIFFL_VERSION.json

Start

aws kinesisanalyticsv2 start-application --cli-input-json file://riffl-start.json

Stop

aws kinesisanalyticsv2 stop-application --cli-input-json file://riffl-stop.json

Example application

Example application 'example/application-glue-iceberg-kda.yaml' is configured with a data generating source "datagen" and sinking into S3 with metadata stored in Glue in the Apache Iceberg format. Data is produced into two Glue tables with one storing data as-is and another applying in-flight optimization. Queries can be executed either using AWS Athena.

Configure dependencies

Follow steps above setting up Environment and Dependencies.

Create tables in Athena

The S3 bucket below needs to be accessible from the EMR cluster.

CREATE DATABASE riffl;

CREATE TABLE riffl.product_optimized (
  id BIGINT,
  type INT,
  name STRING,
  price DECIMAL(10, 2),
  buyer_name STRING,
  buyer_address STRING,
  ts TIMESTAMP,
  dt STRING,
  hr STRING) 
PARTITIONED BY (dt, hr) 
LOCATION 's3://<S3 bucket>/riffl.db/product_optimized' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='PARQUET',
  'write_compression'='zstd'
);

CREATE TABLE riffl.product_default (
  id BIGINT,
  type INT,
  name STRING,
  price DECIMAL(10, 2),
  buyer_name STRING,
  buyer_address STRING,
  ts TIMESTAMP,
  dt STRING,
  hr STRING) 
PARTITIONED BY (dt, hr) 
LOCATION 's3://<S3 bucket>/riffl.db/product_default' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='PARQUET',
  'write_compression'='zstd'
);

Create application and deploy

# NOTE: Correct S3 bucket, IAM Role and Cloudwatch logging must be configured before creating the application
aws kinesisanalyticsv2 create-application --cli-input-json file://riffl-$RIFFL_VERSION.json
aws kinesisanalyticsv2 start-application --cli-input-json file://riffl-start.json
aws kinesisanalyticsv2 stop-application --cli-input-json file://riffl-stop.json

Run SQL Queries in Athena

use iceberg.riffl;

SELECT 
  avg(price), 
  max(ts)
FROM product_optimized
WHERE type = 1 
  AND dt = '2022-11-09';
  
SELECT 
  avg(price), 
  max(ts)
FROM product_default
WHERE type = 1 
  AND dt = '2022-11-09';