Skip to content

Commit

Permalink
libpmi: add spinlock, use count on global context
Browse files Browse the repository at this point in the history
Problem: multiple threads cannot make PMI calls using
the global context without potentially breaking the
wire protocol over the shared file descriptor.

Protect the global context with a spinlock, implemented
using gcc builtin atomics (which also work on Clang).
If two threads happen to make PMI API calls concurrently,
one will spin while the other completes its wire protocol
operation.

Add a usecount on the global PMI context so that each thread
may independently call PMI_Init() and PMI_Finalize().

Update DPRINTF(), DRETURN() macros so they need not access
the global PMI context under lock.  Instead stash the debug
level and rank in globals pmi_debug_level and pmi_debug_rank.
  • Loading branch information
garlick committed May 29, 2019
1 parent c07a6c4 commit 151a57c
Showing 1 changed file with 110 additions and 24 deletions.
134 changes: 110 additions & 24 deletions src/common/libpmi/pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@
* PMI_Init() will fail if PMI_FD, PMI_RANK, or PMI_SIZE is unset.
* It is up to the caller to fall back to singleton operation, if desired.
*
* Thread safety:
* This interface uses a global context protected by a spinlock against
* concurrent access from multiple threads. The global context is usecounted,
* so each thread may independently call PMI_Init() and PMI_Finalize().
* While it is not particularly efficient to share the global context in
* this way, the API should at least behave correctly.
*
* See Flux RFC 13 for more detail.
*/

Expand All @@ -40,73 +47,109 @@
#include "simple_client.h"
#include "clique.h"

/* Global context and its usecount are protected by pmi_lock. Enclose
* access with PMI_SPIN_LOCK (&pmi_lock) ... PMI_SPIN_UNLOCK (&pmi_lock).
*/
static struct pmi_simple_client *pmi_global_ctx;
static int pmi_usecount;
static volatile int pmi_lock;

/* These globals are set once in PMI_Init() and not reset in PMI_Finalize().
* Only used by debugging macros so we don't take pmi_lock.
*/
static int pmi_debug_rank = -1;
static int pmi_debug_level;

/* N.B. spinlocks use gcc built-in functions for atomic memory access,
* which are somewhat more portable than C11 atomics.
*/
#define PMI_SPIN_LOCK(lock) do { \
} while (__sync_lock_test_and_set(lock, 1))

#define PMI_SPIN_UNLOCK(lock) \
__sync_lock_release (lock)

#define DPRINTF(fmt,...) do { \
if (pmi_global_ctx && pmi_global_ctx->debug) \
if (pmi_debug_level) \
fprintf (stderr, fmt, ##__VA_ARGS__); \
} while (0)

#define DRETURN(rc) do { \
DPRINTF ("%d: %s rc=%d %s\n", \
pmi_global_ctx ? pmi_global_ctx->rank : -1, \
__func__, (rc), \
rc == PMI_SUCCESS ? "" : pmi_strerror (rc)); \
if (pmi_debug_level) \
fprintf (stderr, "%d: %s rc=%d %s\n", pmi_debug_rank, __func__, (rc), \
rc == PMI_SUCCESS ? "" : pmi_strerror (rc)); \
return (rc); \
} while (0);


