Skip to content

Commit

Permalink
Merge pull request #220 from NASA-IMPACT/add/generic-vector-ingests
Browse files Browse the repository at this point in the history
Add/generic vector ingests
  • Loading branch information
paridhi-parajuli authored Aug 14, 2024
2 parents f252f3d + 8bcfbad commit cee64f5
Show file tree
Hide file tree
Showing 9 changed files with 447 additions and 2 deletions.
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,66 @@ Currently, the client id and domain of an existing Cognito user pool programmati
# Gitflow Model
[VEDA pipeline gitflow](./GITFLOW.md)
# Ingestion Pipeline Overview
This pipeline is designed to handle the ingestion of both vector and raster data. The ingestion can be performed using the `veda-discover` DAG. Below are examples of configurations for both vector and raster data.
## Ingestion Configuration
### Vector Data Ingestion
```json
{
"collection": "",
"bucket": "",
"prefix": "",
"filename_regex": ".*.csv$",
"id_template": "-{}",
"datetime_range": "",
"vector": true,
"x_possible": "longitude",
"y_possible": "latitude",
"source_projection": "EPSG:4326",
"target_projection": "EPSG:4326",
"extra_flags": ["-overwrite", "-lco", "OVERWRITE=YES"]
}
```

### Raster Data Ingestion
```json
{
"collection": "",
"bucket": "",
"prefix": "",
"filename_regex": ".*.tif$",
"datetime_range": "",
"assets": {
"co2": {
"title": "",
"description": ".",
"regex": ".*.tif$"
}
},
"id_regex": ".*_(.*).tif$",
"id_template": "-{}"
}

```
## Configuration Fields Description
- `collection`: The collection_id of the raster or vector data.
- `bucket`: The name of the S3 bucket where the data is stored.
- `prefix`: The location within the bucket where the files are to be discovered.
- `filename_regex`: A regex expression used to filter files based on naming patterns.
- `id_template`: The format used to create item identifiers in the system.
- `vector`: Set to true to trigger the generic vector ingestion pipeline.
- `vector_eis`: Set to true to trigger the EIS Fire specific vector ingestion pipeline.


## Pipeline Behaviour
Since this pipeline can ingest both raster and vector data, the configuration can be modified accordingly. The `"vector": true` triggers the `generic_ingest_vector` dag. If the `collection` is provided, it uses the collection name as the table name for ingestion (recommended to use `append` extra_flag when the collection is provided). When no `collection` is provided, it uses the `id_template` and generates a table name by appending the actual ingested filename to the id_template (recommended to use `overwrite` extra flag).

Setting `"vector_eis": true` will trigger the EIS Fire specific `ingest_vector` dag. If neither of these flags is set, the raster ingestion will be triggered, with the configuration typically looking like the raster ingestion example above.

# License
This project is licensed under **Apache 2**, see the [LICENSE](LICENSE) file for more details.

11 changes: 10 additions & 1 deletion dags/veda_data_pipeline/groups/discover_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def vector_raster_choice(ti):
dynamic_group_id = ti.task_id.split(".")[0]

if payload.get("vector"):
return f"{dynamic_group_id}.parallel_run_process_generic_vectors"
if payload.get("vector_eis"):
return f"{dynamic_group_id}.parallel_run_process_vectors"
return f"{dynamic_group_id}.parallel_run_process_rasters"

Expand Down Expand Up @@ -101,10 +103,17 @@ def subdag_discover(event={}):
python_callable=get_files_to_process,
)

run_process_generic_vector = TriggerMultiDagRunOperator(
task_id="parallel_run_process_generic_vectors",
trigger_dag_id="veda_generic_ingest_vector",
python_callable=get_files_to_process,
)

# extra no-op, needed to run in dynamic mapping context
end_discover = EmptyOperator(task_id="end_discover", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,)

discover_from_s3 >> raster_vector_branching >> [run_process_raster, run_process_vector]
discover_from_s3 >> raster_vector_branching >> [run_process_raster, run_process_vector, run_process_generic_vector]
run_process_raster >> end_discover
run_process_vector >> end_discover
run_process_generic_vector >> end_discover

120 changes: 120 additions & 0 deletions dags/veda_data_pipeline/veda_process_generic_vector_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import pendulum
from airflow import DAG
from airflow.models.variable import Variable
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule

from datetime import timedelta

