Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

models: replace workflow.run_number with generation and restart number #205

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The list of contributors in alphabetical order:
- `Camila Diaz <https://orcid.org/0000-0001-5543-797X>`_
- `Diego Rodriguez <https://orcid.org/0000-0003-0649-2002>`_
- `Dinos Kousidis <https://orcid.org/0000-0002-4914-4289>`_
- `Giuseppe Steduto <https://orcid.org/0009-0002-1258-8553>`_
- `Jan Okraska <https://orcid.org/0000-0002-1416-3244>`_
- `Leticia Wanderley <https://orcid.org/0000-0003-4649-6630>`_
- `Marco Donadoni <https://orcid.org/0000-0003-2922-5505>`_
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Changes
=======

Version 0.9.3 (UNRELEASED)
--------------------------
- Changes the ``Workflow`` table to replace the ``run_number`` column with two new columns ``run_number_major`` and ``run_number_minor``, in order to allow for more than 9 restarts.

Version 0.9.2 (2023-09-26)
--------------------------

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Separate run number into major and minor run numbers.

Revision ID: b85c3e601de4
Revises: 377cfbfccf75
Create Date: 2023-10-02 12:08:18.292490

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "b85c3e601de4"
down_revision = "377cfbfccf75"
branch_labels = None
depends_on = None


def upgrade():
mdonadoni marked this conversation as resolved.
Show resolved Hide resolved
"""Upgrade to b85c3e601de4 revision."""
# Add new columns (run_number_major, run_number_minor)
op.add_column(
"workflow", sa.Column("run_number_major", sa.Integer()), schema="__reana"
)
op.add_column(
"workflow",
sa.Column("run_number_minor", sa.Integer(), default=0),
schema="__reana",
)

# Data migration (split run_number into run_number_major and run_number_minor)
op.get_bind().execute(
sa.text(
"UPDATE __reana.workflow"
" SET run_number_major = FLOOR(run_number), "
" run_number_minor = (run_number - FLOOR(run_number)) * 10"
),
)

# Delete old constraint
op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana")

# Drop old run_number column
op.drop_column("workflow", "run_number", schema="__reana")

# Add new constraint (the primary key is not run_number anymore, but with major and minor run number
op.create_unique_constraint(
"_user_workflow_run_uc",
"workflow",
["name", "owner_id", "run_number_major", "run_number_minor"],
schema="__reana",
)

# Update run_number_minor for workflows that have been restarted more than 10 times
# (thus erroneously having the following run_number_major), in case some of them
# were created before the limit on 9 restarts was introduced.
op.get_bind().execute(
sa.text(
"""
UPDATE __reana.workflow AS w
SET
run_number_major = to_be_updated.new_major_run_number,
run_number_minor = (w.run_number_minor + (w.run_number_major - to_be_updated.new_major_run_number) * 10)
FROM (
SELECT MIN(w1.run_number_major) - 1 AS new_major_run_number, w1.workspace_path
FROM __reana.workflow w1
WHERE w1.restart AND w1.run_number_minor = 0
GROUP BY w1.workspace_path
) AS to_be_updated
WHERE w.workspace_path = to_be_updated.workspace_path
"""
),
)


def downgrade():
"""Downgrade to 377cfbfccf75 revision."""
# Revert constraint
op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana")

# Add old run_number column back
op.add_column("workflow", sa.Column("run_number", sa.Float()), schema="__reana")

# Check that there are no workflows discarded more than 10 times
# This is because of the way the info about restarts is stored in
# the run_number column (see https://github.com/reanahub/reana-db/issues/186)
restarted_ten_times = (
op.get_bind()
.execute("SELECT COUNT(*) FROM __reana.workflow WHERE run_number_minor >= 10")
.fetchone()[0]
)
if restarted_ten_times != 0:
raise ValueError(
"Cannot migrate database because some workflows have been restarted 10 or more times,"
" and the previous database revision only supports up to 9 restarts."
" If you want to downgrade, you should manually delete them."
)

# Data migration (combine run_number_major and restart_number back to run_number)
op.get_bind().execute(
"UPDATE __reana.workflow SET run_number=run_number_major+(run_number_minor * 1.0 /10)"
)

