Skip to content

Commit

Permalink
DAGs: fixes
Browse files Browse the repository at this point in the history
* Inspire matomo record update fix.
* Annual Reports scheduler date fix to every 15 days.
  • Loading branch information
ErnestaP committed Aug 19, 2024
1 parent 890a13c commit 8529734
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion dags/annual_reports/annual_reports_categories.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

@dag(
start_date=pendulum.today("UTC").add(days=-1),
schedule="@monthly",
schedule="0 0 */15 * *",
)
def annual_reports_categories_dag():
@task(executor_config=kubernetes_executor_config)
Expand Down
5 changes: 4 additions & 1 deletion dags/annual_reports/anuual_reports_publications.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
years = list(range(2004, current_year + 1))


@dag(start_date=pendulum.today("UTC").add(days=-1), schedule="@monthly")
@dag(
start_date=pendulum.today("UTC").add(days=-1),
schedule="0 0 */15 * *",
)
def annual_reports_publications_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_publication_report_count(year, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion dags/common/models/inspire_matomo/inspire_matomo.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ class MatomoData(Base):
visits = Column(Integer)
unique_visitors = Column(Integer)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
updated_at = Column(DateTime, default=func.now())
15 changes: 7 additions & 8 deletions dags/inspire_matomo/inspire_visits_per_day_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from common.operators.sqlalchemy_operator import sqlalchemy_task
from executor_config import kubernetes_executor_config
from inspire_matomo.utils import get_parameters
from sqlalchemy import func
from tenacity import retry_if_exception_type, stop_after_attempt

now = datetime.now()
Expand Down Expand Up @@ -64,20 +65,18 @@ def fetch_unique_visitors_per_day(**kwargs):

@sqlalchemy_task(conn_id="superset")
def populate_database(visits_per_day, unique_visitors_per_day, session, **kwargs):
print(json.loads(visits_per_day))
visits_per_day_json = json.loads(visits_per_day)
unique_visitors_per_day_json = json.loads(unique_visitors_per_day)
date = kwargs["params"].get("date")
parsed_date = datetime.strptime(date, "%Y-%m-%d").date()

record = (
session.query(MatomoData)
.filter_by(date=visits_per_day_json.get("date"))
.first()
)
record = session.query(MatomoData).filter_by(date=parsed_date).first()
if record:
record.visits = int(visits_per_day_json.visits)
record.unique_visitors = int(unique_visitors_per_day_json.unique_visitors)
record.visits = int(visits_per_day_json.get("value"))
record.unique_visitors = int(unique_visitors_per_day_json.get("value"))
record.updated_at = func.now()
else:
parsed_date = datetime.strptime(date, "%Y-%m-%d").date()
new_record = MatomoData(
visits=int(visits_per_day_json.get("value")),
unique_visitors=int(unique_visitors_per_day_json.get("value")),
Expand Down

0 comments on commit 8529734

Please sign in to comment.