Skip to content

Commit

Permalink
Added function for early stopping. Will error for late additions
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Nov 6, 2024
1 parent e13be56 commit 4046c89
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from observatory_platform.files import convert, add_partition_date, save_jsonl_gz
from observatory_platform.google.gcs import gcs_upload_files, gcs_blob_uri, gcs_blob_name_from_path, gcs_download_blob
from observatory_platform.airflow.tasks import check_dependencies
from observatory_platform.google.bigquery import bq_load_table, bq_table_id, bq_create_dataset
from observatory_platform.google.bigquery import bq_load_table, bq_table_id, bq_create_dataset, bq_run_query
from observatory_platform.sftp import SftpFolders, make_sftp_connection
from observatory_platform.airflow.workflow import CloudWorkspace, cleanup
from observatory_platform.airflow.release import PartitionRelease, set_task_state
Expand Down Expand Up @@ -195,7 +195,13 @@ def fetch_releases(**context) -> List[dict]:
release_info[release_date] += sftp_files

if not bool(release_info):
raise AirflowSkipException("No new releases available. Skipping downstream DAG tasks.")
sales_table_id = bq_table_id(

Check warning on line 198 in dags/oaebu_workflows/google_books_telescope/google_books_telescope.py

View check run for this annotation

Codecov / codecov/patch

dags/oaebu_workflows/google_books_telescope/google_books_telescope.py#L198

Added line #L198 was not covered by tests
cloud_workspace.project_id, sales_partner.bq_dataset_id, sales_partner.bq_table_name
)
traffic_table_id = bq_table_id(

Check warning on line 201 in dags/oaebu_workflows/google_books_telescope/google_books_telescope.py

View check run for this annotation

Codecov / codecov/patch

dags/oaebu_workflows/google_books_telescope/google_books_telescope.py#L201

Added line #L201 was not covered by tests
cloud_workspace.project_id, traffic_partner.bq_dataset_id, traffic_partner.bq_table_name
)
_gb_early_stop(sales_table_id, traffic_table_id, cloud_workspace, logical_date=context["logical_date"])

Check warning on line 204 in dags/oaebu_workflows/google_books_telescope/google_books_telescope.py

View check run for this annotation

Codecov / codecov/patch

dags/oaebu_workflows/google_books_telescope/google_books_telescope.py#L204

Added line #L204 was not covered by tests

releases = []
run_id = context["run_id"]
Expand Down Expand Up @@ -448,3 +454,42 @@ def gb_transform(
save_path = sales_path if report_type == "sales" else traffic_path
print(f"SAVING REPORT '{report_type}' to {save_path}")
save_jsonl_gz(save_path, report_results)


def _gb_early_stop(
sales_table_id: str, traffic_table_id: str, cloud_workspace: CloudWorkspace, logical_date: pendulum.DateTime
) -> None:
"""Decides how to stop. Will normally send a skip exception. However, if it's past the 4th of the month, will
send an AirlfowException instead with the intention of making an alert through slack.
Will also check that the sales and traffic tables have the same partitions presnt.
:param sales_table_id: The ID of the sales table
:param traffic_table_id: The ID of the traffic table
:param cloud_workspace: The cloud workspace object
:param logical_date: The logical date of this run
"""

client = Client(project=cloud_workspace.project_id)
sales_dates = get_partitions(sales_table_id, client=client)
traffic_dates = get_partitions(traffic_table_id, client=client)
if not sales_dates == traffic_dates:
raise AirflowException(f"Tables do not have the same partitions: {sales_table_id} != {traffic_table_id}")

this_run_date = logical_date.subtract(months=1).end_of("month")
most_recent_pd = sorted([pendulum.parse(s) for s in sales_dates])[-1] # The most recent partition date
if most_recent_pd < this_run_date:
if logical_date.day > 4:
raise AirflowException("It's past the 4th and there are no files avialable for upload!")
else:
raise AirflowSkipException("No files required for processing. Skipping.")


def get_partitions(table_id: str, partition_key: str = "release_date", client: Client = None) -> List[str]:
"""Queries the table and returns a list of distinct partitions
:param table_id: The fully qualified table id to query
:param partition_key: The name of the column that the table is partitioned on
:return: List of partitions in descending order
"""
query = f"SELECT DISTINCT({partition_key}) FROM {table_id} ORDER BY {partition_key} desc"
return bq_run_query(query, client=client)

Check warning on line 495 in dags/oaebu_workflows/google_books_telescope/google_books_telescope.py

View check run for this annotation

Codecov / codecov/patch

dags/oaebu_workflows/google_books_telescope/google_books_telescope.py#L494-L495

Added lines #L494 - L495 were not covered by tests
46 changes: 45 additions & 1 deletion tests/google_books_telescope/test_google_books_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from unittest.mock import patch

import pendulum
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models.connection import Connection
from airflow.utils.state import State
from click.testing import CliRunner
Expand All @@ -30,6 +30,7 @@
GoogleBooksRelease,
create_dag,
gb_transform,
_gb_early_stop,
)
from observatory_platform.airflow.workflow import Workflow
from observatory_platform.dataset_api import DatasetAPI
Expand Down Expand Up @@ -364,3 +365,46 @@ def test_gb_transform(self):
transform_traffic_path,
pendulum.parse("2020-03-01"),
)


