Skip to content

Commit

Permalink
Merge pull request #6279 from garlick/issue#6278
Browse files Browse the repository at this point in the history
broker: call PMI_Abort() if something goes wrong during PMI bootstrap
  • Loading branch information
mergify[bot] authored Sep 12, 2024
2 parents 95fc972 + fba4dee commit 591b9f8
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 19 deletions.
5 changes: 5 additions & 0 deletions doc/man1/flux-pmi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ barrier

Execute N barrier (step 2) operations (default 1).

.. option:: --abort=RANK

Instead of entering the barrier, arrange for RANK to call the PMI
abort function.

exchange
--------

Expand Down
10 changes: 9 additions & 1 deletion src/broker/boot_pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
}
if (upmi_initialize (upmi, &info, &error) < 0) {
log_msg ("%s: initialize: %s", upmi_describe (upmi), error.text);
goto error;
upmi_destroy (upmi);
return -1;
}
if (set_instance_level_attr (upmi, info.name, attrs) < 0) {
log_err ("set_instance_level_attr");
Expand Down Expand Up @@ -506,6 +507,13 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
topology_decref (topo);
return 0;
error:
/* We've logged error to stderr before getting here so the fatal
* error message passed to the PMI server does not necessarily need
* to be highly detailed. Some implementations of abort may not
* return.
*/
if (upmi_abort (upmi, "fatal bootstrap error", &error) < 0)
log_msg ("upmi_abort: %s", error.text);
free (bizcard);
upmi_destroy (upmi);
hostlist_destroy (hl);
Expand Down
15 changes: 15 additions & 0 deletions src/cmd/builtin/pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "src/common/libpmi/upmi.h"
#include "src/common/libutil/monotime.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/errprintf.h"
#include "ccan/str/str.h"

static struct upmi *upmi;
Expand Down Expand Up @@ -59,6 +60,7 @@ static int internal_cmd_barrier (optparse_t *p, int argc, char *argv[])
{
int n = optparse_option_index (p);
int count = optparse_get_int (p, "count", 1);
int abort = optparse_get_int (p, "abort", -1);
struct timespec t;
const char *label;
flux_error_t error;
Expand All @@ -78,6 +80,17 @@ static int internal_cmd_barrier (optparse_t *p, int argc, char *argv[])
if (upmi_barrier (upmi, &error) < 0)
log_msg_exit ("barrier: %s", error.text);

// abort one rank if --abort was specified
if (abort != -1) {
if (info.rank == abort) {
flux_error_t e;
errprintf (&e, "flux-pmi: rank %d is aborting", info.rank);
if (upmi_abort (upmi, e.text, &error) < 0) {
log_msg_exit ("abort: %s", error.text);
}
}
}

while (count-- > 0) {
monotime (&t);
if (upmi_barrier (upmi, &error) < 0)
Expand Down Expand Up @@ -216,6 +229,8 @@ static int cmd_pmi (optparse_t *p, int argc, char *argv[])
static struct optparse_option barrier_opts[] = {
{ .name = "count", .has_arg = 1, .arginfo = "N",
.usage = "Execute N barrier operations (default 1)", },
{ .name = "abort", .has_arg = 1, .arginfo = "RANK",
.usage = "RANK calls abort instead of barrier", },
OPTPARSE_TABLE_END,
};
static struct optparse_option get_opts[] = {
Expand Down
33 changes: 30 additions & 3 deletions src/cmd/flux-start.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,15 +493,19 @@ static void pmi_debug_trace (void *client, const char *buf)
fprintf (stderr, "%d: %s", cli->rank, buf);
}

int pmi_kvs_put (void *arg, const char *kvsname,
const char *key, const char *val)
int pmi_kvs_put (void *arg,
const char *kvsname,
const char *key,
const char *val)
{
zhash_update (ctx.pmi.kvs, key, xstrdup (val));
zhash_freefn (ctx.pmi.kvs, key, (zhash_free_fn *)free);
return 0;
}

int pmi_kvs_get (void *arg, void *client, const char *kvsname,
int pmi_kvs_get (void *arg,
void *client,
const char *kvsname,
const char *key)
{
char *v = zhash_lookup (ctx.pmi.kvs, key);
Expand All @@ -510,6 +514,28 @@ int pmi_kvs_get (void *arg, void *client, const char *kvsname,
return 0;
}

void pmi_abort (void *arg,
void *client,
int exit_code,
const char *error_message)
{
struct client *cli = client;

log_msg ("%d: PMI_Abort()%s%s",
cli->rank,
error_message ? ": " : "",
error_message ? error_message : "");

cli = zlist_first (ctx.clients);
while (cli) {
if (cli->p) {
flux_future_t *f = flux_subprocess_kill (cli->p, SIGKILL);
flux_future_destroy (f);
}
cli = zlist_next (ctx.clients);
}
}

int execvp_argz (char *argz, size_t argz_len)
{
char **av = malloc (sizeof (char *) * (argz_count (argz, argz_len) + 1));
Expand Down Expand Up @@ -715,6 +741,7 @@ void pmi_server_initialize (int flags)
struct taskmap *map;
const char *mode = optparse_get_str (ctx.opts, "test-pmi-clique", "single");
struct pmi_simple_ops ops = {
.abort = pmi_abort,
.kvs_put = pmi_kvs_put,
.kvs_get = pmi_kvs_get,
.barrier_enter = NULL,
Expand Down
22 changes: 22 additions & 0 deletions src/common/libpmi/upmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,28 @@ int upmi_finalize (struct upmi *upmi, flux_error_t *errp)
return 0;
}

int upmi_abort (struct upmi *upmi, const char *msg, flux_error_t *errp)
{
flux_error_t error;

if (!upmi || !msg) {
errprintf (errp, "invalid argument\n");
return -1;
}
if (upmi_call (upmi,
"upmi.abort",
&error,
"{s:s}",
"msg", msg) < 0) {
errprintf (errp, "%s", error.text);
upmi_trace (upmi, "abort: %s", error.text);
return -1;
}
// possibly not reached
upmi_trace (upmi, "abort: success");
return 0;
}

int upmi_put (struct upmi *upmi,
const char *key,
const char *value,
Expand Down
1 change: 1 addition & 0 deletions src/common/libpmi/upmi.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ int upmi_get (struct upmi *upmi,
flux_error_t *error);
int upmi_barrier (struct upmi *upmi,
flux_error_t *error);
int upmi_abort (struct upmi *upmi, const char *msg, flux_error_t *error);

#endif /* !_LIBPMI_UPMI_H */

Expand Down
25 changes: 24 additions & 1 deletion src/common/libpmi/upmi_libpmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct plugin_ctx {
void *dso;
int (*init) (int *spawned);
int (*finalize) (void);
int (*abort) (int exit_code, const char *error_msg);
int (*get_size) (int *size);
int (*get_rank) (int *rank);
int (*barrier) (void);
Expand Down Expand Up @@ -122,7 +123,8 @@ static struct plugin_ctx *plugin_ctx_create (const char *path,
|| !(ctx->kvs_get_my_name = dlsym (ctx->dso, "PMI_KVS_Get_my_name"))
|| !(ctx->kvs_put = dlsym (ctx->dso, "PMI_KVS_Put"))
|| !(ctx->kvs_commit = dlsym (ctx->dso, "PMI_KVS_Commit"))
|| !(ctx->kvs_get = dlsym (ctx->dso, "PMI_KVS_Get"))) {
|| !(ctx->kvs_get = dlsym (ctx->dso, "PMI_KVS_Get"))
|| !(ctx->abort = dlsym (ctx->dso, "PMI_Abort"))) {
errprintf (error, "%s: missing required PMI_* symbols", path);
goto error;
}
Expand Down Expand Up @@ -215,6 +217,26 @@ static int op_barrier (flux_plugin_t *p,
return 0;
}

static int op_abort (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
struct plugin_ctx *ctx = flux_plugin_aux_get (p, plugin_name);
int result;
const char *msg;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:s}",
"msg", &msg) < 0)
return upmi_seterror (p, args, "error unpacking abort arguments");
result = ctx->abort (1, msg);
if (result != PMI_SUCCESS)
return upmi_seterror (p, args, "%s", pmi_strerror (result));
return 0;
}

static int op_initialize (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand Down Expand Up @@ -313,6 +335,7 @@ static const struct flux_plugin_handler optab[] = {
{ "upmi.put", op_put, NULL },
{ "upmi.get", op_get, NULL },
{ "upmi.barrier", op_barrier, NULL },
{ "upmi.abort", op_abort, NULL },
{ "upmi.initialize", op_initialize, NULL },
{ "upmi.finalize", op_finalize, NULL },
{ "upmi.preinit", op_preinit, NULL },
Expand Down
25 changes: 25 additions & 0 deletions src/common/libpmi/upmi_libpmi2.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct plugin_ctx {
void *dso;
int (*init) (int *spawned, int *size, int *rank, int *appnum);
int (*finalize) (void);
int (*abort) (int flag, const char *msg);
int (*job_getid) (char *jobid, int jobid_size);
int (*kvs_put) (const char *key, const char *value);
int (*kvs_fence) (void);
Expand Down Expand Up @@ -129,6 +130,7 @@ static struct plugin_ctx *plugin_ctx_create (const char *path,
goto error;
if (!(ctx->init = dlsym (ctx->dso, "PMI2_Init"))
|| !(ctx->finalize = dlsym (ctx->dso, "PMI2_Finalize"))
|| !(ctx->abort = dlsym (ctx->dso, "PMI2_Abort"))
|| !(ctx->job_getid = dlsym (ctx->dso, "PMI2_Job_GetId"))
|| !(ctx->kvs_put = dlsym (ctx->dso, "PMI2_KVS_Put"))
|| !(ctx->kvs_fence = dlsym (ctx->dso, "PMI2_KVS_Fence"))
Expand Down Expand Up @@ -319,6 +321,28 @@ static int op_barrier (flux_plugin_t *p,
return 0;
}

static int op_abort (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)

{
struct plugin_ctx *ctx = flux_plugin_aux_get (p, plugin_name);
int flag = 1; // abort all processes in the job
const char *msg;
int result;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:s}",
"msg", &msg) < 0)
return upmi_seterror (p, args, "error unpacking abort arguments");
result = ctx->abort (flag, msg);
if (result != PMI2_SUCCESS)
return upmi_seterror (p, args, "%s", pmi_strerror (result));
return 0;
}

static int op_initialize (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand Down Expand Up @@ -422,6 +446,7 @@ static const struct flux_plugin_handler optab[] = {
{ "upmi.put", op_put, NULL },
{ "upmi.get", op_get, NULL },
{ "upmi.barrier", op_barrier, NULL },
{ "upmi.abort", op_abort, NULL },
{ "upmi.initialize", op_initialize, NULL },
{ "upmi.finalize", op_finalize, NULL },
{ "upmi.preinit", op_preinit, NULL },
Expand Down
21 changes: 21 additions & 0 deletions src/common/libpmi/upmi_simple.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@ static int op_barrier (flux_plugin_t *p,
return 0;
}

static int op_abort (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
struct plugin_ctx *ctx = flux_plugin_aux_get (p, plugin_name);
const char *msg;
int result;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:s}",
"msg", &msg) < 0)
return upmi_seterror (p, args, "error unpacking abort arguments");
result = pmi_simple_client_abort (ctx->client, 1, msg);
if (result != PMI_SUCCESS)
return upmi_seterror (p, args, "%s", pmi_strerror (result));
return 0;
}

static int op_initialize (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand Down Expand Up @@ -195,6 +215,7 @@ static const struct flux_plugin_handler optab[] = {
{ "upmi.put", op_put, NULL },
{ "upmi.get", op_get, NULL },
{ "upmi.barrier", op_barrier, NULL },
{ "upmi.abort", op_abort, NULL },
{ "upmi.initialize", op_initialize, NULL },
{ "upmi.finalize", op_finalize, NULL },
{ "upmi.preinit", op_preinit, NULL },
Expand Down
19 changes: 19 additions & 0 deletions src/common/libpmi/upmi_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,24 @@ static int op_barrier (flux_plugin_t *p,
return 0;
}

static int op_abort (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
const char *msg;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:s}",
"msg", &msg) < 0)
return upmi_seterror (p, args, "error unpacking abort arguments");
fprintf (stderr, "%s\n", msg);
exit (1);
//NOTREACHED
return 0;
}

static int op_initialize (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand Down Expand Up @@ -148,6 +166,7 @@ static const struct flux_plugin_handler optab[] = {
{ "upmi.put", op_put, NULL },
{ "upmi.get", op_get, NULL },
{ "upmi.barrier", op_barrier, NULL },
{ "upmi.abort", op_abort, NULL },
{ "upmi.initialize", op_initialize, NULL },
{ "upmi.finalize", op_finalize, NULL },
{ "upmi.preinit", op_preinit, NULL },
Expand Down
Loading

0 comments on commit 591b9f8

Please sign in to comment.