Skip to content

Commit

Permalink
[coll-comm] review update:
Browse files Browse the repository at this point in the history
- fix include guards
- update docs
- implement copy/move constructors/assignment with tests
- add equality test for collective communicators (needed for testing)
- always enable neighborhood comm, just throw if openmpi is too old
- define moved-from state as MPI_COMM_NULL + empty sizes/offsets
- remove unnecessary namespace
- make virtual function protected

Co-authored-by: Pratik Nayak <[email protected]>
Co-authored-by: Tobias Ribizel <[email protected]>
  • Loading branch information
3 people committed Oct 23, 2024
1 parent 5c22649 commit 1ebe59f
Show file tree
Hide file tree
Showing 13 changed files with 483 additions and 154 deletions.
6 changes: 1 addition & 5 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,10 @@ if(GINKGO_BUILD_MPI)
distributed/collective_communicator.cpp
distributed/dense_communicator.cpp
distributed/matrix.cpp
distributed/neighborhood_communicator.cpp
distributed/partition_helpers.cpp
distributed/vector.cpp
distributed/preconditioner/schwarz.cpp)
if(NOT GINKGO_HAVE_OPENMPI_PRE_4_1_X)
target_sources(${ginkgo_core}
PRIVATE
distributed/neighborhood_communicator.cpp)
endif()
endif()

# MSVC/shared: make ginkgo be the major library
Expand Down
9 changes: 9 additions & 0 deletions core/distributed/collective_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ const communicator& CollectiveCommunicator::get_base_communicator() const
}


