From 52ecb06b5627902a2f4514fba977e98454af4872 Mon Sep 17 00:00:00 2001 From: Anthony Barbier Date: Fri, 25 May 2018 13:32:10 +0100 Subject: COMPMID-1180: Add support for bucket multi-threading (Part 1) - Add an entry point to allow the user to parallelise an arbitrary queue of workloads (Will be used to interleave GEMM / BufferManager) - Added a ThreadFeeder which acts as a thread-safe work distributor Change-Id: I3a84fb7446c453cfcd337e21338c2ccf9f29f7b3 Note: This patch doesn't introduce any change in the default strategy, therefore it shouldn't have any impact on the performance Reviewed-on: https://eu-gerrit-1.euhpc.arm.com/133058 Tested-by: Jenkins Reviewed-by: Georgios Pinitas --- src/runtime/CPP/CPPScheduler.cpp | 192 +++++++++++++++++++++--------- src/runtime/CPP/SingleThreadScheduler.cpp | 9 ++ 2 files changed, 144 insertions(+), 57 deletions(-) (limited to 'src/runtime/CPP') diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp index 92dce34c71..ab91b1071c 100644 --- a/src/runtime/CPP/CPPScheduler.cpp +++ b/src/runtime/CPP/CPPScheduler.cpp @@ -37,7 +37,66 @@ namespace arm_compute { -class Thread +namespace +{ +class ThreadFeeder +{ +public: + /** Constructor + * + * @param[in] start First value that will be returned by the feeder + * @param[in] end End condition (The last value returned by get_next() will be end - 1) + */ + explicit ThreadFeeder(unsigned int start = 0, unsigned int end = 0) + : _current(start), _end(end), _m() + { + } + /** Return the next element in the range if there is one. + * + * @param[out] next Will contain the next element if there is one. + * + * @return False if the end of the range has been reached and next wasn't set. + */ + bool get_next(unsigned int &next) + { + std::lock_guard lock(_m); + if(_current < _end) + { + next = _current; + _current++; + return true; + } + return false; + } + +private: + unsigned int _current; + const unsigned int _end; + std::mutex _m; +}; + +/** Execute workloads[info.thread_id] first, then call the feeder to get the index of the next workload to run. + * + * Will run workloads until the feeder reaches the end of its range. + * + * @param[in] workloads The array of workloads + * @param[in,out] feeder The feeder indicating which workload to execute next. + * @param[in] info Threading and CPU info. + */ +void process_workloads(std::vector &workloads, ThreadFeeder &feeder, const ThreadInfo &info) +{ + unsigned int workload_index = info.thread_id; + do + { + ARM_COMPUTE_ERROR_ON(workload_index >= workloads.size()); + workloads[workload_index](info); + } + while(feeder.get_next(workload_index)); +} + +} //namespace + +class CPPScheduler::Thread { public: /** Start a new thread. */ @@ -51,11 +110,15 @@ public: /** Destructor. Make the thread join. */ ~Thread(); - /** Request the worker thread to start executing the given kernel - * This function will return as soon as the kernel has been sent to the worker thread. + /** Request the worker thread to start executing workloads. + * + * The thread will start by executing workloads[info.thread_id] and will then call the feeder to + * get the index of the following workload to run. + * + * @note This function will return as soon as the workloads have been sent to the worker thread. * wait() needs to be called to ensure the execution is complete. */ - void start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info); + void start(std::vector *workloads, ThreadFeeder &feeder, const ThreadInfo &info); /** Wait for the current kernel execution to complete. */ void wait(); @@ -64,39 +127,38 @@ public: void worker_thread(); private: - std::thread _thread; - ICPPKernel *_kernel{ nullptr }; - Window _window; - ThreadInfo _info; - std::mutex _m; - std::condition_variable _cv; - bool _wait_for_work{ false }; - bool _job_complete{ true }; - std::exception_ptr _current_exception; + std::thread _thread{}; + ThreadInfo _info{}; + std::vector *_workloads{ nullptr }; + ThreadFeeder *_feeder{ nullptr }; + std::mutex _m{}; + std::condition_variable _cv{}; + bool _wait_for_work{ false }; + bool _job_complete{ true }; + std::exception_ptr _current_exception{ nullptr }; }; -Thread::Thread() - : _thread(), _window(), _info(), _m(), _cv(), _current_exception(nullptr) +CPPScheduler::Thread::Thread() { _thread = std::thread(&Thread::worker_thread, this); } -Thread::~Thread() +CPPScheduler::Thread::~Thread() { // Make sure worker thread has ended if(_thread.joinable()) { - start(nullptr, Window(), ThreadInfo()); + ThreadFeeder feeder; + start(nullptr, feeder, ThreadInfo()); _thread.join(); } } -void Thread::start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info) +void CPPScheduler::Thread::start(std::vector *workloads, ThreadFeeder &feeder, const ThreadInfo &info) { - _kernel = kernel; - _window = window; - _info = info; - + _workloads = workloads; + _feeder = &feeder; + _info = info; { std::lock_guard lock(_m); _wait_for_work = true; @@ -105,7 +167,7 @@ void Thread::start(ICPPKernel *kernel, const Window &window, const ThreadInfo &i _cv.notify_one(); } -void Thread::wait() +void CPPScheduler::Thread::wait() { { std::unique_lock lock(_m); @@ -118,7 +180,7 @@ void Thread::wait() } } -void Thread::worker_thread() +void CPPScheduler::Thread::worker_thread() { while(true) { @@ -129,15 +191,14 @@ void Thread::worker_thread() _current_exception = nullptr; // Time to exit - if(_kernel == nullptr) + if(_workloads == nullptr) { return; } try { - _window.validate(); - _kernel->run(_window, _info); + process_workloads(*_workloads, *_feeder, _info); } catch(...) { @@ -174,56 +235,73 @@ unsigned int CPPScheduler::num_threads() const return _num_threads; } +void CPPScheduler::run_workloads(std::vector &workloads) +{ + const unsigned int num_threads = std::min(_num_threads, static_cast(workloads.size())); + if(num_threads < 1) + { + return; + } + ThreadFeeder feeder(num_threads, workloads.size()); + ThreadInfo info; + info.cpu_info = &_cpu_info; + info.num_threads = num_threads; + unsigned int t = 0; + auto thread_it = _threads.begin(); + for(; t < num_threads - 1; ++t, ++thread_it) + { + info.thread_id = t; + thread_it->start(&workloads, feeder, info); + } + + info.thread_id = t; + process_workloads(workloads, feeder, info); + + try + { + for(auto &thread : _threads) + { + thread.wait(); + } + } + catch(const std::system_error &e) + { + std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n'; + } +} + void CPPScheduler::schedule(ICPPKernel *kernel, unsigned int split_dimension) { ARM_COMPUTE_ERROR_ON_MSG(!kernel, "The child class didn't set the kernel"); - /** [Scheduler example] */ - ThreadInfo info; - info.cpu_info = &_cpu_info; - const Window &max_window = kernel->window(); const unsigned int num_iterations = max_window.num_iterations(split_dimension); - info.num_threads = std::min(num_iterations, _num_threads); + const unsigned int num_threads = std::min(num_iterations, _num_threads); if(num_iterations == 0) { return; } - if(!kernel->is_parallelisable() || info.num_threads == 1) + if(!kernel->is_parallelisable() || num_threads == 1) { + ThreadInfo info; + info.cpu_info = &_cpu_info; kernel->run(max_window, info); } else { - int t = 0; - auto thread_it = _threads.begin(); - - for(; t < info.num_threads - 1; ++t, ++thread_it) + std::vector workloads(num_threads); + for(unsigned int t = 0; t < num_threads; t++) { - Window win = max_window.split_window(split_dimension, t, info.num_threads); - info.thread_id = t; - thread_it->start(kernel, win, info); - } - - // Run last part on main thread - Window win = max_window.split_window(split_dimension, t, info.num_threads); - info.thread_id = t; - kernel->run(win, info); - - try - { - for(auto &thread : _threads) + workloads[t] = [&](const ThreadInfo & info) { - thread.wait(); - } - } - catch(const std::system_error &e) - { - std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n'; + Window win = max_window.split_window(split_dimension, info.thread_id, info.num_threads); + win.validate(); + kernel->run(win, info); + }; } + run_workloads(workloads); } - /** [Scheduler example] */ } } // namespace arm_compute diff --git a/src/runtime/CPP/SingleThreadScheduler.cpp b/src/runtime/CPP/SingleThreadScheduler.cpp index 2adc14ce80..6099e2cab5 100644 --- a/src/runtime/CPP/SingleThreadScheduler.cpp +++ b/src/runtime/CPP/SingleThreadScheduler.cpp @@ -49,6 +49,15 @@ void SingleThreadScheduler::schedule(ICPPKernel *kernel, unsigned int split_dime kernel->run(kernel->window(), info); } +void SingleThreadScheduler::run_workloads(std::vector &workloads) +{ + ThreadInfo info; + info.cpu_info = &_cpu_info; + for(auto &wl : workloads) + { + wl(info); + } +} unsigned int SingleThreadScheduler::num_threads() const { return 1; -- cgit v1.2.1