Skip to content

Commit

Permalink
feat(rest): add services field to workflow endpoints (#612)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Jan 16, 2025
1 parent fbab460 commit d48e9e5
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
11 changes: 10 additions & 1 deletion reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
build_unique_component_name,
)
from reana_db.database import Session
from reana_db.models import Job, JobCache, Workflow, RunStatus
from reana_db.models import Job, JobCache, Workflow, RunStatus, Service
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.attributes import flag_modified

Expand Down Expand Up @@ -338,3 +338,12 @@ def _delete_dask_cluster(workflow: Workflow) -> None:
delete_dask_dashboard_ingress(
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)

dask_service = (
Session.query(Service)
.filter_by(name=f"dask-dashboard-{workflow.id_}")
.one_or_none()
)
workflow.services.remove(dask_service)
Session.delete(dask_service)
Session.object_session(workflow).commit()
47 changes: 46 additions & 1 deletion reana_workflow_controller/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,23 @@
from reana_commons.config import WORKFLOW_TIME_FORMAT
from reana_commons.utils import build_unique_component_name
from reana_db.database import Session
from reana_db.models import RunStatus, User, UserWorkflow, Workflow, WorkflowResource
from reana_db.models import (
RunStatus,
User,
UserWorkflow,
Workflow,
WorkflowResource,
Service,
ServiceType,
)
from reana_db.utils import (
_get_workflow_by_uuid,
_get_workflow_with_uuid_or_name,
build_workspace_path,
get_default_quota_resource,
)
from reana_workflow_controller.config import (
REANA_HOSTNAME,
DEFAULT_NAME_FOR_WORKFLOWS,
MAX_WORKFLOW_SHARING_MESSAGE_LENGTH,
)
Expand All @@ -50,6 +59,7 @@
)

from reana_workflow_controller.k8s import check_pod_status_by_prefix
from reana_workflow_controller.dask import requires_dask

START = "start"
STOP = "stop"
Expand Down Expand Up @@ -396,6 +406,30 @@ def get_workflows(args, paginate=None): # noqa
"owner_email": owner_email,
"shared_with": shared_with,
}

if requires_dask(workflow):

dask_service = workflow.services.first()
if dask_service and dask_service.status == RunStatus.created:
pod_status = check_pod_status_by_prefix(
pod_name_prefix=f"reana-run-dask-{workflow.id_}"
)
if pod_status == "Running":
dask_service.status = RunStatus.running
db_session = Session.object_session(dask_service)
db_session.commit()

services = workflow.services.all()
services_serialized = [
{
"name": service.name,
"type": service.type_.name,
"status": service.status.name,
}
for service in services
]
workflow_response["services"] = services_serialized

if type_ == "interactive" or verbose:
int_session = workflow.sessions.first()
if int_session:
Expand Down Expand Up @@ -589,8 +623,19 @@ def create_workflow(): # noqa
),
launcher_url=request.json.get("launcher_url"),
)
if requires_dask(workflow):
dask_service = Service(
name=f"dask-dashboard-{workflow_uuid}",
uri=f"https://{REANA_HOSTNAME}/{workflow_uuid}/dashboard/status",
type_=ServiceType.dask,
status=RunStatus.created,
owner_id=request.args["user"],
)
workflow.services.append(dask_service)

Session.add(workflow)
Session.object_session(workflow).commit()

retention_rules = request.json.get("retention_rules", [])
if retention_rules:
workflow.set_workspace_retention_rules(retention_rules)
Expand Down

0 comments on commit d48e9e5

Please sign in to comment.