diff options
Diffstat (limited to 'src/runtime/CPP/CPPScheduler.cpp')
-rw-r--r-- | src/runtime/CPP/CPPScheduler.cpp | 532 |
1 files changed, 313 insertions, 219 deletions
diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp index 0a03497cb9..9fbdc3a4dd 100644 --- a/src/runtime/CPP/CPPScheduler.cpp +++ b/src/runtime/CPP/CPPScheduler.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2020 ARM Limited. + * Copyright (c) 2016-2023 Arm Limited. * * SPDX-License-Identifier: MIT * @@ -26,17 +26,21 @@ #include "arm_compute/core/CPP/ICPPKernel.h" #include "arm_compute/core/Error.h" #include "arm_compute/core/Helpers.h" +#include "arm_compute/core/Log.h" #include "arm_compute/core/Utils.h" -#include "arm_compute/runtime/CPUUtils.h" +#include "arm_compute/core/utils/misc/Utility.h" + #include "support/Mutex.h" #include <atomic> #include <condition_variable> #include <iostream> #include <list> +#include <memory> #include <mutex> #include <system_error> #include <thread> +#include <vector> namespace arm_compute { @@ -50,8 +54,7 @@ public: * @param[in] start First value that will be returned by the feeder * @param[in] end End condition (The last value returned by get_next() will be end - 1) */ - explicit ThreadFeeder(unsigned int start = 0, unsigned int end = 0) - : _atomic_counter(start), _end(end) + explicit ThreadFeeder(unsigned int start = 0, unsigned int end = 0) : _atomic_counter(start), _end(end) { } /** Return the next element in the range if there is one. @@ -71,61 +74,6 @@ private: const unsigned int _end; }; -/** Given two dimensions and a maxium number of threads to utilise, calcualte the best - * combination of threads that fit in (mutliplied together) max_threads. - * - * This algorithm assumes that work in either of the dimensions is equally difficult - * to compute - * - * @returns [m_nthreads, n_nthreads] A pair of the threads that should be used in each dimension - */ -std::pair<unsigned, unsigned> split_2d(unsigned max_threads, std::size_t m, std::size_t n) -{ - /* - * We want the same ratio of threads in M & N to the ratio of m and n problem size - * - * Therefore: mt/nt == m/n where mt*nt == max_threads - * - * max_threads/nt = mt & (max_threads/nt) * (m/n) = nt - * nt^2 = max_threads * (m/n) - * nt = sqrt( max_threads * (m/n) ) - */ - //ratio of m to n in problem dimensions - double ratio = m / static_cast<double>(n); - - // nt = sqrt(max_threads * (m / n) ) - const unsigned adjusted = std::round( - std::sqrt(max_threads * ratio)); - - //find the nearest factor of max_threads - for(unsigned i = 0; i!= adjusted; ++i) - { - //try down - const unsigned adj_down = adjusted - i; - if(max_threads % adj_down == 0) - { - return { adj_down, max_threads / adj_down }; - } - - //try up - const unsigned adj_up = adjusted + i; - if(max_threads % adj_up == 0) - { - return { adj_up, max_threads / adj_up }; - } - } - - //we didn't find anything so lets bail out with maxes biased to the largest dimension - if(m > n) - { - return{ std::min<unsigned>(m, max_threads), 1 }; - } - else - { - return{ 1, std::min<unsigned>(n, max_threads) }; - } -} - /** Execute workloads[info.thread_id] first, then call the feeder to get the index of the next workload to run. * * Will run workloads until the feeder reaches the end of its range. @@ -141,51 +89,77 @@ void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeede { ARM_COMPUTE_ERROR_ON(workload_index >= workloads.size()); workloads[workload_index](info); - } - while(feeder.get_next(workload_index)); + } while (feeder.get_next(workload_index)); } -} //namespace - -struct CPPScheduler::Impl final +/** Set thread affinity. Pin current thread to a particular core + * + * @param[in] core_id ID of the core to which the current thread is pinned + */ +void set_thread_affinity(int core_id) { - explicit Impl(unsigned int thread_hint) - : _num_threads(thread_hint), _threads(_num_threads - 1) - { - } - void set_num_threads(unsigned int num_threads, unsigned int thead_hint) + if (core_id < 0) { - _num_threads = num_threads == 0 ? thead_hint : num_threads; - _threads.resize(_num_threads - 1); - } - unsigned int num_threads() const - { - return _num_threads; + return; } - void run_workloads(std::vector<IScheduler::Workload> &workloads); - - class Thread; +#if !defined(_WIN64) && !defined(__APPLE__) && !defined(__OpenBSD__) + cpu_set_t set; + CPU_ZERO(&set); + CPU_SET(core_id, &set); + ARM_COMPUTE_EXIT_ON_MSG(sched_setaffinity(0, sizeof(set), &set), "Error setting thread affinity"); +#endif /* !defined(__APPLE__) && !defined(__OpenBSD__) */ +} - unsigned int _num_threads; - std::list<Thread> _threads; - arm_compute::Mutex _run_workloads_mutex{}; -}; +/** There are currently 2 scheduling modes supported by CPPScheduler + * + * Linear: + * The default mode where all the scheduling is carried out by the main thread linearly (in a loop). + * E.G. If there are 8 threads in total, there will be 1 main thread + 7 threads in the thread pool, and it is main + * thread's responsibility to start all the other threads in the thread pool. + * + * Fanout: + * In fanout mode, the scheduling (starting other threads) task is distributed across many threads instead of just + * the main thread. + * + * The scheduler has a fixed parameter: wake_fanout, and the scheduling sequence goes like this: + * 1. Main thread wakes the first wake_fanout - 1 number of FanoutThreads from the thread pool + * From thread: 0 + * To thread (non-inclusive): Wake_fanout - 1 + * 2. Each FanoutThread then wakes wake_fanout number of FanoutThreads from the thread pool: + * From thread: (i + 1) * wake_fanout - 1 + * To thread (non-inclusive): (i + 2) * wake_fanout - 1 + * where i is the current thread's thread id + * The end is clamped at the size of the thread pool / the number of threads in use - 1 + * + * E.G. for a total number of 8 threads (1 main thread, 7 FanoutThreads in thread pool) with a fanout of 3 + * 1. Main thread wakes FanoutThread 0, 1 + * 2. FanoutThread 0 wakes FanoutThread 2, 3, 4 + * 3. FanoutThread 1 wakes FanoutThread 5, 6 + */ -class CPPScheduler::Impl::Thread final +class Thread final { public: - /** Start a new thread. */ - Thread(); + /** Start a new thread + * + * Thread will be pinned to a given core id if value is non-negative + * + * @param[in] core_pin Core id to pin the thread on. If negative no thread pinning will take place + */ + explicit Thread(int core_pin = -1); - Thread(const Thread &) = delete; + Thread(const Thread &) = delete; Thread &operator=(const Thread &) = delete; Thread(Thread &&) = delete; - Thread &operator=(Thread &&) = delete; + Thread &operator=(Thread &&) = delete; /** Destructor. Make the thread join. */ ~Thread(); + /** Set workloads */ + void set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info); + /** Request the worker thread to start executing workloads. * * The thread will start by executing workloads[info.thread_id] and will then call the feeder to @@ -194,47 +168,72 @@ public: * @note This function will return as soon as the workloads have been sent to the worker thread. * wait() needs to be called to ensure the execution is complete. */ - void start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info); + void start(); /** Wait for the current kernel execution to complete. */ - void wait(); + std::exception_ptr wait(); /** Function ran by the worker thread. */ void worker_thread(); + /** Set the scheduling strategy to be linear */ + void set_linear_mode() + { + _thread_pool = nullptr; + _wake_beg = 0; + _wake_end = 0; + } + + /** Set the scheduling strategy to be fanout */ + void set_fanout_mode(std::list<Thread> *thread_pool, unsigned int wake_beg, unsigned int wake_end) + { + _thread_pool = thread_pool; + _wake_beg = wake_beg; + _wake_end = wake_end; + } + private: std::thread _thread{}; ThreadInfo _info{}; - std::vector<IScheduler::Workload> *_workloads{ nullptr }; - ThreadFeeder *_feeder{ nullptr }; + std::vector<IScheduler::Workload> *_workloads{nullptr}; + ThreadFeeder *_feeder{nullptr}; std::mutex _m{}; std::condition_variable _cv{}; - bool _wait_for_work{ false }; - bool _job_complete{ true }; - std::exception_ptr _current_exception{ nullptr }; + bool _wait_for_work{false}; + bool _job_complete{true}; + std::exception_ptr _current_exception{nullptr}; + int _core_pin{-1}; + std::list<Thread> *_thread_pool{nullptr}; + unsigned int _wake_beg{0}; + unsigned int _wake_end{0}; }; -CPPScheduler::Impl::Thread::Thread() +Thread::Thread(int core_pin) : _core_pin(core_pin) { _thread = std::thread(&Thread::worker_thread, this); } -CPPScheduler::Impl::Thread::~Thread() +Thread::~Thread() { // Make sure worker thread has ended - if(_thread.joinable()) + if (_thread.joinable()) { ThreadFeeder feeder; - start(nullptr, feeder, ThreadInfo()); + set_workload(nullptr, feeder, ThreadInfo()); + start(); _thread.join(); } } -void CPPScheduler::Impl::Thread::start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info) +void Thread::set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info) { _workloads = workloads; _feeder = &feeder; _info = info; +} + +void Thread::start() +{ { std::lock_guard<std::mutex> lock(_m); _wait_for_work = true; @@ -243,22 +242,20 @@ void CPPScheduler::Impl::Thread::start(std::vector<IScheduler::Workload> *worklo _cv.notify_one(); } -void CPPScheduler::Impl::Thread::wait() +std::exception_ptr Thread::wait() { { std::unique_lock<std::mutex> lock(_m); _cv.wait(lock, [&] { return _job_complete; }); } - - if(_current_exception) - { - std::rethrow_exception(_current_exception); - } + return _current_exception; } -void CPPScheduler::Impl::Thread::worker_thread() +void Thread::worker_thread() { - while(true) + set_thread_affinity(_core_pin); + + while (true) { std::unique_lock<std::mutex> lock(_m); _cv.wait(lock, [&] { return _wait_for_work; }); @@ -266,12 +263,24 @@ void CPPScheduler::Impl::Thread::worker_thread() _current_exception = nullptr; - // Time to exit - if(_workloads == nullptr) + // Exit if the worker thread has not been fed with workloads + if (_workloads == nullptr || _feeder == nullptr) { return; } + // Wake up more peer threads from thread pool if this job has been delegated to the current thread + if (_thread_pool != nullptr) + { + auto thread_it = _thread_pool->begin(); + std::advance(thread_it, std::min(static_cast<unsigned int>(_thread_pool->size()), _wake_beg)); + auto wake_end = std::min(_wake_end, static_cast<unsigned int>(_info.num_threads - 1)); + for (unsigned int t = _wake_beg; t < wake_end; ++t, ++thread_it) + { + thread_it->start(); + } + } + #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED try { @@ -280,19 +289,142 @@ void CPPScheduler::Impl::Thread::worker_thread() #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED } - catch(...) + catch (...) { _current_exception = std::current_exception(); } #endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */ + _workloads = nullptr; _job_complete = true; lock.unlock(); _cv.notify_one(); } } +} //namespace + +struct CPPScheduler::Impl final +{ + constexpr static unsigned int m_default_wake_fanout = 4; + enum class Mode + { + Linear, + Fanout + }; + enum class ModeToggle + { + None, + Linear, + Fanout + }; + explicit Impl(unsigned int thread_hint) + : _num_threads(thread_hint), _threads(_num_threads - 1), _mode(Mode::Linear), _wake_fanout(0U) + { + const auto mode_env_v = utility::tolower(utility::getenv("ARM_COMPUTE_CPP_SCHEDULER_MODE")); + if (mode_env_v == "linear") + { + _forced_mode = ModeToggle::Linear; + } + else if (mode_env_v == "fanout") + { + _forced_mode = ModeToggle::Fanout; + } + else + { + _forced_mode = ModeToggle::None; + } + } + void set_num_threads(unsigned int num_threads, unsigned int thread_hint) + { + _num_threads = num_threads == 0 ? thread_hint : num_threads; + _threads.resize(_num_threads - 1); + auto_switch_mode(_num_threads); + } + void set_num_threads_with_affinity(unsigned int num_threads, unsigned int thread_hint, BindFunc func) + { + _num_threads = num_threads == 0 ? thread_hint : num_threads; + + // Set affinity on main thread + set_thread_affinity(func(0, thread_hint)); + + // Set affinity on worked threads + _threads.clear(); + for (auto i = 1U; i < _num_threads; ++i) + { + _threads.emplace_back(func(i, thread_hint)); + } + auto_switch_mode(_num_threads); + } + void auto_switch_mode(unsigned int num_threads_to_use) + { + // If the environment variable is set to any of the modes, it overwrites the mode selected over num_threads_to_use + if (_forced_mode == ModeToggle::Fanout || (_forced_mode == ModeToggle::None && num_threads_to_use > 8)) + { + set_fanout_mode(m_default_wake_fanout, num_threads_to_use); + ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE( + "Set CPPScheduler to Fanout mode, with wake up fanout : %d and %d threads to use\n", + this->wake_fanout(), num_threads_to_use); + } + else // Equivalent to (_forced_mode == ModeToggle::Linear || (_forced_mode == ModeToggle::None && num_threads_to_use <= 8)) + { + set_linear_mode(); + ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE("Set CPPScheduler to Linear mode, with %d threads to use\n", + num_threads_to_use); + } + } + void set_linear_mode() + { + for (auto &thread : _threads) + { + thread.set_linear_mode(); + } + _mode = Mode::Linear; + _wake_fanout = 0U; + } + void set_fanout_mode(unsigned int wake_fanout, unsigned int num_threads_to_use) + { + ARM_COMPUTE_ERROR_ON(num_threads_to_use > _threads.size() + 1); + const auto actual_wake_fanout = std::max(2U, std::min(wake_fanout, num_threads_to_use - 1)); + auto thread_it = _threads.begin(); + for (auto i = 1U; i < num_threads_to_use; ++i, ++thread_it) + { + const auto wake_begin = i * actual_wake_fanout - 1; + const auto wake_end = std::min((i + 1) * actual_wake_fanout - 1, num_threads_to_use - 1); + thread_it->set_fanout_mode(&_threads, wake_begin, wake_end); + } + // Reset the remaining threads's wake up schedule + while (thread_it != _threads.end()) + { + thread_it->set_fanout_mode(&_threads, 0U, 0U); + ++thread_it; + } + _mode = Mode::Fanout; + _wake_fanout = actual_wake_fanout; + } + unsigned int num_threads() const + { + return _num_threads; + } + unsigned int wake_fanout() const + { + return _wake_fanout; + } + Mode mode() const + { + return _mode; + } + + void run_workloads(std::vector<IScheduler::Workload> &workloads); + + unsigned int _num_threads; + std::list<Thread> _threads; + arm_compute::Mutex _run_workloads_mutex{}; + Mode _mode{Mode::Linear}; + ModeToggle _forced_mode{ModeToggle::None}; + unsigned int _wake_fanout{0}; +}; /* - * This singleton has been deprecated and will be removed in the next release + * This singleton has been deprecated and will be removed in future releases */ CPPScheduler &CPPScheduler::get() { @@ -300,8 +432,7 @@ CPPScheduler &CPPScheduler::get() return scheduler; } -CPPScheduler::CPPScheduler() - : _impl(support::cpp14::make_unique<Impl>(num_threads_hint())) +CPPScheduler::CPPScheduler() : _impl(std::make_unique<Impl>(num_threads_hint())) { } @@ -314,6 +445,13 @@ void CPPScheduler::set_num_threads(unsigned int num_threads) _impl->set_num_threads(num_threads, num_threads_hint()); } +void CPPScheduler::set_num_threads_with_affinity(unsigned int num_threads, BindFunc func) +{ + // No changes in the number of threads while current workloads are running + arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex); + _impl->set_num_threads_with_affinity(num_threads, num_threads_hint(), func); +} + unsigned int CPPScheduler::num_threads() const { return _impl->num_threads(); @@ -327,137 +465,93 @@ void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads) // This is not great because different threads workloads won't run in parallel but at least they // won't interfere each other and deadlock. arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex); - const unsigned int num_threads = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size())); - if(num_threads < 1) + const unsigned int num_threads_to_use = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size())); + if (num_threads_to_use < 1) { return; } - ThreadFeeder feeder(num_threads, workloads.size()); + // Re-adjust the mode if the actual number of threads to use is different from the number of threads created + _impl->auto_switch_mode(num_threads_to_use); + int num_threads_to_start = 0; + switch (_impl->mode()) + { + case CPPScheduler::Impl::Mode::Fanout: + { + num_threads_to_start = static_cast<int>(_impl->wake_fanout()) - 1; + break; + } + case CPPScheduler::Impl::Mode::Linear: + default: + { + num_threads_to_start = static_cast<int>(num_threads_to_use) - 1; + break; + } + } + ThreadFeeder feeder(num_threads_to_use, workloads.size()); ThreadInfo info; - info.cpu_info = &_cpu_info; - info.num_threads = num_threads; + info.cpu_info = &cpu_info(); + info.num_threads = num_threads_to_use; unsigned int t = 0; auto thread_it = _impl->_threads.begin(); - for(; t < num_threads - 1; ++t, ++thread_it) + // Set num_threads_to_use - 1 workloads to the threads as the remaining 1 is left to the main thread + for (; t < num_threads_to_use - 1; ++t, ++thread_it) { info.thread_id = t; - thread_it->start(&workloads, feeder, info); + thread_it->set_workload(&workloads, feeder, info); } - - info.thread_id = t; - process_workloads(workloads, feeder, info); + thread_it = _impl->_threads.begin(); + for (int i = 0; i < num_threads_to_start; ++i, ++thread_it) + { + thread_it->start(); + } + info.thread_id = t; // Set main thread's thread_id + std::exception_ptr last_exception = nullptr; #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED try { -#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */ - for(auto &thread : _impl->_threads) - { - thread.wait(); - } +#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */ + process_workloads(workloads, feeder, info); // Main thread processes workloads #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED } - catch(const std::system_error &e) + catch (...) { - std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n'; + last_exception = std::current_exception(); } -#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */ -} -#endif /* DOXYGEN_SKIP_THIS */ -void CPPScheduler::schedule(ICPPKernel *kernel, const Hints &hints) -{ - ARM_COMPUTE_ERROR_ON_MSG(!kernel, "The child class didn't set the kernel"); - - const Window &max_window = kernel->window(); - - if(hints.split_dimension() == IScheduler::split_dimensions_all) + try { - /* - * if the split dim is size_t max then this signals we should parallelise over - * all dimensions - */ - const std::size_t m = max_window.num_iterations(Window::DimX); - const std::size_t n = max_window.num_iterations(Window::DimY); - - //in c++17 this can be swapped for auto [ m_threads, n_threads ] = split_2d(... - unsigned m_threads, n_threads; - std::tie(m_threads, n_threads) = split_2d(_impl->_num_threads, m, n); - - std::vector<IScheduler::Workload> workloads; - for(unsigned int ni = 0; ni != n_threads; ++ni) +#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */ + thread_it = _impl->_threads.begin(); + for (unsigned int i = 0; i < num_threads_to_use - 1; ++i, ++thread_it) { - for(unsigned int mi = 0; mi != m_threads; ++mi) + std::exception_ptr current_exception = thread_it->wait(); + if (current_exception) { - workloads.push_back( - [ ni, mi, m_threads, n_threads, &max_window, &kernel ] - (const ThreadInfo & info) - { - //narrow the window to our mi-ni workload - Window win = max_window.split_window(Window::DimX, mi, m_threads) - .split_window(Window::DimY, ni, n_threads); - - win.validate(); - - Window thread_locator; - thread_locator.set(Window::DimX, Window::Dimension(mi, m_threads)); - thread_locator.set(Window::DimY, Window::Dimension(ni, n_threads)); - - thread_locator.validate(); - - kernel->run_nd(win, info, thread_locator); - } - ); + last_exception = current_exception; } } - run_workloads(workloads); + if (last_exception) + { + std::rethrow_exception(last_exception); + } +#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED } - else + catch (const std::system_error &e) { - const unsigned int num_iterations = max_window.num_iterations(hints.split_dimension()); - const unsigned int num_threads = std::min(num_iterations, _impl->_num_threads); + std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n'; + } +#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */ +} +#endif /* DOXYGEN_SKIP_THIS */ - if(num_iterations == 0) - { - return; - } +void CPPScheduler::schedule_op(ICPPKernel *kernel, const Hints &hints, const Window &window, ITensorPack &tensors) +{ + schedule_common(kernel, hints, window, tensors); +} - if(!kernel->is_parallelisable() || num_threads == 1) - { - ThreadInfo info; - info.cpu_info = &_cpu_info; - kernel->run(max_window, info); - } - else - { - unsigned int num_windows = 0; - switch(hints.strategy()) - { - case StrategyHint::STATIC: - num_windows = num_threads; - break; - case StrategyHint::DYNAMIC: - { - const unsigned int granule_threshold = (hints.threshold() <= 0) ? num_threads : static_cast<unsigned int>(hints.threshold()); - // Make sure we don't use some windows which are too small as this might create some contention on the ThreadFeeder - num_windows = num_iterations > granule_threshold ? granule_threshold : num_iterations; - break; - } - default: - ARM_COMPUTE_ERROR("Unknown strategy"); - } - std::vector<IScheduler::Workload> workloads(num_windows); - for(unsigned int t = 0; t < num_windows; t++) - { - //Capture 't' by copy, all the other variables by reference: - workloads[t] = [t, &hints, &max_window, &num_windows, &kernel](const ThreadInfo & info) - { - Window win = max_window.split_window(hints.split_dimension(), t, num_windows); - win.validate(); - kernel->run(win, info); - }; - } - run_workloads(workloads); - } - } +void CPPScheduler::schedule(ICPPKernel *kernel, const Hints &hints) +{ + ITensorPack tensors; + schedule_common(kernel, hints, kernel->window(), tensors); } } // namespace arm_compute |