class TestGBEarlyStop(SandboxTestCase):
"""Tests for the _gb_early_stop function"""

def setUp(self):
"""Set up test fixtures before each test method."""
self.sales_table_id = "sales_table"
self.traffic_table_id = "traffic_table"
self.logical_date = pendulum.datetime(2024, 2, 1)

@patch("oaebu_workflows.google_books_telescope.google_books_telescope.get_partitions")
def test_matching_partitions_with_current_data(self, mock_get_partitions):
"""Test when partitions match and data is current - should not raise any exception."""
mock_get_partitions.side_effect = [
["2024-01-31", "2024-02-28"], # sales dates
["2024-01-31", "2024-02-28"], # traffic dates
]
_gb_early_stop(self.sales_table_id, self.traffic_table_id, self.fake_cloud_workspace, self.logical_date)

@patch("oaebu_workflows.google_books_telescope.google_books_telescope.get_partitions")
def test_mismatched_partitions(self, mock_get_partitions):
"""Test when partitions don't match between tables."""
mock_get_partitions.side_effect = [["2024-01-31", "2024-02-28"], ["2024-01-31"]]

with self.assertRaisesRegex(AirflowException, "Tables do not have"):
_gb_early_stop(self.sales_table_id, self.traffic_table_id, self.fake_cloud_workspace, self.logical_date)

@patch("oaebu_workflows.google_books_telescope.google_books_telescope.get_partitions")
def test_missing_data_before_fourth(self, mock_get_partitions):
"""Test when data is missing but it's before the 4th of the month."""
logical_date = pendulum.datetime(2024, 2, 3)
mock_get_partitions.side_effect = [["2023-12-31"], ["2023-12-31"]]
with self.assertRaises(AirflowSkipException):
_gb_early_stop(self.sales_table_id, self.traffic_table_id, self.fake_cloud_workspace, logical_date)

@patch("oaebu_workflows.google_books_telescope.google_books_telescope.get_partitions")
def test_missing_data_after_fourth(self, mock_get_partitions):
"""Test when data is missing and it's after the 4th of the month."""
logical_date = pendulum.datetime(2024, 2, 5)
mock_get_partitions.side_effect = [["2023-12-31"], ["2023-12-31"]]
with self.assertRaisesRegex(AirflowException, "It's past the 4th"):
_gb_early_stop(self.sales_table_id, self.traffic_table_id, self.fake_cloud_workspace, logical_date)

0 comments on commit 4046c89

Please sign in to comment.