Skip to content

Commit

Permalink
coll/base,tuned: introduce allgather_reduce allreduce algorithm
Browse files Browse the repository at this point in the history
This patch introduces a new allreduce algorithm implemented as an
allgather followed by local reduction.

The change is motivated by the longer latency of tcp/EFA traffic.
Current allreduce algorithms require a round trip to and from a selected
root process. This algorithm avoids the round trip over network and
therefore reduces total latency.

However, this communication pattern is not scalable for large
communicators, and should only be used for inter-node allreduce.

Co-authored-by: Matt Koop <[email protected]>
Co-authored-by: Wenduo Wang <[email protected]>

Signed-off-by: Wenduo Wang <[email protected]>
  • Loading branch information
wenduwan committed Aug 23, 2023
1 parent 8514e71 commit d045c94
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 0 deletions.
117 changes: 117 additions & 0 deletions ompi/mca/coll/base/coll_base_allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
* Copyright (c) 2018 Siberian State University of Telecommunications
* and Information Science. All rights reserved.
* Copyright (c) 2022 Cisco Systems, Inc. All rights reserved.
* Copyright (c) Amazon.com, Inc. or its affiliates.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -1245,4 +1247,119 @@ int ompi_coll_base_allreduce_intra_redscat_allgather(
return err;
}

/**
* A greedy algorithm to exchange data among processes in the communicator via
* an allgather pattern, followed by a local reduction on each process. This
* avoids the round trip in a rooted communication pattern, e.g. reduce on the
* root and then broadcast to peers.
*
* This algorithm supports both commutative and non-commutative MPI operations.
* For non-commutative operations the reduction is applied to the data in the
* same rank order, e.g. rank 0, rank 1, ... rank N, on each process.
*
* This algorithm benefits inter-node allreduce over a high-latency network.
* Caution is needed on larger communicators(n) and data sizes(m), which will
* result in m*(n!) total traffic and potential network congestion.
*/
int ompi_coll_base_allreduce_intra_allgather_reduce(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
char *send_buf = sbuf;
int comm_size = ompi_comm_size(comm);
int err = MPI_SUCCESS;
int rank = ompi_comm_rank(comm);
bool commutative = ompi_op_is_commute(op);
ompi_request_t **reqs;

if (sbuf == MPI_IN_PLACE) {
send_buf = rbuf;
}

/* Allocate a large-enough buffer to receive from everyone else */
char *tmp_buf = NULL, *tmp_buf_raw = NULL, *tmp_recv = NULL;
ptrdiff_t lb, extent, dsize, gap = 0;
ompi_datatype_get_extent(dtype, &lb, &extent);
dsize = opal_datatype_span(&dtype->super, count * comm_size, &gap);
tmp_buf_raw = (char *) malloc(dsize);
if (NULL == tmp_buf_raw) {
return OMPI_ERR_OUT_OF_RESOURCE;
}

if (commutative) {
ompi_datatype_copy_content_same_ddt(dtype, count, (char *) rbuf, (char *) send_buf);
}

tmp_buf = tmp_buf_raw - gap;

/* Requests for send to AND receive from everyone else */
int reqs_needed = (comm_size - 1) * 2;
reqs = ompi_coll_base_comm_get_reqs(module->base_data, reqs_needed);

ptrdiff_t incr = extent * (ptrdiff_t) count;
tmp_recv = (char *) tmp_buf;

int req_index = 0;
for (int peer_rank = 0; peer_rank < comm_size; peer_rank++) {
tmp_recv = tmp_buf + (peer_rank * incr);
if (peer_rank == rank) {
/* Skip send/recv from self - later we copy the data to recv buffer */
continue;
}

err = MCA_PML_CALL(irecv(tmp_recv, count, dtype, peer_rank, MCA_COLL_BASE_TAG_ALLREDUCE,
comm, &reqs[req_index++]));
if (MPI_SUCCESS != err) {
goto err_hndl;
}

err = MCA_PML_CALL(isend(send_buf, count, dtype, peer_rank, MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[req_index++]));
if (MPI_SUCCESS != err) {
goto err_hndl;
}
}

/* Copy my data to recv buffer */
memcpy(tmp_buf + (rank * incr), send_buf, incr);

err = ompi_request_wait_all(req_index, reqs, MPI_STATUSES_IGNORE);

char *inbuf;
for (int peer_rank = 0; peer_rank < comm_size; peer_rank++) {
inbuf = tmp_buf + (peer_rank * incr);
if (0 == peer_rank && !commutative) {
/* Sort the data buffer for non-commutative operations */
memcpy(rbuf, inbuf, incr);
continue;
}
ompi_op_reduce(op, (void *) inbuf, rbuf, count, dtype);
}

err_hndl:
if (NULL != tmp_buf_raw)
free(tmp_buf_raw);

if (NULL != reqs) {
if (MPI_ERR_IN_STATUS == err) {
for (int i = 0; i < reqs_needed; i++) {
if (MPI_REQUEST_NULL == reqs[i])
continue;
if (MPI_ERR_PENDING == reqs[i]->req_status.MPI_ERROR)
continue;
if (MPI_SUCCESS != reqs[i]->req_status.MPI_ERROR) {
err = reqs[i]->req_status.MPI_ERROR;
break;
}
}
}
ompi_coll_base_free_reqs(reqs, reqs_needed);
}

/* All done */
return err;
}

/* copied function (with appropriate renaming) ends here */
1 change: 1 addition & 0 deletions ompi/mca/coll/base/coll_base_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ int ompi_coll_base_allreduce_intra_ring(ALLREDUCE_ARGS);
int ompi_coll_base_allreduce_intra_ring_segmented(ALLREDUCE_ARGS, uint32_t segsize);
int ompi_coll_base_allreduce_intra_basic_linear(ALLREDUCE_ARGS);
int ompi_coll_base_allreduce_intra_redscat_allgather(ALLREDUCE_ARGS);
int ompi_coll_base_allreduce_intra_allgather_reduce(ALLREDUCE_ARGS);

/* AlltoAll */
int ompi_coll_base_alltoall_intra_pairwise(ALLTOALL_ARGS);
Expand Down
3 changes: 3 additions & 0 deletions ompi/mca/coll/tuned/coll_tuned_allreduce_decision.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ static const mca_base_var_enum_value_t allreduce_algorithms[] = {
{4, "ring"},
{5, "segmented_ring"},
{6, "rabenseifner"},
{7, "allgather_reduce"},
{0, NULL}
};

Expand Down Expand Up @@ -146,6 +147,8 @@ int ompi_coll_tuned_allreduce_intra_do_this(const void *sbuf, void *rbuf, int co
return ompi_coll_base_allreduce_intra_ring_segmented(sbuf, rbuf, count, dtype, op, comm, module, segsize);
case (6):
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, comm, module);
case (7):
return ompi_coll_base_allreduce_intra_allgather_reduce(sbuf, rbuf, count, dtype, op, comm, module);
} /* switch */
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
algorithm, ompi_coll_tuned_forced_max_algorithms[ALLREDUCE]));
Expand Down

0 comments on commit d045c94

Please sign in to comment.