Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inspire Matomo: visits per day DAG #40

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ airflow standalone
### 6. Start Postgres with Docker Compose

If you're using Docker to manage your Postgres database, start the service.
IMPORTANT: Please add CDS_TOKEN value in Docker compose file
IMPORTANT: Please add CDS_TOKEN and MATOMO_AUTH_TOKEN value in Docker compose file

```sh
docker-compose -f docker-compose.standalone.yaml up
Expand Down
15 changes: 15 additions & 0 deletions dags/common/models/inspire_matomo/inspire_matomo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from sqlalchemy import Column, Date, DateTime, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class MatomoData(Base):
__tablename__ = "inspire_matomo_data"

id = Column(Integer, primary_key=True)
date = Column(Date)
visits = Column(Integer)
unique_visitors = Column(Integer)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
96 changes: 96 additions & 0 deletions dags/inspire_matomo/inspire_visits_per_day_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import json
import os
from datetime import datetime

import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from common.models.inspire_matomo.inspire_matomo import MatomoData
from common.operators.sqlalchemy_operator import sqlalchemy_task
from executor_config import kubernetes_executor_config
from inspire_matomo.utils import get_parameters
from tenacity import retry_if_exception_type, stop_after_attempt

now = datetime.now()
formatted_date = now.strftime("%Y-%m-%d")


@dag(
start_date=pendulum.today("UTC").add(days=-1),
schedule="@monthly",
params={"period": "day", "date": formatted_date},
)
def inspire_visits_per_day_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_visits_per_day(**kwargs):
http_hook = HttpHook(http_conn_id="matomo", method="GET")
period = kwargs["params"].get("period")
date = kwargs["params"].get("date")
parameters = get_parameters(
period=period, date=date, endpoint_key="visits_per_day"
)
token = os.environ.get("MATOMO_AUTH_TOKEN")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use the airflow variables for this kind of things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left the comment regarding this below👇

response = http_hook.run_with_advanced_retry(
endpoint="/index.php",
data=parameters,
headers={"Authorization": f"Bearer {token}"},
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
return response.text

@task(executor_config=kubernetes_executor_config)
def fetch_unique_visitors_per_day(**kwargs):
http_hook = HttpHook(http_conn_id="matomo", method="GET")
period = kwargs["params"].get("period")
date = kwargs["params"].get("date")
parameters = get_parameters(
period=period, date=date, endpoint_key="unique_visitors"
)
token = os.environ.get("MATOMO_AUTH_TOKEN")
response = http_hook.run_with_advanced_retry(
endpoint="/index.php",
data=parameters,
headers={"Authorization": f"Bearer {token}"},
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
return response.text

@sqlalchemy_task(conn_id="superset")
def populate_database(visits_per_day, unique_visitors_per_day, session, **kwargs):
visits_per_day_json = json.loads(visits_per_day)
unique_visitors_per_day_json = json.loads(unique_visitors_per_day)
date = kwargs["params"].get("date")

record = (
session.query(MatomoData)
.filter_by(date=visits_per_day_json.get("date"))
.first()
)
if record:
record.visits = int(visits_per_day_json.visits)
record.unique_visitors = int(unique_visitors_per_day_json.unique_visitors)
else:
parsed_date = datetime.strptime(date, "%Y-%m-%d").date()
new_record = MatomoData(
visits=int(visits_per_day_json.get("value")),
unique_visitors=int(unique_visitors_per_day_json.get("value")),
date=parsed_date,
)
session.add(new_record)

visits_per_day = fetch_visits_per_day()
unique_visitors_per_day = fetch_unique_visitors_per_day()
populate_database(
visits_per_day=visits_per_day,
unique_visitors_per_day=unique_visitors_per_day,
)


inspire_visits_per_day = inspire_visits_per_day_dag()
22 changes: 22 additions & 0 deletions dags/inspire_matomo/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os


def get_endpoint(key):
end_point_map = {
"visits_per_day": "VisitsSummary.getVisits",
"unique_visitors": "VisitsSummary.getUniqueVisitors",
}
return end_point_map[key]


def get_parameters(period, date, endpoint_key):
return {
"module": "API",
"token_auth": os.environ.get("MATOMO_AUTH_TOKEN"),
"idSite": os.environ.get("MATOMO_SITE_ID"),
"date": str(date),
"period": period,
"format": "json",
"module": "API",
"method": get_endpoint(endpoint_key),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Database revision for Insipre Matomo

Revision ID: fc43c200a255
Revises: db8ba01db969
Create Date: 2024-08-15 17:20:55.998545

"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "fc43c200a255"
down_revision: Union[str, None] = "db8ba01db969"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.create_table(
"inspire_matomo_data",
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("date", sa.Date, nullable=False),
sa.Column("visits", sa.Integer, nullable=False),
sa.Column("unique_visitors", sa.Integer, nullable=False),
sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False),
)


def downgrade() -> None:
op.drop_table("inspire_matomo_data")
Loading