aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiCongLi <sicong.li@arm.com>2021-03-12 12:31:17 +0000
committerSiCong Li <sicong.li@arm.com>2021-04-07 17:38:16 +0000
commit3b5981ce898569aafa98abdf220c73f1a80685b9 (patch)
tree28aca1bea2e1a10e8f74f8ff79cabc99b3131319
parentdb0e2c8de03eb78811be8b0b6e41d8c40dcd097e (diff)
downloadComputeLibrary-3b5981ce898569aafa98abdf220c73f1a80685b9.tar.gz
Implement Fanout mode in CPPScheduler
This new scheduler mode is implemented to reduce runtime overhead on high thread counts by distributing the scheduling work to all threads. The fanout mode should only be enabled on high thread counts (e.g. > 8 threads). Alternatively the mode can be forced by setting the environment variable ARM_COMPUTE_CPP_SCHEDULER_MODE to be either "linear" (default) or "fanout". Note that on bare-metal this functionality is turned off but it does not matter as only multi-threading is not supported on bare-metal. Resolves COMPMID-4349 Signed-off-by: SiCongLi <sicong.li@arm.com> Change-Id: I46e2fab83ea24e616c82ae94dca7b2e72a73c7b8 Reviewed-on: https://review.mlplatform.org/c/ml/ComputeLibrary/+/5352 Reviewed-by: Michele Di Giorgio <michele.digiorgio@arm.com> Reviewed-by: Georgios Pinitas <georgios.pinitas@arm.com> Tested-by: Arm Jenkins <bsgcomp@arm.com> Comments-Addressed: Arm Jenkins <bsgcomp@arm.com>
-rw-r--r--arm_compute/core/utils/misc/Utility.h23
-rw-r--r--arm_compute/runtime/CPP/CPPScheduler.h9
-rw-r--r--src/runtime/CPP/CPPScheduler.cpp212
3 files changed, 225 insertions, 19 deletions
diff --git a/arm_compute/core/utils/misc/Utility.h b/arm_compute/core/utils/misc/Utility.h
index 646d66567a..648758ca07 100644
--- a/arm_compute/core/utils/misc/Utility.h
+++ b/arm_compute/core/utils/misc/Utility.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Arm Limited.
+ * Copyright (c) 2017-2021 Arm Limited.
*
* SPDX-License-Identifier: MIT
*
@@ -24,6 +24,8 @@
#ifndef ARM_COMPUTE_MISC_UTILITY_H
#define ARM_COMPUTE_MISC_UTILITY_H
+#include "arm_compute/core/Error.h"
+
#include <algorithm>
#include <array>
#include <limits>
@@ -208,6 +210,25 @@ inline std::string tolower(std::string string)
});
return string;
}
+
+/** Get environment variable as a string
+ *
+ * @note Return empty string on bare-metal
+ *
+ * @param[in] env_name Name of the Environment variable to retrieve
+ *
+ * @return Environment variable content, or empty string if the variable is undefined or on bare-metal
+ */
+inline std::string getenv(const std::string &env_name)
+{
+#ifdef BARE_METAL
+ ARM_COMPUTE_UNUSED(env_name);
+ return std::string{};
+#else // BARE_METAL
+ const auto env_chr = std::getenv(env_name.c_str());
+ return env_chr == nullptr ? std::string{} : std::string{ env_chr };
+#endif // BARE_METAL
+}
} // namespace utility
} // namespace arm_compute
#endif /* ARM_COMPUTE_MISC_UTILITY_H */
diff --git a/arm_compute/runtime/CPP/CPPScheduler.h b/arm_compute/runtime/CPP/CPPScheduler.h
index f4f6a1311e..a5932d6301 100644
--- a/arm_compute/runtime/CPP/CPPScheduler.h
+++ b/arm_compute/runtime/CPP/CPPScheduler.h
@@ -31,7 +31,14 @@
namespace arm_compute
{
-/** C++11 implementation of a pool of threads to automatically split a kernel's execution among several threads. */
+/** C++11 implementation of a pool of threads to automatically split a kernel's execution among several threads.
+ *
+ * It has 2 scheduling modes: Linear or Fanout (please refer to the implementation for details)
+ * The mode is selected automatically based on the runtime environment. However it can be forced via an environment
+ * variable ARM_COMPUTE_CPP_SCHEDULER_MODE. e.g.:
+ * ARM_COMPUTE_CPP_SCHEDULER_MODE=linear # Force select the linear scheduling mode
+ * ARM_COMPUTE_CPP_SCHEDULER_MODE=fanout # Force select the fanout scheduling mode
+*/
class CPPScheduler final : public IScheduler
{
public:
diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp
index a26b580b97..3bd80eb51d 100644
--- a/src/runtime/CPP/CPPScheduler.cpp
+++ b/src/runtime/CPP/CPPScheduler.cpp
@@ -26,7 +26,9 @@
#include "arm_compute/core/CPP/ICPPKernel.h"
#include "arm_compute/core/Error.h"
#include "arm_compute/core/Helpers.h"
+#include "arm_compute/core/Log.h"
#include "arm_compute/core/Utils.h"
+#include "arm_compute/core/utils/misc/Utility.h"
#include "src/runtime/CPUUtils.h"
#include "support/Mutex.h"
@@ -38,6 +40,7 @@
#include <mutex>
#include <system_error>
#include <thread>
+#include <vector>
namespace arm_compute
{
@@ -91,6 +94,10 @@ void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeede
while(feeder.get_next(workload_index));
}
+/** Set thread affinity. Pin current thread to a particular core
+ *
+ * @param[in] core_id ID of the core to which the current thread is pinned
+ */
void set_thread_affinity(int core_id)
{
if(core_id < 0)
@@ -106,6 +113,33 @@ void set_thread_affinity(int core_id)
#endif /* !defined(__APPLE__) */
}
+/** There are currently 2 scheduling modes supported by CPPScheduler
+ *
+ * Linear:
+ * The default mode where all the scheduling is carried out by the main thread linearly (in a loop).
+ * E.G. If there are 8 threads in total, there will be 1 main thread + 7 threads in the thread pool, and it is main
+ * thread's responsibility to start all the other threads in the thread pool.
+ *
+ * Fanout:
+ * In fanout mode, the scheduling (starting other threads) task is distributed across many threads instead of just
+ * the main thread.
+ *
+ * The scheduler has a fixed parameter: wake_fanout, and the scheduling sequence goes like this:
+ * 1. Main thread wakes the first wake_fanout - 1 number of FanoutThreads from the thread pool
+ * From thread: 0
+ * To thread (non-inclusive): Wake_fanout - 1
+ * 2. Each FanoutThread then wakes wake_fanout number of FanoutThreads from the thread pool:
+ * From thread: (i + 1) * wake_fanout - 1
+ * To thread (non-inclusive): (i + 2) * wake_fanout - 1
+ * where i is the current thread's thread id
+ * The end is clamped at the size of the thread pool / the number of threads in use - 1
+ *
+ * E.G. for a total number of 8 threads (1 main thread, 7 FanoutThreads in thread pool) with a fanout of 3
+ * 1. Main thread wakes FanoutThread 0, 1
+ * 2. FanoutThread 0 wakes FanoutThread 2, 3, 4
+ * 3. FanoutThread 1 wakes FanoutThread 5, 6
+ */
+
class Thread final
{
public:
@@ -125,6 +159,9 @@ public:
/** Destructor. Make the thread join. */
~Thread();
+ /** Set workloads */
+ void set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info);
+
/** Request the worker thread to start executing workloads.
*
* The thread will start by executing workloads[info.thread_id] and will then call the feeder to
@@ -133,7 +170,7 @@ public:
* @note This function will return as soon as the workloads have been sent to the worker thread.
* wait() needs to be called to ensure the execution is complete.
*/
- void start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info);
+ void start();
/** Wait for the current kernel execution to complete. */
void wait();
@@ -141,6 +178,22 @@ public:
/** Function ran by the worker thread. */
void worker_thread();
+ /** Set the scheduling strategy to be linear */
+ void set_linear_mode()
+ {
+ _thread_pool = nullptr;
+ _wake_beg = 0;
+ _wake_end = 0;
+ }
+
+ /** Set the scheduling strategy to be fanout */
+ void set_fanout_mode(std::list<Thread> *thread_pool, unsigned int wake_beg, unsigned int wake_end)
+ {
+ _thread_pool = thread_pool;
+ _wake_beg = wake_beg;
+ _wake_end = wake_end;
+ }
+
private:
std::thread _thread{};
ThreadInfo _info{};
@@ -152,6 +205,9 @@ private:
bool _job_complete{ true };
std::exception_ptr _current_exception{ nullptr };
int _core_pin{ -1 };
+ std::list<Thread> *_thread_pool{ nullptr };
+ unsigned int _wake_beg{ 0 };
+ unsigned int _wake_end{ 0 };
};
Thread::Thread(int core_pin)
@@ -166,16 +222,21 @@ Thread::~Thread()
if(_thread.joinable())
{
ThreadFeeder feeder;
- start(nullptr, feeder, ThreadInfo());
+ set_workload(nullptr, feeder, ThreadInfo());
+ start();
_thread.join();
}
}
-void Thread::start(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info)
+void Thread::set_workload(std::vector<IScheduler::Workload> *workloads, ThreadFeeder &feeder, const ThreadInfo &info)
{
_workloads = workloads;
_feeder = &feeder;
_info = info;
+}
+
+void Thread::start()
+{
{
std::lock_guard<std::mutex> lock(_m);
_wait_for_work = true;
@@ -209,12 +270,24 @@ void Thread::worker_thread()
_current_exception = nullptr;
- // Time to exit
- if(_workloads == nullptr)
+ // Exit if the worker thread has not been fed with workloads
+ if(_workloads == nullptr || _feeder == nullptr)
{
return;
}
+ // Wake up more peer threads from thread pool if this job has been delegated to the current thread
+ if(_thread_pool != nullptr)
+ {
+ auto thread_it = _thread_pool->begin();
+ std::advance(thread_it, std::min(static_cast<unsigned int>(_thread_pool->size()), _wake_beg));
+ auto wake_end = std::min(_wake_end, static_cast<unsigned int>(_info.num_threads - 1));
+ for(unsigned int t = _wake_beg; t < wake_end; ++t, ++thread_it)
+ {
+ thread_it->start();
+ }
+ }
+
#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
try
{
@@ -228,6 +301,7 @@ void Thread::worker_thread()
_current_exception = std::current_exception();
}
#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */
+ _workloads = nullptr;
_job_complete = true;
lock.unlock();
_cv.notify_one();
@@ -237,14 +311,40 @@ void Thread::worker_thread()
struct CPPScheduler::Impl final
{
+ constexpr static unsigned int m_default_wake_fanout = 4;
+ enum class Mode
+ {
+ Linear,
+ Fanout
+ };
+ enum class ModeToggle
+ {
+ None,
+ Linear,
+ Fanout
+ };
explicit Impl(unsigned int thread_hint)
- : _num_threads(thread_hint), _threads(_num_threads - 1)
+ : _num_threads(thread_hint), _threads(_num_threads - 1), _mode(Mode::Linear), _wake_fanout(0U)
{
+ const auto mode_env_v = utility::tolower(utility::getenv("ARM_COMPUTE_CPP_SCHEDULER_MODE"));
+ if(mode_env_v == "linear")
+ {
+ _forced_mode = ModeToggle::Linear;
+ }
+ else if(mode_env_v == "fanout")
+ {
+ _forced_mode = ModeToggle::Fanout;
+ }
+ else
+ {
+ _forced_mode = ModeToggle::None;
+ }
}
void set_num_threads(unsigned int num_threads, unsigned int thread_hint)
{
_num_threads = num_threads == 0 ? thread_hint : num_threads;
_threads.resize(_num_threads - 1);
+ auto_switch_mode(_num_threads);
}
void set_num_threads_with_affinity(unsigned int num_threads, unsigned int thread_hint, BindFunc func)
{
@@ -259,17 +359,72 @@ struct CPPScheduler::Impl final
{
_threads.emplace_back(func(i, thread_hint));
}
+ auto_switch_mode(_num_threads);
+ }
+ void auto_switch_mode(unsigned int num_threads_to_use)
+ {
+ // If the environment variable is set to any of the modes, it overwrites the mode selected over num_threads_to_use
+ if(_forced_mode == ModeToggle::Fanout || (_forced_mode == ModeToggle::None && num_threads_to_use > 8))
+ {
+ set_fanout_mode(m_default_wake_fanout, num_threads_to_use);
+ ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE("Set CPPScheduler to Fanout mode, with wake up fanout : %d and %d threads to use\n", this->wake_fanout(), num_threads_to_use);
+ }
+ else // Equivalent to (_forced_mode == ModeToggle::Linear || (_forced_mode == ModeToggle::None && num_threads_to_use <= 8))
+ {
+ set_linear_mode();
+ ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE("Set CPPScheduler to Linear mode, with %d threads to use\n", num_threads_to_use);
+ }
+ }
+ void set_linear_mode()
+ {
+ for(auto &thread : _threads)
+ {
+ thread.set_linear_mode();
+ }
+ _mode = Mode::Linear;
+ _wake_fanout = 0U;
+ }
+ void set_fanout_mode(unsigned int wake_fanout, unsigned int num_threads_to_use)
+ {
+ ARM_COMPUTE_ERROR_ON(num_threads_to_use > _threads.size() + 1);
+ const auto actual_wake_fanout = std::max(2U, std::min(wake_fanout, num_threads_to_use - 1));
+ auto thread_it = _threads.begin();
+ for(auto i = 1U; i < num_threads_to_use; ++i, ++thread_it)
+ {
+ const auto wake_begin = i * actual_wake_fanout - 1;
+ const auto wake_end = std::min((i + 1) * actual_wake_fanout - 1, num_threads_to_use - 1);
+ thread_it->set_fanout_mode(&_threads, wake_begin, wake_end);
+ }
+ // Reset the remaining threads's wake up schedule
+ while(thread_it != _threads.end())
+ {
+ thread_it->set_fanout_mode(&_threads, 0U, 0U);
+ ++thread_it;
+ }
+ _mode = Mode::Fanout;
+ _wake_fanout = actual_wake_fanout;
}
unsigned int num_threads() const
{
return _num_threads;
}
+ unsigned int wake_fanout() const
+ {
+ return _wake_fanout;
+ }
+ Mode mode() const
+ {
+ return _mode;
+ }
void run_workloads(std::vector<IScheduler::Workload> &workloads);
unsigned int _num_threads;
std::list<Thread> _threads;
arm_compute::Mutex _run_workloads_mutex{};
+ Mode _mode{ Mode::Linear };
+ ModeToggle _forced_mode{ ModeToggle::None };
+ unsigned int _wake_fanout{ 0 };
};
/*
@@ -315,32 +470,55 @@ void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads)
// This is not great because different threads workloads won't run in parallel but at least they
// won't interfere each other and deadlock.
arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex);
- const unsigned int num_threads = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size()));
- if(num_threads < 1)
+ const unsigned int num_threads_to_use = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size()));
+ if(num_threads_to_use < 1)
{
return;
}
- ThreadFeeder feeder(num_threads, workloads.size());
+ // Re-adjust the mode if the actual number of threads to use is different from the number of threads created
+ _impl->auto_switch_mode(num_threads_to_use);
+ int num_threads_to_start = 0;
+ switch(_impl->mode())
+ {
+ case CPPScheduler::Impl::Mode::Fanout:
+ {
+ num_threads_to_start = static_cast<int>(_impl->wake_fanout()) - 1;
+ break;
+ }
+ case CPPScheduler::Impl::Mode::Linear:
+ default:
+ {
+ num_threads_to_start = static_cast<int>(num_threads_to_use) - 1;
+ break;
+ }
+ }
+ ThreadFeeder feeder(num_threads_to_use, workloads.size());
ThreadInfo info;
info.cpu_info = &_cpu_info;
- info.num_threads = num_threads;
+ info.num_threads = num_threads_to_use;
unsigned int t = 0;
auto thread_it = _impl->_threads.begin();
- for(; t < num_threads - 1; ++t, ++thread_it)
+ // Set num_threads_to_use - 1 workloads to the threads as the remaining 1 is left to the main thread
+ for(; t < num_threads_to_use - 1; ++t, ++thread_it)
{
info.thread_id = t;
- thread_it->start(&workloads, feeder, info);
+ thread_it->set_workload(&workloads, feeder, info);
}
-
- info.thread_id = t;
- process_workloads(workloads, feeder, info);
+ thread_it = _impl->_threads.begin();
+ for(int i = 0; i < num_threads_to_start; ++i, ++thread_it)
+ {
+ thread_it->start();
+ }
+ info.thread_id = t; // Set main thread's thread_id
+ process_workloads(workloads, feeder, info); // Main thread processes workloads
#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
try
{
#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */
- for(auto &thread : _impl->_threads)
+ thread_it = _impl->_threads.begin();
+ for(unsigned int i = 0; i < num_threads_to_use - 1; ++i, ++thread_it)
{
- thread.wait();
+ thread_it->wait();
}
#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED
}