Flexible service for performing BigQuery file loads to existing tables. This service handles splitting load jobs when the data volume exceeds the BigQuery 15TB load job limit. The goal of the service is to orchestrate BigQuery Load Jobs to many bigquery datasets / tables from a single bucket providing transparent configuration that is overridable at any level.
Data should be ingested to a prefix containing destination dataset and table
like so:
gs://${INGESTION_BUCKET}/${BQ_DATASET}/${BQ_TABLE_NAME}/*
If the path does not contain a destination table, one can be provided in a
load.json file placed in a parent _config
directory. See
Note, the table prefix can contain multiple sub-prefixes for handling partitions or for configuring historical / incremental loads differently.
By Default we try to read dataset, table, partition (or yyyy/mm/dd/hh) and batch id using the following python regex:
DEFAULT_DESTINATION_REGEX = (
r"^(?P<dataset>[\w\-\.]+)/" # dataset (required)
r"(?P<table>[\w\-]+)/?" # table name (required)
# break up historical v.s. incremental to separate prefixes (optional)
r"(?:historical|incremental)?/?"
r"(?P<partition>\$[\d]+)?/?" # partition decorator (optional)
r"(?:" # [begin] yyyy/mm/dd/hh/ group (optional)
r"(?P<yyyy>[\d]{4})/?" # partition year (yyyy) (optional)
r"(?P<mm>[\d]{2})?/?" # partition month (mm) (optional)
r"(?P<dd>[\d]{2})?/?" # partition day (dd) (optional)
r"(?P<hh>[\d]{2})?/?" # partition hour (hh) (optional)
r")?" # [end]yyyy/mm/dd/hh/ group (optional)
r"(?P<batch>[\w\-]+)?/" # batch id (optional)
)
you can see if this meets your needs in
this regex playground
Otherwise you can override the regex by setting the DESTINATION_REGEX
to
better fit your naming convention on GCS. Your regex must include
Python Regex with named capturing groups
for destination dataset
, and table
.
Note:
dataset
can optionally, explicitly specify destination project (i.e.gs://${BUCKET}/project_id.dataset_id/table/....
) alternatively, one can set theBQ_STORAGE_PROJECT
environment variable to set to override the default target project for datasets at the function level. The default behavior is to infer the project from Application Default Credential (the project in which the Cloud Function is running, or the ADC configured in Google Cloud SDK if invoked locally). This is useful in scenarios where a single deployment of the Cloud Function is responsible for ingesting data into BigQuery tables in projects other than the one it is deployed in. In these cases it is crucial to ensure the service account that Cloud Functions is impersonating has the correct permissions on all destination projects.
Your regex can optionally include for
partition
must be BigQuery Partition decorator with leading$
yyyy
,mm
,dd
,hr
partition year, month, day, and hour (depending on your partition granularity)batch
an optional batch id to indicate multiple uploads for this partition.
For example, if your datafiles were laid out like this:
gs://${BUCKET}/${SOURCE_SYSTEM}/${DATASET}/${TABLE}/region=${LOCATION}/yyyy=${YEAR}/mm=${MONTH}/dd=${DAY}/hh=${HOUR}
i.e.
gs://my-bucket/on-prem-edw/my_product/transactions/region=US/yyyy=2020/mm=01/dd=02/hh=03/_SUCCESS
Then you could use this regex:
DESTINATION_REGEX='(?:[\w\-_0-9]+)/(?P<dataset>[\w\-_0-9]+)/(?P<table>[\w\-_0-9]+)/region=(?P<batch>[\w]+)/yyyy=(?P<yyyy>[0-9]{4})/mm=(?P<mm>[0-9]{2})/dd=(?P<dd>[0-9]{2})/hh=(?P<hh>[0-9]{2})/'
In this case we can take advantage of a more known rigid structure so our regex is simpler (no optional capturing groups, optional slashes).
Note: We can use the
region=
string (which may have been partitioned on in an upstream system such as Hive) as a batch ID because we might expect that an hourly partition might have multiple directories that upload to it. (e.g. US, GB, etc). Because it is all named capturing groups we don't have any strict ordering restrictions about batch id appearing before / after partition information.
Instead of specifying a single constant DESTINATION_REGEX
value which remains
static for all cloud function invocations, you can provide a destination regex
in a load.json config file
(placed in a _config/
directory at table or parent directory level) as shown
below:
{
"sourceFormat": "PARQUET",
"destinationRegex": "(?P<table>.*?)(?:[\\d]{4})?/?(?:[\\d]{2})?/?(?:[\\d]{2})?/?(?P<batch>[\\d]{2})/?",
}
Destination regex specified in a load.json config file takes precedence over the
value defined in the DESTINATION_REGEX
variable.
Providing regex via load.json files allows one cloud function deployment to
handle many different GCS path naming conventions.
When your dataset and table names are not present in the Cloud Storage file
paths, you can explicitly specify a destination table via a destinationTable
mapping in the load.json file as shown below:
{
"sourceFormat": "PARQUET",
"destinationRegex": "(?P<table>.*?)(?:[\\d]{4})?/?(?:[\\d]{2})?/?(?:[\\d]{2})?/?(?P<batch>[\\d]{2})/?",
"destinationTable": {
"projectId": "YOUR_PROJECT_ID",
"datasetId": "YOUR_DATASET_ID",
"tableId": "YOUR_TABLE_ID"
}
}
Note: Your destination regex must include a table group, but the rest of the groups are optional. The table group is needed in order for the cloud function to determine the table prefix of GCS data files. It uses the table prefix as the prefix path for the
_backlog
directory when performing ordered loads.
A destination table specified in a load.json config file takes precedence over a table matched by the destination regex group.
In most cases, it would be recommended to have separate buckets / deployment of the Cloud Function for each naming convention as this typically means that the upstream systems are governed by different teams.
Sometimes many upstream systems might be using different naming conventions when uploading to the same bucket due to organizational constraints. In this case you have the following options:
- Provide an explicit destination table and destination regex in the load.json configuration file as detailed in the following sections:
- Try to write a very flexible regex that handles all of your naming conventions with lots of optional groups. Regex101 is your friend!
- Create separate Pub/Sub notifications and separate deployment of the Cloud
Function with the appropriate
DESTINATION_REGEX
environment variable for each GCS prefix with a different naming convention.
The Ingestion has many optional configuration files that should live in a
special _config/
prefix at the root of the bucket and/or under the dataset
and/or table and/or under the partition prefixes.
For example if you have the following files:
gs://${INGESTION_BUCKET}/_config/load.json
gs://${INGESTION_BUCKET}/${BQ_DATASET}/_config/load.json
gs://${INGESTION_BUCKET}/${BQ_DATASET}/${BQ_TABLE_NAME}/_config/load.json
The json objects will be merged where key conflicts are resolved by config in
the closest directory to the data. If the files contents were like this:
gs://${INGESTION_BUCKET}/_config/load.json
:
{
"sourceFormat": "CSV",
"writeDisposition": "WRITE_APPEND",
"schemaUpdateOptions": [
"ALLOW_FIELD_RELAXATION"
]
}
gs://${INGESTION_BUCKET}/${BQ_DATASET}/_config/load.json
:
{
"sourceFormat": "AVRO"
}
gs://${INGESTION_BUCKET}/${BQ_DATASET}/_config/load.json
:
{
"writeDisposition": "WRTITE_TRUNCATE"
}
The result of merging these would be:
{
"sourceFormat": "AVRO",
"writeDisposition": "WRITE_TRUNCATE",
"schemaUpdateOptions": [
"ALLOW_FILED_RELAXATION"
]
}
This configuration system gives us the ability to DRY up common defaults but override them at whatever level is appropriate as new cases come up.
For CSV loads the fieldDelimiter
in load.json to external.json should be
specified as a unicode character not a hexidecimal character as hexidecimal
characters will confuse python's json.load
function. For example ctrl-P should
be specified as:
{
"fieldDelimiter": "\u0010"
}
In some cases we may need to perform transformations on the files in GCS before
they can be loaded to BigQuery. This is handled by query on an temporary
external table over the GCS objects as a proxy for load job.
gs://${INGESTION_BUCKET}/${BQ_DATASET}/${BQ_TABLE_NAME}/_config/bq_transform.sql
By default, if a query job finishes of statement type
INSERT
,UPDATE
,DELETE
, or MERGE
and numDmlRowsAffected = 0
this will be
treated as a
failure (See Query Job Statistics API docs)
. This is usually due to a bad query / configuration with bad DML predicate. For
example running the following query on an empty table:
UPDATE foo.bar dest ... FROM temp_ext src WHERE src.id = dest.id
By failing on this condition we keep the backlog intact when we run a query job
that unexpectedly did no affect any rows. This can be disabled by setting the
environment variable
FAIL_ON_ZERO_DML_ROWS_AFFECTED=False
.
A CREATE OR REPLACE TABLE
is not DML and will not be subject to this behavior.
External queries will consume query slots from this project's reservation or count towards your on-demand billing. They will not use free tier load slots.
Note: The query should select from a
temp_ext
which will be a temporary external table configured on the fly by the Cloud Function. The query must handle the logic for inserting into the destination table. This means it should use BigQuery DML to mutate the destination table.
For example:
INSERT {dest_dataset}.{dest_table}
SELECT * FROM temp_ext
Note: The
{dest_dataset}
and{dest_table}
placeholders will be replaced at runtime by either:
- The GCS path destination regex group matches for dataset and table.
- The destinationTable entry in a load.json file.
The query will be run with the
appropriate external table definitions
defined in:
gs://${INGESTION_BUCKET}/${BQ_DATASET}/${BQ_TABLE_NAME}/_config/external.json
If this file is missing the external table will be assumed to be PARQUET
format.
Note: If the directory immediately before the triggering successfile starts with a
$
it will be treated as a BigQuery Partition decorator for the destination table.
This means for:
gs://${BUCKET}/foo/bar/$20201026/_SUCCESS
will trigger a load job with a destination table of foo.bar$20201026
This allows you to specify write disposition at the partition level. This can be
helpful in reprocessing scenarios where you'd want to WRITE_TRUNCATE
a partition that had some data quality issue.
If your data will be uploaded to GCS from a hadoop system that uses the
supported default hive partitioning
you can specify this in
the hivePartitioningOptions
key of load.json
for that table.
Any non-trivial incremental loading to partitions should usually use the
Transformation SQL to define the INSERT / MERGE / UPDATE / DELETE
logic into
the target BQ table as these DML semantics are much more flexible thant the load
job write dispositions. Furthermore, using external query has the added benefit
of circumventing the per load job bytes limits (default 15 TB) and commiting
large partitions atomically.
This solution introduces the concept of batch_id
which uniquely identifies a
batch of data committed by an upstream system that needs to be picked up as an
incremental load. You can again set the load job or external query configuration
at any parent folders _config
prefix. This allows you dictate
"for this table any new batch should WRITE_TRUNCATE
it's parent
partition/table"
or "for that table any new batch should WRITE_APPEND
to it's parent
partition/table".
By default BigQuery jobs will be submitted in the project where the Cloud
Function is deployed. To submit jobs in another BigQuery project set
the BQ_PROJECT
environment variable.
Monitoring what data has been loaded by this solution should be done with the
BigQuery INFORMATION_SCHEMA
jobs metadata
If more granular data is needed about a particular job id
All load or external query jobs will have a job id with a prefix following this convention:
job_id_prefix = f"gcf-ingest-{dest_table_ref.dataset_id}-{dest_table_ref.table_id}"
Note: The prefix
gcf-ingest-
is configurable with theJOB_PREFIX
environment variable.
All load or external query jobs are labelled with functional component and cloud function name.
DEFAULT_JOB_LABELS = {
"component": "event-based-gcs-ingest",
"cloud-function-name": getenv("FUNCTION_NAME"),
"bucket-id": "<bucket-for-this-notification>"
}
If the destination regex matches a batch group, there will be a batch-id
label.
SELECT
job_id,
job_type,
start_time,
end_time,
query,
total_bytes_processed,
total_slot_ms,
destination_table
state,
error_result,
(SELECT value FROM UNNEST(labels) WHERE key = "component") as component,
(SELECT value FROM UNNEST(labels) WHERE key = "cloud-function-name") as cloud_function_name,
(SELECT value FROM UNNEST(labels) WHERE key = "batch-id") as batch_id,
FROM
`region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE
(SELECT value FROM UNNEST(labels) WHERE key = "component") = "event-based-gcs-ingest"
If your external queries have mutliple sql statements only the parent job will
follow the gcf-ingest-*
naming convention. Children jobs (for each statement)
begin with prefix _script_job. These jobs will still be labelled with
component
and cloud-function-name
. For more information
see Scripting in Standard SQL
GCS Object Finalize triggers can communicate with Cloud Functions directly or via Pub/Sub topic. This function supports both reading bucket / object id from the Pub/Sub attributes or the Cloud Functions event schema. Pub/Sub triggers offer some additional features as opposed to Cloud Functions direct including filtering notifications to a prefix within a bucket (rather than bucket wide) and controlling message retention period for the Pub/Sub topic. More info can be found here: Pub/Sub Storage Notification Cloud Functions direct trigger
- Trigger on
_SUCCESS
File to load all other files in that directory. - Trigger on non-
_SUCCESS
File will no-op
We run the following CI checks to ensure code quality and avoid common pitfalls:
This CI process is defined in cloudbuild.yaml and can be run locally with cloud-build-local from the root of the repo with:
cloud-build-local --config=tools/cloud_functions/gcs_event_based_ingest/cloudbuild.yaml --dryrun=false .
This CI system uses kaniko cache to speed up builds and defaults cache expiration to two weeks. This notably does not pin python package versions so we know if one of our dependencies or CI checks has been updated in a way that breaks this tool. It's better for us to make a conscious decision to adopt new features or adjust CI configs or pin older version depending on the type for failure. This CI should be run on all new PRs and nightly.
Note: All functionality of the cloud function (including ordering) is integration tested against buckets with object versioning enabled to ensure this solution works for buckets using this feature.
# Build Docker image
PROJECT_ID=$(gcloud config get-value project)
docker build -t gcr.io/$PROJECT_ID/gcs_event_based_ingest_ci -f Dockerfile.ci .
# Run unit tests
docker run --rm -it gcr.io/$PROJECT_ID/gcs_event_based_ingest_ci -k "not IT"
# Run integration tests
docker run --rm -it gcr.io/$PROJECT_ID/gcs_event_based_ingest_ci -k "IT"
# Run all tests
docker run --rm -it gcr.io/$PROJECT_ID/gcs_event_based_ingest_ci
Alternatively to the local cloudbuild or using the docker container to run your
tests, you can pip3 install -r requirements-dev.txt
and select certain tests
to run with python3 -m pytest
.
Note: This is not quite the same as callin
pytest
without thepython -m
prefix (pytest invocation docs) This is mostly useful if you'd like to integrate with your IDE debugger.
Caution: Integration tests will spin up / tear down cloud resources that can incur a small cost. These resources will be spun up based on your Google Cloud SDK Application Default Credentials
All Pytest fixtures are DRY-ed up into tests/conftest.py
This is mostly to share fixtures between the main integration test for the cloud
function and the integration tests for the backfill CLI. See more info on
sharing pytest fixtures in
the pytest docs
.
python3 -m pytest
python3 -m pytest -m "not IT"
python3 -m pytest -m IT
The system tests assume that you have deployed the cloud function.
export TF_VAR_short_sha=$(git rev-parse --short=10 HEAD)
export TF_VAR_project_id=jferriero-pp-dev
python3 -m pytest -vvv e2e
It is suggested to deploy this Cloud Function with the accompanying terraform module
Alternatively, you can deploy with Google Cloud SDK:
PROJECT_ID=your-project-id
TOPIC_ID=test-gcs-ocn
PUBSUB_TOPIC=projects/${PROJECT_ID/topics/${TOPIC_ID}
# Create Pub/Sub Object Change Notifications
gsutil notification create -f json -t ${PUBSUB_TOPIC} -e OBJECT_FINALIZE gs://${INGESTION_BUCKET}
# Deploy Cloud Function
gcloud functions deploy test-gcs-bq-ingest \
--region=us-west4 \
--source=gcs_ocn_bq_ingest \
--entrypoint=main \
--runtime=python38 \
--trigger-topic=${PUBSUB_TOPIC} \
--service-account=${SERVICE_ACCOUNT_EMAIL} \
--timeout=540 \
--set-env-vars='DESTINATION_REGEX=^(?:[\w\-0-9]+)/(?P<dataset>[\w\-_0-9]+)/(?P<table>[\w\-_0-9]+)/?(?:incremental|history)?/?(?P<yyyy>[0-9]{4})?/?(?P<mm>[0-9]{2})?/?(?P<dd>[0-9]{2})?/?(?P<hh>[0-9]{2})?/?(?P<batch>[0-9]+)?/?,FUNCTION_TIMEOUT_SEC=540'
PROJECT_ID=your-project-id
# Deploy Cloud Function
gcloud functions deploy test-gcs-bq-ingest \
--region=us-west4 \
--source=gcs_ocn_bq_ingest \
--entrypoint=main \
--runtime=python38 \
--trigger-resource ${INGESTION_BUCKET} \
--trigger-event google.storage.object.finalize
--service-account=${SERVICE_ACCOUNT_EMAIL} \
--timeout=540 \
--set-env-vars='DESTINATION_REGEX=^(?:[\w\-0-9]+)/(?P<dataset>[\w\-_0-9]+)/(?P<table>[\w\-_0-9]+)/?(?:incremental|history)?/?(?P<yyyy>[0-9]{4})?/?(?P<mm>[0-9]{2})?/?(?P<dd>[0-9]{2})?/?(?P<hh>[0-9]{2})?/?(?P<batch>[0-9]+)?/?,FUNCTION_TIMEOUT_SEC=540'
In theory, one could set up Pub/Sub notifications from multiple GCS Buckets (owned by different teams but following a common naming convention) to the same Pub/Sub topic so that data uploaded to any of these buckets could get automatically loaded to BigQuery by a single deployment of the Cloud Function.
It is possible to configure the Cloud Function to apply incrementals in order if this is crucial to your data integrity. This naturally comes with a performance penalty as for a given table we cannot parallelize ingestion of batches. The ordering behavior and options are described in detail in ORDERING.md
There are some cases where you may have data already copied to GCS according to
the naming convention / with success files before the Object Change
Notifications or Cloud Function have been set up. In these cases, you can use
the backfill.py
CLI utility to crawl an existing bucket searching for success
files. The utility supports either invoking the Cloud Function main method
locally (in concurrent threads) or publishing notifications for the success
files (for a deployed Cloud Function to pick up).
If you use the ordering feature on a table (or function wide) you should use the
NOTIFICATIONS
mode to repost notifications to a pub/sub topic that your
deployed Cloud Function is listening to. The LOCAL
mode does not support
ordering because this feature relies on (re)posting files like _bqlock
,
_BACKFILL
and various claim files and getting re-triggered by object
notifications for these. The script will publish the notifications for success
files and the Cloud Function will add these to the appropriate table's backlog.
Once the script completes you can drop the START_BACKFILL_FILENAME
(e.g. _HISTORYDONE
) for each table you want to trigger the backfill for. In
general, it would not be safe for this utility to drop a _HISTORYDONE
for
every table because the parallel historical loads might still be in progress.
python3 -m backfill -h
usage: backfill.py [-h] --gcs-path GCS_PATH [--mode {LOCAL,NOTIFICATIONS}] [--pubsub-topic PUBSUB_TOPIC] [--success-filename SUCCESS_FILENAME] [--destination-regex DESTINATION_REGEX]
utility to backfill success file notifications or run the cloud function locally in concurrent threads.
optional arguments:
-h, --help show this help message and exit
--gcs-path GCS_PATH, -p GCS_PATH
GCS path (e.g. gs://bucket/prefix/to/search/)to search for existing _SUCCESS files
--mode {LOCAL,NOTIFICATIONS}, -m {LOCAL,NOTIFICATIONS}
How to perform the backfill: LOCAL run cloud function main method locally (in concurrent threads) or NOTIFICATIONS just push notifications to Pub/Sub for a deployed
version of the cloud function to pick up. Default is NOTIFICATIONS.
--pubsub-topic PUBSUB_TOPIC, --topic PUBSUB_TOPIC, -t PUBSUB_TOPIC
Pub/Sub notifications topic to post notifications for. i.e. projects/{PROJECT_ID}/topics/{TOPIC_ID} Required if using NOTIFICATIONS mode.
--success-filename SUCCESS_FILENAME, -f SUCCESS_FILENAME
Override the default success filename '_SUCCESS'
--destination-regex DESTINATION_REGEX, -r DESTINATION_REGEX
Override the default destination regex for determining BigQuery destination based on information encoded in the GCS path of the success file
bqtail is a similar serverless configuration driven ingest to BigQuery from GCS that achieves batching based on window in processing time (as driven by Cloud Scheduler). BQ Tail has nice features for triggering Post actions (BQ queries / GCS file moves or deletes) once the data is ingested, and slack notifications. bqtail is well suited for use cases where the atomicity of event partition is not important (e.g. many distributed publishers uploading logs to GCS). Due to dependency of certain features of bqtail on Cloud Scheduler it cannot be used inside VPC-SC perimeters. This tool might be more appropriate when the publisher is authoritative on the atomicity of batches (e.g. an upstream hadoop job responsible for commiting an event time hour's worth of data).
Cloud Storage Transfer can also be used but requires separate configurations per table and supports only scheduled (rather than event based) loads. This can cause issues if the upstream publisher of data is behind schedule. This service does not support external query to perform transformations upon ingest.