dag_doc_md = """
### Generic Ingest Vector
#### Purpose
This DAG is supposed to be triggered by `veda_discover`. But you still can trigger this DAG manually or through an API
#### Notes
- This DAG can run with the following configuration <br>
```json
{
"collection": "",
"prefix": "transformed_csv/",
"bucket": "ghgc-data-store-develop",
"filename_regex": ".*.csv$",
"discovery": "s3",
"datetime_range": "month",
"vector": true,
"id_regex": "",
"id_template": "NIST_Urban_Testbed_test-{}",
"datetime_range": "",
"vector": true,
"x_possible": "longitude",
"y_possible": "latitude",
"source_projection": "EPSG:4326",
"target_projection": "EPSG:4326",
"extra_flags": ["-overwrite", "-lco", "OVERWRITE=YES"]
"discovered": 33,
"payload": "s3://data-pipeline-ghgc-dev-mwaa-597746869805/events/test_layer_name2/s3_discover_output_f88257e8-ee50-4a14-ace4-5612ae6ebf38.jsonn"
}
```
- [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines)
"""

templat_dag_run_conf = {
"collection": "<collection_name>",
"prefix": "<prefix>/",
"bucket": "<bucket>",
"filename_regex": "<filename_regex>",
"id_template": "<id_template_prefix>-{}",
"datetime_range": "<month>|<day>",
"vector": "false | true",
"x_possible": "<x_column_name>",
"y_possible": "<y_column_name>",
"source_projection": "<crs>",
"target_projection": "<crs>",
"extra_flags": "<args>",
"payload": "<s3_uri_event_payload>",
}
dag_args = {
"start_date": pendulum.today("UTC").add(days=-1),
"schedule_interval": None,
"catchup": False,
"doc_md": dag_doc_md,
}

with DAG(dag_id="veda_generic_ingest_vector", params=templat_dag_run_conf, **dag_args) as dag:
start = DummyOperator(task_id="Start", dag=dag)
end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag)

mwaa_stack_conf = Variable.get(
"MWAA_STACK_CONF", default_var={}, deserialize_json=True
)
vector_ecs_conf = Variable.get("VECTOR_ECS_CONF", deserialize_json=True)

generic_ingest_vector = EcsRunTaskOperator(
task_id="generic_ingest_vector",
trigger_rule=TriggerRule.NONE_FAILED,
cluster=f"{mwaa_stack_conf.get('PREFIX')}-cluster",
task_definition=f"{mwaa_stack_conf.get('PREFIX')}-generic-vector-tasks",
launch_type="FARGATE",
do_xcom_push=True,
execution_timeout=timedelta(minutes=120),
overrides={
"containerOverrides": [
{
"name": f"{mwaa_stack_conf.get('PREFIX')}-veda-generic_vector_ingest",
"command": [
"/var/lang/bin/python",
"handler.py",
"--payload",
"{}".format("{{ task_instance.dag_run.conf }}"),
],
"environment": [
{
"name": "EXTERNAL_ROLE_ARN",
"value": Variable.get(
"ASSUME_ROLE_READ_ARN", default_var=""
),
},
{
"name": "AWS_REGION",
"value": mwaa_stack_conf.get("AWS_REGION"),
},
{
"name": "VECTOR_SECRET_NAME",
"value": Variable.get("VECTOR_SECRET_NAME"),
},
],
},
],
},
network_configuration={
"awsvpcConfiguration": {
"securityGroups": vector_ecs_conf.get("VECTOR_SECURITY_GROUP") + mwaa_stack_conf.get("SECURITYGROUPS"),
"subnets": vector_ecs_conf.get("VECTOR_SUBNETS"),
},
},
awslogs_group=mwaa_stack_conf.get("LOG_GROUP_NAME"),
awslogs_stream_prefix=f"ecs/{mwaa_stack_conf.get('PREFIX')}-veda-generic-vector_ingest", # prefix with container name
)

start >> generic_ingest_vector >> end
2 changes: 1 addition & 1 deletion dags/veda_data_pipeline/veda_process_vector_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
{
"name": "EXTERNAL_ROLE_ARN",
"value": Variable.get(
"ASSUME_ROLE_READ_ARN", default_var=None
"ASSUME_ROLE_READ_ARN", default_var=""
),
},
{
Expand Down
10 changes: 10 additions & 0 deletions docker_tasks/generic_vector_ingest/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM --platform=linux/amd64 ghcr.io/lambgeo/lambda-gdal:3.6-python3.9
RUN yum update -y

WORKDIR /app
ENTRYPOINT []
RUN pip install --upgrade pip
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

COPY handler.py handler.py
Loading

0 comments on commit cee64f5

Please sign in to comment.