Skip to content

Nurkoo/BDA-spark

Repository files navigation

Tasks:

Stage 0:

* Prepare the script that creates s3 bucket called "bda-data-bucket" (mocked by localsttack - available at https://localhost:4566)
* Upload sample .txt file to the bucket and verify that it's working
* Useful links:
    * https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
    * https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/create_bucket.html
    * https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_object.html
* Localstack credentials:
    * region_name="eu-west-1",
    * aws_access_key_id="fake_key",
    * aws_secret_access_key="fake_key",
    * endpoint="http://localhost:4566" (if accessed from local machine),
    * endpoint="http://localstack:4566" (if accessed from another container ie spark container),
* View bucket contents from the web browser:
    * http://localhost:4566/<bucket_name>

Stage 1:

* Change the implementation of API reader for claim-hub
* Each file should be written in AWS S3 (mocked by localsttack - available at https://localhost:4566)
* s3://bda-data-bucket/claim-hub/<execution_date>/<table_name>_part.json

Stage 2:

* Change the implementation of API reader for insure-wave
* Each file should be written in AWS S3 (mocked by localsttack - available at https://localhost:4566)
* s3://bda-data-bucket/insure-wave/<execution_date>/<table_name>_part.json

Stage 3:

* Change the implementation of API reader for claim-zone
* Each file should be written in AWS S3 (mocked by localsttack - available at https://localhost:4566)
* s3://bda-data-bucket/claim-zone/<execution_date>/<table_name>.json

SPARK AWS SETTINGS

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "mykey") spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "mysecret") spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true") spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",) spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://localstack:4566")

Stage 4:

* Create a Spark job that reads all data for a claim table from three sources (AWS S3 paths) for a single day
* The data should be read from each view from 5 parts saved in previous step
* The date for which the data should be retrieved should be passed as an argument with spark-submit
* Combine all parts and all views into one table
* Remove duplicates
* Add another column called "exec_date" and fill it with the date which was read from the AWS S3
* Save final table in postgres

Stage 5:

* Create a Spark job that reads all data for a motor table from three sources (AWS S3 paths) for a single day
* The data should be read from each view from 5 parts saved in previous step
* The date for which the data should be retrieved should be passed as an argument with spark-submit
* Combine all parts and all views into one table
* Remove duplicates
* Add another column called "exec_date" and fill it with the date which was read from the AWS S3
* Save final table in postgres

Stage 6:

* Create a Spark job that reads all data for a exposure table from three sources (AWS S3 paths) for a single day
* The data should be read from each view from 5 parts saved in previous step
* The date for which the data should be retrieved should be passed as an argument with spark-submit
* Combine all parts and all views into one table
* Remove duplicates
* Add another column called "exec_date" and fill it with the date which was read from the AWS S3
* Save final table in postgres

Stage 7:

* Create simple DAG with EmptyOperatos and PythonOperators
* Start (emptyOperator) -> print_date (PythonOperator), print_folder_content (PythonOperator) -> end (emptyOperator)
* print_date - prints current date
* print_folder_content - prints contents of the current folder
* Tasks print_date and print_folder_content should be in one TaskGroup and executed in parallel

Before using SparkSubmitOperators you need to specify spark_default connection in airflow

Airflow spark conn

Stage 8:

* Write orchestration for Stages 1 - 6
* Stages 1-3 in single TaskGroup called "Ingestion", executed in parallel
* Stages 4-6 in another TaskGroup called "Processing", executed in parallel
* 2nd task group should start only after all tasks from the first one are finished
* After 2nd task group should be a PythonOperator task which would check and print count of the tables in postgres

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published