Skip to content

Commit

Permalink
Add support for dynamic definition of process sets
Browse files Browse the repository at this point in the history
Circulate the request to all daemons. Have each daemon
pass the new pset definition to is PMIx server so that
the server can properly respond to membership requests.

Signed-off-by: Ralph Castain <rhc@pmix.org>
(cherry picked from commit 0347baa)
  • Loading branch information
rhc54 committed Sep 7, 2023
1 parent d3485fe commit 9b1cf9e
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/mca/odls/odls_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* Copyright (c) 2014-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2021 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -60,6 +60,7 @@ typedef uint8_t prte_daemon_cmd_flag_t;
#define PRTE_DAEMON_HALT_VM_CMD (prte_daemon_cmd_flag_t) 19
#define PRTE_DAEMON_HALT_DVM_CMD (prte_daemon_cmd_flag_t) 20
#define PRTE_DAEMON_REPORT_JOB_COMPLETE (prte_daemon_cmd_flag_t) 21
#define PRTE_DAEMON_DEFINE_PSET (prte_daemon_cmd_flag_t) 50

/* request proc resource usage */
#define PRTE_DAEMON_TOP_CMD (prte_daemon_cmd_flag_t) 22
Expand Down
74 changes: 70 additions & 4 deletions src/prted/pmix/pmix_server_gen.c
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ pmix_status_t pmix_server_job_ctrl_fn(const pmix_proc_t *requestor, const pmix_p
requestor->nspace, requestor->rank);

for (m = 0; m < ndirs; m++) {
if (0 == strncmp(directives[m].key, PMIX_JOB_CTRL_KILL, PMIX_MAX_KEYLEN)) {
if (PMIX_CHECK_KEY(&directives[m], PMIX_JOB_CTRL_KILL)) {
/* convert the list of targets to a pointer array */
if (NULL == targets) {
ptrarray = NULL;
Expand Down Expand Up @@ -844,7 +844,13 @@ pmix_status_t pmix_server_job_ctrl_fn(const pmix_proc_t *requestor, const pmix_p
}
PMIX_DESTRUCT(&parray);
}
} else if (0 == strncmp(directives[m].key, PMIX_JOB_CTRL_TERMINATE, PMIX_MAX_KEYLEN)) {
if (PMIX_SUCCESS != rc) {
return rc;
}
return PMIX_OPERATION_SUCCEEDED;
}

if (PMIX_CHECK_KEY(&directives[m], PMIX_JOB_CTRL_TERMINATE)) {
if (NULL == targets) {
/* terminate the daemons and all running jobs */
PMIX_DATA_BUFFER_CREATE(cmd);
Expand All @@ -866,8 +872,14 @@ pmix_status_t pmix_server_job_ctrl_fn(const pmix_proc_t *requestor, const pmix_p
}
PMIX_DATA_BUFFER_RELEASE(cmd);
PMIX_RELEASE(sig);
if (PMIX_SUCCESS != rc) {
return rc;
}
return PMIX_OPERATION_SUCCEEDED;
}
} else if (0 == strncmp(directives[m].key, PMIX_JOB_CTRL_SIGNAL, PMIX_MAX_KEYLEN)) {
}

if (PMIX_CHECK_KEY(&directives[m], PMIX_JOB_CTRL_SIGNAL)) {
PMIX_DATA_BUFFER_CREATE(cmd);
cmmnd = PRTE_DAEMON_SIGNAL_LOCAL_PROCS;
/* pack the command */
Expand Down Expand Up @@ -912,10 +924,64 @@ pmix_status_t pmix_server_job_ctrl_fn(const pmix_proc_t *requestor, const pmix_p
}
PMIX_DATA_BUFFER_RELEASE(cmd);
PMIX_RELEASE(sig);
if (PMIX_SUCCESS != rc) {
return rc;
}
return PMIX_OPERATION_SUCCEEDED;
}

#ifdef PMIX_JOB_CTRL_DEFINE_PSET
if (PMIX_CHECK_KEY(&directives[m], PMIX_JOB_CTRL_DEFINE_PSET)) {
// goes to all daemons
PMIX_DATA_BUFFER_CREATE(cmd);
cmmnd = PRTE_DAEMON_DEFINE_PSET;
/* pack the command */
rc = PMIx_Data_pack(NULL, cmd, &cmmnd, 1, PMIX_UINT8);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_DATA_BUFFER_RELEASE(cmd);
return rc;
}
// pack the pset name
rc = PMIx_Data_pack(NULL, cmd, (void*)&directives[m].value.data.string, 1, PMIX_STRING);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_DATA_BUFFER_RELEASE(cmd);
return rc;
}
// pack the #targets
rc = PMIx_Data_pack(NULL, cmd, &ntargets, 1, PMIX_INT32);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_DATA_BUFFER_RELEASE(cmd);
return rc;
}
// pack the targets
rc = PMIx_Data_pack(NULL, cmd, (void*)targets, ntargets, PMIX_PROC);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_DATA_BUFFER_RELEASE(cmd);
return rc;
}
/* goes to all daemons */
sig = PMIX_NEW(prte_grpcomm_signature_t);
sig->signature = (pmix_proc_t *) malloc(sizeof(pmix_proc_t));
sig->sz = 1;
PMIX_LOAD_PROCID(&sig->signature[0], PRTE_PROC_MY_NAME->nspace, PMIX_RANK_WILDCARD);
if (PRTE_SUCCESS != (rc = prte_grpcomm.xcast(sig, PRTE_RML_TAG_DAEMON, cmd))) {
PRTE_ERROR_LOG(rc);
}
PMIX_DATA_BUFFER_RELEASE(cmd);
PMIX_RELEASE(sig);
if (PMIX_SUCCESS != rc) {
return rc;
}
return PMIX_OPERATION_SUCCEEDED;
}
#endif
}

