Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Parallelization] Making explicitily schedule(runtime), with dynamic by default, in OMP loops in ParallelUtils #12923

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,27 @@ if (KRATOS_SHARED_MEMORY_PARALLELIZATION STREQUAL "OpenMP")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OpenMP_C_FLAGS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${OpenMP_EXE_LINKER_FLAGS}")

# Check if the environment variable OMP_SCHEDULE is defined
if(DEFINED ENV{OMP_SCHEDULE})
# Set the already defined one
set(KRATOS_OMP_SCHEDULE $ENV{OMP_SCHEDULE})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMP_SCHEDULE is a runtime env variable, it is a extremely bad idea to use it a compilation switch (IMO).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, but this is the following.

During compilation the OMP_SCHEDULE will set KRATOS_OMP_SCHEDULE that will be used as default if actually OMP_SCHEDULE is not defined, but if OMP_SCHEDULE is defined OMP_SCHEDULE will be taken into account. Do you understand me?

else(DEFINED ENV{OMP_SCHEDULE})
# If not defined set the default value
if(NOT DEFINED KRATOS_OMP_SCHEDULE)
message(STATUS "OMP_SCHEDULE is not defined, setting to dynamic. You can also set it with the environment variable OMP_SCHEDULE or with the CMake variable KRATOS_OMP_SCHEDULE (e.g., dynamic,4)")
set(KRATOS_OMP_SCHEDULE "dynamic")
endif(NOT DEFINED KRATOS_OMP_SCHEDULE)
endif((DEFINED ENV{OMP_SCHEDULE}))

# Display the selected schedule in the build output
message(STATUS "KRATOS_OMP_SCHEDULE is set to: ${KRATOS_OMP_SCHEDULE}")

# Define the OMP_SCHEDULE as a preprocessor macro
add_definitions(-DKRATOS_OMP_SCHEDULE="${KRATOS_OMP_SCHEDULE}")

# This is the only way to run OMP loops with dynamic schedule without conflicting the GIL
add_definitions(-DPYBIND11_NO_ASSERT_GIL_HELD_INCREF_DECREF)
else (OPENMP_FOUND)
message(FATAL_ERROR "OpenMP could not be found!")
# fallback solution => in future once better supported we can use the C++11 based parallelization instead
Expand Down
112 changes: 112 additions & 0 deletions kratos/benchmarks/parallel_utilities_benchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// | / |
// ' / __| _` | __| _ \ __|
// . \ | ( | | ( |\__ `
// _|\_\_| \__,_|\__|\___/ ____/
// Multi-Physics
//
// License: BSD License
// Kratos default license: kratos/license.txt
//
// Main authors: Vicente Mataix Ferrandiz
//

// System includes
#include <utility>
#include <numeric>
#include <iostream>
#include <unordered_map>

// External includes
#include <benchmark/benchmark.h>

// Project includes
#include "utilities/parallel_utilities.h"
#include "utilities/reduction_utilities.h"

namespace Kratos
{
// Template class for testing
template<std::size_t TSize>
class RHSElement {
public:
explicit RHSElement(const double Val) : mRHSVal(Val) {}
void CalculateRHS(std::vector<double>& rVector) {
if (rVector.size() != TSize) { rVector.resize(TSize); }
std::fill(rVector.begin(), rVector.end(), mRHSVal);
}
double GetAccumRHSValue() { return mAccumRHSValue; }
void SetAccumRHSValue(double Value) { mAccumRHSValue = Value; }

private:
double mRHSVal;
double mAccumRHSValue = 0.0;
};

// Benchmark for power operation on a vector
static void BM_VectorPower(benchmark::State& state) {
int nsize = state.range(0);
std::vector<double> data_vector(nsize, 5.0);

for (auto _ : state) {
block_for_each(data_vector, [](double& item) {
item = std::pow(item, 0.1);
});
}
}

// Benchmark for reduction
static void BM_VectorReduction(benchmark::State& state) {
int nsize = state.range(0);
std::vector<double> data_vector(nsize, 5.0);

for (auto _ : state) {
auto final_sum = BlockPartition<std::vector<double>::iterator>(data_vector.begin(),
data_vector.end()).for_each<SumReduction<double>>(
[](double& item){
return item;
});
}
}

// Benchmark for element-wise operations with thread-local storage
static void BM_ThreadLocalStorage(benchmark::State& state) {
constexpr std::size_t vec_size = 6;
std::size_t n_elems = state.range(0);

using RHSElementType = RHSElement<vec_size>;

std::vector<double> rhs_vals(n_elems);
for (std::size_t i = 0; i < n_elems; ++i) {
rhs_vals[i] = (i % 12) * 1.889;
}

std::vector<RHSElementType> elements;
for (std::size_t i = 0; i < rhs_vals.size(); ++i) {
elements.push_back(RHSElementType(rhs_vals[i]));
}

auto tls_lambda_manual_reduction = [](RHSElementType& rElem, std::vector<double>& rTLS)
{
rElem.CalculateRHS(rTLS);
double rhs_sum = std::accumulate(rTLS.begin(), rTLS.end(), 0.0);
rElem.SetAccumRHSValue(rhs_sum);
};

for (auto _ : state) {
BlockPartition<std::vector<RHSElementType>::iterator>(elements.begin(),
elements.end()).for_each(std::vector<double>(), tls_lambda_manual_reduction);

const double sum_elem_rhs_vals = std::accumulate(elements.begin(), elements.end(), 0.0, [](double acc, RHSElementType& rElem){
return acc + rElem.GetAccumRHSValue();
});
}
}

// Register benchmarks and provide input size as a command-line option
BENCHMARK(BM_VectorPower)->Arg(1e3)->Arg(1e5)->Arg(1e6);
BENCHMARK(BM_VectorReduction)->Arg(1e3)->Arg(1e5)->Arg(1e6);
BENCHMARK(BM_ThreadLocalStorage)->Arg(1e3)->Arg(1e5)->Arg(1e6);

} // namespace Kratos

