From 06e890b1475243145d64c7d56dfb4a262a17b09f Mon Sep 17 00:00:00 2001 From: Georgios Pinitas Date: Thu, 9 Jul 2020 18:38:34 +0100 Subject: COMPMID-3565: Exposes interface to enable thread binding Expose `set_num_threads_with_affinity` as an interface to the `IScheduler` to allow binding of threads to given logical cores. Signed-off-by: Georgios Pinitas Change-Id: I062db7caafb0101972ba45d31ee9e61b26800127 Reviewed-on: https://review.mlplatform.org/c/ml/ComputeLibrary/+/3481 Comments-Addressed: Arm Jenkins Reviewed-by: Michalis Spyrou Tested-by: Arm Jenkins --- arm_compute/runtime/CPP/CPPScheduler.h | 35 ++------- arm_compute/runtime/IScheduler.h | 14 ++++ src/runtime/CPP/CPPScheduler.cpp | 100 ++++++++++++++++++------- src/runtime/IScheduler.cpp | 8 +- tests/framework/instruments/SchedulerTimer.cpp | 5 ++ 5 files changed, 102 insertions(+), 60 deletions(-) diff --git a/arm_compute/runtime/CPP/CPPScheduler.h b/arm_compute/runtime/CPP/CPPScheduler.h index 855535ebce..9d55ed448e 100644 --- a/arm_compute/runtime/CPP/CPPScheduler.h +++ b/arm_compute/runtime/CPP/CPPScheduler.h @@ -39,16 +39,6 @@ public: CPPScheduler(); /** Default destructor */ ~CPPScheduler(); - /** Sets the number of threads the scheduler will use to run the kernels. - * - * @param[in] num_threads If set to 0, then the maximum number of threads supported by C++11 will be used, otherwise the number of threads specified. - */ - void set_num_threads(unsigned int num_threads) override; - /** Returns the number of threads that the CPPScheduler has in his pool. - * - * @return Number of threads available in CPPScheduler. - */ - unsigned int num_threads() const override; /** Access the scheduler singleton * @@ -56,27 +46,12 @@ public: * @return The scheduler */ static CPPScheduler &get(); - /** Multithread the execution of the passed kernel if possible. - * - * The kernel will run on a single thread if any of these conditions is true: - * - ICPPKernel::is_parallelisable() returns false - * - The scheduler has been initialized with only one thread. - * - * @param[in] kernel Kernel to execute. - * @param[in] hints Hints for the scheduler. - */ + + // Inherited functions overridden + void set_num_threads(unsigned int num_threads) override; + void set_num_threads_with_affinity(unsigned int num_threads, BindFunc func) override; + unsigned int num_threads() const override; void schedule(ICPPKernel *kernel, const Hints &hints) override; - /** Multithread the execution of the passed kernel if possible. - * - * The kernel will run on a single thread if any of these conditions is true: - * - ICPPKernel::is_parallelisable() returns false - * - The scheduler has been initialized with only one thread. - * - * @param[in] kernel Kernel to execute. - * @param[in] hints Hints for the scheduler. - * @param[in] inputs Vector that contains the input tensors. - * @param[in] outputs Vector that contains the output tensors. - */ void schedule_op(ICPPKernel *kernel, const Hints &hints, const InputTensorMap &inputs, const OutputTensorMap &outputs) override; protected: diff --git a/arm_compute/runtime/IScheduler.h b/arm_compute/runtime/IScheduler.h index 9382c20b29..fff77274bd 100644 --- a/arm_compute/runtime/IScheduler.h +++ b/arm_compute/runtime/IScheduler.h @@ -47,6 +47,13 @@ public: DYNAMIC, /**< Split the workload dynamically using a bucket system */ }; + /** Function to be used and map a given thread id to a logical core id + * + * Mapping function expects the thread index and total number of cores as input, + * and returns the logical core index to bind against + */ + using BindFunc = std::function; + /** When arm_compute::ISchedular::Hints::_split_dimension is initialized with this value * then the schedular is free to break down the problem space over as many dimensions * as it wishes @@ -137,6 +144,13 @@ public: */ virtual void set_num_threads(unsigned int num_threads) = 0; + /** Sets the number of threads the scheduler will use to run the kernels but also using a binding function to pin the threads to given logical cores + * + * @param[in] num_threads If set to 0, then one thread per CPU core available on the system will be used, otherwise the number of threads specified. + * @param[in] func Binding function to use. + */ + virtual void set_num_threads_with_affinity(unsigned int num_threads, BindFunc func); + /** Returns the number of threads that the SingleThreadScheduler has in his pool. * * @return Number of threads available in SingleThreadScheduler. diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp index 9feee6e275..b07aa8ce18 100644 --- a/src/runtime/CPP/CPPScheduler.cpp +++ b/src/runtime/CPP/CPPScheduler.cpp @@ -144,38 +144,31 @@ void process_workloads(std::vector &workloads, ThreadFeede } while(feeder.get_next(workload_index)); } -} //namespace -struct CPPScheduler::Impl final +void set_thread_affinity(int core_id) { - explicit Impl(unsigned int thread_hint) - : _num_threads(thread_hint), _threads(_num_threads - 1) + if(core_id < 0) { + return; } - void set_num_threads(unsigned int num_threads, unsigned int thead_hint) - { - _num_threads = num_threads == 0 ? thead_hint : num_threads; - _threads.resize(_num_threads - 1); - } - unsigned int num_threads() const - { - return _num_threads; - } - - void run_workloads(std::vector &workloads); - - class Thread; - unsigned int _num_threads; - std::list _threads; - arm_compute::Mutex _run_workloads_mutex{}; -}; + cpu_set_t set; + CPU_ZERO(&set); + CPU_SET(core_id, &set); + ARM_COMPUTE_EXIT_ON_MSG(sched_setaffinity(0, sizeof(set), &set), + "Error setting thread affinity"); +} -class CPPScheduler::Impl::Thread final +class Thread final { public: - /** Start a new thread. */ - Thread(); + /** Start a new thread + * + * Thread will be pinned to a given core id if value is non-negative + * + * @param[in] core_pin Core id to pin the thread on. If negative no thread pinning will take place + */ + explicit Thread(int core_pin = -1); Thread(const Thread &) = delete; Thread &operator=(const Thread &) = delete; @@ -211,14 +204,16 @@ private: bool _wait_for_work{ false }; bool _job_complete{ true }; std::exception_ptr _current_exception{ nullptr }; + int _core_pin{ -1 }; }; -CPPScheduler::Impl::Thread::Thread() +Thread::Thread(int core_pin) + : _core_pin(core_pin) { _thread = std::thread(&Thread::worker_thread, this); } -CPPScheduler::Impl::Thread::~Thread() +Thread::~Thread() { // Make sure worker thread has ended if(_thread.joinable()) @@ -229,7 +224,7 @@ CPPScheduler::Impl::Thread::~Thread() } } -void CPPScheduler::Impl::Thread::start(std::vector *workloads, ThreadFeeder &feeder, const ThreadInfo &info) +void Thread::start(std::vector *workloads, ThreadFeeder &feeder, const ThreadInfo &info) { _workloads = workloads; _feeder = &feeder; @@ -242,7 +237,7 @@ void CPPScheduler::Impl::Thread::start(std::vector *worklo _cv.notify_one(); } -void CPPScheduler::Impl::Thread::wait() +void Thread::wait() { { std::unique_lock lock(_m); @@ -255,8 +250,10 @@ void CPPScheduler::Impl::Thread::wait() } } -void CPPScheduler::Impl::Thread::worker_thread() +void Thread::worker_thread() { + set_thread_affinity(_core_pin); + while(true) { std::unique_lock lock(_m); @@ -289,6 +286,44 @@ void CPPScheduler::Impl::Thread::worker_thread() _cv.notify_one(); } } +} //namespace + +struct CPPScheduler::Impl final +{ + explicit Impl(unsigned int thread_hint) + : _num_threads(thread_hint), _threads(_num_threads - 1) + { + } + void set_num_threads(unsigned int num_threads, unsigned int thread_hint) + { + _num_threads = num_threads == 0 ? thread_hint : num_threads; + _threads.resize(_num_threads - 1); + } + void set_num_threads_with_affinity(unsigned int num_threads, unsigned int thread_hint, BindFunc func) + { + _num_threads = num_threads == 0 ? thread_hint : num_threads; + + // Set affinity on main thread + set_thread_affinity(func(0, thread_hint)); + + // Set affinity on worked threads + _threads.clear(); + for(auto i = 1U; i < _num_threads; ++i) + { + _threads.emplace_back(func(i, thread_hint)); + } + } + unsigned int num_threads() const + { + return _num_threads; + } + + void run_workloads(std::vector &workloads); + + unsigned int _num_threads; + std::list _threads; + arm_compute::Mutex _run_workloads_mutex{}; +}; /* * This singleton has been deprecated and will be removed in the next release @@ -313,6 +348,13 @@ void CPPScheduler::set_num_threads(unsigned int num_threads) _impl->set_num_threads(num_threads, num_threads_hint()); } +void CPPScheduler::set_num_threads_with_affinity(unsigned int num_threads, BindFunc func) +{ + // No changes in the number of threads while current workloads are running + arm_compute::lock_guard lock(_impl->_run_workloads_mutex); + _impl->set_num_threads_with_affinity(num_threads, num_threads_hint(), func); +} + unsigned int CPPScheduler::num_threads() const { return _impl->num_threads(); diff --git a/src/runtime/IScheduler.cpp b/src/runtime/IScheduler.cpp index 921e436559..6b961d7dfc 100644 --- a/src/runtime/IScheduler.cpp +++ b/src/runtime/IScheduler.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2018 Arm Limited. + * Copyright (c) 2016-2020 Arm Limited. * * SPDX-License-Identifier: MIT * @@ -41,6 +41,12 @@ CPUInfo &IScheduler::cpu_info() return _cpu_info; } +void IScheduler::set_num_threads_with_affinity(unsigned int num_threads, BindFunc func) +{ + ARM_COMPUTE_UNUSED(num_threads, func); + ARM_COMPUTE_ERROR("Feature for affinity setting is not implemented"); +} + unsigned int IScheduler::num_threads_hint() const { return _num_threads_hint; diff --git a/tests/framework/instruments/SchedulerTimer.cpp b/tests/framework/instruments/SchedulerTimer.cpp index ab1dbbfb4c..75b128591a 100644 --- a/tests/framework/instruments/SchedulerTimer.cpp +++ b/tests/framework/instruments/SchedulerTimer.cpp @@ -63,6 +63,11 @@ public: _real_scheduler.set_num_threads(num_threads); } + void set_num_threads_with_affinity(unsigned int num_threads, BindFunc func) override + { + _real_scheduler.set_num_threads_with_affinity(num_threads, func); + } + unsigned int num_threads() const override { return _real_scheduler.num_threads(); -- cgit v1.2.1