Skip to content

Commit

Permalink
Add a toggle to turn Crossref Events off
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog committed Apr 25, 2024
1 parent 23e2169 commit 763bbc2
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 17 deletions.
66 changes: 49 additions & 17 deletions dags/oaebu_workflows/onix_workflow/onix_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import jsonlines
import pendulum
from airflow.decorators import dag, task, task_group
from airflow.exceptions import AirflowSkipException
from airflow.models.baseoperator import chain
from airflow.utils.trigger_rule import TriggerRule
from google.cloud.bigquery import Client, SourceFormat
from jinja2 import Environment, FileSystemLoader
from ratelimit import limits, sleep_and_retry
Expand All @@ -49,6 +51,7 @@
bq_find_schema,
bq_load_table,
bq_run_query,
bq_select_latest_table,
bq_select_table_shard_dates,
bq_sharded_table_id,
bq_table_id,
Expand Down Expand Up @@ -231,6 +234,7 @@ def create_dag(
bq_worksid_error_table_name: str = "onix_workid_isbn_errors",
bq_workfamilyid_table_name: str = "onix_workfamilyid_isbn",
oaebu_intermediate_match_suffix: str = "_matched",
skip_downloading_crossref_events: bool = True,
# Run parameters
data_partners: List[Union[str, OaebuPartner]] = None,
ga3_views_field="page_views",
Expand Down Expand Up @@ -275,6 +279,8 @@ def create_dag(
:param bq_worksid_error_table_name: table ID of the worksid error table
:param bq_workfamilyid_table_name: table ID of the workfamilyid table
:param oaebu_intermediate_match_suffix: Suffix to append to intermediate tables
:param skip_downloading_crossref_events: skip fetching new data for Crossref Events, when True, falls back to using
a previous version of the table.
:param data_partners: OAEBU data sources.
:param ga3_views_field: The name of the GA3 views field - should be either 'page_views' or 'unique_views'
Expand Down Expand Up @@ -490,12 +496,15 @@ def create_crossref_metadata_table(release: dict, **context) -> None:
)
set_task_state(state, context["ti"].task_id, release=release)

@task()
@task(pool="crossref_events_pool")
def create_crossref_events_table(release: dict, **context) -> None:
"""Download, transform, upload and create a table for crossref events"""

release = OnixWorkflowRelease.from_dict(release)

if skip_downloading_crossref_events:
raise AirflowSkipException("create_crossref_events_table: skipping fetching new Crossref Events data")

# Get the unique dois from the metadata table
metadata_table_id = bq_sharded_table_id(
cloud_workspace.project_id,
Expand All @@ -506,6 +515,7 @@ def create_crossref_events_table(release: dict, **context) -> None:
client = Client(project=cloud_workspace.project_id)
dois = dois_from_table(metadata_table_id, doi_column_name="DOI", client=client)
doi_prefixes = get_doi_prefixes(dois)
logging.info(f"Found DOI prefixes: {doi_prefixes}")

# Download and transform all events
start_date = crossref_start_date
Expand Down Expand Up @@ -535,7 +545,7 @@ def create_crossref_events_table(release: dict, **context) -> None:
)
set_task_state(state, context["ti"].task_id, release=release)

@task()
@task(trigger_rule=TriggerRule.ALL_DONE)
def create_book_table(release: dict, **context) -> None:
"""Create the oaebu book table using the crossref event and metadata tables"""

Expand All @@ -555,11 +565,14 @@ def create_book_table(release: dict, **context) -> None:
bq_oaebu_crossref_metadata_table_name,
release.snapshot_date,
)
crossref_events_table_id = bq_sharded_table_id(
cloud_workspace.project_id,
bq_oaebu_crossref_dataset_id,
bq_crossref_events_table_name,
release.snapshot_date,
client = Client(project=cloud_workspace.project_id)
crossref_events_table_id = bq_select_latest_table(
table_id=bq_table_id(
cloud_workspace.project_id, bq_oaebu_crossref_dataset_id, bq_crossref_events_table_name
),
end_date=release.snapshot_date,
sharded=True,
client=client,
)
sql = render_template(
os.path.join(sql_folder(workflow_module="onix_workflow"), "book.sql.jinja2"),
Expand All @@ -568,7 +581,6 @@ def create_book_table(release: dict, **context) -> None:
)
logging.info(sql)

client = Client(project=cloud_workspace.project_id)
status = bq_create_table_from_query(
sql=sql,
table_id=book_table_id,
Expand Down Expand Up @@ -1042,11 +1054,7 @@ def cleanup_workflow(release: dict, **context):
xcom_release = make_release()
task_aggregate_works = aggregate_works(xcom_release)
task_create_crossref_metadata_table = create_crossref_metadata_table(xcom_release)
task_create_crossref_events_table = create_crossref_events_table(
xcom_release,
pool=crossref_events_pool.pool_name,
pool_slots=1,
)
task_create_crossref_events_table = create_crossref_events_table(xcom_release)
task_create_book_table = create_book_table(xcom_release)
task_group_create_intermediate_tables = create_tasks_intermediate_tables(xcom_release)
task_create_book_product_table = create_book_product_table(xcom_release)
Expand Down Expand Up @@ -1083,11 +1091,12 @@ def get_doi_prefixes(dois: Iterable[str]) -> set[str]:

prefixes = set()
for doi in dois:
parts = doi.split("/")
if len(parts) == 2:
prefixes.add(parts[0])
match = re.search(r"^(10\.\d{4,9})", doi)
if match:
doi_prefix = match.group(1)
prefixes.add(doi_prefix)
else:
logging.warning(f"DOI is not made of two parts: {doi}")
logging.warning(f"No DOI prefix found: {doi}")

return prefixes

Expand Down Expand Up @@ -1132,6 +1141,29 @@ def download_crossref_events(
:return: All events for the input DOIs.
"""

# Create queries per year
# requests = []
# for prefix in doi_prefixes:
# for dt in pendulum.Period(start=start_date, end=end_date).range("years"):
# period_start = dt.start_of("year")
# period_end = dt.end_of("year")
#
# # Make sure the end date does not exceed the original end date
# if period_end > end_date:
# period_end = end_date
#
# request = Request(
# method="GET",
# url="https://api.eventdata.crossref.org/v1/events",
# params={
# "mailto": mailto,
# "from-collected-date": period_start.strftime("%Y-%m-%d"),
# "until-collected-date": period_end.strftime("%Y-%m-%d"),
# "rows": 1000,
# "obj-id.prefix": prefix,
# },
# )
# requests.append(request)
requests = [
Request(
method="GET",
Expand Down
1 change: 1 addition & 0 deletions tests/onix_workflow/test_onix_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ def vcr_ignore_condition(request):
sensor_dag_ids=sensor_dag_ids,
start_date=start_date,
crossref_start_date=pendulum.datetime(year=2018, month=5, day=14),
skip_downloading_crossref_events=False,
)

# Skip dag existence check in sensor.
Expand Down

0 comments on commit 763bbc2

Please sign in to comment.