Skip to content

Commit

Permalink
broker/boot_pmi: add dlopen/singleton support
Browse files Browse the repository at this point in the history
Problem: support dlopen()ing libpmi.so and for
falling back to singleton operation was removed
from the Flux libpmi.so, but the broker needs
this capability.

Re-add this capability, with significantly reduced
complexity given that it need not be a faithful
PMI-1 API interface.
  • Loading branch information
garlick committed May 29, 2019
1 parent 151a57c commit 065a174
Show file tree
Hide file tree
Showing 6 changed files with 563 additions and 91 deletions.
4 changes: 4 additions & 0 deletions src/broker/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ libbroker_la_SOURCES = \
boot_config.c \
boot_pmi.h \
boot_pmi.c \
pmiutil.h \
pmiutil.c \
liblist.h \
liblist.c \
publisher.h \
publisher.c

Expand Down
163 changes: 72 additions & 91 deletions src/broker/boot_pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#endif
#include <sys/param.h>
#include <unistd.h>
#include <assert.h>

#include "src/common/libutil/log.h"
#include "src/common/libutil/oom.h"
Expand All @@ -26,6 +27,8 @@
#include "attr.h"
#include "overlay.h"
#include "boot_pmi.h"
#include "pmiutil.h"


/* Generally accepted max, although some go higher (IE is 2083) */
#define ENDPOINT_MAX 2048
Expand Down Expand Up @@ -142,148 +145,126 @@ static int update_endpoint_attr (attr_t *attrs, const char *name,

int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k)
{
int spawned;
int size;
int rank;
int appnum;
int parent_rank;
const char *child_uri;
int kvsname_len;
int key_len;
int val_len;
char *kvsname = NULL;
char *key = NULL;
char *val = NULL;
int e;
int rc = -1;
char key[64];
char val[1024];
const char *tbonendpoint = NULL;
struct pmi_handle *pmi;
struct pmi_params pmi_params;
int result;

if ((e = PMI_Init (&spawned)) != PMI_SUCCESS) {
log_msg ("PMI_Init: %s", pmi_strerror (e));
goto done;
}

/* Get rank, size, appnum
*/
if ((e = PMI_Get_size (&size)) != PMI_SUCCESS) {
log_msg ("PMI_Get_size: %s", pmi_strerror (e));
goto done;
memset (&pmi_params, 0, sizeof (pmi_params));
if (!(pmi = broker_pmi_create ())) {
log_err ("broker_pmi_create");
goto error;
}
if ((e = PMI_Get_rank (&rank)) != PMI_SUCCESS) {
log_msg ("PMI_Get_rank: %s", pmi_strerror (e));
goto done;
result = broker_pmi_init (pmi);
if (result != PMI_SUCCESS) {
log_msg ("broker_pmi_init: %s", pmi_strerror (result));
goto error;
}
if ((e = PMI_Get_appnum (&appnum)) != PMI_SUCCESS) {
log_msg ("PMI_Get_appnum: %s", pmi_strerror (e));
goto done;
result = broker_pmi_get_params (pmi, &pmi_params);
if (result != PMI_SUCCESS) {
log_msg ("broker_pmi_get_params: %s", pmi_strerror (result));
goto error;
}

overlay_init (overlay, (uint32_t)size, (uint32_t)rank, tbon_k);
overlay_init (overlay,
(uint32_t)pmi_params.size,
(uint32_t)pmi_params.rank,
tbon_k);

/* Set session-id attribute from PMI appnum if not already set.
*/
if (attr_get (attrs, "session-id", NULL, NULL) < 0) {
if (attr_add_int (attrs, "session-id", appnum,
if (attr_add_int (attrs, "session-id", pmi_params.appnum,
FLUX_ATTRFLAG_IMMUTABLE) < 0)
goto done;
goto error;
}

if (update_endpoint_attr (attrs, "tbon.endpoint", &tbonendpoint,
"tcp://%h:*") < 0)
goto done;
goto error;

overlay_set_child (overlay, tbonendpoint);

/* Prepare for PMI KVS operations by grabbing the kvsname,
* and buffers for keys and values.
*/
if ((e = PMI_KVS_Get_name_length_max (&kvsname_len)) != PMI_SUCCESS) {
log_msg ("PMI_KVS_Get_name_length_max: %s", pmi_strerror (e));
goto done;
}
kvsname = xzmalloc (kvsname_len);
if ((e = PMI_KVS_Get_my_name (kvsname, kvsname_len)) != PMI_SUCCESS) {
log_msg ("PMI_KVS_Get_my_name: %s", pmi_strerror (e));
goto done;
}
if ((e = PMI_KVS_Get_key_length_max (&key_len)) != PMI_SUCCESS) {
log_msg ("PMI_KVS_Get_key_length_max: %s", pmi_strerror (e));
goto done;
}
key = xzmalloc (key_len);
if ((e = PMI_KVS_Get_value_length_max (&val_len)) != PMI_SUCCESS) {
log_msg ("PMI_KVS_Get_value_length_max: %s", pmi_strerror (e));
goto done;
}
val = xzmalloc (val_len);

/* Bind to addresses to expand URI wildcards, so we can exchange
* the real addresses.
*/
if (overlay_bind (overlay) < 0) {
log_err ("overlay_bind failed"); /* function is idempotent */
goto done;
goto error;
}

/* Write the URI of downstream facing socket under the rank (if any).
*/
if ((child_uri = overlay_get_child (overlay))) {
if (snprintf (key, key_len, "cmbd.%d.uri", rank) >= key_len) {

if (snprintf (key, sizeof (key),
"cmbd.%d.uri", pmi_params.rank) >= sizeof (key)) {
log_msg ("pmi key string overflow");
goto done;
goto error;
}
if (snprintf (val, val_len, "%s", child_uri) >= val_len) {
if (snprintf (val, sizeof (val), "%s", child_uri) >= sizeof (val)) {
log_msg ("pmi val string overflow");
goto done;
goto error;
}
if ((e = PMI_KVS_Put (kvsname, key, val)) != PMI_SUCCESS) {
log_msg ("PMI_KVS_Put: %s", pmi_strerror (e));
goto done;
result = broker_pmi_kvs_put (pmi, pmi_params.kvsname, key, val);
if (result != PMI_SUCCESS) {
log_msg ("broker_pmi_kvs_put: %s", pmi_strerror (result));
goto error;
}
}

/* Puts are complete, now we synchronize and begin our gets.
*/
if ((e = PMI_KVS_Commit (kvsname)) != PMI_SUCCESS) {
log_msg ("PMI_KVS_Commit: %s", pmi_strerror (e));
goto done;
result = broker_pmi_kvs_commit (pmi, pmi_params.kvsname);
if (result != PMI_SUCCESS) {
log_msg ("broker_pmi_kvs_commit: %s", pmi_strerror (result));
goto error;
}
if ((e = PMI_Barrier ()) != PMI_SUCCESS) {
log_msg ("PMI_Barrier: %s", pmi_strerror (e));
goto done;
result = broker_pmi_barrier (pmi);
if (result != PMI_SUCCESS) {
log_msg ("broker_pmi_barrier: %s", pmi_strerror (result));
goto error;
}

/* Read the uri of our parent, after computing its rank
*/
if (rank > 0) {
parent_rank = kary_parentof (tbon_k, (uint32_t)rank);
if (snprintf (key, key_len, "cmbd.%d.uri", parent_rank) >= key_len) {
if (pmi_params.rank > 0) {
parent_rank = kary_parentof (tbon_k, (uint32_t)pmi_params.rank);
if (snprintf (key, sizeof (key),
"cmbd.%d.uri", parent_rank) >= sizeof (key)) {
log_msg ("pmi key string overflow");
goto done;
goto error;
}
if ((e = PMI_KVS_Get (kvsname, key, val, val_len)) != PMI_SUCCESS) {
log_msg ("pmi_kvs_get: %s", pmi_strerror (e));
goto done;
result = broker_pmi_kvs_get (pmi, pmi_params.kvsname,
key, val, sizeof (val));
if (result != PMI_SUCCESS) {
log_msg ("broker_pmi_kvs_get: %s", pmi_strerror (result));
goto error;
}
overlay_set_parent (overlay, "%s", val);
}

if ((e = PMI_Barrier ()) != PMI_SUCCESS) {
log_msg ("PMI_Barrier: %s", pmi_strerror (e));
goto done;
result = broker_pmi_barrier (pmi);
if (result != PMI_SUCCESS) {
log_msg ("broker_pmi_barrier: %s", pmi_strerror (result));
goto error;
}
PMI_Finalize ();
rc = 0;
done:
if (kvsname)
free (kvsname);
if (key)
free (key);
if (val)
free (val);
if (rc != 0)
errno = EPROTO;
return rc;

result = broker_pmi_finalize (pmi);
if (result != PMI_SUCCESS) {
log_msg ("broker_pmi_finalize: %s", pmi_strerror (result));
goto error;
}

broker_pmi_destroy (pmi);
return 0;
error:
broker_pmi_destroy (pmi);
return -1;
}

/*
Expand Down
142 changes: 142 additions & 0 deletions src/broker/liblist.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/************************************************************\
* Copyright 2014 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <czmq.h>
#include <argz.h>

#include "liblist.h"

static int liblist_append_from_environment (zlist_t *libs, const char *libname)
{
const char *path;
char *argz = NULL;
size_t argz_len;
int rc = -1;
char *filename, *entry = NULL;

if ((path = getenv ("LD_LIBRARY_PATH"))) {
if (argz_create_sep (path, ':', &argz, &argz_len) != 0)
goto done;
while ((entry = argz_next (argz, argz_len, entry))) {
if (asprintf (&filename, "%s/%s", entry, libname) < 0)
goto done;
if (access (filename, F_OK) < 0) {
free (filename);
continue;
}
if (zlist_append (libs, filename) < 0) {
free (filename);
goto done;
}
}
}
rc = 0;
done:
if (argz)
free (argz);
return rc;
}

static int split2 (char *s, int delim, char **w1, char **w2)
{
char *p = strchr (s, delim);
if (!p)
return -1;
*p++ = '\0';
*w1 = s;
*w2 = p;
return 0;
}

static void trim_end (char *s, int ch)
{
int len = strlen (s);
while (len > 0) {
if (s[len - 1] != ch)
break;
s[--len] = '\0';
}
}

static int liblist_append_from_ldconfig (zlist_t *libs, const char *libname)
{
FILE *f;
const char *cmd = "ldconfig -p | sed -e 's/([^(]*)[\t ]*//'" \
" | awk -F\" => \" '{print $1 \":\" $2};'";
char line[1024];
int rc = -1;

if (!(f = popen (cmd, "r")))
goto done;
while (fgets (line, sizeof (line), f) != NULL) {
char *name, *path, *cpy;
if (split2 (line, ':', &name, &path) < 0)
continue;
while (isspace (*name))
name++;
if (strcmp (name, libname) != 0)
continue;
trim_end (path, '\n');
if (!(cpy = strdup (path)))
goto done;
if (zlist_append (libs, cpy) < 0) {
free (cpy);
goto done;
}
}
rc = 0;
done:
if (f)
fclose (f);
return rc;
}

void liblist_destroy (zlist_t *libs)
{
char *entry;
if (libs) {
while ((entry = zlist_pop (libs)))
free (entry);
zlist_destroy (&libs);
}
}

zlist_t *liblist_create (const char *libname)
{
zlist_t *libs = NULL;

if (!(libs = zlist_new ()))
goto error;
if (strchr (libname, '/')) {
char *cpy = strdup (libname);
if (!cpy)
goto error;
if (zlist_append (libs, cpy) < 0) {
free (cpy);
goto error;
}
} else {
if (liblist_append_from_environment (libs, libname) < 0)
goto error;
if (liblist_append_from_ldconfig (libs, libname) < 0)
goto error;
}
return libs;
error:
liblist_destroy (libs);
return NULL;
}

/*
* vi:tabstop=4 shiftwidth
*/
Loading

0 comments on commit 065a174

Please sign in to comment.