aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/CPP
diff options
context:
space:
mode:
authorMoritz Pflanzer <moritz.pflanzer@arm.com>2017-09-08 13:48:23 +0100
committerAnthony Barbier <anthony.barbier@arm.com>2018-11-02 16:35:24 +0000
commitff06f202b94fa131861a7ff118e1b970e1351770 (patch)
tree76b069690d83ac805dd2303caa0934158add7a69 /src/runtime/CPP
parent24486d6554b30df4f9d2bcdf87f408baa742b42f (diff)
downloadComputeLibrary-ff06f202b94fa131861a7ff118e1b970e1351770.tar.gz
COMPMID-417: Cleanup CPPScheduler
Change-Id: I45028dc90db5c8c0ed1eba795d4652aa95305b48 Reviewed-on: http://mpd-gerrit.cambridge.arm.com/87053 Tested-by: Kaizen <jeremy.johnson+kaizengerrit@arm.com> Reviewed-by: Georgios Pinitas <georgios.pinitas@arm.com> Reviewed-by: Anthony Barbier <anthony.barbier@arm.com>
Diffstat (limited to 'src/runtime/CPP')
-rw-r--r--src/runtime/CPP/CPPScheduler.cpp145
1 files changed, 69 insertions, 76 deletions
diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp
index a8382b42bf..137c18bd50 100644
--- a/src/runtime/CPP/CPPScheduler.cpp
+++ b/src/runtime/CPP/CPPScheduler.cpp
@@ -28,76 +28,67 @@
#include "arm_compute/core/Helpers.h"
#include "arm_compute/core/Utils.h"
+#include <condition_variable>
#include <iostream>
+#include <mutex>
#include <semaphore.h>
#include <system_error>
#include <thread>
-using namespace arm_compute;
-
-class arm_compute::Thread
+namespace arm_compute
+{
+class Thread
{
public:
- /** Start a new thread
- */
+ /** Start a new thread. */
Thread();
+
Thread(const Thread &) = delete;
Thread &operator=(const Thread &) = delete;
Thread(Thread &&) = delete;
Thread &operator=(Thread &&) = delete;
- /** Make the thread join
- */
+
+ /** Destructor. Make the thread join. */
~Thread();
+
/** Request the worker thread to start executing the given kernel
* This function will return as soon as the kernel has been sent to the worker thread.
* wait() needs to be called to ensure the execution is complete.
*/
void start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info);
- /** Wait for the current kernel execution to complete
- */
+
+ /** Wait for the current kernel execution to complete. */
void wait();
- /** Function ran by the worker thread
- */
+
+ /** Function ran by the worker thread. */
void worker_thread();
private:
- std::thread _thread;
- ICPPKernel *_kernel{ nullptr };
- Window _window;
- ThreadInfo _info;
- sem_t _wait_for_work;
- sem_t _job_complete;
- std::exception_ptr _current_exception;
+ std::thread _thread;
+ ICPPKernel *_kernel{ nullptr };
+ Window _window;
+ ThreadInfo _info;
+ std::mutex _m;
+ std::condition_variable _cv;
+ bool _wait_for_work{ false };
+ bool _job_complete{ true };
+ std::exception_ptr _current_exception;
};
Thread::Thread()
- : _thread(), _window(), _info(), _wait_for_work(), _job_complete(), _current_exception(nullptr)
+ : _thread(), _window(), _info(), _m(), _cv(), _current_exception(nullptr)
{
- int ret = sem_init(&_wait_for_work, 0, 0);
- ARM_COMPUTE_ERROR_ON(ret < 0);
- ARM_COMPUTE_UNUSED(ret);
-
- ret = sem_init(&_job_complete, 0, 0);
- ARM_COMPUTE_ERROR_ON(ret < 0);
- ARM_COMPUTE_UNUSED(ret);
-
_thread = std::thread(&Thread::worker_thread, this);
}
Thread::~Thread()
{
- ARM_COMPUTE_ERROR_ON(!_thread.joinable());
-
- start(nullptr, Window(), ThreadInfo());
- _thread.join();
-
- int ret = sem_destroy(&_wait_for_work);
- ARM_COMPUTE_ERROR_ON(ret < 0);
- ARM_COMPUTE_UNUSED(ret);
-
- ret = sem_destroy(&_job_complete);
- ARM_COMPUTE_ERROR_ON(ret < 0);
- ARM_COMPUTE_UNUSED(ret);
+ // Make sure worker thread has ended
+ if(_thread.joinable())
+ {
+ start(nullptr, Window(), ThreadInfo());
+ _thread.join();
+ }
}
void Thread::start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info)
@@ -105,16 +96,22 @@ void Thread::start(ICPPKernel *kernel, const Window &window, const ThreadInfo &i
_kernel = kernel;
_window = window;
_info = info;
- int ret = sem_post(&_wait_for_work);
- ARM_COMPUTE_UNUSED(ret);
- ARM_COMPUTE_ERROR_ON(ret < 0);
+
+ {
+ std::lock_guard<std::mutex> lock(_m);
+ _wait_for_work = true;
+ _job_complete = false;
+ }
+ _cv.notify_one();
}
void Thread::wait()
{
- int ret = sem_wait(&_job_complete);
- ARM_COMPUTE_UNUSED(ret);
- ARM_COMPUTE_ERROR_ON(ret < 0);
+ {
+ std::unique_lock<std::mutex> lock(_m);
+ _cv.wait(lock, [&] { return _job_complete; });
+ }
+
if(_current_exception)
{
std::rethrow_exception(_current_exception);
@@ -123,9 +120,14 @@ void Thread::wait()
void Thread::worker_thread()
{
- while(sem_wait(&_wait_for_work) >= 0)
+ while(true)
{
+ std::unique_lock<std::mutex> lock(_m);
+ _cv.wait(lock, [&] { return _wait_for_work; });
+ _wait_for_work = false;
+
_current_exception = nullptr;
+
// Time to exit
if(_kernel == nullptr)
{
@@ -141,21 +143,12 @@ void Thread::worker_thread()
{
_current_exception = std::current_exception();
}
- int ret = sem_post(&_job_complete);
- ARM_COMPUTE_UNUSED(ret);
- ARM_COMPUTE_ERROR_ON(ret < 0);
- }
- ARM_COMPUTE_ERROR("Wait failed");
-}
-
-namespace
-{
-void delete_threads(Thread *t)
-{
- delete[] t;
+ _job_complete = true;
+ lock.unlock();
+ _cv.notify_one();
+ }
}
-} // namespace
CPPScheduler &CPPScheduler::get()
{
@@ -165,15 +158,14 @@ CPPScheduler &CPPScheduler::get()
CPPScheduler::CPPScheduler()
: _num_threads(std::thread::hardware_concurrency()),
- _threads(std::unique_ptr<Thread[], void(*)(Thread *)>(new Thread[_num_threads - 1], delete_threads))
+ _threads(_num_threads - 1)
{
}
void CPPScheduler::set_num_threads(unsigned int num_threads)
{
- const unsigned int num_cores = std::thread::hardware_concurrency();
- _num_threads = num_threads == 0 ? num_cores : num_threads;
- _threads.reset(new Thread[_num_threads - 1]);
+ _num_threads = num_threads == 0 ? std::thread::hardware_concurrency() : num_threads;
+ _threads.resize(_num_threads - 1);
}
unsigned int CPPScheduler::num_threads() const
@@ -199,32 +191,33 @@ void CPPScheduler::schedule(ICPPKernel *kernel, unsigned int split_dimension)
}
else
{
- for(int t = 0; t < info.num_threads; ++t)
+ int t = 0;
+ auto thread_it = _threads.begin();
+
+ for(; t < info.num_threads - 1; ++t, ++thread_it)
{
Window win = max_window.split_window(split_dimension, t, info.num_threads);
info.thread_id = t;
-
- if(t != info.num_threads - 1)
- {
- _threads[t].start(kernel, win, info);
- }
- else
- {
- kernel->run(win, info);
- }
+ thread_it->start(kernel, win, info);
}
+ // Run last part on main thread
+ Window win = max_window.split_window(split_dimension, t, info.num_threads);
+ info.thread_id = t;
+ kernel->run(win, info);
+
try
{
- for(int t = 1; t < info.num_threads; ++t)
+ for(auto &thread : _threads)
{
- _threads[t - 1].wait();
+ thread.wait();
}
}
catch(const std::system_error &e)
{
- std::cout << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
+ std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
}
}
/** [Scheduler example] */
}
+} // namespace arm_compute