request CollectiveCommunicator::i_all_to_all_v(
std::shared_ptr<const Executor> exec, const void* send_buffer,
MPI_Datatype send_type, void* recv_buffer, MPI_Datatype recv_type) const
{
return this->i_all_to_all_v_impl(std::move(exec), send_buffer, send_type,
recv_buffer, recv_type);
}


} // namespace mpi
} // namespace experimental
} // namespace gko
64 changes: 55 additions & 9 deletions core/distributed/dense_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,22 @@ namespace experimental {
namespace mpi {


size_type get_comm_size_safe(const communicator& comm)
{
if (comm.get() == MPI_COMM_NULL) {
return 0;
}
return comm.size();
}


DenseCommunicator::DenseCommunicator(communicator base)
: CollectiveCommunicator(base),
comm_(base),
recv_sizes_(comm_.size()),
recv_offsets_(comm_.size() + 1),
send_sizes_(comm_.size()),
send_offsets_(comm_.size() + 1)
recv_sizes_(get_comm_size_safe(comm_)),
recv_offsets_(get_comm_size_safe(comm_) + 1),
send_sizes_(get_comm_size_safe(comm_)),
send_offsets_(get_comm_size_safe(comm_) + 1)
{}


Expand Down Expand Up @@ -60,6 +69,30 @@ GKO_INSTANTIATE_FOR_EACH_LOCAL_GLOBAL_INDEX_TYPE(GKO_DECLARE_DENSE_CONSTRUCTOR);
#undef GKO_DECLARE_DENSE_CONSTRUCTOR


DenseCommunicator::DenseCommunicator(DenseCommunicator&& other)
: DenseCommunicator(other.get_base_communicator())
{
*this = std::move(other);
}


DenseCommunicator& DenseCommunicator::operator=(DenseCommunicator&& other)
{
if (this != &other) {
comm_ = std::exchange(other.comm_, MPI_COMM_NULL);
send_sizes_ =
std::exchange(other.send_sizes_, std::vector<comm_index_type>{});
send_offsets_ =
std::exchange(other.send_offsets_, std::vector<comm_index_type>{0});
recv_sizes_ =
std::exchange(other.recv_sizes_, std::vector<comm_index_type>{});
recv_offsets_ =
std::exchange(other.recv_offsets_, std::vector<comm_index_type>{0});
}
return *this;
}


DenseCommunicator::DenseCommunicator(
communicator base, const std::vector<comm_index_type>& recv_sizes,
const std::vector<comm_index_type>& recv_offsets,
Expand All @@ -74,11 +107,9 @@ DenseCommunicator::DenseCommunicator(
{}


request DenseCommunicator::i_all_to_all_v(std::shared_ptr<const Executor> exec,
const void* send_buffer,
MPI_Datatype send_type,
void* recv_buffer,
MPI_Datatype recv_type) const
request DenseCommunicator::i_all_to_all_v_impl(
std::shared_ptr<const Executor> exec, const void* send_buffer,
MPI_Datatype send_type, void* recv_buffer, MPI_Datatype recv_type) const
{
#ifdef GINKGO_HAVE_OPENMPI_PRE_4_1_X
comm_.all_to_all_v(exec, send_buffer, send_sizes_.data(),
Expand Down Expand Up @@ -125,6 +156,21 @@ comm_index_type DenseCommunicator::get_send_size() const
}


bool operator==(const DenseCommunicator& a, const DenseCommunicator& b)
{
return (a.comm_.is_identical(b.comm_) || a.comm_.is_congruent(b.comm_)) &&
a.send_sizes_ == b.send_sizes_ && a.recv_sizes_ == b.recv_sizes_ &&
a.send_offsets_ == b.send_offsets_ &&
a.recv_offsets_ == b.recv_offsets_;
}


bool operator!=(const DenseCommunicator& a, const DenseCommunicator& b)
{
return !(a == b);
}


} // namespace mpi
} // namespace experimental
} // namespace gko
6 changes: 3 additions & 3 deletions core/distributed/device_partition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//
// SPDX-License-Identifier: BSD-3-Clause

#ifndef GINKGO_PARTITION_HPP
#define GINKGO_PARTITION_HPP
#ifndef GKO_CORE_DISTRIBUTED_PARTITION_HPP
#define GKO_CORE_DISTRIBUTED_PARTITION_HPP

#include <ginkgo/core/distributed/partition.hpp>

Expand Down Expand Up @@ -89,4 +89,4 @@ to_device_const(
} // namespace gko


#endif // GINKGO_PARTITION_HPP
#endif // GKO_CORE_DISTRIBUTED_PARTITION_HPP
81 changes: 64 additions & 17 deletions core/distributed/neighborhood_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ std::unique_ptr<CollectiveCommunicator>
NeighborhoodCommunicator::create_inverse() const
{
auto base_comm = this->get_base_communicator();
distributed::comm_index_type num_sources;
distributed::comm_index_type num_destinations;
distributed::comm_index_type weighted;
comm_index_type num_sources;
comm_index_type num_destinations;
comm_index_type weighted;
GKO_ASSERT_NO_MPI_ERRORS(MPI_Dist_graph_neighbors_count(
comm_.get(), &num_sources, &num_destinations, &weighted));

std::vector<distributed::comm_index_type> sources(num_sources);
std::vector<distributed::comm_index_type> destinations(num_destinations);
std::vector<comm_index_type> sources(num_sources);
std::vector<comm_index_type> destinations(num_destinations);
GKO_ASSERT_NO_MPI_ERRORS(MPI_Dist_graph_neighbors(
comm_.get(), num_sources, sources.data(), MPI_UNWEIGHTED,
num_destinations, destinations.data(), MPI_UNWEIGHTED));
Expand All @@ -120,10 +120,10 @@ comm_index_type NeighborhoodCommunicator::get_send_size() const


NeighborhoodCommunicator::NeighborhoodCommunicator(
communicator base, const std::vector<distributed::comm_index_type>& sources,
communicator base, const std::vector<comm_index_type>& sources,
const std::vector<comm_index_type>& recv_sizes,
const std::vector<comm_index_type>& recv_offsets,
const std::vector<distributed::comm_index_type>& destinations,
const std::vector<comm_index_type>& destinations,
const std::vector<comm_index_type>& send_sizes,
const std::vector<comm_index_type>& send_offsets)
: CollectiveCommunicator(base), comm_(MPI_COMM_NULL)
Expand All @@ -138,31 +138,35 @@ NeighborhoodCommunicator::NeighborhoodCommunicator(

NeighborhoodCommunicator::NeighborhoodCommunicator(communicator base)
: CollectiveCommunicator(std::move(base)),
comm_(MPI_COMM_SELF),
send_sizes_(),
comm_(MPI_COMM_NULL),
send_offsets_(1),
recv_sizes_(),
recv_offsets_(1)
{
// ensure that comm_ always has the correct topology
std::vector<comm_index_type> non_nullptr(1);
non_nullptr.resize(0);
comm_ = create_neighborhood_comm(this->get_base_communicator(), non_nullptr,
non_nullptr);
if (this->get_base_communicator().get() != MPI_COMM_NULL) {
// ensure that comm_ always has the correct topology
std::vector<comm_index_type> non_nullptr(1);
non_nullptr.resize(0);
comm_ = create_neighborhood_comm(this->get_base_communicator(),
non_nullptr, non_nullptr);
}
}


request NeighborhoodCommunicator::i_all_to_all_v(
request NeighborhoodCommunicator::i_all_to_all_v_impl(
std::shared_ptr<const Executor> exec, const void* send_buffer,
MPI_Datatype send_type, void* recv_buffer, MPI_Datatype recv_type) const
{
#if GINKGO_HAVE_OPENMPI_PRE_4_1_X
GKO_NOT_IMPLEMENTED;
#else
auto guard = exec->get_scoped_device_id_guard();
request req;
GKO_ASSERT_NO_MPI_ERRORS(MPI_Ineighbor_alltoallv(
send_buffer, send_sizes_.data(), send_offsets_.data(), send_type,
recv_buffer, recv_sizes_.data(), recv_offsets_.data(), recv_type,
comm_.get(), req.get()));
return req;
#endif
}


Expand All @@ -179,12 +183,55 @@ NeighborhoodCommunicator::create_with_same_type(
}


NeighborhoodCommunicator::NeighborhoodCommunicator(
NeighborhoodCommunicator&& other)
: NeighborhoodCommunicator(other.get_base_communicator())
{
*this = std::move(other);
}


NeighborhoodCommunicator& NeighborhoodCommunicator::operator=(
NeighborhoodCommunicator&& other)
{
if (this != &other) {
comm_ = std::exchange(other.comm_, MPI_COMM_NULL);
send_sizes_ =
std::exchange(other.send_sizes_, std::vector<comm_index_type>{});
send_offsets_ =
std::exchange(other.send_offsets_, std::vector<comm_index_type>{0});
recv_sizes_ =
std::exchange(other.recv_sizes_, std::vector<comm_index_type>{});
recv_offsets_ =
std::exchange(other.recv_offsets_, std::vector<comm_index_type>{0});
}
return *this;
}


bool operator==(const NeighborhoodCommunicator& a,
const NeighborhoodCommunicator& b)
{
return (a.comm_.is_identical(b.comm_) || a.comm_.is_congruent(b.comm_)) &&
a.send_sizes_ == b.send_sizes_ && a.recv_sizes_ == b.recv_sizes_ &&
a.send_offsets_ == b.send_offsets_ &&
a.recv_offsets_ == b.recv_offsets_;
}


bool operator!=(const NeighborhoodCommunicator& a,
const NeighborhoodCommunicator& b)
{
return !(a == b);
}


template <typename LocalIndexType, typename GlobalIndexType>
NeighborhoodCommunicator::NeighborhoodCommunicator(
communicator base,
const distributed::index_map<LocalIndexType, GlobalIndexType>& imap)
: CollectiveCommunicator(base),
comm_(MPI_COMM_SELF),
comm_(MPI_COMM_NULL),
recv_sizes_(imap.get_remote_target_ids().get_size()),
recv_offsets_(recv_sizes_.size() + 1),
send_offsets_(1)
Expand Down
Loading

0 comments on commit 1ebe59f

Please sign in to comment.