aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--arm_compute/runtime/CPP/CPPScheduler.h11
-rw-r--r--arm_compute/runtime/IScheduler.h12
-rw-r--r--arm_compute/runtime/SingleThreadScheduler.h8
-rw-r--r--docs/01_library.dox52
-rw-r--r--src/runtime/CPP/CPPScheduler.cpp192
-rw-r--r--src/runtime/CPP/SingleThreadScheduler.cpp9
-rw-r--r--tests/framework/instruments/SchedulerTimer.cpp13
7 files changed, 233 insertions, 64 deletions
diff --git a/arm_compute/runtime/CPP/CPPScheduler.h b/arm_compute/runtime/CPP/CPPScheduler.h
index 17da7aeb78..6462ac6f2c 100644
--- a/arm_compute/runtime/CPP/CPPScheduler.h
+++ b/arm_compute/runtime/CPP/CPPScheduler.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2017 ARM Limited.
+ * Copyright (c) 2016-2018 ARM Limited.
*
* SPDX-License-Identifier: MIT
*
@@ -30,8 +30,6 @@
namespace arm_compute
{
-class Thread;
-
/** C++11 implementation of a pool of threads to automatically split a kernel's execution among several threads. */
class CPPScheduler : public IScheduler
{
@@ -63,7 +61,14 @@ public:
*/
void schedule(ICPPKernel *kernel, unsigned int split_dimension) override;
+ /** Will run the workloads in parallel using num_threads
+ *
+ * @param[in] workloads Workloads to run
+ */
+ void run_workloads(std::vector<Workload> &workloads) override;
+
private:
+ class Thread;
/** Constructor: create a pool of threads. */
CPPScheduler();
diff --git a/arm_compute/runtime/IScheduler.h b/arm_compute/runtime/IScheduler.h
index a0bcada722..76ff5a3de0 100644
--- a/arm_compute/runtime/IScheduler.h
+++ b/arm_compute/runtime/IScheduler.h
@@ -26,6 +26,8 @@
#include "arm_compute/core/CPP/CPPTypes.h"
+#include <functional>
+
namespace arm_compute
{
class ICPPKernel;
@@ -34,6 +36,8 @@ class ICPPKernel;
class IScheduler
{
public:
+ /** Signature for the workloads to execute */
+ using Workload = std::function<void(const ThreadInfo &)>;
/** Default constructor. */
IScheduler();
@@ -59,6 +63,14 @@ public:
*/
virtual void schedule(ICPPKernel *kernel, unsigned int split_dimension) = 0;
+ /** Execute all the passed workloads
+ *
+ * @note there is no guarantee regarding the order in which the workloads will be executed or whether or not they will be executed in parallel.
+ *
+ * @param[in] workloads Array of workloads to run
+ */
+ virtual void run_workloads(std::vector<Workload> &workloads) = 0;
+
/** Get CPU info.
*
* @return CPU info.
diff --git a/arm_compute/runtime/SingleThreadScheduler.h b/arm_compute/runtime/SingleThreadScheduler.h
index a6e1defe7c..5672b622f2 100644
--- a/arm_compute/runtime/SingleThreadScheduler.h
+++ b/arm_compute/runtime/SingleThreadScheduler.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017 ARM Limited.
+ * Copyright (c) 2017-2018 ARM Limited.
*
* SPDX-License-Identifier: MIT
*
@@ -54,6 +54,12 @@ public:
*/
void schedule(ICPPKernel *kernel, unsigned int split_dimension) override;
+ /** Will run the workloads sequentially and in order.
+ *
+ * @param[in] workloads Workloads to run
+ */
+ void run_workloads(std::vector<Workload> &workloads) override;
+
private:
/** Constructor. */
SingleThreadScheduler() = default;
diff --git a/docs/01_library.dox b/docs/01_library.dox
index e3f673df82..c4edad234d 100644
--- a/docs/01_library.dox
+++ b/docs/01_library.dox
@@ -77,11 +77,57 @@ kernel.run( max_window ); // Run the kernel on the full window
The previous section shows how to run a NEON / CPP kernel in the current thread, however if your system has several CPU cores, you will probably want the kernel to use several cores. Here is how this can be done:
-@snippet src/runtime/CPP/CPPScheduler.cpp Scheduler example
+@code{.cpp}
+ ThreadInfo info;
+ info.cpu_info = &_cpu_info;
+
+ const Window &max_window = kernel->window();
+ const unsigned int num_iterations = max_window.num_iterations(split_dimension);
+ info.num_threads = std::min(num_iterations, _num_threads);
+
+ if(num_iterations == 0)
+ {
+ return;
+ }
+
+ if(!kernel->is_parallelisable() || info.num_threads == 1)
+ {
+ kernel->run(max_window, info);
+ }
+ else
+ {
+ int t = 0;
+ auto thread_it = _threads.begin();
+
+ for(; t < info.num_threads - 1; ++t, ++thread_it)
+ {
+ Window win = max_window.split_window(split_dimension, t, info.num_threads);
+ info.thread_id = t;
+ thread_it->start(kernel, win, info);
+ }
+
+ // Run last part on main thread
+ Window win = max_window.split_window(split_dimension, t, info.num_threads);
+ info.thread_id = t;
+ kernel->run(win, info);
+
+ try
+ {
+ for(auto &thread : _threads)
+ {
+ thread.wait();
+ }
+ }
+ catch(const std::system_error &e)
+ {
+ std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
+ }
+ }
+@endcode
-This is the very basic implementation used in the NEON runtime library by all the NEON functions.
+This is a very basic implementation which was originally used in the NEON runtime library by all the NEON functions.
-@sa CPPScheduler.
+@sa CPPScheduler
@note Some kernels like for example @ref NEHistogramKernel need some local temporary buffer to perform their calculations. In order to avoid memory corruption between threads, the local buffer must be of size: ```memory_needed_per_thread * num_threads``` and a unique thread_id between 0 and num_threads must be assigned to the @ref ThreadInfo object passed to the ```run``` function.
diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp
index 92dce34c71..ab91b1071c 100644
--- a/src/runtime/CPP/CPPScheduler.cpp
+++ b/src/runtime/CPP/CPPScheduler.cpp
@@ -37,7 +37,66 @@
namespace arm_compute
{
-class Thread
+namespace
+{
+class ThreadFeeder
+{
+public:
+ /** Constructor
+ *
+ * @param[in] start First value that will be returned by the feeder
+ * @param[in] end End condition (The last value returned by get_next() will be end - 1)
+ */
+ explicit ThreadFeeder(unsigned int start = 0, unsigned int end = 0)
+ : _current(start), _end(end), _m()
+ {
+ }
+ /** Return the next element in the range if there is one.
+ *
+ * @param[out] next Will contain the next element if there is one.
+ *
+ * @return False if the end of the range has been reached and next wasn't set.
+ */
+ bool get_next(unsigned int &next)
+ {
+ std::lock_guard<std::mutex> lock(_m);
+ if(_current < _end)
+ {
+ next = _current;
+ _current++;
+ return true;
+ }
+ return false;
+ }
+
+private:
+ unsigned int _current;
+ const unsigned int _end;
+ std::mutex _m;
+};
+
+/** Execute workloads[info.thread_id] first, then call the feeder to get the index of the next workload to run.
+ *
+ * Will run workloads until the feeder reaches the end of its range.
+ *
+ * @param[in] workloads The array of workloads
+ * @param[in,out] feeder The feeder indicating which workload to execute next.
+ * @param[in] info Threading and CPU info.
+ */
+void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeeder &feeder, const ThreadInfo &info)
+{
+ unsigned int workload_index = info.thread_id;
+ do
+ {
+ ARM_COMPUTE_ERROR_ON(workload_index >= workloads.size());
+ workloads[workload_index](info);
+ }
+ while(feeder.get_next(workload_index));
+}
+
+} //namespace
+
+class CPPScheduler::Thread
{
public:
/** Start a new thread. */
@@ -51,11 +110,15 @@ public:
/** Destructor. Make the thread join. */
~Thread();
- /** Request the worker thread to start executing the given kernel
- * This function will return as soon as the kernel has been sent to the worker thread.
+ /** Request the worker thread to start executing workloads.
+ *
+ * The thread will start by executing workloads[info.thread_id] and will then call the feeder to
+ * get the index of the following workload to run.
+ *
+ * @note This function will return as soon as the workloads have been sent to the worker thread.
* wait() needs to be called to ensure the execution is complete.
*/
- void start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info);
+ void start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info);
/** Wait for the current kernel execution to complete. */
void wait();
@@ -64,39 +127,38 @@ public:
void worker_thread();
private:
- std::thread _thread;
- ICPPKernel *_kernel{ nullptr };
- Window _window;
- ThreadInfo _info;
- std::mutex _m;
- std::condition_variable _cv;
- bool _wait_for_work{ false };
- bool _job_complete{ true };
- std::exception_ptr _current_exception;
+ std::thread _thread{};
+ ThreadInfo _info{};
+ std::vector<IScheduler::Workload> *_workloads{ nullptr };
+ ThreadFeeder *_feeder{ nullptr };
+ std::mutex _m{};
+ std::condition_variable _cv{};
+ bool _wait_for_work{ false };
+ bool _job_complete{ true };
+ std::exception_ptr _current_exception{ nullptr };
};
-Thread::Thread()
- : _thread(), _window(), _info(), _m(), _cv(), _current_exception(nullptr)
+CPPScheduler::Thread::Thread()
{
_thread = std::thread(&Thread::worker_thread, this);
}
-Thread::~Thread()
+CPPScheduler::Thread::~Thread()
{
// Make sure worker thread has ended
if(_thread.joinable())
{
- start(nullptr, Window(), ThreadInfo());
+ ThreadFeeder feeder;
+ start(nullptr, feeder, ThreadInfo());
_thread.join();
}
}
-void Thread::start(ICPPKernel *kernel, const Window &window, const ThreadInfo &info)
+void CPPScheduler::Thread::start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info)
{
- _kernel = kernel;
- _window = window;
- _info = info;
-
+ _workloads = workloads;
+ _feeder = &feeder;
+ _info = info;
{
std::lock_guard<std::mutex> lock(_m);
_wait_for_work = true;
@@ -105,7 +167,7 @@ void Thread::start(ICPPKernel *kernel, const Window &window, const ThreadInfo &i
_cv.notify_one();
}
-void Thread::wait()
+void CPPScheduler::Thread::wait()
{
{
std::unique_lock<std::mutex> lock(_m);
@@ -118,7 +180,7 @@ void Thread::wait()
}
}
-void Thread::worker_thread()
+void CPPScheduler::Thread::worker_thread()
{
while(true)
{
@@ -129,15 +191,14 @@ void Thread::worker_thread()
_current_exception = nullptr;
// Time to exit
- if(_kernel == nullptr)
+ if(_workloads == nullptr)
{
return;
}
try
{
- _window.validate();
- _kernel->run(_window, _info);
+ process_workloads(*_workloads, *_feeder, _info);
}
catch(...)
{
@@ -174,56 +235,73 @@ unsigned int CPPScheduler::num_threads() const
return _num_threads;
}
+void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads)
+{
+ const unsigned int num_threads = std::min(_num_threads, static_cast<unsigned int>(workloads.size()));
+ if(num_threads < 1)
+ {
+ return;
+ }
+ ThreadFeeder feeder(num_threads, workloads.size());
+ ThreadInfo info;
+ info.cpu_info = &_cpu_info;
+ info.num_threads = num_threads;
+ unsigned int t = 0;
+ auto thread_it = _threads.begin();
+ for(; t < num_threads - 1; ++t, ++thread_it)
+ {
+ info.thread_id = t;
+ thread_it->start(&workloads, feeder, info);
+ }
+
+ info.thread_id = t;
+ process_workloads(workloads, feeder, info);
+
+ try
+ {
+ for(auto &thread : _threads)
+ {
+ thread.wait();
+ }
+ }
+ catch(const std::system_error &e)
+ {
+ std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
+ }
+}
+
void CPPScheduler::schedule(ICPPKernel *kernel, unsigned int split_dimension)
{
ARM_COMPUTE_ERROR_ON_MSG(!kernel, "The child class didn't set the kernel");
- /** [Scheduler example] */
- ThreadInfo info;
- info.cpu_info = &_cpu_info;
-
const Window &max_window = kernel->window();
const unsigned int num_iterations = max_window.num_iterations(split_dimension);
- info.num_threads = std::min(num_iterations, _num_threads);
+ const unsigned int num_threads = std::min(num_iterations, _num_threads);
if(num_iterations == 0)
{
return;
}
- if(!kernel->is_parallelisable() || info.num_threads == 1)
+ if(!kernel->is_parallelisable() || num_threads == 1)
{
+ ThreadInfo info;
+ info.cpu_info = &_cpu_info;
kernel->run(max_window, info);
}
else
{
- int t = 0;
- auto thread_it = _threads.begin();
-
- for(; t < info.num_threads - 1; ++t, ++thread_it)
+ std::vector<IScheduler::Workload> workloads(num_threads);
+ for(unsigned int t = 0; t < num_threads; t++)
{
- Window win = max_window.split_window(split_dimension, t, info.num_threads);
- info.thread_id = t;
- thread_it->start(kernel, win, info);
- }
-
- // Run last part on main thread
- Window win = max_window.split_window(split_dimension, t, info.num_threads);
- info.thread_id = t;
- kernel->run(win, info);
-
- try
- {
- for(auto &thread : _threads)
+ workloads[t] = [&](const ThreadInfo & info)
{
- thread.wait();
- }
- }
- catch(const std::system_error &e)
- {
- std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
+ Window win = max_window.split_window(split_dimension, info.thread_id, info.num_threads);
+ win.validate();
+ kernel->run(win, info);
+ };
}
+ run_workloads(workloads);
}
- /** [Scheduler example] */
}
} // namespace arm_compute
diff --git a/src/runtime/CPP/SingleThreadScheduler.cpp b/src/runtime/CPP/SingleThreadScheduler.cpp
index 2adc14ce80..6099e2cab5 100644
--- a/src/runtime/CPP/SingleThreadScheduler.cpp
+++ b/src/runtime/CPP/SingleThreadScheduler.cpp
@@ -49,6 +49,15 @@ void SingleThreadScheduler::schedule(ICPPKernel *kernel, unsigned int split_dime
kernel->run(kernel->window(), info);
}
+void SingleThreadScheduler::run_workloads(std::vector<Workload> &workloads)
+{
+ ThreadInfo info;
+ info.cpu_info = &_cpu_info;
+ for(auto &wl : workloads)
+ {
+ wl(info);
+ }
+}
unsigned int SingleThreadScheduler::num_threads() const
{
return 1;
diff --git a/tests/framework/instruments/SchedulerTimer.cpp b/tests/framework/instruments/SchedulerTimer.cpp
index 1b37b189dd..49d94d76eb 100644
--- a/tests/framework/instruments/SchedulerTimer.cpp
+++ b/tests/framework/instruments/SchedulerTimer.cpp
@@ -76,6 +76,19 @@ public:
_kernels.push_back(std::move(info));
}
+ void run_workloads(std::vector<Workload> &workloads) override
+ {
+ _timer.start();
+ _real_scheduler.run_workloads(workloads);
+ _timer.stop();
+
+ SchedulerTimer::kernel_info info;
+ info.name = "Unknown";
+ info.prefix = _prefix;
+ info.measurements = _timer.measurements();
+ _kernels.push_back(std::move(info));
+ }
+
private:
std::list<SchedulerTimer::kernel_info> &_kernels;
IScheduler &_real_scheduler;