diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 342dd503..7f59b6ba 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -33,7 +33,7 @@ build_unique_component_name, ) from reana_db.database import Session -from reana_db.models import Job, JobCache, Workflow, RunStatus +from reana_db.models import Job, JobCache, Workflow, RunStatus, Service from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm.attributes import flag_modified @@ -338,3 +338,12 @@ def _delete_dask_cluster(workflow: Workflow) -> None: delete_dask_dashboard_ingress( f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_ ) + + dask_service = ( + Session.query(Service) + .filter_by(name=f"dask-dashboard-{workflow.id_}") + .one_or_none() + ) + workflow.services.remove(dask_service) + Session.delete(dask_service) + Session.object_session(workflow).commit() diff --git a/reana_workflow_controller/rest/workflows.py b/reana_workflow_controller/rest/workflows.py index 13983f47..f77966a5 100644 --- a/reana_workflow_controller/rest/workflows.py +++ b/reana_workflow_controller/rest/workflows.py @@ -24,7 +24,16 @@ from reana_commons.config import WORKFLOW_TIME_FORMAT from reana_commons.utils import build_unique_component_name from reana_db.database import Session -from reana_db.models import RunStatus, User, UserWorkflow, Workflow, WorkflowResource +from reana_db.models import ( + RunStatus, + User, + UserWorkflow, + Workflow, + WorkflowResource, + Service, + ServiceType, + ServiceStatus, +) from reana_db.utils import ( _get_workflow_by_uuid, _get_workflow_with_uuid_or_name, @@ -32,6 +41,7 @@ get_default_quota_resource, ) from reana_workflow_controller.config import ( + REANA_HOSTNAME, DEFAULT_NAME_FOR_WORKFLOWS, MAX_WORKFLOW_SHARING_MESSAGE_LENGTH, ) @@ -50,6 +60,7 @@ ) from reana_workflow_controller.k8s import check_pod_status_by_prefix +from reana_workflow_controller.dask import requires_dask START = "start" STOP = "stop" @@ -396,6 +407,31 @@ def get_workflows(args, paginate=None): # noqa "owner_email": owner_email, "shared_with": shared_with, } + + if requires_dask(workflow): + + dask_service = workflow.services.first() + if dask_service and dask_service.status == ServiceStatus.created: + pod_status = check_pod_status_by_prefix( + pod_name_prefix=f"reana-run-dask-{workflow.id_}" + ) + + if pod_status == "Running": + dask_service.status = ServiceStatus.running + db_session = Session.object_session(dask_service) + db_session.commit() + + services = workflow.services.all() + services_serialized = [ + { + "name": service.name, + "type": service.type_.name, + "status": service.status.name, + } + for service in services + ] + workflow_response["services"] = services_serialized + if type_ == "interactive" or verbose: int_session = workflow.sessions.first() if int_session: @@ -589,8 +625,19 @@ def create_workflow(): # noqa ), launcher_url=request.json.get("launcher_url"), ) + if requires_dask(workflow): + dask_service = Service( + name=f"dask-dashboard-{workflow_uuid}", + uri=f"https://{REANA_HOSTNAME}/{workflow_uuid}/dashboard/status", + type_=ServiceType.dask, + status=ServiceStatus.created, + owner_id=request.args["user"], + ) + workflow.services.append(dask_service) + Session.add(workflow) Session.object_session(workflow).commit() + retention_rules = request.json.get("retention_rules", []) if retention_rules: workflow.set_workspace_retention_rules(retention_rules) diff --git a/tests/test_views.py b/tests/test_views.py index b47652e6..52b69db2 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -69,6 +69,7 @@ def test_get_workflows(app, session, user0, cwl_workflow_with_name): "user": str(workflow.owner_id), "created": response_data[0]["created"], "progress": response_data[0]["progress"], + "services": response_data[0]["services"], "size": {"raw": -1, "human_readable": ""}, "launcher_url": None, "owner_email": user0.email,