This repository provides you cdk scripts and sample code on how to implement a simple web analytics system.
Below diagram shows what we are implementing.
The cdk.json
file tells the CDK Toolkit how to execute your app.
This project is set up like a standard Python project. The initialization
process also creates a virtualenv within this project, stored under the .venv
directory. To create the virtualenv it assumes that there is a python3
(or python
for Windows) executable in your path with access to the venv
package. If for any reason the automatic creation of the virtualenv fails,
you can create the virtualenv manually.
To manually create a virtualenv on MacOS and Linux:
$ python3 -m venv .venv
After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.
$ source .venv/bin/activate
If you are a Windows platform, you would activate the virtualenv like this:
% .venv\Scripts\activate.bat
Once the virtualenv is activated, you can install the required dependencies.
(.venv) $ pip install -r requirements.txt
Before deployment, you should uplad zipped code files to s3 like this:
(.venv) $ aws s3api create-bucket --bucket your-s3-bucket-name-for-lambda-layer-code --region region-name (.venv) $ ./build-aws-lambda-layer-package.sh your-s3-bucket-name-for-lambda-layer-code
⚠️ To create a bucket outside of theus-east-1
region,aws s3api create-bucket
command requires the appropriate LocationConstraint to be specified in order to create the bucket in the desired region. For more information, see these examples.
⚠️ Make sure you have Docker installed.
For example,
(.venv) $ aws s3api create-bucket --bucket lambda-layer-resources --region us-east-1 (.venv) $ ./build-aws-lambda-layer-package.sh lambda-layer-resources
For more information about how to create a package for Amazon Lambda Layer, see here.
Before to synthesize the CloudFormation template for this code, you should update cdk.context.json
file.
In particular, you need to fill the s3 location of the previously created lambda lay codes.
For example,
{ "firehose_data_tranform_lambda": { "s3_bucket_name": "lambda-layer-resources", "s3_object_key": "var/fastavro-lib.zip" }, .... }
Now you are ready to synthesize the CloudFormation template for this code.
(.venv) $ export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text) (.venv) $ export CDK_DEFAULT_REGION=$(aws configure get region) (.venv) $ cdk synth --all
Use cdk deploy
command to create the stack shown above.
(.venv) $ cdk deploy --require-approval never --all
After all CDK stacks are successfully deployed, make sure that the AWS Lambda function merging many small files to a few of large parquet files is granted appropriate LakeFormation permissions.
Go checking Amazon Lake Formation Web console
Otherwise, you need to grant appropriate LakeFormation permissions to the AWS Lambda function merging many small files to a few of large parquet files by running the following commands:
(.venv) $ MERGE_SMALL_FILES_JOB_ROLE_ARN=$(aws cloudformation describe-stacks \ --stack-name WebAnalyticsMergeSmallFiles | \ jq -r '.Stacks[0].Outputs[] | \ select(.OutputKey | endswith("LambdaExecRoleArn")) | \ .OutputValue') (.venv) $ aws lakeformation grant-permissions \ --principal DataLakePrincipalIdentifier=${MERGE_SMALL_FILES_JOB_ROLE_ARN} \ --permissions CREATE_TABLE DESCRIBE ALTER DROP \ --resource '{ "Database": { "Name": "mydatabase" } }' (.venv) $ aws lakeformation grant-permissions \ --principal DataLakePrincipalIdentifier=${MERGE_SMALL_FILES_JOB_ROLE_ARN} \ --permissions SELECT DESCRIBE ALTER INSERT DELETE DROP \ --resource '{ "Table": {"DatabaseName": "mydatabase", "TableWildcard": {}} }'
ℹ️
mydatabase
is the database for access logs specified asOLD_DATABASE
andNEW_DATABASE
in thecdk.context.json
file.
ℹ️
WebAnalyticsMergeSmallFiles
is the CDK Stack name to create the lambda function merging small files to large one by running Amazon Athena Create Table As Select(CTAS) query.
To add additional dependencies, for example other CDK libraries, just add
them to your setup.py
file and rerun the pip install -r requirements.txt
command.
-
Run
GET /streams
method to invokeListStreams
in Kinesis$ curl -X GET https://your-api-gateway-id.execute-api.us-east-1.amazonaws.com/v1/streams
The response is:
{ "HasMoreStreams": false, "StreamNames": [ "PUT-Firehose-aEhWz" ], "StreamSummaries": [ { "StreamARN": "arn:aws:kinesis:us-east-1:123456789012:stream/PUT-Firehose-aEhWz", "StreamCreationTimestamp": 1661612556, "StreamModeDetails": { "StreamMode": "ON_DEMAND" }, "StreamName": "PUT-Firehose-aEhWz", "StreamStatus": "ACTIVE" } ] }
-
Generate test data.
(.venv) $ pip install -r requirements-dev.txt (.venv) $ python src/utils/gen_fake_data.py --max-count 5 --stream-name PUT-Firehose-aEhWz --api-url 'https://your-api-gateway-id.execute-api.us-east-1.amazonaws.com/v1' --api-method records [200 OK] {"EncryptionType":"KMS","FailedRecordCount":0,"Records":[{"SequenceNumber":"49633315260289903462649185194773668901646666226496176178","ShardId":"shardId-000000000003"}]} [200 OK] {"EncryptionType":"KMS","FailedRecordCount":0,"Records":[{"SequenceNumber":"49633315260289903462649185194774877827466280924390359090","ShardId":"shardId-000000000003"}]} [200 OK] {"EncryptionType":"KMS","FailedRecordCount":0,"Records":[{"SequenceNumber":"49633315260223001227053593325351479598467950537766600706","ShardId":"shardId-000000000000"}]} [200 OK] {"EncryptionType":"KMS","FailedRecordCount":0,"Records":[{"SequenceNumber":"49633315260245301972252123948494224242560213528447287314","ShardId":"shardId-000000000001"}]} [200 OK] {"EncryptionType":"KMS","FailedRecordCount":0,"Records":[{"SequenceNumber":"49633315260223001227053593325353897450107179933554966530","ShardId":"shardId-000000000000"}]}
-
Creating and loading a table with partitioned data in Amazon Athena
Go to Athena on the AWS Management console.
-
(step 1) Specify the workgroup to use
To run queries, switch to the appropriate workgroup like this:
-
(step 2) Create a database
In order to create a new database called
mydatabase
, enter the following statement in the Athena query editor and click the Run button to execute the query.CREATE DATABASE IF NOT EXISTS mydatabase
-
(step 3) Create a table
Copy the following query into the Athena query editor, replace the
xxxxxxx
in the last line underLOCATION
with the string of your S3 bucket, and execute the query to create a new table.CREATE EXTERNAL TABLE mydatabase.web_log_json ( `userId` string, `sessionId` string, `referrer` string, `userAgent` string, `ip` string, `hostname` string, `os` string, `timestamp` timestamp, `uri` string) PARTITIONED BY ( `year` int, `month` int, `day` int, `hour` int) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' LOCATION 's3://web-analytics-xxxxx/json-data'
If the query is successful, a table named
web_log_json
is created and displayed on the left panel under the Tables section.If you get an error, check if (a) you have updated the
LOCATION
to the correct S3 bucket name, (b) you have mydatabase selected under the Database dropdown, and (c) you haveAwsDataCatalog
selected as the Data source.ℹ️ If you fail to create the table, give Athena users access permissions on
mydatabase
through AWS Lake Formation, or you can grant anyone using Athena to accessmydatabase
by running the following command:(.venv) $ aws lakeformation grant-permissions \ --principal DataLakePrincipalIdentifier=arn:aws:iam::{account-id}:user/example-user-id \ --permissions CREATE_TABLE DESCRIBE ALTER DROP \ --resource '{ "Database": { "Name": "mydatabase" } }' (.venv) $ aws lakeformation grant-permissions \ --principal DataLakePrincipalIdentifier=arn:aws:iam::{account-id}:user/example-user-id \ --permissions SELECT DESCRIBE ALTER INSERT DELETE DROP \ --resource '{ "Table": {"DatabaseName": "mydatabase", "TableWildcard": {}} }'
-
(step 4) Load the partition data
Run the following query to load the partition data.
MSCK REPAIR TABLE mydatabase.web_log_json;
After you run this command, the data is ready for querying.
Instead of
MSCK REPAIR TABLE
command, you can use theALTER TABLE ADD PARTITION
command to add each partition manually.For example, to load the data in
s3://web-analytics-xxxxx/json-data/year=2023/month=01/day=10/hour=06/
you can run the following query.ALTER TABLE mydatabase.web_log_json ADD IF NOT EXISTS PARTITION (year=2023, month=1, day=10, hour=6) LOCATION 's3://web-analytics-xxxxx/json-data/year=2023/month=01/day=10/hour=06/';
-
(Optional) (step 5) Check partitions
Run the following query to list all the partitions in an Athena table in unsorted order.
SHOW PARTITIONS mydatabase.web_log_json;
-
-
Run test query
Enter the following SQL statement and execute the query.
SELECT COUNT(*) FROM mydatabase.web_log_json;
-
Merge small files into large one
When real-time incoming data is stored in S3 using Kinesis Data Firehose, files with small data size are created.
To improve the query performance of Amazon Athena, it is recommended to combine small files into one large file.
Also, it is better to use columnar dataformat (e.g.,Parquet
,ORC
,AVRO
, etc) instead ofJSON
in Amazon Athena.
To run these tasks periodically, the AWS Lambda function that executes Athena's Create Table As Select (CTAS) query has been deployed.
Now we create an Athena table to query for large files that are created by periodical merge files task.CREATE EXTERNAL TABLE mydatabase.web_log_parquet ( `userId` string, `sessionId` string, `referrer` string, `userAgent` string, `ip` string, `hostname` string, `os` string, `timestamp` timestamp, `uri` string) PARTITIONED BY ( `year` int, `month` int, `day` int, `hour` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://web-analytics-xxxxx/parquet-data'
After creating the table and once merge files task is completed, the data is ready for querying.
Delete the CloudFormation stack by running the below command.
(.venv) $ cdk destroy --force --all
cdk ls
list all stacks in the appcdk synth
emits the synthesized CloudFormation templatecdk deploy
deploy this stack to your default AWS account/regioncdk diff
compare deployed stack with current statecdk docs
open CDK documentation
Enjoy!
- Web Analytics
- Tutorial: Create a REST API as an Amazon Kinesis proxy in API Gateway
- Streaming Data Solution for Amazon Kinesis
- Serverless Patterns Collection
- aws-samples/serverless-patterns
- Building fine-grained authorization using Amazon Cognito, API Gateway, and IAM
- AWS Lake Formation - Create a data lake administrator
- AWS Lake Formation Permissions Reference
- Tutorial: Schedule AWS Lambda Functions Using CloudWatch Events
- Amazon Athena Workshop
- Curl Cookbook
- fastavro - Fast read/write of
AVRO
files - Apache Avro Specification
- How to create a Lambda layer using a simulated Lambda environment with Docker
$ cat <<EOF > requirements-Lambda-Layer.txt > fastavro==1.6.1 > EOF $ docker run -v "$PWD":/var/task "public.ecr.aws/sam/build-python3.11" /bin/sh -c "pip install -r requirements-Lambda-Layer.txt -t python/lib/python3.11/site-packages/; exit" $ zip -r fastavro-lib.zip python > /dev/null $ aws s3 mb s3://my-bucket-for-lambda-layer-packages $ aws s3 cp fastavro-lib.zip s3://my-bucket-for-lambda-layer-packages/
See CONTRIBUTING for more information.
This library is licensed under the MIT-0 License. See the LICENSE file.