Apache Beam pipeline for moving polygon data from Pub/Sub to BigQuery. Deployed in Google Dataflow.
-
Create a GCS bucket used for staging and temp location:
gcloud config set project <your_gcp_project> PROJECT=$(gcloud config get-value project 2> /dev/null) ENVIRONMENT_INDEX=0 BUCKET=${PROJECT}-dataflow-${ENVIRONMENT_INDEX} && echo "${BUCKET}" gsutil mb gs://${BUCKET}/
-
Create the errors table:
bq mk --table --description "polygon ETL Streaming Errors" \ ${PROJECT}:crypto_polygon.errors \ src/main/resources/errors-schema.json
-
Copy
exampleChainConfig.json
tochainConfig.json
and updatechainConfig.json
with your values. -
Start the Dataflow job in:
mvn -e -Pdataflow-runner compile exec:java \ -Dexec.mainClass=io.blockchainetl.polygon.PolygonPubSubToBigQueryPipeline \ -Dexec.args="--chainConfigFile=chainConfig.json \ --tempLocation=gs://${BUCKET}/temp \ --project=${PROJECT} \ --runner=DataflowRunner \ --jobName=polygon-pubsub-to-bigquery-`date +"%Y%m%d-%H%M%S"` \ --workerMachineType=n1-standard-1 \ --maxNumWorkers=1 \ --diskSizeGb=30 \ --region=us-central1 \ --zone=us-central1-a \ "
Below are the commands for creating a Cloud Source Repository to hold chainConfig.json:
REPO_NAME=${PROJECT}-dataflow-config-${ENVIRONMENT_INDEX} && echo "Repo name ${REPO_NAME}"
gcloud source repos create ${REPO_NAME}
gcloud source repos clone ${REPO_NAME} && cd ${REPO_NAME}
# Put chainConfig.json to the root of the repo
git add chainConfig.json && git commit -m "Initial commit"
git push
Check a separate file for operations.