diff options
Diffstat (limited to 'src/runtime')
-rw-r--r-- | src/runtime/CPP/CPPScheduler.cpp | 212 |
1 files changed, 195 insertions, 17 deletions
diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp index a26b580b97..3bd80eb51d 100644 --- a/src/runtime/CPP/CPPScheduler.cpp +++ b/src/runtime/CPP/CPPScheduler.cpp @@ -26,7 +26,9 @@ #include "arm_compute/core/CPP/ICPPKernel.h" #include "arm_compute/core/Error.h" #include "arm_compute/core/Helpers.h" +#include "arm_compute/core/Log.h" #include "arm_compute/core/Utils.h" +#include "arm_compute/core/utils/misc/Utility.h" #include "src/runtime/CPUUtils.h" #include "support/Mutex.h" @@ -38,6 +40,7 @@ #include <mutex> #include <system_error> #include <thread> +#include <vector> namespace arm_compute { @@ -91,6 +94,10 @@ void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeede while(feeder.get_next(workload_index)); } +/** Set thread affinity. Pin current thread to a particular core + * + * @param[in] core_id ID of the core to which the current thread is pinned + */ void set_thread_affinity(int core_id) { if(core_id < 0) @@ -106,6 +113,33 @@ void set_thread_affinity(int core_id) #endif /* !defined(__APPLE__) */ } +/** There are currently 2 scheduling modes supported by CPPScheduler + * + * Linear: + * The default mode where all the scheduling is carried out by the main thread linearly (in a loop). + * E.G. If there are 8 threads in total, there will be 1 main thread + 7 threads in the thread pool, and it is main + * thread's responsibility to start all the other threads in the thread pool. + * + * Fanout: + * In fanout mode, the scheduling (starting other threads) task is distributed across many threads instead of just + * the main thread. + * + * The scheduler has a fixed parameter: wake_fanout, and the scheduling sequence goes like this: + * 1. Main thread wakes the first wake_fanout - 1 number of FanoutThreads from the thread pool + * From thread: 0 + * To thread (non-inclusive): Wake_fanout - 1 + * 2. Each FanoutThread then wakes wake_fanout number of FanoutThreads from the thread pool: + * From thread: (i + 1) * wake_fanout - 1 + * To thread (non-inclusive): (i + 2) * wake_fanout - 1 + * where i is the current thread's thread id + * The end is clamped at the size of the thread pool / the number of threads in use - 1 + * + * E.G. for a total number of 8 threads (1 main thread, 7 FanoutThreads in thread pool) with a fanout of 3 + * 1. Main thread wakes FanoutThread 0, 1 + * 2. FanoutThread 0 wakes FanoutThread 2, 3, 4 + * 3. FanoutThread 1 wakes FanoutThread 5, 6 + */ + class Thread final { public: @@ -125,6 +159,9 @@ public: /** Destructor. Make the thread join. */ ~Thread(); + /** Set workloads */ + void set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info); + /** Request the worker thread to start executing workloads. * * The thread will start by executing workloads[info.thread_id] and will then call the feeder to @@ -133,7 +170,7 @@ public: * @note This function will return as soon as the workloads have been sent to the worker thread. * wait() needs to be called to ensure the execution is complete. */ - void start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info); + void start(); /** Wait for the current kernel execution to complete. */ void wait(); @@ -141,6 +178,22 @@ public: /** Function ran by the worker thread. */ void worker_thread(); + /** Set the scheduling strategy to be linear */ + void set_linear_mode() + { + _thread_pool = nullptr; + _wake_beg = 0; + _wake_end = 0; + } + + /** Set the scheduling strategy to be fanout */ + void set_fanout_mode(std::list<Thread> *thread_pool, unsigned int wake_beg, unsigned int wake_end) + { + _thread_pool = thread_pool; + _wake_beg = wake_beg; + _wake_end = wake_end; + } + private: std::thread _thread{}; ThreadInfo _info{}; @@ -152,6 +205,9 @@ private: bool _job_complete{ true }; std::exception_ptr _current_exception{ nullptr }; int _core_pin{ -1 }; + std::list<Thread> *_thread_pool{ nullptr }; + unsigned int _wake_beg{ 0 }; + unsigned int _wake_end{ 0 }; }; Thread::Thread(int core_pin) @@ -166,16 +222,21 @@ Thread::~Thread() if(_thread.joinable()) { ThreadFeeder feeder; - start(nullptr, feeder, ThreadInfo()); + set_workload(nullptr, feeder, ThreadInfo()); + start(); _thread.join(); } } -void Thread::start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info) +void Thread::set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info) { _workloads = workloads; _feeder = &feeder; _info = info; +} + +void Thread::start() +{ { std::lock_guard<std::mutex> lock(_m); _wait_for_work = true; @@ -209,12 +270,24 @@ void Thread::worker_thread() _current_exception = nullptr; - // Time to exit - if(_workloads == nullptr) + // Exit if the worker thread has not been fed with workloads + if(_workloads == nullptr || _feeder == nullptr) { return; } + // Wake up more peer threads from thread pool if this job has been delegated to the current thread + if(_thread_pool != nullptr) + { + auto thread_it = _thread_pool->begin(); + std::advance(thread_it, std::min(static_cast<unsigned int>(_thread_pool->size()), _wake_beg)); + auto wake_end = std::min(_wake_end, static_cast<unsigned int>(_info.num_threads - 1)); + for(unsigned int t = _wake_beg; t < wake_end; ++t, ++thread_it) + { + thread_it->start(); + } + } + #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED try { @@ -228,6 +301,7 @@ void Thread::worker_thread() _current_exception = std::current_exception(); } #endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */ + _workloads = nullptr; _job_complete = true; lock.unlock(); _cv.notify_one(); @@ -237,14 +311,40 @@ void Thread::worker_thread() struct CPPScheduler::Impl final { + constexpr static unsigned int m_default_wake_fanout = 4; + enum class Mode + { + Linear, + Fanout + }; + enum class ModeToggle + { + None, + Linear, + Fanout + }; explicit Impl(unsigned int thread_hint) - : _num_threads(thread_hint), _threads(_num_threads - 1) + : _num_threads(thread_hint), _threads(_num_threads - 1), _mode(Mode::Linear), _wake_fanout(0U) { + const auto mode_env_v = utility::tolower(utility::getenv("ARM_COMPUTE_CPP_SCHEDULER_MODE")); + if(mode_env_v == "linear") + { + _forced_mode = ModeToggle::Linear; + } + else if(mode_env_v == "fanout") + { + _forced_mode = ModeToggle::Fanout; + } + else + { + _forced_mode = ModeToggle::None; + } } void set_num_threads(unsigned int num_threads, unsigned int thread_hint) { _num_threads = num_threads == 0 ? thread_hint : num_threads; _threads.resize(_num_threads - 1); + auto_switch_mode(_num_threads); } void set_num_threads_with_affinity(unsigned int num_threads, unsigned int thread_hint, BindFunc func) { @@ -259,17 +359,72 @@ struct CPPScheduler::Impl final { _threads.emplace_back(func(i, thread_hint)); } + auto_switch_mode(_num_threads); + } + void auto_switch_mode(unsigned int num_threads_to_use) + { + // If the environment variable is set to any of the modes, it overwrites the mode selected over num_threads_to_use + if(_forced_mode == ModeToggle::Fanout || (_forced_mode == ModeToggle::None && num_threads_to_use > 8)) + { + set_fanout_mode(m_default_wake_fanout, num_threads_to_use); + ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE("Set CPPScheduler to Fanout mode, with wake up fanout : %d and %d threads to use\n", this->wake_fanout(), num_threads_to_use); + } + else // Equivalent to (_forced_mode == ModeToggle::Linear || (_forced_mode == ModeToggle::None && num_threads_to_use <= 8)) + { + set_linear_mode(); + ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE("Set CPPScheduler to Linear mode, with %d threads to use\n", num_threads_to_use); + } + } + void set_linear_mode() + { + for(auto &thread : _threads) + { + thread.set_linear_mode(); + } + _mode = Mode::Linear; + _wake_fanout = 0U; + } + void set_fanout_mode(unsigned int wake_fanout, unsigned int num_threads_to_use) + { + ARM_COMPUTE_ERROR_ON(num_threads_to_use > _threads.size() + 1); + const auto actual_wake_fanout = std::max(2U, std::min(wake_fanout, num_threads_to_use - 1)); + auto thread_it = _threads.begin(); + for(auto i = 1U; i < num_threads_to_use; ++i, ++thread_it) + { + const auto wake_begin = i * actual_wake_fanout - 1; + const auto wake_end = std::min((i + 1) * actual_wake_fanout - 1, num_threads_to_use - 1); + thread_it->set_fanout_mode(&_threads, wake_begin, wake_end); + } + // Reset the remaining threads's wake up schedule + while(thread_it != _threads.end()) + { + thread_it->set_fanout_mode(&_threads, 0U, 0U); + ++thread_it; + } + _mode = Mode::Fanout; + _wake_fanout = actual_wake_fanout; } unsigned int num_threads() const { return _num_threads; } + unsigned int wake_fanout() const + { + return _wake_fanout; + } + Mode mode() const + { + return _mode; + } void run_workloads(std::vector<IScheduler::Workload> &workloads); unsigned int _num_threads; std::list<Thread> _threads; arm_compute::Mutex _run_workloads_mutex{}; + Mode _mode{ Mode::Linear }; + ModeToggle _forced_mode{ ModeToggle::None }; + unsigned int _wake_fanout{ 0 }; }; /* @@ -315,32 +470,55 @@ void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads) // This is not great because different threads workloads won't run in parallel but at least they // won't interfere each other and deadlock. arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex); - const unsigned int num_threads = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size())); - if(num_threads < 1) + const unsigned int num_threads_to_use = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size())); + if(num_threads_to_use < 1) { return; } - ThreadFeeder feeder(num_threads, workloads.size()); + // Re-adjust the mode if the actual number of threads to use is different from the number of threads created + _impl->auto_switch_mode(num_threads_to_use); + int num_threads_to_start = 0; + switch(_impl->mode()) + { + case CPPScheduler::Impl::Mode::Fanout: + { + num_threads_to_start = static_cast<int>(_impl->wake_fanout()) - 1; + break; + } + case CPPScheduler::Impl::Mode::Linear: + default: + { + num_threads_to_start = static_cast<int>(num_threads_to_use) - 1; + break; + } + } + ThreadFeeder feeder(num_threads_to_use, workloads.size()); ThreadInfo info; info.cpu_info = &_cpu_info; - info.num_threads = num_threads; + info.num_threads = num_threads_to_use; unsigned int t = 0; auto thread_it = _impl->_threads.begin(); - for(; t < num_threads - 1; ++t, ++thread_it) + // Set num_threads_to_use - 1 workloads to the threads as the remaining 1 is left to the main thread + for(; t < num_threads_to_use - 1; ++t, ++thread_it) { info.thread_id = t; - thread_it->start(&workloads, feeder, info); + thread_it->set_workload(&workloads, feeder, info); } - - info.thread_id = t; - process_workloads(workloads, feeder, info); + thread_it = _impl->_threads.begin(); + for(int i = 0; i < num_threads_to_start; ++i, ++thread_it) + { + thread_it->start(); + } + info.thread_id = t; // Set main thread's thread_id + process_workloads(workloads, feeder, info); // Main thread processes workloads #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED try { #endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */ - for(auto &thread : _impl->_threads) + thread_it = _impl->_threads.begin(); + for(unsigned int i = 0; i < num_threads_to_use - 1; ++i, ++thread_it) { - thread.wait(); + thread_it->wait(); } #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED } |