diff options
Diffstat (limited to 'src/runtime/CPP/CPPScheduler.cpp')
-rw-r--r-- | src/runtime/CPP/CPPScheduler.cpp | 133 |
1 files changed, 74 insertions, 59 deletions
diff --git a/src/runtime/CPP/CPPScheduler.cpp b/src/runtime/CPP/CPPScheduler.cpp index f112d456c7..9fbdc3a4dd 100644 --- a/src/runtime/CPP/CPPScheduler.cpp +++ b/src/runtime/CPP/CPPScheduler.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 Arm Limited. + * Copyright (c) 2016-2023 Arm Limited. * * SPDX-License-Identifier: MIT * @@ -29,6 +29,7 @@ #include "arm_compute/core/Log.h" #include "arm_compute/core/Utils.h" #include "arm_compute/core/utils/misc/Utility.h" + #include "support/Mutex.h" #include <atomic> @@ -53,8 +54,7 @@ public: * @param[in] start First value that will be returned by the feeder * @param[in] end End condition (The last value returned by get_next() will be end - 1) */ - explicit ThreadFeeder(unsigned int start = 0, unsigned int end = 0) - : _atomic_counter(start), _end(end) + explicit ThreadFeeder(unsigned int start = 0, unsigned int end = 0) : _atomic_counter(start), _end(end) { } /** Return the next element in the range if there is one. @@ -89,8 +89,7 @@ void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeede { ARM_COMPUTE_ERROR_ON(workload_index >= workloads.size()); workloads[workload_index](info); - } - while(feeder.get_next(workload_index)); + } while (feeder.get_next(workload_index)); } /** Set thread affinity. Pin current thread to a particular core @@ -99,17 +98,17 @@ void process_workloads(std::vector<IScheduler::Workload> &workloads, ThreadFeede */ void set_thread_affinity(int core_id) { - if(core_id < 0) + if (core_id < 0) { return; } -#if !defined(__APPLE__) +#if !defined(_WIN64) && !defined(__APPLE__) && !defined(__OpenBSD__) cpu_set_t set; CPU_ZERO(&set); CPU_SET(core_id, &set); ARM_COMPUTE_EXIT_ON_MSG(sched_setaffinity(0, sizeof(set), &set), "Error setting thread affinity"); -#endif /* !defined(__APPLE__) */ +#endif /* !defined(__APPLE__) && !defined(__OpenBSD__) */ } /** There are currently 2 scheduling modes supported by CPPScheduler @@ -150,10 +149,10 @@ public: */ explicit Thread(int core_pin = -1); - Thread(const Thread &) = delete; + Thread(const Thread &) = delete; Thread &operator=(const Thread &) = delete; Thread(Thread &&) = delete; - Thread &operator=(Thread &&) = delete; + Thread &operator=(Thread &&) = delete; /** Destructor. Make the thread join. */ ~Thread(); @@ -172,7 +171,7 @@ public: void start(); /** Wait for the current kernel execution to complete. */ - void wait(); + std::exception_ptr wait(); /** Function ran by the worker thread. */ void worker_thread(); @@ -196,21 +195,20 @@ public: private: std::thread _thread{}; ThreadInfo _info{}; - std::vector<IScheduler::Workload> *_workloads{ nullptr }; - ThreadFeeder *_feeder{ nullptr }; + std::vector<IScheduler::Workload> *_workloads{nullptr}; + ThreadFeeder *_feeder{nullptr}; std::mutex _m{}; std::condition_variable _cv{}; - bool _wait_for_work{ false }; - bool _job_complete{ true }; - std::exception_ptr _current_exception{ nullptr }; - int _core_pin{ -1 }; - std::list<Thread> *_thread_pool{ nullptr }; - unsigned int _wake_beg{ 0 }; - unsigned int _wake_end{ 0 }; + bool _wait_for_work{false}; + bool _job_complete{true}; + std::exception_ptr _current_exception{nullptr}; + int _core_pin{-1}; + std::list<Thread> *_thread_pool{nullptr}; + unsigned int _wake_beg{0}; + unsigned int _wake_end{0}; }; -Thread::Thread(int core_pin) - : _core_pin(core_pin) +Thread::Thread(int core_pin) : _core_pin(core_pin) { _thread = std::thread(&Thread::worker_thread, this); } @@ -218,7 +216,7 @@ Thread::Thread(int core_pin) Thread::~Thread() { // Make sure worker thread has ended - if(_thread.joinable()) + if (_thread.joinable()) { ThreadFeeder feeder; set_workload(nullptr, feeder, ThreadInfo()); @@ -244,24 +242,20 @@ void Thread::start() _cv.notify_one(); } -void Thread::wait() +std::exception_ptr Thread::wait() { { std::unique_lock<std::mutex> lock(_m); _cv.wait(lock, [&] { return _job_complete; }); } - - if(_current_exception) - { - std::rethrow_exception(_current_exception); - } + return _current_exception; } void Thread::worker_thread() { set_thread_affinity(_core_pin); - while(true) + while (true) { std::unique_lock<std::mutex> lock(_m); _cv.wait(lock, [&] { return _wait_for_work; }); @@ -270,18 +264,18 @@ void Thread::worker_thread() _current_exception = nullptr; // Exit if the worker thread has not been fed with workloads - if(_workloads == nullptr || _feeder == nullptr) + if (_workloads == nullptr || _feeder == nullptr) { return; } // Wake up more peer threads from thread pool if this job has been delegated to the current thread - if(_thread_pool != nullptr) + if (_thread_pool != nullptr) { auto thread_it = _thread_pool->begin(); std::advance(thread_it, std::min(static_cast<unsigned int>(_thread_pool->size()), _wake_beg)); auto wake_end = std::min(_wake_end, static_cast<unsigned int>(_info.num_threads - 1)); - for(unsigned int t = _wake_beg; t < wake_end; ++t, ++thread_it) + for (unsigned int t = _wake_beg; t < wake_end; ++t, ++thread_it) { thread_it->start(); } @@ -295,7 +289,7 @@ void Thread::worker_thread() #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED } - catch(...) + catch (...) { _current_exception = std::current_exception(); } @@ -326,11 +320,11 @@ struct CPPScheduler::Impl final : _num_threads(thread_hint), _threads(_num_threads - 1), _mode(Mode::Linear), _wake_fanout(0U) { const auto mode_env_v = utility::tolower(utility::getenv("ARM_COMPUTE_CPP_SCHEDULER_MODE")); - if(mode_env_v == "linear") + if (mode_env_v == "linear") { _forced_mode = ModeToggle::Linear; } - else if(mode_env_v == "fanout") + else if (mode_env_v == "fanout") { _forced_mode = ModeToggle::Fanout; } @@ -354,7 +348,7 @@ struct CPPScheduler::Impl final // Set affinity on worked threads _threads.clear(); - for(auto i = 1U; i < _num_threads; ++i) + for (auto i = 1U; i < _num_threads; ++i) { _threads.emplace_back(func(i, thread_hint)); } @@ -363,20 +357,23 @@ struct CPPScheduler::Impl final void auto_switch_mode(unsigned int num_threads_to_use) { // If the environment variable is set to any of the modes, it overwrites the mode selected over num_threads_to_use - if(_forced_mode == ModeToggle::Fanout || (_forced_mode == ModeToggle::None && num_threads_to_use > 8)) + if (_forced_mode == ModeToggle::Fanout || (_forced_mode == ModeToggle::None && num_threads_to_use > 8)) { set_fanout_mode(m_default_wake_fanout, num_threads_to_use); - ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE("Set CPPScheduler to Fanout mode, with wake up fanout : %d and %d threads to use\n", this->wake_fanout(), num_threads_to_use); + ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE( + "Set CPPScheduler to Fanout mode, with wake up fanout : %d and %d threads to use\n", + this->wake_fanout(), num_threads_to_use); } else // Equivalent to (_forced_mode == ModeToggle::Linear || (_forced_mode == ModeToggle::None && num_threads_to_use <= 8)) { set_linear_mode(); - ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE("Set CPPScheduler to Linear mode, with %d threads to use\n", num_threads_to_use); + ARM_COMPUTE_LOG_INFO_MSG_WITH_FORMAT_CORE("Set CPPScheduler to Linear mode, with %d threads to use\n", + num_threads_to_use); } } void set_linear_mode() { - for(auto &thread : _threads) + for (auto &thread : _threads) { thread.set_linear_mode(); } @@ -388,14 +385,14 @@ struct CPPScheduler::Impl final ARM_COMPUTE_ERROR_ON(num_threads_to_use > _threads.size() + 1); const auto actual_wake_fanout = std::max(2U, std::min(wake_fanout, num_threads_to_use - 1)); auto thread_it = _threads.begin(); - for(auto i = 1U; i < num_threads_to_use; ++i, ++thread_it) + for (auto i = 1U; i < num_threads_to_use; ++i, ++thread_it) { const auto wake_begin = i * actual_wake_fanout - 1; const auto wake_end = std::min((i + 1) * actual_wake_fanout - 1, num_threads_to_use - 1); thread_it->set_fanout_mode(&_threads, wake_begin, wake_end); } // Reset the remaining threads's wake up schedule - while(thread_it != _threads.end()) + while (thread_it != _threads.end()) { thread_it->set_fanout_mode(&_threads, 0U, 0U); ++thread_it; @@ -421,9 +418,9 @@ struct CPPScheduler::Impl final unsigned int _num_threads; std::list<Thread> _threads; arm_compute::Mutex _run_workloads_mutex{}; - Mode _mode{ Mode::Linear }; - ModeToggle _forced_mode{ ModeToggle::None }; - unsigned int _wake_fanout{ 0 }; + Mode _mode{Mode::Linear}; + ModeToggle _forced_mode{ModeToggle::None}; + unsigned int _wake_fanout{0}; }; /* @@ -435,8 +432,7 @@ CPPScheduler &CPPScheduler::get() return scheduler; } -CPPScheduler::CPPScheduler() - : _impl(std::make_unique<Impl>(num_threads_hint())) +CPPScheduler::CPPScheduler() : _impl(std::make_unique<Impl>(num_threads_hint())) { } @@ -469,15 +465,15 @@ void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads) // This is not great because different threads workloads won't run in parallel but at least they // won't interfere each other and deadlock. arm_compute::lock_guard<std::mutex> lock(_impl->_run_workloads_mutex); - const unsigned int num_threads_to_use = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size())); - if(num_threads_to_use < 1) + const unsigned int num_threads_to_use = std::min(_impl->num_threads(), static_cast<unsigned int>(workloads.size())); + if (num_threads_to_use < 1) { return; } // Re-adjust the mode if the actual number of threads to use is different from the number of threads created _impl->auto_switch_mode(num_threads_to_use); int num_threads_to_start = 0; - switch(_impl->mode()) + switch (_impl->mode()) { case CPPScheduler::Impl::Mode::Fanout: { @@ -493,35 +489,54 @@ void CPPScheduler::run_workloads(std::vector<IScheduler::Workload> &workloads) } ThreadFeeder feeder(num_threads_to_use, workloads.size()); ThreadInfo info; - info.cpu_info = &_cpu_info; + info.cpu_info = &cpu_info(); info.num_threads = num_threads_to_use; unsigned int t = 0; auto thread_it = _impl->_threads.begin(); // Set num_threads_to_use - 1 workloads to the threads as the remaining 1 is left to the main thread - for(; t < num_threads_to_use - 1; ++t, ++thread_it) + for (; t < num_threads_to_use - 1; ++t, ++thread_it) { info.thread_id = t; thread_it->set_workload(&workloads, feeder, info); } thread_it = _impl->_threads.begin(); - for(int i = 0; i < num_threads_to_start; ++i, ++thread_it) + for (int i = 0; i < num_threads_to_start; ++i, ++thread_it) { thread_it->start(); } - info.thread_id = t; // Set main thread's thread_id - process_workloads(workloads, feeder, info); // Main thread processes workloads + info.thread_id = t; // Set main thread's thread_id + std::exception_ptr last_exception = nullptr; +#ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED + try + { +#endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */ + process_workloads(workloads, feeder, info); // Main thread processes workloads #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED + } + catch (...) + { + last_exception = std::current_exception(); + } + try { #endif /* ARM_COMPUTE_EXCEPTIONS_DISABLED */ thread_it = _impl->_threads.begin(); - for(unsigned int i = 0; i < num_threads_to_use - 1; ++i, ++thread_it) + for (unsigned int i = 0; i < num_threads_to_use - 1; ++i, ++thread_it) + { + std::exception_ptr current_exception = thread_it->wait(); + if (current_exception) + { + last_exception = current_exception; + } + } + if (last_exception) { - thread_it->wait(); + std::rethrow_exception(last_exception); } #ifndef ARM_COMPUTE_EXCEPTIONS_DISABLED } - catch(const std::system_error &e) + catch (const std::system_error &e) { std::cerr << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n'; } |