From 5abf159efcc56e85d6e4cf9ca093dde76b1b7c8d Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Tue, 16 Apr 2024 11:59:51 +0200 Subject: [PATCH] OA: dag for gold open access mechanisms --- .../versions/64ac526a078b_my_db_revision.py | 2 +- .../gold_open_access_mechanisms.py | 75 +++++++++++++++++++ dags/open_access/utils.py | 6 ++ 3 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 dags/open_access/gold_open_access_mechanisms.py diff --git a/dags/migrations/versions/64ac526a078b_my_db_revision.py b/dags/migrations/versions/64ac526a078b_my_db_revision.py index ad0d7e7..1d0cab6 100644 --- a/dags/migrations/versions/64ac526a078b_my_db_revision.py +++ b/dags/migrations/versions/64ac526a078b_my_db_revision.py @@ -38,7 +38,7 @@ def upgrade(): sa.Column("cern_individual_apcs", sa.Integer, nullable=False), sa.Column("scoap3", sa.Integer, nullable=False), sa.Column("other", sa.Integer, nullable=False), - sa.Column("other_colectives", sa.Integer, nullable=False), + sa.Column("other_collective_models", sa.Integer, nullable=False), sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False), sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False), schema="oa", diff --git a/dags/open_access/gold_open_access_mechanisms.py b/dags/open_access/gold_open_access_mechanisms.py new file mode 100644 index 0000000..65999e9 --- /dev/null +++ b/dags/open_access/gold_open_access_mechanisms.py @@ -0,0 +1,75 @@ +from functools import reduce + +import open_access.utils as utils +import pendulum +from airflow.decorators import dag, task +from airflow.providers.postgres.operators.postgres import PostgresOperator + + +@dag( + start_date=pendulum.today("UTC").add(days=-1), + schedule="@monthly", + params={"year": 2023}, +) +def oa_gold_open_access_mechanisms(): + @task() + def fetch_data_task(query, **kwargs): + year = kwargs["params"].get("year") + golden_access_base_query = ( + r"(affiliation:CERN+or+595:'For+annual+report')" + + rf"and+year:{year}+not+980:ConferencePaper+" + + r"not+980:BookChapter+not+595:'Not+for+annual+report" + ) + type_of_query = [*query][0] + url = utils.get_url(f"{golden_access_base_query}+{query[type_of_query]}") + data = utils.get_data(url) + total = utils.get_total_results_count(data.text) + return {type_of_query: total} + + @task(multiple_outputs=True) + def join(values, **kwargs): + results = reduce(lambda a, b: {**a, **b}, values) + results["years"] = kwargs["params"].get("year") + return results + + results = fetch_data_task.expand( + query=[ + {"cern_read_and_publish": utils.cern_read_and_publish}, + {"cern_individual_apcs": utils.cern_individual_apcs}, + {"scoap3": utils.scoap3}, + {"other": utils.other}, + {"other_collective_models": utils.other_collective_models}, + ], + ) + unpacked_results = join(results) + + PostgresOperator( + task_id="populate_open_access_table", + postgres_conn_id="superset_qa", + sql=""" + SET search_path TO oa; + INSERT INTO "oa.open_access" (year, cern_read_and_publish, cern_individual_apcs, + scoap3, other, other_collective_models, created_at, updated_at) + VALUES (%(years)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (year) + DO UPDATE SET + cern_read_and_publish = EXCLUDED.cern_read_and_publish, + cern_individual_apcs = EXCLUDED.cern_individual_apcs, + scoap3 = EXCLUDED.scoap3, + other = EXCLUDED.other, + other_collective_models = EXCLUDED.other_collective_models, + updated_at = CURRENT_TIMESTAMP; + """, + parameters={ + "years": unpacked_results["years"], + "cern_read_and_publish": unpacked_results["cern_read_and_publish"], + "cern_individual_apcs": unpacked_results["cern_individual_apcs"], + "scoap3": unpacked_results["scoap3"], + "other": unpacked_results["other"], + "other_collective_models": unpacked_results["other_collective_models"], + }, + ) + + +OA_gold_open_access_mechanisms = oa_gold_open_access_mechanisms() diff --git a/dags/open_access/utils.py b/dags/open_access/utils.py index 718b398..ddcf80d 100644 --- a/dags/open_access/utils.py +++ b/dags/open_access/utils.py @@ -38,6 +38,12 @@ def get_total_results_count(data): r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')" ) +cern_read_and_publish = r"540__f:'CERN-RP" +cern_individual_apcs = r"540__f:'CERN-APC'" +scoap3 = r"540__f:'SCOAP3'" +other = r"540__f:'Other'" +other_collective_models = r"540__f:'Collective'" + @backoff.on_exception( backoff.expo, requests.exceptions.ProxyError, max_time=120, max_tries=5