BENCHMARK_MAIN();
62 changes: 45 additions & 17 deletions kratos/sources/kernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
// _|\_\_| \__,_|\__|\___/ ____/
// Multi-Physics
//
// License: BSD License
// Kratos default license: kratos/license.txt
// License: BSD License
// Kratos default license: kratos/license.txt
//
// Main authors: Pooyan Dadvand
//
Expand Down Expand Up @@ -141,33 +141,62 @@ void Kernel::SetPythonVersion(std::string pyVersion) {

void Kernel::PrintParallelismSupportInfo() const
{
#ifdef KRATOS_SMP_NONE
#ifdef KRATOS_SMP_NONE
constexpr bool threading_support = false;
#else
constexpr auto smp = "None";
#else
constexpr bool threading_support = true;
std::string scheduling_str;
#if defined(KRATOS_SMP_OPENMP)
// Check if the environment variable is defined
const char* var_name = "OMP_SCHEDULE";
const char* scheduling = getenv(var_name);

if (scheduling != nullptr) { // Correct variable name and nullptr comparison
scheduling_str = scheduling;
} else {
#ifdef KRATOS_OMP_SCHEDULE
scheduling_str = KRATOS_OMP_SCHEDULE; // Use the preprocessor-defined value
#else
scheduling_str = "dynamic"; // NOTE: This should not happen as defined in compiling time
#endif
#ifdef KRATOS_COMPILED_IN_WINDOWS
const int output_setenv = _putenv_s(var_name, scheduling_str.c_str());
#else
const int overwrite = 1; // Overwrite if it exists, a priori not, that's why we are setting it
const int output_setenv = setenv(var_name, scheduling_str.c_str(), overwrite);
#endif
KRATOS_ERROR_IF_NOT(output_setenv == 0) << "Error setting environment variable " << var_name << std::endl;
scheduling_str = "\"" + scheduling_str + "\"";
scheduling_str += " (retrieving from KRATOS_OMP_SCHEDULE)";
}

const auto smp = "OpenMP, scheduling type is " + scheduling_str; // Use `std::string` for concatenation
#elif defined(KRATOS_SMP_CXX11)
constexpr auto smp = "C++11";
#else
constexpr auto smp = "Unknown";
#endif
#endif

#ifdef KRATOS_USING_MPI
#ifdef KRATOS_USING_MPI
constexpr bool mpi_support = true;
#else
#else
constexpr bool mpi_support = false;
#endif
#endif

Logger logger("");
logger << LoggerMessage::Severity::INFO;

if (threading_support) {
if (mpi_support) {
logger << "Compiled with threading and MPI support." << std::endl;
}
else {
logger << "Compiled with threading support." << std::endl;
logger << "Compiled with threading and MPI support. Threading support with " << smp << "." << std::endl;
} else {
logger << "Compiled with threading support. Threading support with " << smp << "." << std::endl;
}
}
else if (mpi_support) {
} else if (mpi_support) {
logger << "Compiled with MPI support." << std::endl;
}
else {
} else {
logger << "Serial compilation." << std::endl;
}

Expand All @@ -179,8 +208,7 @@ void Kernel::PrintParallelismSupportInfo() const
if (mIsDistributedRun) {
const DataCommunicator& r_world = ParallelEnvironment::GetDataCommunicator("World");
logger << "MPI world size: " << r_world.Size() << "." << std::endl;
}
else {
} else {
logger << "Running without MPI." << std::endl;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
// _|\_\_| \__,_|\__|\___/ ____/
// Multi-Physics
//
// License: BSD License
// Kratos default license: kratos/license.txt
// License: BSD License
// Kratos default license: kratos/license.txt
//
// Main authors: Riccardo Rossi
// Philipp Bucher (https://github.com/philbucher)
//

// System includes
#include <utility>
Expand Down
17 changes: 9 additions & 8 deletions kratos/utilities/parallel_utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// Main authors: Riccardo Rossi
// Denis Demidov
// Philipp Bucher (https://github.com/philbucher)
// Vicente Mataix Ferrandiz
//

#pragma once
Expand Down Expand Up @@ -183,7 +184,7 @@ class BlockPartition
{
KRATOS_PREPARE_CATCH_THREAD_EXCEPTION

#pragma omp parallel for
#pragma omp parallel for schedule(runtime)
for (int i=0; i<mNchunks; ++i) {
KRATOS_TRY
for (auto it = mBlockPartition[i]; it != mBlockPartition[i+1]; ++it) {
Expand All @@ -206,7 +207,7 @@ class BlockPartition
KRATOS_PREPARE_CATCH_THREAD_EXCEPTION

TReducer global_reducer;
#pragma omp parallel for
#pragma omp parallel for schedule(runtime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loumalouomega as i am telling, take a look to line 154. It does not make sense to change this unless we change what happens there.

also to my understanding the runtime behaviour has potentially a very high overhead due to the need of making a syscall to fetch an env variable.

https://stackoverflow.com/questions/7460552/reading-environment-variables-is-slow-operation/7460612#7460612

not sure if that matters...but at least we need to beware of this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the benchmark to check that it affects significantly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of the runtime is to give flexibility, if you prefer we can define it on compiling time...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loumalouomega aside of the comments on the opportunity of using the OMP_SCHEDULE did u take a look at what i am writing?

we are doing "by hand" the chunking. If we don't change that, it makes no sense to use a different scheduling, as everyone will be working on its chunk (as of now we dot have more chunks than threads!)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay...let me think this...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Centrantly there is no effect (significative), I need to rethink this...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we may need to rethink the chunging (to be dependent of the CPU architecture)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RiccardoRossi what do you suggest exactly, because I has been studying this and our chunking conflicts with the OMP scheduling, and a priori the most efficient would be to let OMP to do the chunking. The problem is that with that we lose the parallel_utilities design and reduction utilities.

for (int i=0; i<mNchunks; ++i) {
KRATOS_TRY
TReducer local_reducer;
Expand Down Expand Up @@ -238,7 +239,7 @@ class BlockPartition
// copy the prototype to create the thread local storage
TThreadLocalStorage thread_local_storage(rThreadLocalStoragePrototype);

#pragma omp for
#pragma omp for schedule(runtime)
for(int i=0; i<mNchunks; ++i){
KRATOS_TRY
for (auto it = mBlockPartition[i]; it != mBlockPartition[i+1]; ++it){
Expand Down Expand Up @@ -270,7 +271,7 @@ class BlockPartition
// copy the prototype to create the thread local storage
TThreadLocalStorage thread_local_storage(rThreadLocalStoragePrototype);

#pragma omp for
#pragma omp for schedule(runtime)
for (int i=0; i<mNchunks; ++i) {
KRATOS_TRY
TReducer local_reducer;
Expand Down Expand Up @@ -519,7 +520,7 @@ class IndexPartition
{
KRATOS_PREPARE_CATCH_THREAD_EXCEPTION

#pragma omp parallel for
#pragma omp parallel for schedule(runtime)
for (int i=0; i<mNchunks; ++i) {
KRATOS_TRY
for (auto k = mBlockPartition[i]; k < mBlockPartition[i+1]; ++k) {
Expand All @@ -541,7 +542,7 @@ class IndexPartition
KRATOS_PREPARE_CATCH_THREAD_EXCEPTION

TReducer global_reducer;
#pragma omp parallel for
#pragma omp parallel for schedule(runtime)
for (int i=0; i<mNchunks; ++i) {
KRATOS_TRY
TReducer local_reducer;
Expand Down Expand Up @@ -572,7 +573,7 @@ class IndexPartition
// copy the prototype to create the thread local storage
TThreadLocalStorage thread_local_storage(rThreadLocalStoragePrototype);

#pragma omp for
#pragma omp for schedule(runtime)
for (int i=0; i<mNchunks; ++i) {
KRATOS_TRY
for (auto k = mBlockPartition[i]; k < mBlockPartition[i+1]; ++k) {
Expand Down Expand Up @@ -604,7 +605,7 @@ class IndexPartition
// copy the prototype to create the thread local storage
TThreadLocalStorage thread_local_storage(rThreadLocalStoragePrototype);

#pragma omp for
#pragma omp for schedule(runtime)
for (int i=0; i<mNchunks; ++i) {
KRATOS_TRY
TReducer local_reducer;
Expand Down
Loading