diff --git a/src/process.cpp b/src/process.cpp index b05b6f25..107712f9 100644 --- a/src/process.cpp +++ b/src/process.cpp @@ -20,6 +20,7 @@ #include "LAScatalog.h" #include "openmp.h" +// Parallel processing strategies #define SEQUENTIAL 1 #define CONCURRENTPOINTS 2 #define CONCURRENTFILES 3 @@ -126,6 +127,12 @@ SEXP process(SEXP sexppipeline, SEXP sexpprogrss, SEXP sexpncpu, SEXP sexpmode, bool failure = false; int k = 0; + + // Records a pointer to the private pipeline of each threads + // This allows to know the order of the results + std::vector private_pipelines_ptr; + private_pipelines_ptr.reserve(ncpu_outer_loop); + #pragma omp parallel num_threads(ncpu_outer_loop) { try @@ -134,6 +141,7 @@ SEXP process(SEXP sexppipeline, SEXP sexpprogrss, SEXP sexpncpu, SEXP sexpmode, // ensure that shared resources are protected (such as connection to output files) // and private data are copied. Pipeline private_pipeline(pipeline); + private_pipelines_ptr[omp_get_thread_num()] = &private_pipeline; #pragma omp for for (int i = 0 ; i < n ; ++i) @@ -200,13 +208,16 @@ SEXP process(SEXP sexppipeline, SEXP sexpprogrss, SEXP sexpncpu, SEXP sexpmode, // We are outside the main loop. We can clear the pipeline with last = true; private_pipeline.clear(true); - // We have multiple pipelines that each process some chunks and each have a partial - // output. We reduce in the main pipeline - #pragma omp critical + // We have multiple pipelines and each processed some chunks and each have a partial + // output. We reduce in the main pipeline. To preserve the ordering of the output, first + // we put a barrier, then each thread is merged in order + #pragma omp barrier + #pragma omp single { if (!failure) { - pipeline.merge(private_pipeline); + for (int i = 0 ; i < ncpu_outer_loop ; i++) + pipeline.merge(*private_pipelines_ptr[i]); } } }