From ff06f202b94fa131861a7ff118e1b970e1351770 Mon Sep 17 00:00:00 2001 From: Moritz Pflanzer Date: Fri, 8 Sep 2017 13:48:23 +0100 Subject: COMPMID-417: Cleanup CPPScheduler Change-Id: I45028dc90db5c8c0ed1eba795d4652aa95305b48 Reviewed-on: http://mpd-gerrit.cambridge.arm.com/87053 Tested-by: Kaizen Reviewed-by: Georgios Pinitas Reviewed-by: Anthony Barbier --- src/runtime/CPP/CPPScheduler.cpp | 145 +++++++++++++++++++-------------------- 1 file changed, 69 insertions(+), 76 deletions(-) (limited to 'src/runtime/CPP/CPPScheduler.cpp') diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp index a8382b42bf..137c18bd50 100644 --- a/src/runtime/CPP/CPPScheduler.cpp +++ b/src/runtime/CPP/CPPScheduler.cpp @@ -28,76 +28,67 @@ #include "arm_compute/core/Helpers.h" #include "arm_compute/core/Utils.h" +#include #include +#include #include #include #include -using namespace arm_compute; - -class arm_compute::Thread +namespace arm_compute +{ +class Thread { public: - /** Start a new thread - */ + /** Start a new thread. */ Thread(); + Thread(const Thread &) = delete; Thread &operator=(const Thread &) = delete; Thread(Thread &&) = delete; Thread &operator=(Thread &&) = delete; - /** Make the thread join - */ + + /** Destructor. Make the thread join. */ ~Thread(); + /** Request the worker thread to start executing the given kernel * This function will return as soon as the kernel has been sent to the worker thread. * wait() needs to be called to ensure the execution is complete. */ void start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info); - /** Wait for the current kernel execution to complete - */ + + /** Wait for the current kernel execution to complete. */ void wait(); - /** Function ran by the worker thread - */ + + /** Function ran by the worker thread. */ void worker_thread(); private: - std::thread _thread; - ICPPKernel *_kernel{ nullptr }; - Window _window; - ThreadInfo _info; - sem_t _wait_for_work; - sem_t _job_complete; - std::exception_ptr _current_exception; + std::thread _thread; + ICPPKernel *_kernel{ nullptr }; + Window _window; + ThreadInfo _info; + std::mutex _m; + std::condition_variable _cv; + bool _wait_for_work{ false }; + bool _job_complete{ true }; + std::exception_ptr _current_exception; }; Thread::Thread() - : _thread(), _window(), _info(), _wait_for_work(), _job_complete(), _current_exception(nullptr) + : _thread(), _window(), _info(), _m(), _cv(), _current_exception(nullptr) { - int ret = sem_init(&_wait_for_work, 0, 0); - ARM_COMPUTE_ERROR_ON(ret < 0); - ARM_COMPUTE_UNUSED(ret); - - ret = sem_init(&_job_complete, 0, 0); - ARM_COMPUTE_ERROR_ON(ret < 0); - ARM_COMPUTE_UNUSED(ret); - _thread = std::thread(&Thread::worker_thread, this); } Thread::~Thread() { - ARM_COMPUTE_ERROR_ON(!_thread.joinable()); - - start(nullptr, Window(), ThreadInfo()); - _thread.join(); - - int ret = sem_destroy(&_wait_for_work); - ARM_COMPUTE_ERROR_ON(ret < 0); - ARM_COMPUTE_UNUSED(ret); - - ret = sem_destroy(&_job_complete); - ARM_COMPUTE_ERROR_ON(ret < 0); - ARM_COMPUTE_UNUSED(ret); + // Make sure worker thread has ended + if(_thread.joinable()) + { + start(nullptr, Window(), ThreadInfo()); + _thread.join(); + } } void Thread::start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info) @@ -105,16 +96,22 @@ void Thread::start(ICPPKernel *kernel, const Window &window, const ThreadInfo &i _kernel = kernel; _window = window; _info = info; - int ret = sem_post(&_wait_for_work); - ARM_COMPUTE_UNUSED(ret); - ARM_COMPUTE_ERROR_ON(ret < 0); + + { + std::lock_guard lock(_m); + _wait_for_work = true; + _job_complete = false; + } + _cv.notify_one(); } void Thread::wait() { - int ret = sem_wait(&_job_complete); - ARM_COMPUTE_UNUSED(ret); - ARM_COMPUTE_ERROR_ON(ret < 0); + { + std::unique_lock lock(_m); + _cv.wait(lock, [&] { return _job_complete; }); + } + if(_current_exception) { std::rethrow_exception(_current_exception); @@ -123,9 +120,14 @@ void Thread::wait() void Thread::worker_thread() { - while(sem_wait(&_wait_for_work) >= 0) + while(true) { + std::unique_lock lock(_m); + _cv.wait(lock, [&] { return _wait_for_work; }); + _wait_for_work = false; + _current_exception = nullptr; + // Time to exit if(_kernel == nullptr) { @@ -141,21 +143,12 @@ void Thread::worker_thread() { _current_exception = std::current_exception(); } - int ret = sem_post(&_job_complete); - ARM_COMPUTE_UNUSED(ret); - ARM_COMPUTE_ERROR_ON(ret < 0); - } - ARM_COMPUTE_ERROR("Wait failed"); -} - -namespace -{ -void delete_threads(Thread *t) -{ - delete[] t; + _job_complete = true; + lock.unlock(); + _cv.notify_one(); + } } -} // namespace CPPScheduler &CPPScheduler::get() { @@ -165,15 +158,14 @@ CPPScheduler &CPPScheduler::get() CPPScheduler::CPPScheduler() : _num_threads(std::thread::hardware_concurrency()), - _threads(std::unique_ptr(new Thread[_num_threads - 1], delete_threads)) + _threads(_num_threads - 1) { } void CPPScheduler::set_num_threads(unsigned int num_threads) { - const unsigned int num_cores = std::thread::hardware_concurrency(); - _num_threads = num_threads == 0 ? num_cores : num_threads; - _threads.reset(new Thread[_num_threads - 1]); + _num_threads = num_threads == 0 ? std::thread::hardware_concurrency() : num_threads; + _threads.resize(_num_threads - 1); } unsigned int CPPScheduler::num_threads() const @@ -199,32 +191,33 @@ void CPPScheduler::schedule(ICPPKernel *kernel, unsigned int split_dimension) } else { - for(int t = 0; t < info.num_threads; ++t) + int t = 0; + auto thread_it = _threads.begin(); + + for(; t < info.num_threads - 1; ++t, ++thread_it) { Window win = max_window.split_window(split_dimension, t, info.num_threads); info.thread_id = t; - - if(t != info.num_threads - 1) - { - _threads[t].start(kernel, win, info); - } - else - { - kernel->run(win, info); - } + thread_it->start(kernel, win, info); } + // Run last part on main thread + Window win = max_window.split_window(split_dimension, t, info.num_threads); + info.thread_id = t; + kernel->run(win, info); + try { - for(int t = 1; t < info.num_threads; ++t) + for(auto &thread : _threads) { - _threads[t - 1].wait(); + thread.wait(); } } catch(const std::system_error &e) { - std::cout << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n'; + std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n'; } } /** [Scheduler example] */ } +} // namespace arm_compute -- cgit v1.2.1