diff --git a/backend/howtheyvote/worker/__init__.py b/backend/howtheyvote/worker/__init__.py index f884180c0..fe614f5c3 100644 --- a/backend/howtheyvote/worker/__init__.py +++ b/backend/howtheyvote/worker/__init__.py @@ -1,6 +1,6 @@ import datetime -from sqlalchemy import exists, func, select +from sqlalchemy import select from structlog import get_logger from .. import config @@ -8,12 +8,12 @@ from ..models import PipelineRun, PipelineRunResult, PlenarySession from ..pipelines import MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline from ..query import session_is_current_at -from .worker import SkipPipelineError, Weekday, Worker +from .worker import SkipPipelineError, Weekday, Worker, pipeline_ran_successfully log = get_logger(__name__) -def op_rcv() -> None: +def op_rcv_midday() -> None: """Checks if there is a current plenary session and, if yes, fetches the latest roll-call vote results.""" today = datetime.date.today() @@ -21,7 +21,25 @@ def op_rcv() -> None: if not _is_session_day(today): raise SkipPipelineError() - if _ran_successfully(RCVListPipeline, today): + if pipeline_ran_successfully(RCVListPipeline, today): + raise SkipPipelineError() + + pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) + pipeline.run() + + +def op_rcv_evening() -> None: + """While on most plenary days, there’s only one voting session around midday, on some days + there is another sesssion in the evening, usually around 17:00. The vote results of the + evening sessions are appended to the same source document that also contains the results + of the midday votes. This method fetches the latest roll-call vote results, even if they + have been fetched successfully earlier on the same day.""" + today = datetime.date.today() + + if not _is_session_day(today): + raise SkipPipelineError() + + if pipeline_ran_successfully(RCVListPipeline, today, count=2): raise SkipPipelineError() pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) @@ -59,19 +77,6 @@ def _is_session_day(date: datetime.date) -> bool: return session is not None -def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: - """Check if a given pipeline has been run successfully on a given day.""" - query = ( - exists() - .where(PipelineRun.pipeline == pipeline.__name__) - .where(func.date(PipelineRun.started_at) == func.date(date)) - .where(PipelineRun.result == PipelineRunResult.SUCCESS) - .select() - ) - - return bool(Session.execute(query).scalar()) - - worker = Worker() # Mon at 04:00 @@ -94,7 +99,7 @@ def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: # Mon-Thu between 12:00 and 15:00, every 10 mins worker.schedule( - op_rcv, + op_rcv_midday, name=RCVListPipeline.__name__, weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU}, hours=range(12, 15), @@ -102,6 +107,16 @@ def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: tz=config.TIMEZONE, ) +# Mon-Thu between 17:00 and 20:00, every 10 mins +worker.schedule( + op_rcv_evening, + name=RCVListPipeline.__name__, + weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU}, + hours=range(17, 20), + minutes=range(0, 60, 10), + tz=config.TIMEZONE, +) + # Mon-Thu, between 13:00 and 20:00, every 30 mins worker.schedule( op_press, diff --git a/backend/howtheyvote/worker/worker.py b/backend/howtheyvote/worker/worker.py index 199dcb28c..df8f2ffe2 100644 --- a/backend/howtheyvote/worker/worker.py +++ b/backend/howtheyvote/worker/worker.py @@ -9,6 +9,7 @@ from prometheus_client import Counter, Gauge, Histogram from prometheus_client import start_http_server as start_metrics_server from schedule import Scheduler +from sqlalchemy import func, select from structlog import get_logger from .. import config @@ -58,6 +59,24 @@ class Weekday(enum.Enum): Handler = Callable[..., Any] +def pipeline_ran_successfully( + pipeline: type[object], + date: datetime.date, + count: int = 1, +) -> bool: + """Check if a given pipeline has been run successfully on a given day.""" + query = ( + select(func.count()) + .select_from(PipelineRun) + .where(PipelineRun.pipeline == pipeline.__name__) + .where(func.date(PipelineRun.started_at) == func.date(date)) + .where(PipelineRun.result == PipelineRunResult.SUCCESS) + ) + result = Session.execute(query).scalar() or 0 + + return result >= count + + class Worker: """Running a worker starts a long-running process that executes data pipelines in regular intervals and stores the result of the pipeline runs in the database.""" diff --git a/backend/tests/worker/test_worker.py b/backend/tests/worker/test_worker.py index b98a759db..dd924e24b 100644 --- a/backend/tests/worker/test_worker.py +++ b/backend/tests/worker/test_worker.py @@ -5,7 +5,7 @@ from howtheyvote.models import PipelineRun, PipelineRunResult from howtheyvote.pipelines import DataUnavailableError, PipelineError -from howtheyvote.worker.worker import Weekday, Worker +from howtheyvote.worker.worker import Weekday, Worker, pipeline_ran_successfully def get_handler(): @@ -174,3 +174,45 @@ def pipeline_error(): assert runs[1].pipeline == "pipeline_error" assert runs[1].result == PipelineRunResult.FAILURE + + +def test_pipeline_ran_successfully(db_session): + class TestPipeline: + pass + + now = datetime.datetime.now() + today = now.date() + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + result=PipelineRunResult.FAILURE, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today) is False + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + result=PipelineRunResult.SUCCESS, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today) is True + assert pipeline_ran_successfully(TestPipeline, today, count=2) is False + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + result=PipelineRunResult.SUCCESS, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today, count=2) is True