diff --git a/AUTHORS.rst b/AUTHORS.rst index 35ea030..008f205 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -7,6 +7,7 @@ The list of contributors in alphabetical order: - `Camila Diaz `_ - `Diego Rodriguez `_ - `Dinos Kousidis `_ +- `Giuseppe Steduto `_ - `Jan Okraska `_ - `Leticia Wanderley `_ - `Marco Donadoni `_ diff --git a/CHANGES.rst b/CHANGES.rst index 9996a3f..3229242 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,10 @@ Changes ======= +Version 0.9.3 (UNRELEASED) +-------------------------- +- Replaces ``run_number`` column of the ``Workflow`` table with two new columns ``generation_number`` and ``restart_number``, to allow for more than 9 restarts. + Version 0.9.2 (2023-09-26) -------------------------- diff --git a/reana_db/alembic/versions/20231002_1208_b85c3e601de4_separate_run_and_restart_number.py b/reana_db/alembic/versions/20231002_1208_b85c3e601de4_separate_run_and_restart_number.py new file mode 100644 index 0000000..2112afc --- /dev/null +++ b/reana_db/alembic/versions/20231002_1208_b85c3e601de4_separate_run_and_restart_number.py @@ -0,0 +1,114 @@ +"""Separate run number into generation and restart number. + +Revision ID: b85c3e601de4 +Revises: 377cfbfccf75 +Create Date: 2023-10-02 12:08:18.292490 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "b85c3e601de4" +down_revision = "377cfbfccf75" +branch_labels = None +depends_on = None + + +def upgrade(): + """Upgrade to b85c3e601de4 revision.""" + # Add new columns (generation_number, restart_number) + op.add_column( + "workflow", sa.Column("generation_number", sa.Integer()), schema="__reana" + ) + op.add_column( + "workflow", + sa.Column("restart_number", sa.Integer(), default=0), + schema="__reana", + ) + + # Data migration (split run_number into generation_number and restart_number) + op.get_bind().execute( + sa.text( + "UPDATE __reana.workflow" + " SET generation_number = FLOOR(run_number), " + " restart_number = (run_number - FLOOR(run_number)) * 10" + ), + ) + + # Delete old constraint + op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana") + + # Drop old run_number column + op.drop_column("workflow", "run_number", schema="__reana") + + # Add new constraint (the primary key is not run_number anymore, but with generation and restart number + op.create_unique_constraint( + "_user_workflow_run_uc", + "workflow", + ["name", "owner_id", "generation_number", "restart_number"], + schema="__reana", + ) + + # Update restart_number for workflows that have been restarted more than 10 times + # (thus erroneously having the following generation_number), in case some of them + # were created before the limit on 9 restarts was introduced. + op.get_bind().execute( + sa.text( + """ + UPDATE __reana.workflow AS w + SET + generation_number = to_be_updated.new_generation_number, + restart_number = (w.restart_number + (w.generation_number - to_be_updated.new_generation_number) * 10) + FROM ( + SELECT MIN(w1.generation_number) - 1 AS new_generation_number, w1.workspace_path + FROM __reana.workflow w1 + WHERE w1.restart AND w1.restart_number = 0 + GROUP BY w1.workspace_path + ) AS to_be_updated + WHERE w.workspace_path = to_be_updated.workspace_path + """ + ), + ) + + +def downgrade(): + """Downgrade to 377cfbfccf75 revision.""" + # Revert constraint + op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana") + + # Add old run_number column back + op.add_column("workflow", sa.Column("run_number", sa.Float()), schema="__reana") + + # Check that there are no workflows discarded more than 10 times + # This is because of the way the info about restarts is stored in + # the run_number column (see https://github.com/reanahub/reana-db/issues/186) + restarted_ten_times = ( + op.get_bind() + .execute("SELECT COUNT(*) FROM __reana.workflow WHERE restart_number >= 10") + .fetchone()[0] + ) + if restarted_ten_times != 0: + raise ValueError( + "Cannot migrate database because some workflows have been restarted 10 or more times," + " and the previous database revision only supports up to 9 restarts." + " If you want to downgrade, you should manually delete them." + ) + + # Data migration (combine generation_number and restart_number back to run_number) + op.get_bind().execute( + "UPDATE __reana.workflow SET run_number=generation_number+(restart_number * 1.0 /10)" + ) + + # Drop new columns + op.drop_column("workflow", "generation_number", schema="__reana") + op.drop_column("workflow", "restart_number", schema="__reana") + + # Restore old constraint + op.create_unique_constraint( + "_user_workflow_run_uc", + "workflow", + ["name", "owner_id", "run_number"], + schema="__reana", + ) diff --git a/reana_db/config.py b/reana_db/config.py index 99ebabb..5fd404f 100644 --- a/reana_db/config.py +++ b/reana_db/config.py @@ -85,6 +85,3 @@ os.getenv("REANA_PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY", "false") ) """Whether to run the periodic (cronjob) resource quota updater.""" - -LIMIT_RESTARTS = 9 -"""Maximum number of times a workflow can be restarted.""" diff --git a/reana_db/models.py b/reana_db/models.py index d62c211..d5a8403 100644 --- a/reana_db/models.py +++ b/reana_db/models.py @@ -16,7 +16,7 @@ import uuid from datetime import datetime from functools import reduce -from typing import Dict, List +from typing import Dict, List, Tuple from reana_commons.config import ( MQ_MAX_PRIORITY, @@ -54,12 +54,12 @@ DB_SECRET_KEY, DEFAULT_QUOTA_LIMITS, DEFAULT_QUOTA_RESOURCES, - LIMIT_RESTARTS, WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY, ) from reana_db.utils import ( build_workspace_path, store_workflow_disk_quota, + split_run_number, update_users_cpu_quota, update_users_disk_quota, update_workflow_cpu_quota, @@ -459,7 +459,8 @@ class Workflow(Base, Timestamp, QuotaBase): run_started_at = Column(DateTime) run_finished_at = Column(DateTime) run_stopped_at = Column(DateTime) - _run_number = Column("run_number", Float) + generation_number = Column(Integer) + restart_number = Column(Integer, default=0) job_progress = Column(JSONType, default=dict) workspace_path = Column(String) restart = Column(Boolean, default=False) @@ -487,7 +488,11 @@ class Workflow(Base, Timestamp, QuotaBase): __table_args__ = ( UniqueConstraint( - "name", "owner_id", "run_number", name="_user_workflow_run_uc" + "name", + "owner_id", + "generation_number", + "restart_number", + name="_user_workflow_run_uc", ), {"schema": "__reana"}, ) @@ -527,7 +532,9 @@ def __init__( self.git_repo = git_repo self.git_provider = git_provider self.restart = restart - self._run_number = self.assign_run_number(run_number) + self.generation_number, self.restart_number = self.get_new_run_number( + run_number + ) self.workspace_path = workspace_path or build_workspace_path( self.owner_id, self.id_ ) @@ -537,54 +544,65 @@ def __repr__(self): """Workflow string representation.""" return "" % self.id_ - @hybrid_property - def run_number(self): - """Property of run_number.""" - if self._run_number.is_integer(): - return int(self._run_number) - return self._run_number - - @run_number.expression - def run_number(cls): - return func.abs(cls._run_number) - - def assign_run_number(self, run_number): - """Assing run number.""" + def run_number(self) -> str: + """Get workflow run number.""" + if self.restart_number != 0: + return f"{self.generation_number}.{self.restart_number}" + return str(self.generation_number) + + def _get_last_workflow(self, run_number): + """Fetch the last workflow restart given a certain run number.""" from .database import Session if run_number: + generation_number, restart_number = split_run_number(run_number) last_workflow = ( Session.query(Workflow) .filter( Workflow.name == self.name, - Workflow.run_number >= int(run_number), - Workflow.run_number < int(run_number) + 1, + Workflow.generation_number == generation_number, Workflow.owner_id == self.owner_id, ) - .order_by(Workflow.run_number.desc()) + .order_by( + Workflow.generation_number.desc(), Workflow.restart_number.desc() + ) .first() ) else: last_workflow = ( Session.query(Workflow) .filter_by(name=self.name, restart=False, owner_id=self.owner_id) - .order_by(Workflow.run_number.desc()) + .order_by( + Workflow.generation_number.desc(), Workflow.restart_number.desc() + ) .first() ) - if last_workflow and self.restart: - # FIXME: remove the limit of nine restarts when we fix the way in which - # we save `run_number` in the DB - num_restarts = round(last_workflow.run_number * 10) % 10 - if num_restarts == LIMIT_RESTARTS: + return last_workflow + + def get_new_run_number(self, run_number) -> Tuple[int, int]: + """Return the generation and restart numbers for a new workflow. + + Return a tuple where the first element is the generation number and the + second element is the restart number. + """ + last_workflow = self._get_last_workflow(run_number) + + if not last_workflow: + if self.restart: raise REANAValidationError( - f"Cannot restart a workflow more than {LIMIT_RESTARTS} times" + "Cannot restart a workflow that has not been run before." ) - return round(last_workflow.run_number + 0.1, 1) + return 1, 0 # First generation, no restart + else: - if not last_workflow: - return 1 + if not self.restart: + generation_number = last_workflow.generation_number + 1 + restart_number = 0 else: - return last_workflow.run_number + 1 + generation_number = last_workflow.generation_number + restart_number = last_workflow.restart_number + 1 + + return generation_number, restart_number def get_input_parameters(self): """Return workflow parameters.""" @@ -604,7 +622,7 @@ def get_owner_access_token(self): def get_full_workflow_name(self): """Return full workflow name including run number.""" - return "{}.{}".format(self.name, str(self.run_number)) + return "{}.{}".format(self.name, self.run_number()) def get_workspace_disk_usage(self, summarize=False, search=None): """Retrieve disk usage information of a workspace.""" @@ -643,15 +661,13 @@ def get_all_restarts(self): """Get all the restarts of this workflow, including the original workflow. Returns all the restarts of this workflow, that is all the workflows that have - the same name and the same run number (up to the dot). This includes the + the same name and the same generation number. This includes the original workflow, as well as all the following restarts. """ - run_number = int(self.run_number) restarts = Workflow.query.filter( Workflow.name == self.name, Workflow.owner_id == self.owner_id, - Workflow.run_number >= run_number, - Workflow.run_number < run_number + 1, + Workflow.generation_number == self.generation_number, ) return restarts diff --git a/reana_db/utils.py b/reana_db/utils.py index e56d91d..dec82ec 100644 --- a/reana_db/utils.py +++ b/reana_db/utils.py @@ -52,6 +52,14 @@ def build_workspace_path(user_id, workflow_id=None, workspace_root_path=None): return workspace_path +def split_run_number(run_number): + """Split run number into generation and restart numbers.""" + run_number = str(run_number) + if "." in run_number: + return tuple(map(int, run_number.split(".", maxsplit=1))) + return int(run_number), 0 + + def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid): """Get Workflow from database with uuid or name. @@ -128,21 +136,31 @@ def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid): return _get_workflow_by_name(workflow_name, user_uuid) # `run_number` was specified. - # Check `run_number` is valid. try: - run_number = float(run_number) + generation_number, restart_number = run_number.split(".", maxsplit=1) + except ValueError: + # There were not enough dot-separated substrings, so probably + # the `restart_number` was not specified. + generation_number = run_number + restart_number = 0 + + # Check `run_number` and `restart_number` are valid. + try: + generation_number = int(generation_number) + restart_number = int(restart_number) except ValueError: - # `uuid_or_name` was split, so it is a dot-separated string + # `uuid_or_name` was split, so it is a dot-separated string, # but it didn't contain a valid `run_number`. # Assume that this dot-separated string is the name of # the workflow and search with it. return _get_workflow_by_name(uuid_or_name, user_uuid) # `run_number` is valid. - # Search by `run_number` since it is a primary key. + # Search by `generation_number` and `restart_number`, since it is a primary key. workflow = Workflow.query.filter( Workflow.name == workflow_name, - Workflow.run_number == run_number, + Workflow.generation_number == generation_number, + Workflow.restart_number == restart_number, Workflow.owner_id == user_uuid, ).one_or_none() if not workflow: @@ -169,7 +187,7 @@ def _get_workflow_by_name(workflow_name, user_uuid): Workflow.query.filter( Workflow.name == workflow_name, Workflow.owner_id == user_uuid ) - .order_by(Workflow.run_number.desc()) + .order_by(Workflow.generation_number.desc(), Workflow.restart_number.desc()) .first() ) if not workflow: diff --git a/tests/test_models.py b/tests/test_models.py index 0831c6a..92ef68c 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -51,7 +51,9 @@ def test_workflow_run_number_assignment(db, session, new_user): ) session.add(first_workflow) session.commit() - assert first_workflow.run_number == 1 + assert first_workflow.run_number() == "1" + assert first_workflow.generation_number == 1 + assert first_workflow.restart_number == 0 second_workflow = Workflow( id_=str(uuid4()), name=workflow_name, @@ -62,7 +64,9 @@ def test_workflow_run_number_assignment(db, session, new_user): ) session.add(second_workflow) session.commit() - assert second_workflow.run_number == 2 + assert second_workflow.run_number() == "2" + assert second_workflow.generation_number == 2 + assert second_workflow.restart_number == 0 first_workflow_restart = Workflow( id_=str(uuid4()), name=workflow_name, @@ -71,11 +75,13 @@ def test_workflow_run_number_assignment(db, session, new_user): type_="serial", logs="", restart=True, - run_number=first_workflow.run_number, + run_number=first_workflow.run_number(), ) session.add(first_workflow_restart) session.commit() - assert first_workflow_restart.run_number == 1.1 + assert first_workflow_restart.run_number() == "1.1" + assert first_workflow_restart.generation_number == 1 + assert first_workflow_restart.restart_number == 1 first_workflow_second_restart = Workflow( id_=str(uuid4()), name=workflow_name, @@ -84,11 +90,13 @@ def test_workflow_run_number_assignment(db, session, new_user): type_="serial", logs="", restart=True, - run_number=first_workflow_restart.run_number, + run_number=first_workflow_restart.run_number(), ) session.add(first_workflow_second_restart) session.commit() - assert first_workflow_second_restart.run_number == 1.2 + assert first_workflow_second_restart.run_number() == "1.2" + assert first_workflow_second_restart.generation_number == 1 + assert first_workflow_second_restart.restart_number == 2 def test_workflow_retention_rules(db, session, new_user): diff --git a/tests/test_utils.py b/tests/test_utils.py index 558b4ca..3a66d88 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -16,7 +16,27 @@ from reana_commons.config import SHARED_VOLUME_PATH from reana_db.models import Workflow -from reana_db.utils import _get_workflow_with_uuid_or_name +from reana_db.utils import _get_workflow_with_uuid_or_name, split_run_number + + +@pytest.mark.parametrize( + "run_number, generation_number, restart_number", + [ + ("1", 1, 0), + ("156.12", 156, 12), + ("2.4", 2, 4), + (3.22, 3, 22), + pytest.param( + "1.2.3", + None, + None, + marks=pytest.mark.xfail(raises=ValueError, strict=True), + ), + ], +) +def test_split_run_number(run_number, generation_number, restart_number): + """Tests for split_run_number().""" + assert split_run_number(run_number) == (generation_number, restart_number) @pytest.mark.parametrize(