Skip to content

Commit

Permalink
Order preserving multi-threading
Browse files Browse the repository at this point in the history
  • Loading branch information
Jean-Romain committed Mar 4, 2024
1 parent 289e9c3 commit 8b94ae9
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions src/process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "LAScatalog.h"
#include "openmp.h"

// Parallel processing strategies
#define SEQUENTIAL 1
#define CONCURRENTPOINTS 2
#define CONCURRENTFILES 3
Expand Down Expand Up @@ -126,6 +127,12 @@ SEXP process(SEXP sexppipeline, SEXP sexpprogrss, SEXP sexpncpu, SEXP sexpmode,

bool failure = false;
int k = 0;

// Records a pointer to the private pipeline of each threads
// This allows to know the order of the results
std::vector<Pipeline*> private_pipelines_ptr;
private_pipelines_ptr.reserve(ncpu_outer_loop);

#pragma omp parallel num_threads(ncpu_outer_loop)
{
try
Expand All @@ -134,6 +141,7 @@ SEXP process(SEXP sexppipeline, SEXP sexpprogrss, SEXP sexpncpu, SEXP sexpmode,
// ensure that shared resources are protected (such as connection to output files)
// and private data are copied.
Pipeline private_pipeline(pipeline);
private_pipelines_ptr[omp_get_thread_num()] = &private_pipeline;

#pragma omp for
for (int i = 0 ; i < n ; ++i)
Expand Down Expand Up @@ -200,13 +208,16 @@ SEXP process(SEXP sexppipeline, SEXP sexpprogrss, SEXP sexpncpu, SEXP sexpmode,
// We are outside the main loop. We can clear the pipeline with last = true;
private_pipeline.clear(true);

// We have multiple pipelines that each process some chunks and each have a partial
// output. We reduce in the main pipeline
#pragma omp critical
// We have multiple pipelines and each processed some chunks and each have a partial
// output. We reduce in the main pipeline. To preserve the ordering of the output, first
// we put a barrier, then each thread is merged in order
#pragma omp barrier
#pragma omp single
{
if (!failure)
{
pipeline.merge(private_pipeline);
for (int i = 0 ; i < ncpu_outer_loop ; i++)
pipeline.merge(*private_pipelines_ptr[i]);
}
}
}
Expand Down

0 comments on commit 8b94ae9

Please sign in to comment.