# Drop new columns
op.drop_column("workflow", "run_number_major", schema="__reana")
op.drop_column("workflow", "run_number_minor", schema="__reana")

# Restore old constraint
op.create_unique_constraint(
"_user_workflow_run_uc",
"workflow",
["name", "owner_id", "run_number"],
schema="__reana",
)
3 changes: 0 additions & 3 deletions reana_db/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,3 @@
os.getenv("REANA_PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY", "false")
)
"""Whether to run the periodic (cronjob) resource quota updater."""

LIMIT_RESTARTS = 9
"""Maximum number of times a workflow can be restarted."""
91 changes: 54 additions & 37 deletions reana_db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import uuid
from datetime import datetime
from functools import reduce
from typing import Dict, List
from typing import Dict, List, Tuple

from reana_commons.config import (
MQ_MAX_PRIORITY,
Expand Down Expand Up @@ -54,12 +54,12 @@
DB_SECRET_KEY,
DEFAULT_QUOTA_LIMITS,
DEFAULT_QUOTA_RESOURCES,
LIMIT_RESTARTS,
WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY,
)
from reana_db.utils import (
build_workspace_path,
store_workflow_disk_quota,
split_run_number,
update_users_cpu_quota,
update_users_disk_quota,
update_workflow_cpu_quota,
Expand Down Expand Up @@ -459,7 +459,8 @@
run_started_at = Column(DateTime)
run_finished_at = Column(DateTime)
run_stopped_at = Column(DateTime)
_run_number = Column("run_number", Float)
run_number_major = Column(Integer)
run_number_minor = Column(Integer, default=0)
job_progress = Column(JSONType, default=dict)
workspace_path = Column(String)
restart = Column(Boolean, default=False)
Expand Down Expand Up @@ -487,7 +488,11 @@

__table_args__ = (
UniqueConstraint(
"name", "owner_id", "run_number", name="_user_workflow_run_uc"
"name",
"owner_id",
"run_number_major",
"run_number_minor",
name="_user_workflow_run_uc",
),
{"schema": "__reana"},
)
Expand Down Expand Up @@ -527,7 +532,9 @@
self.git_repo = git_repo
self.git_provider = git_provider
self.restart = restart
self._run_number = self.assign_run_number(run_number)
self.run_number_major, self.run_number_minor = self.get_new_run_number(
run_number
)
self.workspace_path = workspace_path or build_workspace_path(
self.owner_id, self.id_
)
Expand All @@ -537,54 +544,66 @@
"""Workflow string representation."""
return "<Workflow %r>" % self.id_

@hybrid_property
def run_number(self):
"""Property of run_number."""
if self._run_number.is_integer():
return int(self._run_number)
return self._run_number

@run_number.expression
def run_number(cls):
return func.abs(cls._run_number)

def assign_run_number(self, run_number):
"""Assing run number."""
@property
def run_number(self) -> str:
mdonadoni marked this conversation as resolved.
Show resolved Hide resolved
"""Get workflow run number."""
if self.run_number_minor != 0:
return f"{self.run_number_major}.{self.run_number_minor}"
return str(self.run_number_major)

def _get_last_workflow(self, run_number):
"""Fetch the last workflow restart given a certain run number."""
from .database import Session

if run_number:
run_number_major, run_number_minor = split_run_number(run_number)
last_workflow = (
Session.query(Workflow)
.filter(
Workflow.name == self.name,
Workflow.run_number >= int(run_number),
Workflow.run_number < int(run_number) + 1,
Workflow.run_number_major == run_number_major,
Workflow.owner_id == self.owner_id,
)
.order_by(Workflow.run_number.desc())
.order_by(
Workflow.run_number_major.desc(), Workflow.run_number_minor.desc()
)
.first()
)
else:
last_workflow = (
Session.query(Workflow)
.filter_by(name=self.name, restart=False, owner_id=self.owner_id)
.order_by(Workflow.run_number.desc())
.order_by(
Workflow.run_number_major.desc(), Workflow.run_number_minor.desc()
)
.first()
)
if last_workflow and self.restart:
# FIXME: remove the limit of nine restarts when we fix the way in which
# we save `run_number` in the DB
num_restarts = round(last_workflow.run_number * 10) % 10
if num_restarts == LIMIT_RESTARTS:
mdonadoni marked this conversation as resolved.
Show resolved Hide resolved
return last_workflow

def get_new_run_number(self, run_number) -> Tuple[int, int]:
"""Return the major and minor run numbers for a new workflow.

Return a tuple where the first element is the major run number and the
second element is the minor run number.
"""
last_workflow = self._get_last_workflow(run_number)

if not last_workflow:
if self.restart:
raise REANAValidationError(
f"Cannot restart a workflow more than {LIMIT_RESTARTS} times"
"Cannot restart a workflow that has not been run before."
)
return round(last_workflow.run_number + 0.1, 1)
return 1, 0 # First workflow run

else:
if not last_workflow:
return 1
if not self.restart:
run_number_major = last_workflow.run_number_major + 1
run_number_minor = 0
else:
return last_workflow.run_number + 1
run_number_major = last_workflow.run_number_major
run_number_minor = last_workflow.run_number_minor + 1

return run_number_major, run_number_minor

def get_input_parameters(self):
"""Return workflow parameters."""
Expand All @@ -604,7 +623,7 @@

def get_full_workflow_name(self):
"""Return full workflow name including run number."""
return "{}.{}".format(self.name, str(self.run_number))
return "{}.{}".format(self.name, self.run_number)

Check warning on line 626 in reana_db/models.py

View check run for this annotation

Codecov / codecov/patch

reana_db/models.py#L626

Added line #L626 was not covered by tests

def get_workspace_disk_usage(self, summarize=False, search=None):
"""Retrieve disk usage information of a workspace."""
Expand Down Expand Up @@ -643,15 +662,13 @@
"""Get all the restarts of this workflow, including the original workflow.

Returns all the restarts of this workflow, that is all the workflows that have
the same name and the same run number (up to the dot). This includes the
the same name and the same major run number. This includes the
original workflow, as well as all the following restarts.
"""
run_number = int(self.run_number)
restarts = Workflow.query.filter(
Workflow.name == self.name,
Workflow.owner_id == self.owner_id,
Workflow.run_number >= run_number,
Workflow.run_number < run_number + 1,
Workflow.run_number_major == self.run_number_major,
)
return restarts

Expand Down
23 changes: 15 additions & 8 deletions reana_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@
return workspace_path


def split_run_number(run_number):

Check warning on line 55 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L55

Added line #L55 was not covered by tests
"""Split run number into major and minor run numbers."""
run_number = str(run_number)
if "." in run_number:
return tuple(map(int, run_number.split(".", maxsplit=1)))
return int(run_number), 0


def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
"""Get Workflow from database with uuid or name.

Expand Down Expand Up @@ -128,21 +136,20 @@
return _get_workflow_by_name(workflow_name, user_uuid)

# `run_number` was specified.
# Check `run_number` is valid.
try:
run_number = float(run_number)
run_number_major, run_number_minor = split_run_number(run_number)
except ValueError:
# `uuid_or_name` was split, so it is a dot-separated string
# but it didn't contain a valid `run_number`.
# Assume that this dot-separated string is the name of
# The specified `run_number` is not valid.
# Assume that this string is the name of
# the workflow and search with it.
return _get_workflow_by_name(uuid_or_name, user_uuid)

# `run_number` is valid.
# Search by `run_number` since it is a primary key.
# Search by `run_number_major` and `run_number_minor`, since it is a primary key.
workflow = Workflow.query.filter(
Workflow.name == workflow_name,
Workflow.run_number == run_number,
Workflow.run_number_major == run_number_major,
Workflow.run_number_minor == run_number_minor,
Workflow.owner_id == user_uuid,
).one_or_none()
if not workflow:
Expand All @@ -169,7 +176,7 @@
Workflow.query.filter(
Workflow.name == workflow_name, Workflow.owner_id == user_uuid
)
.order_by(Workflow.run_number.desc())
.order_by(Workflow.run_number_major.desc(), Workflow.run_number_minor.desc())
.first()
)
if not workflow:
Expand Down
Loading