diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index ec2d5296..45e90422 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -33,7 +33,7 @@ build_unique_component_name, ) from reana_db.database import Session -from reana_db.models import Job, JobCache, Workflow, RunStatus +from reana_db.models import Job, JobCache, Workflow, RunStatus, Service from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm.attributes import flag_modified @@ -338,3 +338,12 @@ def _delete_dask_cluster(workflow: Workflow) -> None: delete_dask_dashboard_ingress( f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_ ) + + dask_service = ( + Session.query(Service) + .filter_by(name=f"dask-service-{workflow.id_}") + .one_or_none() + ) + workflow.services.remove(dask_service) + Session.delete(dask_service) + Session.object_session(workflow).commit() diff --git a/reana_workflow_controller/rest/workflows.py b/reana_workflow_controller/rest/workflows.py index 13983f47..892202ee 100644 --- a/reana_workflow_controller/rest/workflows.py +++ b/reana_workflow_controller/rest/workflows.py @@ -24,7 +24,15 @@ from reana_commons.config import WORKFLOW_TIME_FORMAT from reana_commons.utils import build_unique_component_name from reana_db.database import Session -from reana_db.models import RunStatus, User, UserWorkflow, Workflow, WorkflowResource +from reana_db.models import ( + RunStatus, + User, + UserWorkflow, + Workflow, + WorkflowResource, + Service, + ServiceType, +) from reana_db.utils import ( _get_workflow_by_uuid, _get_workflow_with_uuid_or_name, @@ -32,6 +40,7 @@ get_default_quota_resource, ) from reana_workflow_controller.config import ( + REANA_HOSTNAME, DEFAULT_NAME_FOR_WORKFLOWS, MAX_WORKFLOW_SHARING_MESSAGE_LENGTH, ) @@ -50,6 +59,7 @@ ) from reana_workflow_controller.k8s import check_pod_status_by_prefix +from reana_workflow_controller.dask import requires_dask START = "start" STOP = "stop" @@ -396,6 +406,30 @@ def get_workflows(args, paginate=None): # noqa "owner_email": owner_email, "shared_with": shared_with, } + + if requires_dask(workflow): + + dask_service = workflow.services.first() + if dask_service and dask_service.status == RunStatus.created: + pod_status = check_pod_status_by_prefix( + pod_name_prefix=f"reana-run-dask-{workflow.id_}" + ) + if pod_status == "Running": + dask_service.status = RunStatus.running + db_session = Session.object_session(dask_service) + db_session.commit() + + services = workflow.services.all() + services_serialized = [ + { + "name": service.name, + "type": service.type_.name, + "status": service.status.name, + } + for service in services + ] + workflow_response["services"] = services_serialized + if type_ == "interactive" or verbose: int_session = workflow.sessions.first() if int_session: @@ -589,8 +623,19 @@ def create_workflow(): # noqa ), launcher_url=request.json.get("launcher_url"), ) + if requires_dask(workflow): + dask_service = Service( + name=f"dask-service-{workflow_uuid}", + uri=f"{REANA_HOSTNAME}{workflow_uuid}/dashboard/status", + type_=ServiceType.dask, + status=RunStatus.created, + owner_id=request.args["user"], + ) + workflow.services.append(dask_service) + Session.add(workflow) Session.object_session(workflow).commit() + retention_rules = request.json.get("retention_rules", []) if retention_rules: workflow.set_workspace_retention_rules(retention_rules)