Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch results of evening voting sessions #957

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions backend/howtheyvote/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,45 @@
import datetime

from sqlalchemy import exists, func, select
from sqlalchemy import select
from structlog import get_logger

from .. import config
from ..db import Session
from ..models import PipelineRun, PipelineRunResult, PlenarySession
from ..pipelines import MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline
from ..query import session_is_current_at
from .worker import SkipPipelineError, Weekday, Worker
from .worker import SkipPipelineError, Weekday, Worker, pipeline_ran_successfully

log = get_logger(__name__)


def op_rcv() -> None:
def op_rcv_midday() -> None:
"""Checks if there is a current plenary session and, if yes, fetches the latest roll-call
vote results."""
today = datetime.date.today()

if not _is_session_day(today):
raise SkipPipelineError()

if _ran_successfully(RCVListPipeline, today):
if pipeline_ran_successfully(RCVListPipeline, today):
raise SkipPipelineError()

pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today)
pipeline.run()


def op_rcv_evening() -> None:
"""While on most plenary days, there’s only one voting session around midday, on some days
there is another sesssion in the evening, usually around 17:00. The vote results of the
evening sessions are appended to the same source document that also contains the results
of the midday votes. This method fetches the latest roll-call vote results, even if they
have been fetched successfully earlier on the same day."""
today = datetime.date.today()

if not _is_session_day(today):
raise SkipPipelineError()

if pipeline_ran_successfully(RCVListPipeline, today, count=2):
raise SkipPipelineError()

pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today)
Expand Down Expand Up @@ -59,19 +77,6 @@ def _is_session_day(date: datetime.date) -> bool:
return session is not None


def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool:
"""Check if a given pipeline has been run successfully on a given day."""
query = (
exists()
.where(PipelineRun.pipeline == pipeline.__name__)
.where(func.date(PipelineRun.started_at) == func.date(date))
.where(PipelineRun.result == PipelineRunResult.SUCCESS)
.select()
)

return bool(Session.execute(query).scalar())


worker = Worker()

# Mon at 04:00
Expand All @@ -94,14 +99,24 @@ def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool:

# Mon-Thu between 12:00 and 15:00, every 10 mins
worker.schedule(
op_rcv,
op_rcv_midday,
name=RCVListPipeline.__name__,
weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU},
hours=range(12, 15),
minutes=range(0, 60, 10),
tz=config.TIMEZONE,
)

# Mon-Thu between 17:00 and 20:00, every 10 mins
worker.schedule(
op_rcv_evening,
name=RCVListPipeline.__name__,
weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU},
hours=range(17, 20),
minutes=range(0, 60, 10),
tz=config.TIMEZONE,
)

# Mon-Thu, between 13:00 and 20:00, every 30 mins
worker.schedule(
op_press,
Expand Down
19 changes: 19 additions & 0 deletions backend/howtheyvote/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client import start_http_server as start_metrics_server
from schedule import Scheduler
from sqlalchemy import func, select
from structlog import get_logger

from .. import config
Expand Down Expand Up @@ -58,6 +59,24 @@ class Weekday(enum.Enum):
Handler = Callable[..., Any]


def pipeline_ran_successfully(
pipeline: type[object],
date: datetime.date,
count: int = 1,
) -> bool:
"""Check if a given pipeline has been run successfully on a given day."""
query = (
select(func.count())
.select_from(PipelineRun)
.where(PipelineRun.pipeline == pipeline.__name__)
.where(func.date(PipelineRun.started_at) == func.date(date))
.where(PipelineRun.result == PipelineRunResult.SUCCESS)
)
result = Session.execute(query).scalar() or 0

return result >= count


class Worker:
"""Running a worker starts a long-running process that executes data pipelines in regular
intervals and stores the result of the pipeline runs in the database."""
Expand Down
44 changes: 43 additions & 1 deletion backend/tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from howtheyvote.models import PipelineRun, PipelineRunResult
from howtheyvote.pipelines import DataUnavailableError, PipelineError
from howtheyvote.worker.worker import Weekday, Worker
from howtheyvote.worker.worker import Weekday, Worker, pipeline_ran_successfully


def get_handler():
Expand Down Expand Up @@ -174,3 +174,45 @@ def pipeline_error():

assert runs[1].pipeline == "pipeline_error"
assert runs[1].result == PipelineRunResult.FAILURE


def test_pipeline_ran_successfully(db_session):
class TestPipeline:
pass

now = datetime.datetime.now()
today = now.date()

run = PipelineRun(
started_at=now,
finished_at=now,
pipeline=TestPipeline.__name__,
result=PipelineRunResult.FAILURE,
)
db_session.add(run)
db_session.commit()

assert pipeline_ran_successfully(TestPipeline, today) is False

run = PipelineRun(
started_at=now,
finished_at=now,
pipeline=TestPipeline.__name__,
result=PipelineRunResult.SUCCESS,
)
db_session.add(run)
db_session.commit()

assert pipeline_ran_successfully(TestPipeline, today) is True
assert pipeline_ran_successfully(TestPipeline, today, count=2) is False

run = PipelineRun(
started_at=now,
finished_at=now,
pipeline=TestPipeline.__name__,
result=PipelineRunResult.SUCCESS,
)
db_session.add(run)
db_session.commit()

assert pipeline_ran_successfully(TestPipeline, today, count=2) is True