Skip to content

Commit

Permalink
[DAPHNE-daphne-eu#758] MetaDataObject for CSRMatrix
Browse files Browse the repository at this point in the history
* This commit introduces the meta data object to the CSR data type

* Memory pinning

To prevent excessive allocation ID lookups in the hot path when using --vec, this change "pins" memory by allocation type of previous accesses.
  • Loading branch information
corepointer committed Aug 19, 2024
1 parent dc61221 commit 59944e7
Show file tree
Hide file tree
Showing 41 changed files with 634 additions and 539 deletions.
26 changes: 13 additions & 13 deletions src/runtime/distributed/coordinator/kernels/Broadcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ struct Broadcast<ALLOCATION_TYPE::DIST_MPI, DT>
LoadPartitioningDistributed<DT, AllocationDescriptorMPI> partioner(DistributionSchema::BROADCAST, mat, dctx);
while (partioner.HasNextChunk()){
auto dp = partioner.GetNextChunk();
auto rank = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getRank();
auto rank = dynamic_cast<AllocationDescriptorMPI*>(dp->getAllocation(0))->getRank();

if (dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorMPI*>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

// Minimum chunk size
Expand Down Expand Up @@ -120,13 +120,13 @@ struct Broadcast<ALLOCATION_TYPE::DIST_MPI, DT>
WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank);
std::string address=std::to_string(rank);
DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address);
auto data = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorMPI*>(dp->getAllocation(0))->getDistributedData();
data.identifier = dataAcknowledgement.identifier;
data.numRows = dataAcknowledgement.numRows;
data.numCols = dataAcknowledgement.numCols;
data.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).updateDistributedData(data);

auto alloc = dynamic_cast<AllocationDescriptorMPI*>(dp->getAllocation(0));
alloc->updateDistributedData(data);
}
}
};
Expand Down Expand Up @@ -160,12 +160,12 @@ struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT>

while(partioner.HasNextChunk()){
auto dp = partioner.GetNextChunk();
if (dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

auto address = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation();
auto address = dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->getLocation();

StoredInfo storedInfo({dp->dp_id});
StoredInfo storedInfo({dp->getID()});
caller.asyncStoreCall(address, storedInfo);
// Minimum chunk size
auto min_chunk_size = dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer<DT>::length(mat) ?
Expand Down Expand Up @@ -196,15 +196,15 @@ struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT>
auto dp_id = response.storedInfo.dp_id;
auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id);

auto data = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->getDistributedData();

auto storedData = response.result;
data.identifier = storedData.identifier();
data.numRows = storedData.num_rows();
data.numCols = storedData.num_cols();
data.isPlacedAtWorker = true;

dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).updateDistributedData(data);
dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->updateDistributedData(data);
}
};
};
Expand Down Expand Up @@ -234,10 +234,10 @@ struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT>

while(partioner.HasNextChunk()){
auto dp = partioner.GetNextChunk();
if (dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

auto workerAddr = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation();
auto workerAddr = dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->getLocation();
std::thread t([=, &mat]()
{
// TODO Consider saving channels inside DaphneContext
Expand Down Expand Up @@ -272,7 +272,7 @@ struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT>
newData.numRows = storedData.num_rows();
newData.numCols = storedData.num_cols();
newData.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).updateDistributedData(newData);
dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->updateDistributedData(newData);
});
threads_vector.push_back(move(t));
}
Expand Down
42 changes: 21 additions & 21 deletions src/runtime/distributed/coordinator/kernels/Distribute.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

#pragma once

#include <runtime/local/context/DistributedContext.h>
#include <runtime/local/datastructures/DataObjectFactory.h>
#include <runtime/local/datastructures/DenseMatrix.h>
#include <runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h>

#include <runtime/local/datastructures/AllocationDescriptorGRPC.h>
#include <runtime/distributed/proto/DistributedGRPCCaller.h>
#include <runtime/distributed/worker/WorkerImpl.h>
#include <runtime/local/context/DistributedContext.h>
#include <runtime/local/datastructures/AllocationDescriptorGRPC.h>
#include <runtime/local/datastructures/DataObjectFactory.h>
#include <runtime/local/datastructures/DenseMatrix.h>
#include <runtime/local/io/DaphneSerializer.h>

#ifdef USE_MPI
#include <runtime/distributed/worker/MPIHelper.h>
Expand Down Expand Up @@ -71,12 +71,12 @@ struct Distribute<ALLOCATION_TYPE::DIST_MPI, DT>

while (partioner.HasNextChunk()){
DataPlacement *dp = partioner.GetNextChunk();
auto rank = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getRank();
auto rank = dynamic_cast<AllocationDescriptorMPI*>(dp->getAllocation(0))->getRank();

if (dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorMPI*>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len);

// Minimum chunk size
auto min_chunk_size = dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer<DT>::length(slicedMat) ?
Expand All @@ -99,12 +99,12 @@ struct Distribute<ALLOCATION_TYPE::DIST_MPI, DT>
WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank);
std::string address = std::to_string(rank);
DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address);
auto data = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorMPI*>(dp->getAllocation(0))->getDistributedData();
data.identifier = dataAcknowledgement.identifier ;
data.numRows = dataAcknowledgement.numRows;
data.numCols = dataAcknowledgement.numCols;
data.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).updateDistributedData(data);
dynamic_cast<AllocationDescriptorMPI*>(dp->getAllocation(0))->updateDistributedData(data);
}

}
Expand Down Expand Up @@ -132,17 +132,17 @@ struct Distribute<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT>
while (partioner.HasNextChunk()){
auto dp = partioner.GetNextChunk();
// Skip if already placed at workers
if (dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;
distributed::Data protoMsg;

std::vector<char> buffer;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len);

StoredInfo storedInfo({dp->dp_id});
StoredInfo storedInfo({dp->getID()});

auto address = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation();
auto address = dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->getLocation();

caller.asyncStoreCall(address, storedInfo);
// Minimum chunk size
Expand Down Expand Up @@ -172,12 +172,12 @@ struct Distribute<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT>

auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id);

auto data = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->getDistributedData();
data.identifier = storedData.identifier();
data.numRows = storedData.num_rows();
data.numCols = storedData.num_cols();
data.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).updateDistributedData(data);
dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->updateDistributedData(data);
}

}
Expand All @@ -202,22 +202,22 @@ struct Distribute<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT>
while (partioner.HasNextChunk()){
auto dp = partioner.GetNextChunk();
// Skip if already placed at workers
if (dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;


std::vector<char> buffer;


auto workerAddr = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation();
auto workerAddr = dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->getLocation();
std::thread t([=, &mat]()
{
auto stub = ctx->stubs[workerAddr].get();

distributed::StoredData storedData;
grpc::ClientContext grpc_ctx;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len);
auto serializer = DaphneSerializerChunks<DT>(slicedMat, dctx->config.max_distributed_serialization_chunk_size);

distributed::Data protoMsg;
Expand All @@ -238,10 +238,10 @@ struct Distribute<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT>
newData.numRows = storedData.num_rows();
newData.numCols = storedData.num_cols();
newData.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).updateDistributedData(newData);
dynamic_cast<AllocationDescriptorGRPC*>(dp->getAllocation(0))->updateDistributedData(newData);
DataObjectFactory::destroy(slicedMat);
});
threads_vector.push_back(move(t));
threads_vector.push_back(std::move(t));
}
for (auto &thread : threads_vector)
thread.join();
Expand Down
Loading

0 comments on commit 59944e7

Please sign in to comment.