diff --git a/database/metadata/databases/default/tables/public_view_crash_narratives_ocr_todo.yaml b/database/metadata/databases/default/tables/public_view_crash_narratives_ocr_todo.yaml new file mode 100644 index 000000000..55f3dd6bc --- /dev/null +++ b/database/metadata/databases/default/tables/public_view_crash_narratives_ocr_todo.yaml @@ -0,0 +1,3 @@ +table: + name: view_crash_narratives_ocr_todo + schema: public diff --git a/database/metadata/databases/default/tables/tables.yaml b/database/metadata/databases/default/tables/tables.yaml index 17e6d7b19..8fbf0a1ef 100644 --- a/database/metadata/databases/default/tables/tables.yaml +++ b/database/metadata/databases/default/tables/tables.yaml @@ -86,3 +86,4 @@ - "!include public_units.yaml" - "!include public_units_cris.yaml" - "!include public_units_edits.yaml" +- "!include public_view_crash_narratives_ocr_todo.yaml" diff --git a/database/migrations/default/1727375054583_narrative_ocr/down.sql b/database/migrations/default/1727375054583_narrative_ocr/down.sql new file mode 100644 index 000000000..5bad11e06 --- /dev/null +++ b/database/migrations/default/1727375054583_narrative_ocr/down.sql @@ -0,0 +1,7 @@ +drop view if exists view_crash_narratives_ocr_todo; + +alter table crashes_edits + drop column investigator_narrative_ocr_processed_at; + +alter table crashes + drop column investigator_narrative_ocr_processed_at; diff --git a/database/migrations/default/1727375054583_narrative_ocr/up.sql b/database/migrations/default/1727375054583_narrative_ocr/up.sql new file mode 100644 index 000000000..89254b2f2 --- /dev/null +++ b/database/migrations/default/1727375054583_narrative_ocr/up.sql @@ -0,0 +1,39 @@ +alter table crashes_edits +add column investigator_narrative_ocr_processed_at timestamp with time zone; + +alter table crashes +add column investigator_narrative_ocr_processed_at timestamp with time zone; + +comment on column crashes_edits.investigator_narrative_ocr_processed_at is 'The most recent +timestamp at which the OCR process attempted to extract the investigator narrative. If null, +indicates that the OCR narrative extract has never been attempted. This value should be set +via ETL process.'; + +comment on column crashes.investigator_narrative_ocr_processed_at is 'The most recent +timestamp at which the OCR process attempted to extract the investigator narrative. If null, +indicates that the OCR narrative extract has never been attempted. This value should be set +via ETL process on the crashes_edits table.'; + +create or replace view view_crash_narratives_ocr_todo as ( + select + id, + cris_crash_id + from + crashes + where + cr3_stored_fl = TRUE + and investigator_narrative is NULL + and ( + investigator_narrative_ocr_processed_at is NULL + or cr3_processed_at >= investigator_narrative_ocr_processed_at + ) + -- this issue started in Sep 2024 + -- we do not OCR very old crashes + and updated_at > '2024-09-01' + order by + cr3_processed_at asc, + id asc +); + +comment on view view_crash_narratives_ocr_todo is 'View which lists crashes which need to +be processed by the OCR narrative extraction ETL' diff --git a/database/migrations/default/1727451510064_preserve_crash_narrative/down.sql b/database/migrations/default/1727451510064_preserve_crash_narrative/down.sql new file mode 100644 index 000000000..a96afd21c --- /dev/null +++ b/database/migrations/default/1727451510064_preserve_crash_narrative/down.sql @@ -0,0 +1,5 @@ +drop trigger if exists +crashes_cris_preserve_investigator_narrative_on_update on crashes_cris; + +drop function if exists +public.crashes_cris_set_old_investigator_narrative; diff --git a/database/migrations/default/1727451510064_preserve_crash_narrative/up.sql b/database/migrations/default/1727451510064_preserve_crash_narrative/up.sql new file mode 100644 index 000000000..8ada9ca52 --- /dev/null +++ b/database/migrations/default/1727451510064_preserve_crash_narrative/up.sql @@ -0,0 +1,50 @@ +create or replace function public.crashes_cris_set_old_investigator_narrative() + returns trigger + language plpgsql + as $function$ +begin + new.investigator_narrative = old.investigator_narrative; + return new; +end; +$function$; + +create or replace trigger crashes_cris_preserve_investigator_narrative_on_update +before update on public.crashes_cris +for each row +when ( + new.investigator_narrative is null and old.investigator_narrative is not null +) +execute procedure public.crashes_cris_set_old_investigator_narrative(); + +comment on function public.crashes_cris_set_old_investigator_narrative is 'Sets the +investigator_narrative to its previous value if the updated value is null. This +trigger function addresses a known CRIS bug in which updated crash records are +missing the invesitgator narrative. It is tracked via DTS issue +https://github.com/cityofaustin/atd-data-tech/issues/18971 and CRIS ticket #854366'; + +-- +-- backfill narratives which have been erased +-- run this manually to prevent migration timeout +-- + +-- update +-- crashes_cris +-- set +-- investigator_narrative = updates_todo.investigator_narrative_old +-- from (select +-- record_id as crash_pk, +-- crashes.investigator_narrative as investigator_narrative_new, +-- record_json -> 'old' ->> 'investigator_narrative' as investigator_narrative_old +-- from +-- change_log_crashes_cris as changes +-- left join crashes on changes.record_id = crashes.id +-- where +-- record_json -> 'old' ->> 'investigator_narrative' is not null +-- and record_json -> 'new' ->> 'investigator_narrative' is null +-- and operation_type = 'UPDATE' +-- and changes.created_at > '2024-09-09' +-- and changes.created_by = 'cris' +-- order by +-- changes.id asc) as updates_todo +-- where +-- crashes_cris.id = updates_todo.crash_pk; diff --git a/etl/cris_import/.gitignore b/etl/cris_import/.gitignore index 42fa33a7e..bf15853d5 100644 --- a/etl/cris_import/.gitignore +++ b/etl/cris_import/.gitignore @@ -1,3 +1,4 @@ extracts/* !extracts/*.md *.zip +*.jpeg diff --git a/etl/cris_import/Dockerfile b/etl/cris_import/Dockerfile index e0531f7fe..dd946636b 100644 --- a/etl/cris_import/Dockerfile +++ b/etl/cris_import/Dockerfile @@ -6,7 +6,7 @@ COPY . /app RUN apt-get -y update # we need p7zip-full to extract CRIS zips # poppler-utils as a pdf2image dependency to snip crash diagrams out of pdfs -RUN apt-get install -y p7zip-full poppler-utils +RUN apt-get install -y p7zip-full poppler-utils tesseract-ocr RUN chmod -R 755 /app/* RUN chmod +x /app/cris_import.py RUN pip install -r requirements.txt diff --git a/etl/cris_import/README.md b/etl/cris_import/README.md index 2a601598b..ce9e8e88a 100644 --- a/etl/cris_import/README.md +++ b/etl/cris_import/README.md @@ -2,21 +2,23 @@ This ETL manages the processing and importing of TxDOT CRIS data into the Vision Zero database. -All data processing is managed by a single script, `cris_import.py` which processes both CSV files and CR3 PDF crash reports. The script supports a number of CLI arguments to handle a number of data processing scenarios. +The following scripts are available: -## Quick start +- **`cris_import.py`**: the primary data import script which processes both CSV files and CR3 PDF crash reports. +- **`cr3_ocr_narrative.py`**: a utility script which extracts crash narrative data from CR3 PDFs using Optical Character Recognition (OCR) +- **`_restore_zips_from_archive.py`**: a development helper script that moves CRIS extract zips from `./archive` to `./inbox` in the S3 bucket. -Set your `ENV` to `dev` in order to safely run the S3 operations locally. +## Quick start - CRIS import 1. Start your local Vision Zero cluster (database + Hasura + editor). -2. Save a copy of the `env_template` file as `.env`, and fill in the details. +2. Save a copy of the `env_template` file as `.env`, and fill in the details. Set the `BUCKET_ENV` variable to `dev` in order to safely run the S3 operations locally. 3. Build and run the docker image. This will drop you into the docker image's shell: ```shell -$ docker compose build # <- only need to do this once -$ docker compose run cris_import +docker compose build # <- do this once, and when dependencies change +docker compose run cris_import ``` 4. Run the CRIS import script. This will download any extracts available in S3, load the CSV crash records into the database, crop crash diagrams out of the CR3 PDFs, and upload the CR3 pdfs and crash diagrams to the s3 bucket. @@ -30,26 +32,27 @@ $ ./cris_import.py --s3-download --s3-upload --csv --pdf Create your environment by saving a copy of the `env_template` file as `.env`. The template includes default values for local development. See the password store for more details. -All interactions with AWS S3 occur with against a single bucket which has subdirectores for the `dev` and `prod` environments. If you set your `ENV` to `dev` you can safely run this ETL's S3 operations. +All interactions with AWS S3 occur against a single bucket which has subdirectores for the `dev` and `prod` environments. If you set your `BUCKET_ENV` to `dev` you can safely run this ETL's S3 operations. ``` -ENV=dev -HASURA_GRAPHQL_ENDPOINT="http://localhost:8084/v1/graphql" +BUCKET_ENV=dev +HASURA_GRAPHQL_ENDPOINT=http://localhost:8084/v1/graphql +HASURA_GRAPHQL_ADMIN_SECRET=hasurapassword AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= BUCKET_NAME= EXTRACT_PASSWORD= ``` -## Usage examples +## CRIS Import - `cris_import.py` -The only script that should be run directly is `cris_import.py`. It supports the following CLI args: +This is the primary data import script which processes both CSV files and CR3 PDF crash reports. It supports a number of CLI args, with usage examples described below. ```shell --csv Process CSV files. At least one of --csv or --pdf is required. --pdf Process CR3 pdfs. At least one of --csv or --pdf is required. --s3-download Source zip extracts from S3 bucket - --s3-upload Upload cr3 pdfs and digrams to S3 bucket + --s3-upload Upload cr3 pdfs and diagrams to S3 bucket --s3-archive If using --s3-download, move the processed extracts from ./inbox to ./archive when done --skip-unzip Only process files that are already unzipped in the local directory --verbose, -v Sets logging level to DEBUG mode @@ -69,7 +72,7 @@ $ ./cris_import.py --s3-download --s3-upload --s3-archive --csv --pdf --workers ### Local import -Process any extract zips in your local `./extracts` directory. CSVs will be loaded ino the db, and crash diagrams will be extracted but not uploaded to S3. +Process any extract zips in your local `./extracts` directory. CSVs will be loaded into the db, and crash diagrams will be extracted but not uploaded to S3. ```shell $ ./cris_import.py --csv --pdf @@ -92,3 +95,13 @@ $ ./cris_import.py --skip-unzip --csv # process pdfs with more workers and in debug mode $ ./cris_import.py --pdf --skip-unzip --s3-upload --workers 8 --verbose ``` + +## CR3 Narrative Extraction - `cr3_ocr_narrative.py` + +This utility script extracts crash narrative data from CR3 PDFs using Optical Character Recognition (OCR). Although CRIS provides an `investigator_narrative` column, it is often blank due to an unknown CRIS issue tracked [here](https://github.com/cityofaustin/atd-data-tech/issues/18971). + +It supports the `--verbose` flag to enable debug logging, and the number of concurrent workers can be set with the `--workers` flag. + +```shell + ./cr3_ocr_narrative.py --verbose --workers 4 +``` diff --git a/etl/cris_import/_restore_zips_from_archive.py b/etl/cris_import/_restore_zips_from_archive.py index 82481d775..97c6bacc2 100644 --- a/etl/cris_import/_restore_zips_from_archive.py +++ b/etl/cris_import/_restore_zips_from_archive.py @@ -13,7 +13,7 @@ ) BUCKET_NAME = os.environ["BUCKET_NAME"] -ENV = os.environ["ENV"] +ENV = os.environ["BUCKET_ENV"] def main(): diff --git a/etl/cris_import/cr3_ocr_narrative.py b/etl/cris_import/cr3_ocr_narrative.py new file mode 100755 index 000000000..eb196fa88 --- /dev/null +++ b/etl/cris_import/cr3_ocr_narrative.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +from datetime import datetime, timezone +from concurrent.futures import ProcessPoolExecutor + +from pdf2image import convert_from_bytes +from pytesseract import image_to_string, image_to_data + +from utils.cli import get_cli_args +from utils.files import download_cr3_pdf +from utils.graphql import ( + make_hasura_request, + NARRATIVES_TODO_QUERY, + UPDATE_CRASH_NARRATIVE_OCR_MUTATION, +) +from utils.logging import init_logger +from utils.process_pdfs import get_cr3_version +from utils.settings import NARRATIVE_BBOX_PIXELS + + +def extract_narrative_pdf(cris_crash_id, crash_pk, index): + """Handles narrative extraction of one PDF + + Args: + cris_crash_id (int): the CRIS crash ID + crash_pk (int): the crash ID in the VZ database + index (int): a unique id which captures the position of this + item in the list of narratives being processed by the + script. it enables better error logging during concurrency + """ + logger.info(f"Processing cris crash ID {cris_crash_id} ({index})") + pdf = download_cr3_pdf(cris_crash_id) + + logger.debug("Converting PDF to image...") + page = convert_from_bytes( + pdf, + fmt="jpeg", # jpeg is much faster than the default ppm fmt + first_page=2, # page 2 has the crash narrative + last_page=2, + dpi=150, + )[0] + + cr3_version = get_cr3_version(page) + logger.debug(f"CR3 version: {cr3_version}") + + # uncomment to save a copy of the pdf page image + # page.save(f"temp/{cris_crash_id}_page_{cr3_version}.jpeg") + + bbox = NARRATIVE_BBOX_PIXELS[cr3_version] + + logger.debug("Cropping narrative from PDF...") + narrative_image = page.crop(bbox) + + # uncomment to save a copy of the cropped narrative image + # narrative_image.save(f"temp/{cris_crash_id}_narrative_{cr3_version}.jpeg") + + logger.debug("Extracting narrative text...") + narrative = None + narrative = image_to_string(narrative_image, lang="eng") + + logger.debug(f"Extracted narrative: {narrative}") + + variables = { + "id": crash_pk, + "updates": { + "investigator_narrative": narrative, + "investigator_narrative_ocr_processed_at": datetime.now( + timezone.utc + ).isoformat(), + "updated_by": "dts_automation", + }, + } + + logger.debug("Updating crash record...") + make_hasura_request( + query=UPDATE_CRASH_NARRATIVE_OCR_MUTATION, variables=variables + ) + +def main(cli_args): + logger.info("Downloading crashes todo...") + todos = make_hasura_request(query=NARRATIVES_TODO_QUERY)[ + "view_crash_narratives_ocr_todo" + ] + + logger.info(f"{len(todos)} to process") + if not todos: + return + + futures = [] + with ProcessPoolExecutor(max_workers=cli_args.workers) as executor: + for index, crash in enumerate(todos): + future = executor.submit( + extract_narrative_pdf, crash["cris_crash_id"], crash["id"], index + ) + futures.append(future) + errors = [] + for index, future in enumerate(futures): + try: + future.result() + except Exception as e: + # grab the filename from the pdf list + crash = todos[index] + errors.append([crash["cris_crash_id"], e]) + + if errors: + logger.error( + f"Encountered {len(errors)} error(s) extracting narratives. Logging up to 10 errors and raising the first..." + ) + for cris_crash_id, e in errors[0:10]: + logger.info(f"Error processing CRIS crash ID {cris_crash_id}: {e}") + raise errors[0][1] + +if __name__ == "__main__": + cli_args = get_cli_args() + logger = init_logger(debug=cli_args.verbose) + main(cli_args) diff --git a/etl/cris_import/env_template b/etl/cris_import/env_template index 9106adf94..1a0cc4469 100644 --- a/etl/cris_import/env_template +++ b/etl/cris_import/env_template @@ -1,7 +1,7 @@ -ENV=dev AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= +BUCKET_ENV=dev BUCKET_NAME= EXTRACT_PASSWORD= -HASURA_GRAPHQL_ENDPOINT="http://localhost:8084/v1/graphql" +HASURA_GRAPHQL_ENDPOINT=http://localhost:8084/v1/graphql HASURA_GRAPHQL_ADMIN_SECRET=hasurapassword diff --git a/etl/cris_import/requirements.txt b/etl/cris_import/requirements.txt index 67e0dc982..b5fad467d 100644 --- a/etl/cris_import/requirements.txt +++ b/etl/cris_import/requirements.txt @@ -1,3 +1,4 @@ boto3==1.* pdf2image==1.* +pytesseract==0.* requests==2.* diff --git a/etl/cris_import/utils/files.py b/etl/cris_import/utils/files.py index 430ec5d14..bad003a86 100644 --- a/etl/cris_import/utils/files.py +++ b/etl/cris_import/utils/files.py @@ -1,13 +1,15 @@ +from io import BytesIO import os import subprocess from boto3 import client, resource +from botocore.exceptions import ClientError from utils.logging import get_logger from utils.settings import LOCAL_EXTRACTS_DIR -ENV = os.environ["ENV"] +ENV = os.environ["BUCKET_ENV"] BUCKET_NAME = os.environ["BUCKET_NAME"] EXTRACT_PASSWORD = os.environ["EXTRACT_PASSWORD"] @@ -144,3 +146,34 @@ def get_extract_dir(extract_name): def upload_file_to_s3(file_path, object_key): s3_client.upload_file(file_path, BUCKET_NAME, object_key) + + +def download_cr3_pdf(cris_crash_id): + """Download a CR3 PDF from S3 + + Args: + cris_crash_id (int): The CRIS crash ID + + Raises: + FileNotFoundError: When the requested PDF file is not found + in the bucket + + Returns: + bytes: The downloaded file + """ + pdf = BytesIO() + ENV = "prod" + object_key = f"{ENV}/cr3s/pdfs/{cris_crash_id}.pdf" + logger.debug(f"Downloading PDF: {object_key}") + try: + s3_client.download_fileobj(BUCKET_NAME, object_key, pdf) + except ClientError as e: + if e.response["Error"]["Code"] == "404": + raise FileNotFoundError( + f"'{object_key}' not found in bucket '{BUCKET_NAME}'" + ) + + else: + raise e + pdf.seek(0) + return pdf.read() diff --git a/etl/cris_import/utils/graphql.py b/etl/cris_import/utils/graphql.py index c7d9af0eb..984cd4046 100644 --- a/etl/cris_import/utils/graphql.py +++ b/etl/cris_import/utils/graphql.py @@ -105,6 +105,24 @@ } +NARRATIVES_TODO_QUERY = """ + query GetNarrativesToOCR { + view_crash_narratives_ocr_todo(limit: 1000) { + id, + cris_crash_id + } + } +""" + +UPDATE_CRASH_NARRATIVE_OCR_MUTATION = """ +mutation UpdateCrashNarrativeOCR($updates: crashes_edits_set_input!, $id: Int!) { + update_crashes_edits_by_pk(pk_columns: {id: $id}, _set: $updates) { + id + } +} +""" + + def make_hasura_request(*, query, variables=None): """Make a POST request to the graphql API. diff --git a/etl/cris_import/utils/process_pdfs.py b/etl/cris_import/utils/process_pdfs.py index f91a61e61..d1ea1edb1 100644 --- a/etl/cris_import/utils/process_pdfs.py +++ b/etl/cris_import/utils/process_pdfs.py @@ -5,21 +5,33 @@ import time -from pdf2image import convert_from_path, pdfinfo_from_path +from pdf2image import convert_from_path, pdfinfo_from_path, pdfinfo_from_bytes from utils.graphql import UPDATE_CRASH_CR3_FIELDS, make_hasura_request from utils.logging import get_logger from utils.files import upload_file_to_s3 from utils.settings import ( DIAGRAM_BBOX_PIXELS, - NEW_CR3_FORM_TEST_PIXELS, + CR3_FORM_V2_TEST_PIXELS, ) -ENV = os.getenv("ENV") +ENV = os.getenv("BUCKET_ENV") logger = get_logger() -def get_cr3_version(page, page_width): +def are_all_pixels_black(page, test_pixels, threshold=5): + for pixel in test_pixels: + rgb_pixel = page.getpixel(pixel) + if ( + rgb_pixel[0] > threshold + or rgb_pixel[1] > threshold + or rgb_pixel[2] > threshold + ): + return False + return True + + +def get_cr3_version(page): """Determine the CR3 form version. The check is conducted by sampling if various pixels are black. @@ -35,24 +47,13 @@ def get_cr3_version(page, page_width): Returns: str: 'v1_small', 'v1_large','v2_large', or 'v2_small' """ - page_size = "small" if page_width < 700 else "large" - test_pixels = NEW_CR3_FORM_TEST_PIXELS[page_size] - - for pixel in test_pixels: - rgb_pixel = page.getpixel(pixel) - if rgb_pixel[0] > 5 or rgb_pixel[1] > 5 or rgb_pixel[2] > 5: - # the PDF fails our pixel checks, so assume it's the - # earliest version - return f"v1_{page_size}" - - return f"v2_{page_size}" + width, height = page.size + page_size = "small" if width < 2000 else "large" + if are_all_pixels_black(page, CR3_FORM_V2_TEST_PIXELS[page_size]): + return f"v2_{page_size}" -def get_pdf_width(pdf_path): - """Return the width of the pdf in points""" - pdf_info = pdfinfo_from_path(pdf_path) - # parse width from a string that looks like '612 x 792 pts (letter)' - return int(pdf_info["Page size"].split(" ")[0]) + return f"v1_{page_size}" def crop_and_save_diagram(page, cris_crash_id, bbox, extract_dir): @@ -94,7 +95,6 @@ def process_pdf(extract_dir, filename, s3_upload, index): logger.info(f"Processing {filename} ({index})") cris_crash_id = int(filename.replace(".pdf", "")) pdf_path = os.path.join(extract_dir, "crashReports", filename) - page_width = get_pdf_width(pdf_path) logger.debug("Converting PDF to image...") @@ -106,7 +106,7 @@ def process_pdf(extract_dir, filename, s3_upload, index): dpi=150, )[0] - cr3_version = get_cr3_version(page, page_width) + cr3_version = get_cr3_version(page) bbox = DIAGRAM_BBOX_PIXELS[cr3_version] logger.debug("Cropping crash diagram...") diff --git a/etl/cris_import/utils/settings.py b/etl/cris_import/utils/settings.py index d80ee6e36..ac8aa3326 100644 --- a/etl/cris_import/utils/settings.py +++ b/etl/cris_import/utils/settings.py @@ -8,8 +8,16 @@ "v2_large": (2496, 3036, 4836, 5464), } +"""This is the bbox used to crop the crash narrative""" +NARRATIVE_BBOX_PIXELS = { + "v1_small": (30, 928, 681, 1590), + "v1_large": (296, 3683, 2580, 5749), + "v2_small": (30, 791, 650, 1430), + "v2_large": (90, 3026, 2496, 5466), +} + """If all four of these pixels are black, it is a 'new' CR3 pdf""" -NEW_CR3_FORM_TEST_PIXELS = { +CR3_FORM_V2_TEST_PIXELS = { "small": [ (115, 670), (300, 670), @@ -24,6 +32,7 @@ ], } + CSV_UPLOAD_BATCH_SIZE = 1000 # max number of CPU cores to utilize when processing PDFs concurrently