aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--arm_compute/core/utils/misc/Utility.h23
-rw-r--r--arm_compute/runtime/CPP/CPPScheduler.h9
-rw-r--r--src/runtime/CPP/CPPScheduler.cpp212
3 files changed, 225 insertions, 19 deletions
diff --git a/arm_compute/core/utils/misc/Utility.h b/arm_compute/core/utils/misc/Utility.h
index 646d66567a..648758ca07 100644
--- a/arm_compute/core/utils/misc/Utility.h
+++ b/arm_compute/core/utils/misc/Utility.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Arm Limited.
+ * Copyright (c) 2017-2021 Arm Limited.
*
* SPDX-License-Identifier: MIT
*
@@ -24,6 +24,8 @@
#ifndef ARM_COMPUTE_MISC_UTILITY_H
#define ARM_COMPUTE_MISC_UTILITY_H
+#include "arm_compute/core/Error.h"
+
#include <algorithm>
#include <array>
#include <limits>
@@ -208,6 +210,25 @@ inline std::string tolower(std::string string)
});
return string;
}
+
+/** Get environment variable as a string
+ *
+ * @note Return empty string on bare-metal
+ *
+ * @param[in] env_name Name of the Environment variable to retrieve
+ *
+ * @return Environment variable content, or empty string if the variable is undefined or on bare-metal
+ */
+inline std::string getenv(const std::string &env_name)
+{
+#ifdef BARE_METAL
+ ARM_COMPUTE_UNUSED(env_name);
+ return std::string{};
+#else // BARE_METAL
+ const auto env_chr = std::getenv(env_name.c_str());
+ return env_chr == nullptr ? std::string{} : std::string{ env_chr };
+#endif // BARE_METAL
+}
} // namespace utility
} // namespace arm_compute
#endif /* ARM_COMPUTE_MISC_UTILITY_H */
diff --git a/arm_compute/runtime/CPP/CPPScheduler.h b/arm_compute/runtime/CPP/CPPScheduler.h
index f4f6a1311e..a5932d6301 100644
--- a/arm_compute/runtime/CPP/CPPScheduler.h
+++ b/arm_compute/runtime/CPP/CPPScheduler.h
@@ -31,7 +31,14 @@
namespace arm_compute
{
-/** C++11 implementation of a pool of threads to automatically split a kernel's execution among several threads. */
+/** C++11 implementation of a pool of threads to automatically split a kernel's execution among several threads.
+ *
+ * It has 2 scheduling modes: Linear or Fanout (please refer to the implementation for details)
+ * The mode is selected automatically based on the runtime environment. However it can be forced via an environment
+ * variable ARM_COMPUTE_CPP_SCHEDULER_MODE. e.g.:
+ * ARM_COMPUTE_CPP_SCHEDULER_MODE=linear # Force select the linear scheduling mode
+ * ARM_COMPUTE_CPP_SCHEDULER_MODE=fanout # Force select the fanout scheduling mode
+*/
class CPPScheduler final : public IScheduler
{
public:
diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp
index a26b580b97..3bd80eb51d 100644
--- a/src/runtime/CPP/CPPScheduler.cpp
+++ b/src/runtime/CPP/CPPScheduler.cpp
@@ -26,7 +26,9 @@
#include "arm_compute/core/CPP/ICPPKernel.h"
#include "arm_compute/core/Error.h"
#include "arm_compute/core/Helpers.h"
+#include "arm_compute/core/Log.h"
#include "arm_compute/core/Utils.h"
+#include "arm_compute/core/utils/misc/Utility.h"
#include "src/runtime/CPUUtils.h"
#include "support/Mutex.h"
@@ -38,6 +40,7 @@
#include <mutex>
#include <system_error>
#include <thread>
+#include <vector>
namespace arm_compute
{
@@ -91,6 +94,10 @@ void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeede
while(feeder.get_next(workload_index));
}
+/** Set thread affinity. Pin current thread to a particular core
+ *
+ * @param[in] core_id ID of the core to which the current thread is pinned
+ */
void set_thread_affinity(int core_id)
{
if(core_id < 0)
@@ -106,6 +113,33 @@ void set_thread_affinity(int core_id)
#endif /* !defined(__APPLE__) */
}
+/** There are currently 2 scheduling modes supported by CPPScheduler
+ *
+ * Linear:
+ * The default mode where all the scheduling is carried out by the main thread linearly (in a loop).
+ * E.G. If there are 8 threads in total, there will be 1 main thread + 7 threads in the thread pool, and it is main
+ * thread's responsibility to start all the other threads in the thread pool.
+ *
+ * Fanout:
+ * In fanout mode, the scheduling (starting other threads) task is distributed across many threads instead of just
+ * the main thread.
+ *
+ * The scheduler has a fixed parameter: wake_fanout, and the scheduling sequence goes like this:
+ * 1. Main thread wakes the first wake_fanout - 1 number of FanoutThreads from the thread pool
+ * From thread: 0
+ * To thread (non-inclusive): Wake_fanout - 1
+ * 2. Each FanoutThread then wakes wake_fanout number of FanoutThreads from the thread pool:
+ * From thread: (i + 1) * wake_fanout - 1
+ * To thread (non-inclusive): (i + 2) * wake_fanout - 1
+ * where i is the current thread's thread id
+ * The end is clamped at the size of the thread pool / the number of threads in use - 1
+ *
+ * E.G. for a total number of 8 threads (1 main thread, 7 FanoutThreads in thread pool) with a fanout of 3
+ * 1. Main thread wakes FanoutThread 0, 1
+ * 2. FanoutThread 0 wakes FanoutThread 2, 3, 4
+ * 3. FanoutThread 1 wakes FanoutThread 5, 6
+ */
+
class Thread final
{
public:
@@ -125,6 +159,9 @@ public:
/** Destructor. Make the thread join. */
~Thread();
+ /** Set workloads */
+ void set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info);
+
/** Request the worker thread to start executing workloads.
*
* The thread will start by executing workloads[info.thread_id] and will then call the feeder to
@@ -133,7 +170,7 @@ public:
* @note This function will return as soon as the workloads have been sent to the worker thread.
* wait() needs to be called to ensure the execution is complete.
*/
- void start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info);
+ void start();
/** Wait for the current kernel execution to complete. */
void wait();
@@ -141,6 +178,22 @@ public:
/** Function ran by the worker thread. */
void worker_thread();
+ /** Set the scheduling strategy to be linear */
+ void set_linear_mode()
+ {
+ _thread_pool = nullptr;
+ _wake_beg = 0;
+ _wake_end = 0;
+ }
+
+ /** Set the scheduling strategy to be fanout */
+ void set_fanout_mode(std::list<Thread> *thread_pool, unsigned int wake_beg, unsigned int wake_end)
+ {
+ _thread_pool = thread_pool;
+ _wake_beg = wake_beg;
+ _wake_end = wake_end;
+ }
+
private:
std::thread _thread{};
ThreadInfo _info{};
@@ -152,6 +205,9 @@ private:
bool _job_complete{ true };
std::exception_ptr _current_exception{ nullptr };
int _core_pin{ -1 };
+ std::list<Thread> *_thread_pool{ nullptr };
+ unsigned int _wake_beg{ 0 };
+ unsigned int _wake_end{ 0 };
};
Thread::Thread(int core_pin)
@@ -166,16 +222,21 @@ Thread::~Thread()
if(_thread.joinable())
{
ThreadFeeder feeder;
- start(nullptr, feeder, ThreadInfo());
+ set_workload(nullptr, feeder, ThreadInfo());
+ start();
_thread.join();
}
}
-void Thread::start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info)
+void Thread::set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info)
{
_workloads = workloads;
_feeder = &feeder;
_info = info;
+}
+
+void Thread::start()
+{
{
std::lock_guard<std::mutex> lock(_m);
_wait_for_work = true;
@@ -209,12 +270,24 @@ void Thread::worker_thread()
_current_exception = nullptr;
- // Time to exit
- if(_workloads == nullptr)
+ // Exit if the worker thread has not been fed with workloads
+ if(_workloads == nullptr || _feeder == nullptr)
{
return;
}
+ // Wake up more peer threads from thread pool if this job has been delegated to the current thread
+ if(_thread_pool != nullptr)
+ {
+ auto thread_it = _thread_pool->begin();
+ std::advance(thread_it, std::min(static_cast<unsigned int>(_thread_pool->size()), _wake_beg));
+ auto wake_end = std::min(_wake_end, static_cast<unsigned int>(_info.num_threads - 1));
+ for(unsigned int t = _wake_beg; t < wake_end; ++t, ++thread_it)
+ {
+ thread_it->start();
+ }
+ }
+
#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
try
{
@@ -228,6 +301,7 @@ void Thread::worker_thread()
_current_exception = std::current_exception();
}
#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */
+ _workloads = nullptr;
_job_complete = true;
lock.unlock();
_cv.notify_one();
@@ -237,14 +311,40 @@ void Thread::worker_thread()
struct CPPScheduler::Impl final
{
+ constexpr static unsigned int m_default_wake_fanout = 4;
+ enum class Mode
+ {
+ Linear,
+ Fanout
+ };
+ enum class ModeToggle
+ {
+ None,
+ Linear,
+ Fanout
+ };
explicit Impl(unsigned int thread_hint)
- : _num_threads(thread_hint), _threads(_num_threads - 1)
+ : _num_threads(thread_hint), _threads(_num_threads - 1), _mode(Mode::Linear), _wake_fanout(0U)
{
+ const auto mode_env_v = utility::tolower(utility::getenv("ARM_COMPUTE_CPP_SCHEDULER_MODE"));
+ if(mode_env_v == "linear")
+ {
+ _forced_mode = ModeToggle::Linear;
+ }
+ else if(mode_env_v == "fanout")
+ {
+ _forced_mode = ModeToggle::Fanout;
+ }
+ else
+ {
+ _forced_mode = ModeToggle::None;
+ }
}
void set_num_threads(unsigned int num_threads, unsigned int thread_hint)
{
_num_threads = num_threads == 0 ? thread_hint : num_threads;
_threads.resize(_num_threads - 1);
+ auto_switch_mode(_num_threads);
}
void set_num_threads_with_affinity(unsigned int num_threads, unsigned int thread_hint, BindFunc func)
{
@@ -259,17 +359,72 @@ struct CPPScheduler::Impl final
{
_threads.emplace_back(func(i, thread_hint));
}
+ auto_switch_mode(_num_threads);
+ }
+ void auto_switch_mode(unsigned int num_threads_to_use)
+ {
+ // If the environment variable is set to any of the modes, it overwrites the mode selected over num_threads_to_use
+ if(_forced_mode == ModeToggle::Fanout || (_forced_mode == ModeToggle::None && num_threads_to_use > 8))
+ {
+ set_fanout_mode(m_default_wake_fanout, num_threads_to_use);
+ ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE("Set CPPScheduler to Fanout mode, with wake up fanout : %d and %d threads to use\n", this->wake_fanout(), num_threads_to_use);
+ }
+ else // Equivalent to (_forced_mode == ModeToggle::Linear || (_forced_mode == ModeToggle::None && num_threads_to_use <= 8))
+ {
+ set_linear_mode();
+ ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE("Set CPPScheduler to Linear mode, with %d threads to use\n", num_threads_to_use);
+ }
+ }
+ void set_linear_mode()
+ {
+ for(auto &thread : _threads)
+ {
+ thread.set_linear_mode();
+ }
+ _mode = Mode::Linear;
+ _wake_fanout = 0U;
+ }
+ void set_fanout_mode(unsigned int wake_fanout, unsigned int num_threads_to_use)
+ {
+ ARM_COMPUTE_ERROR_ON(num_threads_to_use > _threads.size() + 1);
+ const auto actual_wake_fanout = std::max(2U, std::min(wake_fanout, num_threads_to_use - 1));
+ auto thread_it = _threads.begin();
+ for(auto i = 1U; i < num_threads_to_use; ++i, ++thread_it)
+ {
+ const auto wake_begin = i * actual_wake_fanout - 1;
+ const auto wake_end = std::min((i + 1) * actual_wake_fanout - 1, num_threads_to_use - 1);
+ thread_it->set_fanout_mode(&_threads, wake_begin, wake_end);
+ }
+ // Reset the remaining threads's wake up schedule
+ while(thread_it != _threads.end())
+ {
+ thread_it->set_fanout_mode(&_threads, 0U, 0U);
+ ++thread_it;
+ }
+ _mode = Mode::Fanout;
+ _wake_fanout = actual_wake_fanout;
}
unsigned int num_threads() const
{
return _num_threads;
}
+ unsigned int wake_fanout() const
+ {
+ return _wake_fanout;
+ }
+ Mode mode() const
+ {
+ return _mode;
+ }
void run_workloads(std::vector<IScheduler::Workload> &workloads);
unsigned int _num_threads;
std::list<Thread> _threads;
arm_compute::Mutex _run_workloads_mutex{};
+ Mode _mode{ Mode::Linear };
+ ModeToggle _forced_mode{ ModeToggle::None };
+ unsigned int _wake_fanout{ 0 };
};
/*
@@ -315,32 +470,55 @@ void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads)
// This is not great because different threads workloads won't run in parallel but at least they
// won't interfere each other and deadlock.
arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex);
- const unsigned int num_threads = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size()));
- if(num_threads < 1)
+ const unsigned int num_threads_to_use = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size()));
+ if(num_threads_to_use < 1)
{
return;
}
- ThreadFeeder feeder(num_threads, workloads.size());
+ // Re-adjust the mode if the actual number of threads to use is different from the number of threads created
+ _impl->auto_switch_mode(num_threads_to_use);
+ int num_threads_to_start = 0;
+ switch(_impl->mode())
+ {
+ case CPPScheduler::Impl::Mode::Fanout:
+ {
+ num_threads_to_start = static_cast<int>(_impl->wake_fanout()) - 1;
+ break;
+ }
+ case CPPScheduler::Impl::Mode::Linear:
+ default:
+ {
+ num_threads_to_start = static_cast<int>(num_threads_to_use) - 1;
+ break;
+ }
+ }
+ ThreadFeeder feeder(num_threads_to_use, workloads.size());
ThreadInfo info;
info.cpu_info = &_cpu_info;
- info.num_threads = num_threads;
+ info.num_threads = num_threads_to_use;
unsigned int t = 0;
auto thread_it = _impl->_threads.begin();
- for(; t < num_threads - 1; ++t, ++thread_it)
+ // Set num_threads_to_use - 1 workloads to the threads as the remaining 1 is left to the main thread
+ for(; t < num_threads_to_use - 1; ++t, ++thread_it)
{
info.thread_id = t;
- thread_it->start(&workloads, feeder, info);
+ thread_it->set_workload(&workloads, feeder, info);
}
-
- info.thread_id = t;
- process_workloads(workloads, feeder, info);
+ thread_it = _impl->_threads.begin();
+ for(int i = 0; i < num_threads_to_start; ++i, ++thread_it)
+ {
+ thread_it->start();
+ }
+ info.thread_id = t; // Set main thread's thread_id
+ process_workloads(workloads, feeder, info); // Main thread processes workloads
#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
try
{
#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */
- for(auto &thread : _impl->_threads)
+ thread_it = _impl->_threads.begin();
+ for(unsigned int i = 0; i < num_threads_to_use - 1; ++i, ++thread_it)
{
- thread.wait();
+ thread_it->wait();
}
#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
}