Skip to content

Commit

Permalink
models: replace workflow.run_number with major and minor run numbers
Browse files Browse the repository at this point in the history
Change the workflow table to split the run_number into two integers: one
is the major run number (the number before the dot), and the other one
is the minor run number (after the dot), which increases when
restarting a workflow, thus removing the limit of 9 restarts.

Closes #186.
  • Loading branch information
giuseppe-steduto committed Oct 30, 2023
1 parent 6271ecd commit 58fa915
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 53 deletions.
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The list of contributors in alphabetical order:
- `Camila Diaz <https://orcid.org/0000-0001-5543-797X>`_
- `Diego Rodriguez <https://orcid.org/0000-0003-0649-2002>`_
- `Dinos Kousidis <https://orcid.org/0000-0002-4914-4289>`_
- `Giuseppe Steduto <https://orcid.org/0009-0002-1258-8553>`_
- `Jan Okraska <https://orcid.org/0000-0002-1416-3244>`_
- `Leticia Wanderley <https://orcid.org/0000-0003-4649-6630>`_
- `Marco Donadoni <https://orcid.org/0000-0003-2922-5505>`_
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Changes
=======

Version 0.9.3 (UNRELEASED)
--------------------------
- Changes the ``Workflow`` table to replace the ``run_number`` column with two new columns ``run_number_major`` and ``run_number_minor``, in order to allow for more than 9 restarts.

Version 0.9.2 (2023-09-26)
--------------------------

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Separate run number into major and minor run numbers.
Revision ID: b85c3e601de4
Revises: 377cfbfccf75
Create Date: 2023-10-02 12:08:18.292490
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "b85c3e601de4"
down_revision = "377cfbfccf75"
branch_labels = None
depends_on = None


def upgrade():
"""Upgrade to b85c3e601de4 revision."""
# Add new columns (run_number_major, run_number_minor)
op.add_column(
"workflow", sa.Column("run_number_major", sa.Integer()), schema="__reana"
)
op.add_column(
"workflow",
sa.Column("run_number_minor", sa.Integer(), default=0),
schema="__reana",
)

# Data migration (split run_number into run_number_major and run_number_minor)
op.get_bind().execute(
sa.text(
"UPDATE __reana.workflow"
" SET run_number_major = FLOOR(run_number), "
" run_number_minor = (run_number - FLOOR(run_number)) * 10"
),
)

# Delete old constraint
op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana")

# Drop old run_number column
op.drop_column("workflow", "run_number", schema="__reana")

# Add new constraint (the primary key is not run_number anymore, but with major and minor run number
op.create_unique_constraint(
"_user_workflow_run_uc",
"workflow",
["name", "owner_id", "run_number_major", "run_number_minor"],
schema="__reana",
)

# Update run_number_minor for workflows that have been restarted more than 10 times
# (thus erroneously having the following run_number_major), in case some of them
# were created before the limit on 9 restarts was introduced.
op.get_bind().execute(
sa.text(
"""
UPDATE __reana.workflow AS w
SET
run_number_major = to_be_updated.new_major_run_number,
run_number_minor = (w.run_number_minor + (w.run_number_major - to_be_updated.new_major_run_number) * 10)
FROM (
SELECT MIN(w1.run_number_major) - 1 AS new_major_run_number, w1.workspace_path
FROM __reana.workflow w1
WHERE w1.restart AND w1.run_number_minor = 0
GROUP BY w1.workspace_path
) AS to_be_updated
WHERE w.workspace_path = to_be_updated.workspace_path
"""
),
)


def downgrade():
"""Downgrade to 377cfbfccf75 revision."""
# Revert constraint
op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana")

# Add old run_number column back
op.add_column("workflow", sa.Column("run_number", sa.Float()), schema="__reana")

# Check that there are no workflows discarded more than 10 times
# This is because of the way the info about restarts is stored in
# the run_number column (see https://github.com/reanahub/reana-db/issues/186)
restarted_ten_times = (
op.get_bind()
.execute("SELECT COUNT(*) FROM __reana.workflow WHERE run_number_minor >= 10")
.fetchone()[0]
)
if restarted_ten_times != 0:
raise ValueError(
"Cannot migrate database because some workflows have been restarted 10 or more times,"
" and the previous database revision only supports up to 9 restarts."
" If you want to downgrade, you should manually delete them."
)