int PMI_Init (int *spawned)
{
int result = PMI_FAIL;
int result;
const char *pmi_debug;
struct pmi_simple_client *ctx;

PMI_SPIN_LOCK (&pmi_lock);

if (pmi_global_ctx) {
pmi_usecount++;
result = PMI_SUCCESS;
goto out_unlock;
}

pmi_debug = getenv ("FLUX_PMI_DEBUG");
if (!pmi_debug)
pmi_debug = getenv ("PMI_DEBUG");

if (pmi_global_ctx)
return PMI_ERR_INIT;

ctx = pmi_simple_client_create_fd (getenv ("PMI_FD"),
getenv ("PMI_RANK"),
getenv ("PMI_SIZE"),
pmi_debug,
getenv ("PMI_SPAWNED"));
if (!ctx) {
if (errno == ENOMEM)
return PMI_ERR_NOMEM;
return PMI_FAIL;
result = errno == ENOMEM ? PMI_ERR_NOMEM : PMI_FAIL;
goto out_unlock;
}

result = pmi_simple_client_init (ctx);
if (result != PMI_SUCCESS) {
pmi_simple_client_destroy (ctx);
return result;
goto out_unlock;
}
pmi_global_ctx = ctx;
pmi_usecount = 1;
pmi_debug_level = ctx->debug;
pmi_debug_rank = ctx->rank;
if (spawned)
*spawned = ctx->spawned;

out_unlock:
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

int PMI_Initialized (int *initialized)
{
PMI_SPIN_LOCK (&pmi_lock);
if (initialized)
*initialized = pmi_global_ctx ? pmi_global_ctx->initialized : 0;
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (PMI_SUCCESS);
}

int PMI_Finalize (void)
{
int result = PMI_ERR_INIT;
int result = PMI_SUCCESS;

if (pmi_global_ctx) {
PMI_SPIN_LOCK (&pmi_lock);
if (pmi_global_ctx && --pmi_usecount == 0) {
result = pmi_simple_client_finalize (pmi_global_ctx);
pmi_simple_client_destroy (pmi_global_ctx);
pmi_global_ctx = NULL;
}
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

Expand All @@ -124,53 +167,68 @@ int PMI_Get_size (int *size)
{
int result = PMI_ERR_INIT;

PMI_SPIN_LOCK (&pmi_lock);
if (pmi_global_ctx) {
*size = pmi_global_ctx->size;
result = PMI_SUCCESS;
}
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

int PMI_Get_rank (int *rank)
{
int result = PMI_ERR_INIT;

PMI_SPIN_LOCK (&pmi_lock);
if (pmi_global_ctx) {
*rank = pmi_global_ctx->rank;
result = PMI_SUCCESS;
}
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

int PMI_Get_universe_size (int *size)
{
int result;

PMI_SPIN_LOCK (&pmi_lock);
result = pmi_simple_client_get_universe_size (pmi_global_ctx, size);
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

int PMI_Get_appnum (int *appnum)
{
int result;

PMI_SPIN_LOCK (&pmi_lock);
result = pmi_simple_client_get_appnum (pmi_global_ctx, appnum);
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

int PMI_KVS_Get_my_name (char kvsname[], int length)
{
int result;
int rank = pmi_global_ctx ? pmi_global_ctx->rank : -1;

PMI_SPIN_LOCK (&pmi_lock);
result = pmi_simple_client_kvs_get_my_name (pmi_global_ctx,
kvsname,
length);
PMI_SPIN_UNLOCK (&pmi_lock);

if (result == PMI_SUCCESS) {
DPRINTF ("%d: %s (\"%s\") rc=%d\n", rank, __func__, kvsname, result);
DPRINTF ("%d: %s (\"%s\") rc=%d\n",
pmi_debug_rank, __func__, kvsname, result);
} else {
DPRINTF ("%d: %s rc=%d %s\n", rank, __func__, result,
pmi_strerror (result));
DPRINTF ("%d: %s rc=%d %s\n",
pmi_debug_rank, __func__, result, pmi_strerror (result));
}
return result;
}
Expand All @@ -179,43 +237,54 @@ int PMI_KVS_Get_name_length_max (int *length)
{
int result = PMI_ERR_INIT;

PMI_SPIN_LOCK (&pmi_lock);
if (pmi_global_ctx && pmi_global_ctx->initialized) {
*length = pmi_global_ctx->kvsname_max;
result = PMI_SUCCESS;
}
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

int PMI_KVS_Get_key_length_max (int *length)
{
int result = PMI_ERR_INIT;

PMI_SPIN_LOCK (&pmi_lock);
if (pmi_global_ctx && pmi_global_ctx->initialized) {
*length = pmi_global_ctx->keylen_max;
result = PMI_SUCCESS;
}
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

int PMI_KVS_Get_value_length_max (int *length)
{
int result = PMI_ERR_INIT;

PMI_SPIN_LOCK (&pmi_lock);
if (pmi_global_ctx && pmi_global_ctx->initialized) {
*length = pmi_global_ctx->vallen_max;
result = PMI_SUCCESS;
}
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

int PMI_KVS_Put (const char kvsname[], const char key[], const char value[])
{
int result;
int rank = pmi_global_ctx ? pmi_global_ctx->rank : -1;

PMI_SPIN_LOCK (&pmi_lock);
result = pmi_simple_client_kvs_put (pmi_global_ctx, kvsname, key, value);
PMI_SPIN_UNLOCK (&pmi_lock);

DPRINTF ("%d: PMI_KVS_Put (\"%s\", \"%s\", \"%s\") rc=%d %s\n",
rank, kvsname, key, value, result,
pmi_debug_rank, kvsname, key, value, result,
result == PMI_SUCCESS ? "" : pmi_strerror (result));
return result;
}
Expand All @@ -224,16 +293,18 @@ int PMI_KVS_Get (const char kvsname[], const char key[],
char value[], int length)
{
int result;
int rank = pmi_global_ctx ? pmi_global_ctx->rank : -1;

PMI_SPIN_LOCK (&pmi_lock);
result = pmi_simple_client_kvs_get (pmi_global_ctx, kvsname, key,
value, length);
PMI_SPIN_UNLOCK (&pmi_lock);

if (result == PMI_SUCCESS) {
DPRINTF ("%d: PMI_KVS_Get (\"%s\", \"%s\", \"%s\") rc=%d\n",
rank, kvsname, key, value, result);
pmi_debug_rank, kvsname, key, value, result);
} else {
DPRINTF ("%d: PMI_KVS_Get (\"%s\", \"%s\") rc=%d %s\n",
rank, kvsname, key, result, pmi_strerror (result));
pmi_debug_rank, kvsname, key, result, pmi_strerror (result));
}
return result;
}
Expand All @@ -242,16 +313,22 @@ int PMI_KVS_Commit (const char kvsname[])
{
int result = PMI_ERR_INIT;

PMI_SPIN_LOCK (&pmi_lock);
if (pmi_global_ctx && pmi_global_ctx->initialized)
result = PMI_SUCCESS; // no-op
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

int PMI_Barrier (void)
{
int result = PMI_ERR_INIT;

PMI_SPIN_LOCK (&pmi_lock);
result = pmi_simple_client_barrier (pmi_global_ctx);
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

Expand Down Expand Up @@ -290,15 +367,21 @@ int PMI_Get_clique_ranks (int ranks[], int length)
{
int result;

PMI_SPIN_LOCK (&pmi_lock);
result = pmi_simple_client_get_clique_ranks (pmi_global_ctx, ranks, length);
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

int PMI_Get_clique_size (int *size)
{
int result;

PMI_SPIN_LOCK (&pmi_lock);
result = pmi_simple_client_get_clique_size (pmi_global_ctx, size);
PMI_SPIN_UNLOCK (&pmi_lock);

DRETURN (result);
}

Expand All @@ -307,6 +390,7 @@ int PMI_Get_id_length_max (int *length)
int result;

result = PMI_KVS_Get_name_length_max (length);

DRETURN (result);
}

Expand All @@ -315,6 +399,7 @@ int PMI_Get_id (char kvsname[], int length)
int result;

result = PMI_KVS_Get_my_name (kvsname, length);

DRETURN (result);
}

Expand All @@ -323,6 +408,7 @@ int PMI_Get_kvs_domain_id (char kvsname[], int length)
int result;

result = PMI_KVS_Get_my_name (kvsname, length);

DRETURN (result);
}

Expand Down

0 comments on commit 151a57c

Please sign in to comment.