return PMIX_OPERATION_SUCCEEDED;
return PMIX_ERR_NOT_SUPPORTED;
}

static void relcb(void *cbdata)
Expand Down
43 changes: 41 additions & 2 deletions src/prted/prted_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* Copyright (c) 2016-2019 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2020 IBM Corporation. All rights reserved.
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -105,7 +105,7 @@ void prte_daemon_recv(int status, pmix_proc_t *sender,
pmix_nspace_t job;
pmix_data_buffer_t data, *answer;
prte_job_t *jdata;
pmix_proc_t proc;
pmix_proc_t proc, *pptr;
int32_t i, num_replies;
pmix_pointer_array_t procarray;
prte_proc_t *proct;
Expand Down Expand Up @@ -340,6 +340,45 @@ void prte_daemon_recv(int status, pmix_proc_t *sender,

break;

/**** DEFINE PSET ****/
case PRTE_DAEMON_DEFINE_PSET:
// get pset name
n = 1;
ret = PMIx_Data_unpack(NULL, buffer, &cmd_str, &n, PMIX_STRING);
if (PMIX_SUCCESS != ret) {
PRTE_ERROR_LOG(ret);
goto CLEANUP;
}
// get number of target procs
n = 1;
ret = PMIx_Data_unpack(NULL, buffer, &num_procs, &n, PMIX_INT32);
if (PMIX_SUCCESS != ret) {
PRTE_ERROR_LOG(ret);
goto CLEANUP;
}
// create space for them
pptr = PMIx_Proc_create(num_procs);
if (NULL == pptr) {
PRTE_ERROR_LOG(PRTE_ERR_OUT_OF_RESOURCE);
goto CLEANUP;
}
// unpack the targets
n = num_procs;
ret = PMIx_Data_unpack(NULL, buffer, pptr, &n, PMIX_PROC);
if (PMIX_SUCCESS != ret) {
PRTE_ERROR_LOG(ret);
goto CLEANUP;
}
// define the pset
ret = PMIx_server_define_process_set(pptr, num_procs, cmd_str);
free(cmd_str);
cmd_str = NULL;
PMIx_Proc_free(pptr, num_procs);
if (PMIX_SUCCESS != ret) {
PMIX_ERROR_LOG(ret);
}
break;

/**** EXIT COMMAND ****/
case PRTE_DAEMON_EXIT_CMD:
if (prte_debug_daemons_flag) {
Expand Down

0 comments on commit 9b1cf9e

Please sign in to comment.