aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/CPP
diff options
context:
space:
mode:
authorAnthony Barbier <anthony.barbier@arm.com>2018-05-25 13:32:10 +0100
committerAnthony Barbier <anthony.barbier@arm.com>2018-11-02 16:52:54 +0000
commit52ecb06b5627902a2f4514fba977e98454af4872 (patch)
tree7e66afcd6a81d2a5d7f886d5d2d0f4e27cc6c4d1 /src/runtime/CPP
parentdf473eab0ab8a52e6b58e0f6442b39ba4c1d68ea (diff)
downloadComputeLibrary-52ecb06b5627902a2f4514fba977e98454af4872.tar.gz
COMPMID-1180: Add support for bucket multi-threading (Part 1)
- Add an entry point to allow the user to parallelise an arbitrary queue of workloads (Will be used to interleave GEMM / BufferManager) - Added a ThreadFeeder which acts as a thread-safe work distributor Change-Id: I3a84fb7446c453cfcd337e21338c2ccf9f29f7b3 Note: This patch doesn't introduce any change in the default strategy, therefore it shouldn't have any impact on the performance Reviewed-on: https://eu-gerrit-1.euhpc.arm.com/133058 Tested-by: Jenkins <bsgcomp@arm.com> Reviewed-by: Georgios Pinitas <georgios.pinitas@arm.com>
Diffstat (limited to 'src/runtime/CPP')
-rw-r--r--src/runtime/CPP/CPPScheduler.cpp192
-rw-r--r--src/runtime/CPP/SingleThreadScheduler.cpp9
2 files changed, 144 insertions, 57 deletions
diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp
index 92dce34c71..ab91b1071c 100644
--- a/src/runtime/CPP/CPPScheduler.cpp
+++ b/src/runtime/CPP/CPPScheduler.cpp
@@ -37,7 +37,66 @@
namespace arm_compute
{
-class Thread
+namespace
+{
+class ThreadFeeder
+{
+public:
+ /** Constructor
+ *
+ * @param[in] start First value that will be returned by the feeder
+ * @param[in] end End condition (The last value returned by get_next() will be end - 1)
+ */
+ explicit ThreadFeeder(unsigned int start = 0, unsigned int end = 0)
+ : _current(start), _end(end), _m()
+ {
+ }
+ /** Return the next element in the range if there is one.
+ *
+ * @param[out] next Will contain the next element if there is one.
+ *
+ * @return False if the end of the range has been reached and next wasn't set.
+ */
+ bool get_next(unsigned int &next)
+ {
+ std::lock_guard<std::mutex> lock(_m);
+ if(_current < _end)
+ {
+ next = _current;
+ _current++;
+ return true;
+ }
+ return false;
+ }
+
+private:
+ unsigned int _current;
+ const unsigned int _end;
+ std::mutex _m;
+};
+
+/** Execute workloads[info.thread_id] first, then call the feeder to get the index of the next workload to run.
+ *
+ * Will run workloads until the feeder reaches the end of its range.
+ *
+ * @param[in] workloads The array of workloads
+ * @param[in,out] feeder The feeder indicating which workload to execute next.
+ * @param[in] info Threading and CPU info.
+ */
+void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeeder &feeder, const ThreadInfo &info)
+{
+ unsigned int workload_index = info.thread_id;
+ do
+ {
+ ARM_COMPUTE_ERROR_ON(workload_index >= workloads.size());
+ workloads[workload_index](info);
+ }
+ while(feeder.get_next(workload_index));
+}
+
+} //namespace
+
+class CPPScheduler::Thread
{
public:
/** Start a new thread. */
@@ -51,11 +110,15 @@ public:
/** Destructor. Make the thread join. */
~Thread();
- /** Request the worker thread to start executing the given kernel
- * This function will return as soon as the kernel has been sent to the worker thread.
+ /** Request the worker thread to start executing workloads.
+ *
+ * The thread will start by executing workloads[info.thread_id] and will then call the feeder to
+ * get the index of the following workload to run.
+ *
+ * @note This function will return as soon as the workloads have been sent to the worker thread.
* wait() needs to be called to ensure the execution is complete.
*/
- void start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info);
+ void start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info);
/** Wait for the current kernel execution to complete. */
void wait();
@@ -64,39 +127,38 @@ public:
void worker_thread();
private:
- std::thread _thread;
- ICPPKernel *_kernel{ nullptr };
- Window _window;
- ThreadInfo _info;
- std::mutex _m;
- std::condition_variable _cv;
- bool _wait_for_work{ false };
- bool _job_complete{ true };
- std::exception_ptr _current_exception;
+ std::thread _thread{};
+ ThreadInfo _info{};
+ std::vector<IScheduler::Workload> *_workloads{ nullptr };
+ ThreadFeeder *_feeder{ nullptr };
+ std::mutex _m{};
+ std::condition_variable _cv{};
+ bool _wait_for_work{ false };
+ bool _job_complete{ true };
+ std::exception_ptr _current_exception{ nullptr };
};
-Thread::Thread()
- : _thread(), _window(), _info(), _m(), _cv(), _current_exception(nullptr)
+CPPScheduler::Thread::Thread()
{
_thread = std::thread(&Thread::worker_thread, this);
}
-Thread::~Thread()
+CPPScheduler::Thread::~Thread()
{
// Make sure worker thread has ended
if(_thread.joinable())
{
- start(nullptr, Window(), ThreadInfo());
+ ThreadFeeder feeder;
+ start(nullptr, feeder, ThreadInfo());
_thread.join();
}
}
-void Thread::start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info)
+void CPPScheduler::Thread::start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info)
{
- _kernel = kernel;
- _window = window;
- _info = info;
-
+ _workloads = workloads;
+ _feeder = &feeder;
+ _info = info;
{
std::lock_guard<std::mutex> lock(_m);
_wait_for_work = true;
@@ -105,7 +167,7 @@ void Thread::start(ICPPKernel *kernel, const Window &window, const ThreadInfo &i
_cv.notify_one();
}
-void Thread::wait()
+void CPPScheduler::Thread::wait()
{
{
std::unique_lock<std::mutex> lock(_m);
@@ -118,7 +180,7 @@ void Thread::wait()
}
}
-void Thread::worker_thread()
+void CPPScheduler::Thread::worker_thread()
{
while(true)
{
@@ -129,15 +191,14 @@ void Thread::worker_thread()
_current_exception = nullptr;
// Time to exit
- if(_kernel == nullptr)
+ if(_workloads == nullptr)
{
return;
}
try
{
- _window.validate();
- _kernel->run(_window, _info);
+ process_workloads(*_workloads, *_feeder, _info);
}
catch(...)
{
@@ -174,56 +235,73 @@ unsigned int CPPScheduler::num_threads() const
return _num_threads;
}
+void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads)
+{
+ const unsigned int num_threads = std::min(_num_threads, static_cast<unsigned int>(workloads.size()));
+ if(num_threads < 1)
+ {
+ return;
+ }
+ ThreadFeeder feeder(num_threads, workloads.size());
+ ThreadInfo info;
+ info.cpu_info = &_cpu_info;
+ info.num_threads = num_threads;
+ unsigned int t = 0;
+ auto thread_it = _threads.begin();
+ for(; t < num_threads - 1; ++t, ++thread_it)
+ {
+ info.thread_id = t;
+ thread_it->start(&workloads, feeder, info);
+ }
+
+ info.thread_id = t;
+ process_workloads(workloads, feeder, info);
+
+ try
+ {
+ for(auto &thread : _threads)
+ {
+ thread.wait();
+ }
+ }
+ catch(const std::system_error &e)
+ {
+ std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
+ }
+}
+
void CPPScheduler::schedule(ICPPKernel *kernel, unsigned int split_dimension)
{
ARM_COMPUTE_ERROR_ON_MSG(!kernel, "The child class didn't set the kernel");
- /** [Scheduler example] */
- ThreadInfo info;
- info.cpu_info = &_cpu_info;
-
const Window &max_window = kernel->window();
const unsigned int num_iterations = max_window.num_iterations(split_dimension);
- info.num_threads = std::min(num_iterations, _num_threads);
+ const unsigned int num_threads = std::min(num_iterations, _num_threads);
if(num_iterations == 0)
{
return;
}
- if(!kernel->is_parallelisable() || info.num_threads == 1)
+ if(!kernel->is_parallelisable() || num_threads == 1)
{
+ ThreadInfo info;
+ info.cpu_info = &_cpu_info;
kernel->run(max_window, info);
}
else
{
- int t = 0;
- auto thread_it = _threads.begin();
-
- for(; t < info.num_threads - 1; ++t, ++thread_it)
+ std::vector<IScheduler::Workload> workloads(num_threads);
+ for(unsigned int t = 0; t < num_threads; t++)
{
- Window win = max_window.split_window(split_dimension, t, info.num_threads);
- info.thread_id = t;
- thread_it->start(kernel, win, info);
- }
-
- // Run last part on main thread
- Window win = max_window.split_window(split_dimension, t, info.num_threads);
- info.thread_id = t;
- kernel->run(win, info);
-
- try
- {
- for(auto &thread : _threads)
+ workloads[t] = [&](const ThreadInfo & info)
{
- thread.wait();
- }
- }
- catch(const std::system_error &e)
- {
- std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
+ Window win = max_window.split_window(split_dimension, info.thread_id, info.num_threads);
+ win.validate();
+ kernel->run(win, info);
+ };
}
+ run_workloads(workloads);
}
- /** [Scheduler example] */
}
} // namespace arm_compute
diff --git a/src/runtime/CPP/SingleThreadScheduler.cpp b/src/runtime/CPP/SingleThreadScheduler.cpp
index 2adc14ce80..6099e2cab5 100644
--- a/src/runtime/CPP/SingleThreadScheduler.cpp
+++ b/src/runtime/CPP/SingleThreadScheduler.cpp
@@ -49,6 +49,15 @@ void SingleThreadScheduler::schedule(ICPPKernel *kernel, unsigned int split_dime
kernel->run(kernel->window(), info);
}
+void SingleThreadScheduler::run_workloads(std::vector<Workload> &workloads)
+{
+ ThreadInfo info;
+ info.cpu_info = &_cpu_info;
+ for(auto &wl : workloads)
+ {
+ wl(info);
+ }
+}
unsigned int SingleThreadScheduler::num_threads() const
{
return 1;