26 #ifdef GETFEM_HAS_OPENMP
31 using bgeot::scalar_type;
35 #ifdef GETFEM_HAS_OPENMP
37 std::recursive_mutex omp_guard::mutex;
39 omp_guard::omp_guard()
41 std::make_unique<std::lock_guard<std::recursive_mutex>>(mutex)
45 local_guard::local_guard(std::recursive_mutex& m) :
48 std::make_shared<std::lock_guard<std::recursive_mutex>>(m)
52 local_guard lock_factory::get_lock()
const{
53 return local_guard{mutex};
56 size_type global_thread_policy::this_thread() {
57 return partition_master::get().get_current_partition();
60 size_type global_thread_policy::num_threads(){
61 return partition_master::get().get_nb_partitions();
64 size_type true_thread_policy::this_thread() {
65 return omp_get_thread_num();
68 size_type true_thread_policy::num_threads(){
69 return omp_get_max_threads();
73 omp_set_num_threads(n);
74 partition_master::get().check_threads();
79 if(omp_get_num_threads() == 1 && omp_get_level() == 0)
return false;
81 if(omp_get_num_threads() == 1 && omp_get_level() == 1)
return true;
83 if(omp_in_parallel() == 1)
return true;
89 return omp_get_max_threads() == 1;
93 return std::thread::hardware_concurrency();
98 size_type global_thread_policy::this_thread() {
return 0;}
100 size_type global_thread_policy::num_threads(){
return 1;}
102 size_type true_thread_policy::this_thread() {
return 0;}
104 size_type true_thread_policy::num_threads(){
return 1;}
121 std::vector<std::exception_ptr> exceptions;
123 void captureException(){
124 exceptions[true_thread_policy::this_thread()] = std::current_exception();
129 : exceptions(true_thread_policy::num_threads(),
nullptr)
132 template <
typename function,
typename... parameters>
133 void run(
function f, parameters... params){
134 try {f(params...);}
catch (...) {captureException();}
137 std::vector<std::exception_ptr> caughtExceptions()
const{
138 std::vector<std::exception_ptr> non_empty_exceptions;
139 for (
auto &&pException : exceptions){
140 if (pException !=
nullptr) non_empty_exceptions.push_back(pException);
142 return non_empty_exceptions;
146 for (
auto &&pException : exceptions){
147 if (pException !=
nullptr) std::rethrow_exception(pException);
152 partition_iterator::partition_iterator(
154 : master{m}, it{it_from_set}
157 partition_iterator partition_iterator::operator++(){
159 if (*
this != master.
end()) master.set_current_partition(*it);
163 bool partition_iterator::operator==(
const partition_iterator &it1)
const {
167 bool partition_iterator::operator!=(
const partition_iterator &it1)
const {
168 return !(*
this == it1);
171 size_type partition_iterator::operator*()
const{
175 partition_master partition_master::instance;
177 partition_master& partition_master::get(){
181 void partition_master::check_threads(){
183 auto must_update =
false;
184 if (nb_user_threads != true_thread_policy::num_threads()){
185 nb_user_threads = true_thread_policy::num_threads();
188 if (nb_partitions < nb_user_threads && !partitions_set_by_user){
189 nb_partitions = nb_user_threads;
194 dal::singletons_manager::on_partitions_change();
199 GMM_ASSERT1 (!partitions_set_by_user,
200 "Number of partitions can be set only once.");
201 if (n > nb_partitions){
203 nb_user_threads = true_thread_policy::num_threads();
205 dal::singletons_manager::on_partitions_change();
207 else if (n < nb_partitions){
208 GMM_WARNING1(
"Not reducing number of partitions from "
209 << nb_partitions <<
" to " << n <<
210 " as it might invalidate global storage.");
212 partitions_set_by_user =
true;
216 GMM_ASSERT1(nb_user_threads == true_thread_policy::num_threads(),
217 "The number of omp threads was changed outside partition_master."
218 "Please use getfem::set_num_threads for this.");
219 current_partition = *(std::begin(partitions.thrd_cast()));
230 "Cannot change thread policy in parallel section.");
236 partition_master::partition_master()
237 : nb_user_threads{1}, nb_partitions{1} {
238 partitions_updated =
false;
244 GMM_ASSERT2(behaviour == thread_behaviour::partition_threads ?
245 true_thread_policy::this_thread() < nb_partitions :
true,
246 "Requesting current partition for thread " <<
247 true_thread_policy::this_thread() <<
248 " while number of partitions is " << nb_partitions
250 return behaviour == thread_behaviour::partition_threads ?
251 current_partition : true_thread_policy::this_thread();
255 return behaviour == thread_behaviour::partition_threads ?
256 nb_partitions : true_thread_policy::num_threads();
259 void partition_master::set_current_partition(
size_type p){
260 if (behaviour == thread_behaviour::partition_threads){
261 GMM_ASSERT2(partitions.thrd_cast().count(p) != 0,
"Internal error: "
262 << p <<
" is not a valid partitions for thread "
263 << true_thread_policy::this_thread()
265 current_partition = p;
269 void partition_master::rewind_partitions(){
271 current_partition = *(std::begin(partitions.thrd_cast()));
274 for (
size_type t = 0; t != partitions.num_threads(); ++t){
275 current_partition(t) = *(std::begin(partitions(t)));
280 void partition_master::update_partitions(){
281 partitions_updated =
false;
285 if (partitions_updated)
return;
287 partitions = decltype(partitions){};
288 current_partition = decltype(current_partition){};
290 auto n_threads = true_thread_policy::num_threads();
291 if(n_threads > nb_partitions){
292 GMM_WARNING0(
"Using " << n_threads <<
293 " threads which is above the maximum number of partitions :" <<
297 if (behaviour == thread_behaviour::partition_threads){
298 for (
size_type t = 0; t != n_threads; ++t){
299 auto partition_size =
static_cast<size_type>
300 (std::ceil(
static_cast<scalar_type
>(nb_partitions) /
301 static_cast<scalar_type
>(n_threads)));
302 auto partition_begin = partition_size * t;
303 if (partition_begin >= nb_partitions)
break;
304 auto partition_end = std::min(partition_size * (t + 1), nb_partitions);
305 auto hint_it = std::begin(partitions(t));
306 for (
size_type i = partition_begin; i != partition_end; ++i){
307 hint_it = partitions(t).insert(hint_it, i);
309 current_partition(t) = partition_begin;
313 for (
size_type t = 0; t != n_threads; ++t){
314 partitions(t).insert(t);
315 current_partition(t) = t;
319 partitions_updated =
true;
322 #if defined _WIN32 && !defined (__GNUC__)
323 #define GETFEM_ON_WIN
326 parallel_boilerplate::
327 parallel_boilerplate()
328 : plocale{std::make_unique<standard_locale>()},
329 pexception{std::make_unique<thread_exception>()} {
331 _configthreadlocale(_ENABLE_PER_THREAD_LOCALE);
335 void parallel_boilerplate::run_lambda(std::function<
void(
void)> lambda){
336 pexception->run(lambda);
339 parallel_boilerplate::~parallel_boilerplate(){
341 _configthreadlocale(_DISABLE_PER_THREAD_LOCALE);
343 pexception->rethrow();
346 void parallel_execution(std::function<
void(
void)> lambda,
347 bool iterate_over_partitions){
352 parallel_boilerplate boilerplate;
353 auto &pm = partition_master::get();
354 if (pm.get_nb_partitions() < true_thread_policy::num_threads()){
355 pm.set_nb_partitions(true_thread_policy::num_threads());
357 #pragma omp parallel default(shared)
359 if (iterate_over_partitions) {
360 for (
auto &&partitions : partition_master::get()) {
362 boilerplate.run_lambda(lambda);
366 boilerplate.run_lambda(lambda);
369 if (iterate_over_partitions) partition_master::get().rewind_partitions();
373 #ifdef GETFEM_FORCE_SINGLE_THREAD_BLAS
375 # define BLAS_FORCE_SINGLE_THREAD \
376 int openblas_get_num_threads_res = 1; \
378 typedef int (* ptrfunc1)(); \
379 ptrfunc1 func1 = ptrfunc1(dlsym(NULL, "openblas_get_num_threads")); \
380 if (func1) openblas_get_num_threads_res = (*func1)(); \
381 typedef void (* ptrfunc2)(int); \
382 ptrfunc2 func2 = ptrfunc2(dlsym(NULL, "openblas_set_num_threads")); \
383 if (func2) (*func2)(1); \
385 # define BLAS_RESTORE_NUM_THREAD \
387 typedef void (* ptrfunc2)(int); \
388 ptrfunc func2 = ptrfunc2(dlsym(NULL, "openblas_set_num_threads")); \
389 if (func2) (*func)(openblas_get_num_threads_res); \
392 # define BLAS_FORCE_SINGLE_THREAD
393 # define BLAS_RESTORE_NUM_THREAD
397 struct dummy_class_for_blas_nbthread_init {
398 dummy_class_for_blas_nbthread_init(
void)
399 { BLAS_FORCE_SINGLE_THREAD; }
402 static dummy_class_for_blas_nbthread_init dcfbnti;