Skip to content

Commit

Permalink
OA: dag for gold open access mechanisms
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Apr 16, 2024
1 parent c4ed5b4 commit 5abf159
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 1 deletion.
2 changes: 1 addition & 1 deletion dags/migrations/versions/64ac526a078b_my_db_revision.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def upgrade():
sa.Column("cern_individual_apcs", sa.Integer, nullable=False),
sa.Column("scoap3", sa.Integer, nullable=False),
sa.Column("other", sa.Integer, nullable=False),
sa.Column("other_colectives", sa.Integer, nullable=False),
sa.Column("other_collective_models", sa.Integer, nullable=False),
sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False),
schema="oa",
Expand Down
75 changes: 75 additions & 0 deletions dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from functools import reduce

import open_access.utils as utils
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator


@dag(
start_date=pendulum.today("UTC").add(days=-1),
schedule="@monthly",
params={"year": 2023},
)
def oa_gold_open_access_mechanisms():
@task()
def fetch_data_task(query, **kwargs):
year = kwargs["params"].get("year")
golden_access_base_query = (
r"(affiliation:CERN+or+595:'For+annual+report')"
+ rf"and+year:{year}+not+980:ConferencePaper+"
+ r"not+980:BookChapter+not+595:'Not+for+annual+report"
)
type_of_query = [*query][0]
url = utils.get_url(f"{golden_access_base_query}+{query[type_of_query]}")
data = utils.get_data(url)
total = utils.get_total_results_count(data.text)
return {type_of_query: total}

@task(multiple_outputs=True)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
return results

results = fetch_data_task.expand(
query=[
{"cern_read_and_publish": utils.cern_read_and_publish},
{"cern_individual_apcs": utils.cern_individual_apcs},
{"scoap3": utils.scoap3},
{"other": utils.other},
{"other_collective_models": utils.other_collective_models},
],
)
unpacked_results = join(results)

PostgresOperator(
task_id="populate_open_access_table",
postgres_conn_id="superset_qa",
sql="""
SET search_path TO oa;
INSERT INTO "oa.open_access" (year, cern_read_and_publish, cern_individual_apcs,
scoap3, other, other_collective_models, created_at, updated_at)
VALUES (%(years)s, %(closed)s, %(bronze)s, %(green)s, %(gold)s,
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (year)
DO UPDATE SET
cern_read_and_publish = EXCLUDED.cern_read_and_publish,
cern_individual_apcs = EXCLUDED.cern_individual_apcs,
scoap3 = EXCLUDED.scoap3,
other = EXCLUDED.other,
other_collective_models = EXCLUDED.other_collective_models,
updated_at = CURRENT_TIMESTAMP;
""",
parameters={
"years": unpacked_results["years"],
"cern_read_and_publish": unpacked_results["cern_read_and_publish"],
"cern_individual_apcs": unpacked_results["cern_individual_apcs"],
"scoap3": unpacked_results["scoap3"],
"other": unpacked_results["other"],
"other_collective_models": unpacked_results["other_collective_models"],
},
)


OA_gold_open_access_mechanisms = oa_gold_open_access_mechanisms()
6 changes: 6 additions & 0 deletions dags/open_access/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ def get_total_results_count(data):
r"540__3:'publication'+and+" + r"(540__a:'CC-BY'+OR++540__a:'CC+BY')"
)

cern_read_and_publish = r"540__f:'CERN-RP"
cern_individual_apcs = r"540__f:'CERN-APC'"
scoap3 = r"540__f:'SCOAP3'"
other = r"540__f:'Other'"
other_collective_models = r"540__f:'Collective'"


@backoff.on_exception(
backoff.expo, requests.exceptions.ProxyError, max_time=120, max_tries=5
Expand Down

0 comments on commit 5abf159

Please sign in to comment.