# Data migration (combine run_number_major and restart_number back to run_number)
op.get_bind().execute(
"UPDATE __reana.workflow SET run_number=run_number_major+(run_number_minor * 1.0 /10)"
)

# Drop new columns
op.drop_column("workflow", "run_number_major", schema="__reana")
op.drop_column("workflow", "run_number_minor", schema="__reana")

# Restore old constraint
op.create_unique_constraint(
"_user_workflow_run_uc",
"workflow",
["name", "owner_id", "run_number"],
schema="__reana",
)
3 changes: 0 additions & 3 deletions reana_db/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,3 @@
os.getenv("REANA_PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY", "false")
)
"""Whether to run the periodic (cronjob) resource quota updater."""

LIMIT_RESTARTS = 9
"""Maximum number of times a workflow can be restarted."""
91 changes: 54 additions & 37 deletions reana_db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import uuid
from datetime import datetime
from functools import reduce
from typing import Dict, List
from typing import Dict, List, Tuple

from reana_commons.config import (
MQ_MAX_PRIORITY,
Expand Down Expand Up @@ -54,12 +54,12 @@
DB_SECRET_KEY,
DEFAULT_QUOTA_LIMITS,
DEFAULT_QUOTA_RESOURCES,
LIMIT_RESTARTS,
WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY,
)
from reana_db.utils import (
build_workspace_path,
store_workflow_disk_quota,
split_run_number,
update_users_cpu_quota,
update_users_disk_quota,
update_workflow_cpu_quota,
Expand Down Expand Up @@ -459,7 +459,8 @@ class Workflow(Base, Timestamp, QuotaBase):
run_started_at = Column(DateTime)
run_finished_at = Column(DateTime)
run_stopped_at = Column(DateTime)
_run_number = Column("run_number", Float)
run_number_major = Column(Integer)
run_number_minor = Column(Integer, default=0)
job_progress = Column(JSONType, default=dict)
workspace_path = Column(String)
restart = Column(Boolean, default=False)
Expand Down Expand Up @@ -487,7 +488,11 @@ class Workflow(Base, Timestamp, QuotaBase):

__table_args__ = (
UniqueConstraint(
"name", "owner_id", "run_number", name="_user_workflow_run_uc"
"name",
"owner_id",
"run_number_major",
"run_number_minor",
name="_user_workflow_run_uc",
),
{"schema": "__reana"},
)
Expand Down Expand Up @@ -527,7 +532,9 @@ def __init__(
self.git_repo = git_repo
self.git_provider = git_provider
self.restart = restart
self._run_number = self.assign_run_number(run_number)
self.run_number_major, self.run_number_minor = self.get_new_run_number(
run_number
)
self.workspace_path = workspace_path or build_workspace_path(
self.owner_id, self.id_
)
Expand All @@ -537,54 +544,66 @@ def __repr__(self):
"""Workflow string representation."""
return "<Workflow %r>" % self.id_

@hybrid_property
def run_number(self):
"""Property of run_number."""
if self._run_number.is_integer():
return int(self._run_number)
return self._run_number

@run_number.expression
def run_number(cls):
return func.abs(cls._run_number)

def assign_run_number(self, run_number):
"""Assing run number."""
@property
def run_number(self) -> str:
"""Get workflow run number."""
if self.run_number_minor != 0:
return f"{self.run_number_major}.{self.run_number_minor}"
return str(self.run_number_major)

def _get_last_workflow(self, run_number):
"""Fetch the last workflow restart given a certain run number."""
from .database import Session

if run_number:
run_number_major, run_number_minor = split_run_number(run_number)
last_workflow = (
Session.query(Workflow)
.filter(
Workflow.name == self.name,
Workflow.run_number >= int(run_number),
Workflow.run_number < int(run_number) + 1,
Workflow.run_number_major == run_number_major,
Workflow.owner_id == self.owner_id,
)
.order_by(Workflow.run_number.desc())
.order_by(
Workflow.run_number_major.desc(), Workflow.run_number_minor.desc()
)
.first()
)
else:
last_workflow = (
Session.query(Workflow)
.filter_by(name=self.name, restart=False, owner_id=self.owner_id)
.order_by(Workflow.run_number.desc())
.order_by(
Workflow.run_number_major.desc(), Workflow.run_number_minor.desc()
)
.first()
)
if last_workflow and self.restart:
# FIXME: remove the limit of nine restarts when we fix the way in which
# we save `run_number` in the DB
num_restarts = round(last_workflow.run_number * 10) % 10
if num_restarts == LIMIT_RESTARTS:
return last_workflow

def get_new_run_number(self, run_number) -> Tuple[int, int]:
"""Return the major and minor run numbers for a new workflow.
Return a tuple where the first element is the major run number and the
second element is the minor run number.
"""
last_workflow = self._get_last_workflow(run_number)

if not last_workflow:
if self.restart:
raise REANAValidationError(
f"Cannot restart a workflow more than {LIMIT_RESTARTS} times"
"Cannot restart a workflow that has not been run before."
)
return round(last_workflow.run_number + 0.1, 1)
return 1, 0 # First workflow run

else:
if not last_workflow:
return 1
if not self.restart:
run_number_major = last_workflow.run_number_major + 1
run_number_minor = 0
else:
return last_workflow.run_number + 1
run_number_major = last_workflow.run_number_major
run_number_minor = last_workflow.run_number_minor + 1

return run_number_major, run_number_minor

def get_input_parameters(self):
"""Return workflow parameters."""
Expand All @@ -604,7 +623,7 @@ def get_owner_access_token(self):

def get_full_workflow_name(self):
"""Return full workflow name including run number."""
return "{}.{}".format(self.name, str(self.run_number))
return "{}.{}".format(self.name, self.run_number)

Check warning on line 626 in reana_db/models.py

View check run for this annotation

Codecov / codecov/patch

reana_db/models.py#L626

Added line #L626 was not covered by tests

def get_workspace_disk_usage(self, summarize=False, search=None):
"""Retrieve disk usage information of a workspace."""
Expand Down Expand Up @@ -643,15 +662,13 @@ def get_all_restarts(self):
"""Get all the restarts of this workflow, including the original workflow.
Returns all the restarts of this workflow, that is all the workflows that have
the same name and the same run number (up to the dot). This includes the
the same name and the same major run number. This includes the
original workflow, as well as all the following restarts.
"""
run_number = int(self.run_number)
restarts = Workflow.query.filter(
Workflow.name == self.name,
Workflow.owner_id == self.owner_id,
Workflow.run_number >= run_number,
Workflow.run_number < run_number + 1,
Workflow.run_number_major == self.run_number_major,
)
return restarts

Expand Down
23 changes: 15 additions & 8 deletions reana_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ def build_workspace_path(user_id, workflow_id=None, workspace_root_path=None):
return workspace_path


def split_run_number(run_number):

Check warning on line 55 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L55

Added line #L55 was not covered by tests
"""Split run number into major and minor run numbers."""
run_number = str(run_number)
if "." in run_number:
return tuple(map(int, run_number.split(".", maxsplit=1)))
return int(run_number), 0


def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
"""Get Workflow from database with uuid or name.
Expand Down Expand Up @@ -128,21 +136,20 @@ def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
return _get_workflow_by_name(workflow_name, user_uuid)

# `run_number` was specified.
# Check `run_number` is valid.
try:
run_number = float(run_number)
run_number_major, run_number_minor = split_run_number(run_number)
except ValueError:
# `uuid_or_name` was split, so it is a dot-separated string
# but it didn't contain a valid `run_number`.
# Assume that this dot-separated string is the name of
# `uuid_or_name` didn't contain a valid `run_number`.
# Assume that this string is the name of
# the workflow and search with it.
return _get_workflow_by_name(uuid_or_name, user_uuid)

# `run_number` is valid.
# Search by `run_number` since it is a primary key.
# Search by `run_number_major` and `run_number_minor`, since it is a primary key.
workflow = Workflow.query.filter(
Workflow.name == workflow_name,
Workflow.run_number == run_number,
Workflow.run_number_major == run_number_major,
Workflow.run_number_minor == run_number_minor,
Workflow.owner_id == user_uuid,
).one_or_none()
if not workflow:
Expand All @@ -169,7 +176,7 @@ def _get_workflow_by_name(workflow_name, user_uuid):
Workflow.query.filter(
Workflow.name == workflow_name, Workflow.owner_id == user_uuid
)
.order_by(Workflow.run_number.desc())
.order_by(Workflow.run_number_major.desc(), Workflow.run_number_minor.desc())
.first()
)
if not workflow:
Expand Down
Loading

0 comments on commit 58fa915

Please sign in to comment.