Viewing file: workstealing.h (9.39 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
// -*- C++ -*-
// Copyright (C) 2007-2018 Free Software Foundation, Inc. // // This file is part of the GNU ISO C++ Library. This library is free // software; you can redistribute it and/or modify it under the terms // of the GNU General Public License as published by the Free Software // Foundation; either version 3, or (at your option) any later // version.
// This library is distributed in the hope that it will be useful, but // WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // General Public License for more details.
// Under Section 7 of GPL version 3, you are granted additional // permissions described in the GCC Runtime Library Exception, version // 3.1, as published by the Free Software Foundation.
// You should have received a copy of the GNU General Public License and // a copy of the GCC Runtime Library Exception along with this program; // see the files COPYING3 and COPYING.RUNTIME respectively. If not, see // <http://www.gnu.org/licenses/>.
/** @file parallel/workstealing.h * @brief Parallelization of embarrassingly parallel execution by * means of work-stealing. * * Work stealing is described in * * R. D. Blumofe and C. E. Leiserson. * Scheduling multithreaded computations by work stealing. * Journal of the ACM, 46(5):720–748, 1999. * * This file is a GNU parallel extension to the Standard C++ Library. */
// Written by Felix Putze.
#ifndef _GLIBCXX_PARALLEL_WORKSTEALING_H #define _GLIBCXX_PARALLEL_WORKSTEALING_H 1
#include <parallel/parallel.h> #include <parallel/random_number.h> #include <parallel/compatibility.h>
namespace __gnu_parallel {
#define _GLIBCXX_JOB_VOLATILE volatile
/** @brief One __job for a certain thread. */ template<typename _DifferenceTp> struct _Job { typedef _DifferenceTp _DifferenceType;
/** @brief First element. * * Changed by owning and stealing thread. By stealing thread, * always incremented. */ _GLIBCXX_JOB_VOLATILE _DifferenceType _M_first;
/** @brief Last element. * * Changed by owning thread only. */ _GLIBCXX_JOB_VOLATILE _DifferenceType _M_last;
/** @brief Number of elements, i.e. @c _M_last-_M_first+1. * * Changed by owning thread only. */ _GLIBCXX_JOB_VOLATILE _DifferenceType _M_load; };
/** @brief Work stealing algorithm for random access iterators. * * Uses O(1) additional memory. Synchronization at job lists is * done with atomic operations. * @param __begin Begin iterator of element sequence. * @param __end End iterator of element sequence. * @param __op User-supplied functor (comparator, predicate, adding * functor, ...). * @param __f Functor to @a process an element with __op (depends on * desired functionality, e. g. for std::for_each(), ...). * @param __r Functor to @a add a single __result to the already * processed elements (depends on functionality). * @param __base Base value for reduction. * @param __output Pointer to position where final result is written to * @param __bound Maximum number of elements processed (e. g. for * std::count_n()). * @return User-supplied functor (that may contain a part of the result). */ template<typename _RAIter, typename _Op, typename _Fu, typename _Red, typename _Result> _Op __for_each_template_random_access_workstealing(_RAIter __begin, _RAIter __end, _Op __op, _Fu& __f, _Red __r, _Result __base, _Result& __output, typename std::iterator_traits<_RAIter>::difference_type __bound) { _GLIBCXX_CALL(__end - __begin)
typedef std::iterator_traits<_RAIter> _TraitsType; typedef typename _TraitsType::difference_type _DifferenceType;
const _Settings& __s = _Settings::get();
_DifferenceType __chunk_size = static_cast<_DifferenceType>(__s.workstealing_chunk_size);
// How many jobs? _DifferenceType __length = (__bound < 0) ? (__end - __begin) : __bound;
// To avoid false sharing in a cache line. const int __stride = (__s.cache_line_size * 10 / sizeof(_Job<_DifferenceType>) + 1);
// Total number of threads currently working. _ThreadIndex __busy = 0;
_Job<_DifferenceType> *__job;
omp_lock_t __output_lock; omp_init_lock(&__output_lock);
// Write base value to output. __output = __base;
// No more threads than jobs, at least one thread. _ThreadIndex __num_threads = __gnu_parallel::max<_ThreadIndex> (1, __gnu_parallel::min<_DifferenceType>(__length, __get_max_threads()));
# pragma omp parallel shared(__busy) num_threads(__num_threads) { # pragma omp single { __num_threads = omp_get_num_threads();
// Create job description array. __job = new _Job<_DifferenceType>[__num_threads * __stride]; }
// Initialization phase.
// Flags for every thread if it is doing productive work. bool __iam_working = false;
// Thread id. _ThreadIndex __iam = omp_get_thread_num();
// This job. _Job<_DifferenceType>& __my_job = __job[__iam * __stride];
// Random number (for work stealing). _ThreadIndex __victim;
// Local value for reduction. _Result __result = _Result();
// Number of elements to steal in one attempt. _DifferenceType __steal;
// Every thread has its own random number generator // (modulo __num_threads). _RandomNumber __rand_gen(__iam, __num_threads);
// This thread is currently working. # pragma omp atomic ++__busy;
__iam_working = true;
// How many jobs per thread? last thread gets the rest. __my_job._M_first = static_cast<_DifferenceType> (__iam * (__length / __num_threads));
__my_job._M_last = (__iam == (__num_threads - 1) ? (__length - 1) : ((__iam + 1) * (__length / __num_threads) - 1)); __my_job._M_load = __my_job._M_last - __my_job._M_first + 1;
// Init result with _M_first value (to have a base value for reduction) if (__my_job._M_first <= __my_job._M_last) { // Cannot use volatile variable directly. _DifferenceType __my_first = __my_job._M_first; __result = __f(__op, __begin + __my_first); ++__my_job._M_first; --__my_job._M_load; }
_RAIter __current;
# pragma omp barrier
// Actual work phase // Work on own or stolen current start while (__busy > 0) { // Work until no productive thread left. # pragma omp flush(__busy)
// Thread has own work to do while (__my_job._M_first <= __my_job._M_last) { // fetch-and-add call // Reserve current job block (size __chunk_size) in my queue. _DifferenceType __current_job = __fetch_and_add<_DifferenceType>(&(__my_job._M_first), __chunk_size);
// Update _M_load, to make the three values consistent, // _M_first might have been changed in the meantime __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; for (_DifferenceType __job_counter = 0; __job_counter < __chunk_size && __current_job <= __my_job._M_last; ++__job_counter) { // Yes: process it! __current = __begin + __current_job; ++__current_job;
// Do actual work. __result = __r(__result, __f(__op, __current)); }
# pragma omp flush(__busy) }
// After reaching this point, a thread's __job list is empty. if (__iam_working) { // This thread no longer has work. # pragma omp atomic --__busy;
__iam_working = false; }
_DifferenceType __supposed_first, __supposed_last, __supposed_load; do { // Find random nonempty deque (not own), do consistency check. __yield(); # pragma omp flush(__busy) __victim = __rand_gen(); __supposed_first = __job[__victim * __stride]._M_first; __supposed_last = __job[__victim * __stride]._M_last; __supposed_load = __job[__victim * __stride]._M_load; } while (__busy > 0 && ((__supposed_load <= 0) || ((__supposed_first + __supposed_load - 1) != __supposed_last)));
if (__busy == 0) break;
if (__supposed_load > 0) { // Has work and work to do. // Number of elements to steal (at least one). __steal = (__supposed_load < 2) ? 1 : __supposed_load / 2;
// Push __victim's current start forward. _DifferenceType __stolen_first = __fetch_and_add<_DifferenceType> (&(__job[__victim * __stride]._M_first), __steal); _DifferenceType __stolen_try = (__stolen_first + __steal - _DifferenceType(1));
__my_job._M_first = __stolen_first; __my_job._M_last = __gnu_parallel::min(__stolen_try, __supposed_last); __my_job._M_load = __my_job._M_last - __my_job._M_first + 1;
// Has potential work again. # pragma omp atomic ++__busy; __iam_working = true;
# pragma omp flush(__busy) } # pragma omp flush(__busy) } // end while __busy > 0 // Add accumulated result to output. omp_set_lock(&__output_lock); __output = __r(__output, __result); omp_unset_lock(&__output_lock); }
delete[] __job;
// Points to last element processed (needed as return value for // some algorithms like transform) __f._M_finish_iterator = __begin + __length;
omp_destroy_lock(&__output_lock);
return __op; } } // end namespace
#endif /* _GLIBCXX_PARALLEL_WORKSTEALING_H */
|