From abee0016b5b5c4129bc8d30f440449d835a6404a Mon Sep 17 00:00:00 2001 From: Keith Davis Date: Thu, 22 Apr 2021 10:10:34 +0100 Subject: IVGCVSW-5813 Add Async Queue to IRuntime Signed-off-by: Keith Davis Change-Id: Icc0d131c8ee2e9748e2f14762a75962b39c10f9d --- CMakeLists.txt | 3 + include/armnn/IAsyncExecutionCallback.hpp | 43 ++++++ include/armnn/IRuntime.hpp | 23 ++- include/armnn/Types.hpp | 22 ++- src/armnn/AsyncExecutionCallback.cpp | 57 ++++++++ src/armnn/AsyncExecutionCallback.hpp | 48 +++++++ src/armnn/LoadedNetwork.cpp | 160 ++++++++++++++++++++- src/armnn/LoadedNetwork.hpp | 75 ++++++++-- src/armnn/Runtime.cpp | 48 +++++-- src/armnn/Runtime.hpp | 14 ++ .../test/StridedSliceAsyncEndToEndTest.hpp | 159 ++++++++++++++++++-- src/backends/reference/test/RefEndToEndTests.cpp | 11 ++ 12 files changed, 616 insertions(+), 47 deletions(-) create mode 100644 include/armnn/IAsyncExecutionCallback.hpp create mode 100644 src/armnn/AsyncExecutionCallback.cpp create mode 100644 src/armnn/AsyncExecutionCallback.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 86dad31a21..51f08bade9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -136,6 +136,7 @@ list(APPEND armnn_sources include/armnn/DescriptorsFwd.hpp include/armnn/Exceptions.hpp include/armnn/backends/ILayerSupport.hpp + include/armnn/IAsyncExecutionCallback.hpp include/armnn/ILayerVisitor.hpp include/armnn/INetwork.hpp include/armnn/IProfiler.hpp @@ -302,6 +303,8 @@ list(APPEND armnn_sources src/armnn/layers/TransposeLayer.cpp src/armnn/layers/UnmapLayer.cpp src/armnn/layers/UnmapLayer.hpp + src/armnn/AsyncExecutionCallback.cpp + src/armnn/AsyncExecutionCallback.hpp src/armnn/BackendRegistry.cpp src/armnn/BackendSettings.hpp src/armnn/BackendHelper.cpp diff --git a/include/armnn/IAsyncExecutionCallback.hpp b/include/armnn/IAsyncExecutionCallback.hpp new file mode 100644 index 0000000000..045ec4581f --- /dev/null +++ b/include/armnn/IAsyncExecutionCallback.hpp @@ -0,0 +1,43 @@ +// +// Copyright © 2021 Arm Ltd and Contributors. All rights reserved. +// SPDX-License-Identifier: MIT +// + +#pragma once + +#include "Types.hpp" + +namespace armnn +{ + +namespace experimental +{ + +class IAsyncExecutionCallback; +using IAsyncExecutionCallbackPtr = std::shared_ptr; + +class IAsyncExecutionCallback +{ +public: + virtual ~IAsyncExecutionCallback() {}; + + // Notify the AsyncExecutionCallback object of the armnn execution status + virtual void Notify(armnn::Status status, InferenceTimingPair timeTaken) = 0; + + // Block the calling thread until the AsyncExecutionCallback object allows it to proceed + virtual void Wait() const = 0; + + // Retrieve the ArmNN Status from the AsyncExecutionCallback that has been notified + virtual armnn::Status GetStatus() const = 0; + + // Retrieve the start time before executing the inference + virtual HighResolutionClock GetStartTime() const = 0; + + // Retrieve the time after executing the inference + virtual HighResolutionClock GetEndTime() const = 0; + +}; + +} // experimental + +} // namespace armnn diff --git a/include/armnn/IRuntime.hpp b/include/armnn/IRuntime.hpp index 55c57974dc..f296a5f564 100644 --- a/include/armnn/IRuntime.hpp +++ b/include/armnn/IRuntime.hpp @@ -8,6 +8,7 @@ #include "INetwork.hpp" #include "IProfiler.hpp" #include "IWorkingMemHandle.hpp" +#include "IAsyncExecutionCallback.hpp" #include "Tensor.hpp" #include "Types.hpp" #include "TypesUtils.hpp" @@ -31,20 +32,24 @@ struct INetworkProperties ARMNN_DEPRECATED_MSG("Please use INetworkProperties constructor with MemorySource argument") INetworkProperties(bool importEnabled = false, bool exportEnabled = false, - bool asyncEnabled = false) + bool asyncEnabled = false, + size_t numThreads = 0) : m_ImportEnabled(importEnabled) , m_ExportEnabled(exportEnabled) , m_AsyncEnabled(asyncEnabled) + , m_NumThreads(numThreads) , m_InputSource(MemorySource::Undefined) , m_OutputSource(MemorySource::Undefined) {} INetworkProperties(bool asyncEnabled, MemorySource m_InputSource, - MemorySource m_OutputSource) + MemorySource m_OutputSource, + size_t numThreads = 0) : m_ImportEnabled(m_InputSource != MemorySource::Undefined) , m_ExportEnabled(m_OutputSource != MemorySource::Undefined) , m_AsyncEnabled(asyncEnabled) + , m_NumThreads(numThreads) , m_InputSource(m_InputSource) , m_OutputSource(m_OutputSource) {} @@ -54,7 +59,9 @@ struct INetworkProperties /// Deprecated and will be removed in future release. const bool m_ExportEnabled; - const bool m_AsyncEnabled; + const bool m_AsyncEnabled; + const size_t m_NumThreads; + const MemorySource m_InputSource; const MemorySource m_OutputSource; @@ -184,6 +191,16 @@ public: const InputTensors& inputTensors, const OutputTensors& outputTensors); + /// This is an experimental function + /// Schedule a thread safe execution by taking the input tensors and an execution priority for Quality of Service. + /// The output tensors will then be filled and the callback object will notify that the execution has either + /// succeeded or failed. + void Schedule(NetworkId networkId, + const InputTensors& inputTensors, + const OutputTensors& outputTensors, + const QosExecPriority priority, + std::shared_ptr callback); + /// Unloads a network from the IRuntime. /// At the moment this only removes the network from the m_Impl->m_Network. /// This might need more work in the future to be AndroidNN compliant. diff --git a/include/armnn/Types.hpp b/include/armnn/Types.hpp index bc41003c57..9e46d08501 100644 --- a/include/armnn/Types.hpp +++ b/include/armnn/Types.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include "BackendId.hpp" #include "Exceptions.hpp" #include "Deprecated.hpp" @@ -20,6 +21,9 @@ constexpr unsigned int MaxNumOfTensorDimensions = 5U; /// The lowest performance data capture interval we support is 10 miliseconds. constexpr unsigned int LOWEST_CAPTURE_PERIOD = 10000u; +/// Variable to control expire rate of priority queue +constexpr unsigned int EXPIRE_RATE = 3U; + /// @enum Status enumeration /// @var Status::Successful /// @var Status::Failure @@ -31,14 +35,14 @@ enum class Status enum class DataType { - Float16 = 0, - Float32 = 1, + Float16 = 0, + Float32 = 1, QAsymmU8 = 2, Signed32 = 3, - Boolean = 4, + Boolean = 4, QSymmS16 = 5, QuantizedSymm8PerAxis ARMNN_DEPRECATED_ENUM_MSG("Per Axis property inferred by number of scales in TensorInfo") = 6, - QSymmS8 = 7, + QSymmS8 = 7, QAsymmS8 = 8, BFloat16 = 9, Signed64 = 10, @@ -53,6 +57,13 @@ enum class DataLayout NHWC = 2 }; +enum class QosExecPriority +{ + Low = 0, + Medium = 1, + High = 2 +}; + enum class ActivationFunction { Sigmoid = 0, @@ -304,6 +315,9 @@ class ITensorHandle; /// @param tensorHandle - TensorHandle for the input tensor to the Debug layer using DebugCallbackFunction = std::function; +/// Define a timer and associated inference ID for recording execution times +using HighResolutionClock = std::chrono::high_resolution_clock::time_point; +using InferenceTimingPair = std::pair; namespace profiling { diff --git a/src/armnn/AsyncExecutionCallback.cpp b/src/armnn/AsyncExecutionCallback.cpp new file mode 100644 index 0000000000..c44808918d --- /dev/null +++ b/src/armnn/AsyncExecutionCallback.cpp @@ -0,0 +1,57 @@ +// +// Copyright © 2021 Arm Ltd and Contributors. All rights reserved. +// SPDX-License-Identifier: MIT +// + +#include + +namespace armnn +{ + +namespace experimental +{ + +void AsyncExecutionCallback::Notify(armnn::Status status, InferenceTimingPair timeTaken) +{ + { + std::lock_guard hold(m_Mutex); + if (m_Notified) + { + return; + } + // store results and mark as notified + m_Status = status; + m_StartTime = timeTaken.first; + m_EndTime = timeTaken.second; + m_Notified = true; + } + m_Condition.notify_all(); +} + +void AsyncExecutionCallback::Wait() const +{ + std::unique_lock lock(m_Mutex); + m_Condition.wait(lock, [this] { return m_Notified; }); +} + +armnn::Status AsyncExecutionCallback::GetStatus() const +{ + Wait(); + return m_Status; +} + +HighResolutionClock AsyncExecutionCallback::GetStartTime() const +{ + Wait(); + return m_StartTime; +} + +HighResolutionClock AsyncExecutionCallback::GetEndTime() const +{ + Wait(); + return m_EndTime; +} + +} // namespace experimental + +} // namespace armnn \ No newline at end of file diff --git a/src/armnn/AsyncExecutionCallback.hpp b/src/armnn/AsyncExecutionCallback.hpp new file mode 100644 index 0000000000..c17b839748 --- /dev/null +++ b/src/armnn/AsyncExecutionCallback.hpp @@ -0,0 +1,48 @@ +// +// Copyright © 2021 Arm Ltd and Contributors. All rights reserved. +// SPDX-License-Identifier: MIT +// + +#pragma once + +#include +#include +#include + +#include +#include + +namespace armnn +{ + +namespace experimental +{ + +class AsyncExecutionCallback final : public IAsyncExecutionCallback +{ +public: + AsyncExecutionCallback() + {} + ~AsyncExecutionCallback() + {} + + void Notify(armnn::Status status, InferenceTimingPair timeTaken); + void Wait() const; + + armnn::Status GetStatus() const; + HighResolutionClock GetStartTime() const; + HighResolutionClock GetEndTime() const; + +private: + mutable std::mutex m_Mutex; + mutable std::condition_variable m_Condition; + + HighResolutionClock m_StartTime; + HighResolutionClock m_EndTime; + armnn::Status m_Status = Status::Failure; + bool m_Notified = false; +}; + +} // namespace experimental + +} // namespace armnn \ No newline at end of file diff --git a/src/armnn/LoadedNetwork.cpp b/src/armnn/LoadedNetwork.cpp index 46eb9883fb..67de00f0f3 100644 --- a/src/armnn/LoadedNetwork.cpp +++ b/src/armnn/LoadedNetwork.cpp @@ -24,6 +24,7 @@ #include #include +#include namespace armnn { @@ -84,7 +85,8 @@ void AddWorkloadStructure(std::unique_ptr& timelineUtils std::unique_ptr LoadedNetwork::MakeLoadedNetwork(std::unique_ptr net, std::string& errorMessage, const INetworkProperties& networkProperties, - profiling::ProfilingService& profilingService) + profiling::ProfilingService& profilingService, + const NetworkId networkIdOut) { std::unique_ptr loadedNetwork; @@ -98,7 +100,7 @@ std::unique_ptr LoadedNetwork::MakeLoadedNetwork(std::unique_ptr< try { - loadedNetwork.reset(new LoadedNetwork(std::move(net), networkProperties, profilingService)); + loadedNetwork.reset(new LoadedNetwork(std::move(net), networkProperties, profilingService, networkIdOut)); } catch (const armnn::RuntimeException& error) { @@ -118,9 +120,11 @@ std::unique_ptr LoadedNetwork::MakeLoadedNetwork(std::unique_ptr< LoadedNetwork::LoadedNetwork(std::unique_ptr net, const INetworkProperties& networkProperties, - profiling::ProfilingService& profilingService) : + profiling::ProfilingService& profilingService, + const NetworkId networkId) : m_OptimizedNetwork(std::move(net)), m_NetworkProperties(networkProperties), + m_NetworkId(networkId), m_TensorHandleFactoryRegistry(), m_ProfilingService(profilingService) { @@ -161,6 +165,14 @@ LoadedNetwork::LoadedNetwork(std::unique_ptr net, } } } + + // Create the thread pool which will have working memory handles assigned to each thread + // Should occur after factories are registered so thet the WorkingMemHandles can be created + if (m_NetworkProperties.m_NumThreads > 0 && networkProperties.m_AsyncEnabled) + { + CreateThreadPool(m_NetworkProperties.m_NumThreads); + } + if (!networkProperties.m_AsyncEnabled) { for (auto &&layer : order) @@ -846,6 +858,147 @@ bool LoadedNetwork::Execute(std::unique_ptr& timelineUti return success; } +void LoadedNetwork::CreateThreadPool(std::size_t numThreads) +{ + + for (auto i = 0u; i < numThreads; ++i) + { + std::unique_ptr workingMemHandle = CreateWorkingMemHandle(m_NetworkId); + m_Threads.emplace_back( + std::make_unique( + &LoadedNetwork::ProcessExecPriorities, + this, + std::move(workingMemHandle) + ) + ); + } +} + +void LoadedNetwork::TerminateThreadPool() noexcept +{ + { + std::unique_lock threadPoolLock(m_ThreadPoolMutex); + m_TerminatePool = true; + } + + m_ThreadPoolEvent.notify_all(); + + for (auto &thread : m_Threads) + { + thread->join(); + } +} + +void LoadedNetwork::Schedule(const InputTensors& inputTensors, + const OutputTensors& outputTensors, + const QosExecPriority priority, + std::shared_ptr cb) +{ + // Group execution parameters so that they can be easily added to the queue + ExecutionTuple groupExecParams = std::make_tuple(inputTensors, outputTensors, cb); + std::shared_ptr operation = make_shared(groupExecParams); + + // Add a message to the queue and notify the request thread + std::unique_lock lock(m_ThreadPoolMutex); + switch (priority) { + case QosExecPriority::High: + m_HighPriorityQueue.push(operation); + break; + case QosExecPriority::Low: + m_LowPriorityQueue.push(operation); + break; + case QosExecPriority::Medium: + default: + m_MediumPriorityQueue.push(operation); + } + m_ThreadPoolEvent.notify_one(); +} + +void LoadedNetwork::ProcessExecPriorities(std::unique_ptr workingMemHandle) +{ + int expireRate = EXPIRE_RATE; + int highPriorityCount = 0; + int mediumPriorityCount = 0; + + IWorkingMemHandle& workingMemHandleRef = *workingMemHandle.get(); + + while (true) + { + std::shared_ptr currentExecInProgress(nullptr); + { + // Wait for a message to be added to the queue + // This is in a separate scope to minimise the lifetime of the lock + std::unique_lock lock(m_ThreadPoolMutex); + + m_ThreadPoolEvent.wait(lock, + [=] { + return m_TerminatePool || !m_HighPriorityQueue.empty() || + !m_MediumPriorityQueue.empty() || !m_LowPriorityQueue.empty(); + }); + + if (m_TerminatePool && m_HighPriorityQueue.empty() && m_MediumPriorityQueue.empty() && + m_LowPriorityQueue.empty()) + { + break; + } + + // Get the message to process from the front of each queue based on priority from high to low + // Get high priority first if it does not exceed the expire rate + if (!m_HighPriorityQueue.empty() && highPriorityCount < expireRate) + { + currentExecInProgress = m_HighPriorityQueue.front(); + m_HighPriorityQueue.pop(); + highPriorityCount += 1; + } + // If high priority queue is empty or the count exceeds the expire rate, get medium priority message + else if (!m_MediumPriorityQueue.empty() && mediumPriorityCount < expireRate) + { + currentExecInProgress = m_MediumPriorityQueue.front(); + m_MediumPriorityQueue.pop(); + mediumPriorityCount += 1; + // Reset high priority count + highPriorityCount = 0; + } + // If medium priority queue is empty or the count exceeds the expire rate, get low priority message + else if (!m_LowPriorityQueue.empty()) + { + currentExecInProgress = m_LowPriorityQueue.front(); + m_LowPriorityQueue.pop(); + // Reset high and medium priority count + highPriorityCount = 0; + mediumPriorityCount = 0; + } + else + { + // Reset high and medium priority count + highPriorityCount = 0; + mediumPriorityCount = 0; + continue; + } + } + + // invoke the asynchronous execution method + auto inputTensors = std::get<0>(*currentExecInProgress); + auto outputTensors = std::get<1>(*currentExecInProgress); + auto cb = std::get<2>(*currentExecInProgress); + + // Get time at start of inference + HighResolutionClock startTime = armnn::GetTimeNow(); + + try // executing the inference + { + // Execute and populate the time at end of inference in the callback + Execute(inputTensors, outputTensors, workingMemHandleRef) == Status::Success ? + cb->Notify(Status::Success, std::make_pair(startTime, armnn::GetTimeNow())) : + cb->Notify(Status::Failure, std::make_pair(startTime, armnn::GetTimeNow())); + } + catch (const RuntimeException& error) + { + cb->Notify(Status::Failure, std::make_pair(startTime, armnn::GetTimeNow())); + } + } +} + void LoadedNetwork::EnqueueInput(const BindableLayer& layer, const ConstTensor& inputTensor, WorkingMemHandle& context) @@ -1096,6 +1249,7 @@ Status LoadedNetwork::Execute(const InputTensors& inputTensors, EnqueueOutput(*outputLayer, GetOutputTensor(outputLayer->GetBindingId(), outputTensors), workingMemHandle); } } + return executionSucceeded ? Status::Success : Status::Failure; } diff --git a/src/armnn/LoadedNetwork.hpp b/src/armnn/LoadedNetwork.hpp index 51092c744e..b5474db294 100644 --- a/src/armnn/LoadedNetwork.hpp +++ b/src/armnn/LoadedNetwork.hpp @@ -19,13 +19,14 @@ #include #include +#include #include namespace cl { - class Context; - class CommandQueue; - class Device; +class Context; +class CommandQueue; +class Device; } namespace armnn @@ -34,8 +35,19 @@ namespace armnn class LoadedNetwork { public: - using WorkloadQueue = std::vector< std::unique_ptr >; - ~LoadedNetwork(){ FreeWorkingMemory(); } + using WorkloadQueue = std::vector>; + + using ExecutionTuple = std::tuple>; + + using ExecutionQueue = std::queue>; + + ~LoadedNetwork() + { + FreeWorkingMemory(); + TerminateThreadPool(); + } /// Create a new unique WorkingMemHandle object. Create multiple handles if you wish to have /// overlapped Execution by calling this function from different threads. @@ -44,16 +56,25 @@ public: TensorInfo GetInputTensorInfo(LayerBindingId layerId) const; TensorInfo GetOutputTensorInfo(LayerBindingId layerId) const; + /// Single thread execution of the loaded network Status EnqueueWorkload(const InputTensors& inputTensors, const OutputTensors& outputTensors); + /// Thread safe execution of the loaded network Status Execute(const InputTensors& inputTensors, const OutputTensors& outputTensors, IWorkingMemHandle& workingMemHandle); + /// Schedule an asynchronous execution on the loaded network + void Schedule(const InputTensors& inputTensors, + const OutputTensors& outputTensors, + const QosExecPriority priority, + std::shared_ptr cb); + static std::unique_ptr MakeLoadedNetwork(std::unique_ptr net, - std::string & errorMessage, + std::string& errorMessage, const INetworkProperties& networkProperties, - profiling::ProfilingService& profilingService); + profiling::ProfilingService& profilingService, + const NetworkId networkIdOut); // NOTE we return by reference as the purpose of this method is only to provide // access to the private m_Profiler and in theory we should not need to increment @@ -87,7 +108,8 @@ private: LoadedNetwork(std::unique_ptr net, const INetworkProperties& networkProperties, - profiling::ProfilingService& profilingService); + profiling::ProfilingService& profilingService, + const NetworkId networkIdOut); void EnqueueInput(const BindableLayer& layer, ITensorHandle* tensorHandle, const TensorInfo& tensorInfo); @@ -97,9 +119,15 @@ private: void EnqueueOutput(const BindableLayer& layer, const Tensor& outputTensor, WorkingMemHandle& handle); + void ProcessExecPriorities(std::unique_ptr workingMemHandle); + bool Execute(std::unique_ptr& timelineUtils, profiling::ProfilingGuid inferenceGuid); + void CreateThreadPool(std::size_t numThreads); + + void TerminateThreadPool() noexcept; + const IWorkloadFactory& GetWorkloadFactory(const Layer& layer) const; using BackendPtrMap = std::unordered_map; @@ -108,19 +136,38 @@ private: WorkloadFactoryMap m_WorkloadFactories; std::unique_ptr m_OptimizedNetwork; - WorkloadQueue m_InputQueue; - WorkloadQueue m_WorkloadQueue; - WorkloadQueue m_OutputQueue; - std::shared_ptr m_Profiler; + std::shared_ptr m_Profiler; + + WorkloadQueue m_InputQueue; + WorkloadQueue m_WorkloadQueue; + WorkloadQueue m_OutputQueue; mutable std::mutex m_WorkingMemMutex; - bool m_IsWorkingMemAllocated=false; + bool m_IsWorkingMemAllocated = false; + + std::vector> m_Threads; + std::stack m_WorkingMemHandles; + + ExecutionQueue m_HighPriorityQueue; + ExecutionQueue m_MediumPriorityQueue; + ExecutionQueue m_LowPriorityQueue; + + // Condition Variables require mutex which will guard the shared state. + // Has an event happened? Stop signal for example + std::condition_variable m_ThreadPoolEvent; + std::mutex m_ThreadPoolMutex; + + // The shared state for conditional variable + bool m_TerminatePool = false; + INetworkProperties m_NetworkProperties; + const NetworkId m_NetworkId; + TensorHandleFactoryRegistry m_TensorHandleFactoryRegistry; - profiling::ProfilingService& m_ProfilingService; + profiling::ProfilingService& m_ProfilingService; }; } diff --git a/src/armnn/Runtime.cpp b/src/armnn/Runtime.cpp index 1dd86a61ce..e04cf5ddaf 100644 --- a/src/armnn/Runtime.cpp +++ b/src/armnn/Runtime.cpp @@ -88,6 +88,15 @@ Status IRuntime::Execute(IWorkingMemHandle& workingMemHandle, return pRuntimeImpl->Execute(workingMemHandle, inputTensors, outputTensors); } +void IRuntime::Schedule(NetworkId networkId, + const InputTensors& inputTensors, + const OutputTensors& outputTensors, + const QosExecPriority priority, + std::shared_ptr cb) +{ + pRuntimeImpl->Schedule(networkId, inputTensors, outputTensors, priority, cb); +} + Status IRuntime::UnloadNetwork(NetworkId networkId) { return pRuntimeImpl->UnloadNetwork(networkId); @@ -150,7 +159,8 @@ Status RuntimeImpl::LoadNetwork(NetworkId& networkIdOut, std::unique_ptr(rawNetwork), errorMessage, networkProperties, - m_ProfilingService); + m_ProfilingService, + networkIdOut); if (!loadedNetwork) { @@ -439,24 +449,42 @@ Status RuntimeImpl::Execute(IWorkingMemHandle& iWorkingMemHandle, } if (!loadedNetwork->IsAsyncEnabled()) { - ARMNN_LOG(error) << "Network " << networkId << " is not async enabled.\n"; + ARMNN_LOG(error) << "Attempting execute " << networkId << " when it is not async enabled.\n"; return Status::Failure; } ProfilerManager::GetInstance().RegisterProfiler(loadedNetwork->GetProfiler().get()); ARMNN_SCOPED_PROFILING_EVENT(Compute::Undefined, "Execute"); - static thread_local NetworkId lastId = networkId; - if (lastId != networkId) + return loadedNetwork->Execute(inputTensors, outputTensors, iWorkingMemHandle); +} + +void RuntimeImpl::Schedule(NetworkId networkId, + const InputTensors& inputTensors, + const OutputTensors& outputTensors, + const QosExecPriority priority, + std::shared_ptr callback) +{ + LoadedNetwork* loadedNetwork = GetLoadedNetworkPtr(networkId); + + if (!loadedNetwork) { - LoadedNetworkFuncSafe(lastId, [](LoadedNetwork* network) - { - network->FreeWorkingMemory(); - }); + throw armnn::Exception( + "Network with ID of " + std::to_string(networkId) + " does not exist \n" + ); + } + if (!loadedNetwork->IsAsyncEnabled()) + { + throw armnn::Exception( + "Attempting to schedule Network " + std::to_string(networkId) + " when it is not async enabled \n" + ); } - lastId=networkId; - return loadedNetwork->Execute(inputTensors, outputTensors, iWorkingMemHandle); + ProfilerManager::GetInstance().RegisterProfiler(loadedNetwork->GetProfiler().get()); + + ARMNN_SCOPED_PROFILING_EVENT(Compute::Undefined, "Schedule"); + + loadedNetwork->Schedule(inputTensors, outputTensors, priority, callback); } /// Create a new unique WorkingMemHandle object. Create multiple handles if you wish to have diff --git a/src/armnn/Runtime.hpp b/src/armnn/Runtime.hpp index da5445383f..55a4accf67 100644 --- a/src/armnn/Runtime.hpp +++ b/src/armnn/Runtime.hpp @@ -60,6 +60,20 @@ public: const InputTensors& inputTensors, const OutputTensors& outputTensors); + /// This is an experimental function. + /// Schedule a thread safe execution by taking the input tensors and an execution priority for Quality of Service. + /// The output tensors will then be filled and the callback object will notify that the execution has either + /// succeeded or failed. + void Schedule(NetworkId networkId, + const InputTensors& inputTensors, + const OutputTensors& outputTensors, + const QosExecPriority priority, + std::shared_ptr callback); + + /// This is an experimental function. + /// Evaluates a network using input in inputTensors and outputs filled into outputTensors. + /// This function performs a thread safe execution of the network. Returns once execution is complete. + /// Will block until this and any other thread using the same workingMem object completes. Status Execute(IWorkingMemHandle& workingMemHandle, const InputTensors& inputTensors, const OutputTensors& outputTensors); diff --git a/src/backends/backendsCommon/test/StridedSliceAsyncEndToEndTest.hpp b/src/backends/backendsCommon/test/StridedSliceAsyncEndToEndTest.hpp index b20ff4f142..87cade7dca 100644 --- a/src/backends/backendsCommon/test/StridedSliceAsyncEndToEndTest.hpp +++ b/src/backends/backendsCommon/test/StridedSliceAsyncEndToEndTest.hpp @@ -9,7 +9,9 @@ #include #include +#include +#include #include #include @@ -114,27 +116,29 @@ void AsyncThreadedEndToEndTestImpl(INetworkPtr network, } - - template, typename TOutput = ResolveType > + typename TInput = ResolveType, typename TOutput = ResolveType> void AsyncEndToEndTestImpl(INetworkPtr network, const std::map>& inputTensorData, const std::map>& expectedOutputData, std::vector backends, - float tolerance = 0.000001f) + float tolerance = 0.000001f, + size_t numThreads = 0) { // Create Runtime in which test will run IRuntime::CreationOptions options; - IRuntimePtr runtime(IRuntime::Create(options)); + IRuntimePtr runtime(IRuntime::Create(options)); // Optimize the Network IOptimizedNetworkPtr optNet = Optimize(*network, backends, runtime->GetDeviceSpec()); // Creates AsyncNetwork NetworkId networkId = 0; + std::string errorMessage; - const INetworkProperties networkProperties(true, MemorySource::Undefined, MemorySource::Undefined); + + const INetworkProperties networkProperties(true, MemorySource::Undefined, MemorySource::Undefined, numThreads); + runtime->LoadNetwork(networkId, std::move(optNet), errorMessage, networkProperties); InputTensors inputTensors; @@ -157,17 +161,50 @@ void AsyncEndToEndTestImpl(INetworkPtr network, outputStorage.at(it.first).data())}); } - // Create WorkingMemHandle for this async network - std::unique_ptr workingMemHandle = runtime->CreateWorkingMemHandle(networkId); - IWorkingMemHandle& workingMemHandleRef = *workingMemHandle.get(); + if (numThreads == 0) + { + // Create WorkingMemHandle for this async network + std::unique_ptr workingMemHandle = runtime->CreateWorkingMemHandle(networkId); + IWorkingMemHandle& workingMemHandleRef = *workingMemHandle.get(); - // Run the async network - runtime->Execute(workingMemHandleRef, inputTensors, outputTensors); + // Run the async network + runtime->Execute(workingMemHandleRef, inputTensors, outputTensors); + } + else + { + std::vector callbacks; + + // Create 1000 callbacks that will be checked post scheduling + for (size_t i = 0; i < 1000; ++i) + { + callbacks.emplace_back(std::make_shared()); + } + + // For the asyncronous execution, we are adding a pool of working memory handles (1 per thread) in the + // LoadedNetwork with a each scheduled inference having a spefic priority + for (IAsyncExecutionCallbackPtr cb : callbacks) + { + runtime->Schedule(networkId, + inputTensors, + outputTensors, + static_cast(rand()%3), + cb); + } + + // Wait until the execution signals a notify + for (IAsyncExecutionCallbackPtr cb : callbacks) + { + cb->Wait(); + + // Checks the results. + BOOST_CHECK(cb->GetStatus() == Status::Success); + } + } - // Checks the results. for (auto&& it : expectedOutputData) { std::vector out = outputStorage.at(it.first); + for (unsigned int i = 0; i < out.size(); ++i) { BOOST_CHECK(Compare(it.second[i], out[i], tolerance) == true); @@ -263,7 +300,103 @@ void StridedSlicedEndToEndTest(const std::vector& backends) std::map> inputTensorData = {{0, inputData}}; std::map> expectedOutputData = {{0, outputExpected}}; - AsyncEndToEndTestImpl(move(net), inputTensorData, expectedOutputData, backends, 1); + AsyncEndToEndTestImpl(move(net), inputTensorData, expectedOutputData, backends, 0.000001f); +} + +template +void AsyncScheduledStridedSlicedEndToEndTest(const std::vector& backends) +{ + using namespace armnn; + using T = ResolveType; + + const TensorShape& inputShape = {3, 2, 3, 1}; + const TensorShape& outputShape = {1, 2, 3, 1}; + const std::vector& beginData = {1, 0, 0, 0}; + const std::vector& endData = {2, 2, 3, 1}; + const std::vector& stridesData = {1, 1, 1, 1}; + int beginMask = 0; + int endMask = 0; + int shrinkAxisMask = 0; + int ellipsisMask = 0; + int newAxisMask = 0; + + // Builds up the structure of the network + INetworkPtr net = CreateStridedSliceNetwork(inputShape, + outputShape, + beginData, + endData, + stridesData, + beginMask, + endMask, + shrinkAxisMask, + ellipsisMask, + newAxisMask); + + // Creates structures for input & output. + std::vector inputData{ + 1.0f, 1.0f, 1.0f, 2.0f, 2.0f, 2.0f, + + 3.0f, 3.0f, 3.0f, 4.0f, 4.0f, 4.0f, + + 5.0f, 5.0f, 5.0f, 6.0f, 6.0f, 6.0f + }; + + std::vector outputExpected{ + 3.0f, 3.0f, 3.0f, 4.0f, 4.0f, 4.0f + }; + + std::map> inputTensorData = {{0, inputData}}; + std::map> expectedOutputData = {{0, outputExpected}}; + + AsyncEndToEndTestImpl(move(net), inputTensorData, expectedOutputData, backends, 0.000001f, 1); +} + +template +void AsyncScheduledStridedSlicedMultiThreadedEndToEndTest(const std::vector& backends) +{ + using namespace armnn; + using T = ResolveType; + + const TensorShape& inputShape = {3, 2, 3, 1}; + const TensorShape& outputShape = {1, 2, 3, 1}; + const std::vector& beginData = {1, 0, 0, 0}; + const std::vector& endData = {2, 2, 3, 1}; + const std::vector& stridesData = {1, 1, 1, 1}; + int beginMask = 0; + int endMask = 0; + int shrinkAxisMask = 0; + int ellipsisMask = 0; + int newAxisMask = 0; + + // Builds up the structure of the network + INetworkPtr net = CreateStridedSliceNetwork(inputShape, + outputShape, + beginData, + endData, + stridesData, + beginMask, + endMask, + shrinkAxisMask, + ellipsisMask, + newAxisMask); + + // Creates structures for input & output. + std::vector inputData{ + 1.0f, 1.0f, 1.0f, 2.0f, 2.0f, 2.0f, + + 3.0f, 3.0f, 3.0f, 4.0f, 4.0f, 4.0f, + + 5.0f, 5.0f, 5.0f, 6.0f, 6.0f, 6.0f + }; + + std::vector outputExpected{ + 3.0f, 3.0f, 3.0f, 4.0f, 4.0f, 4.0f + }; + + std::map> inputTensorData = {{0, inputData}}; + std::map> expectedOutputData = {{0, outputExpected}}; + + AsyncEndToEndTestImpl(move(net), inputTensorData, expectedOutputData, backends, 0.000001f, 3); } template diff --git a/src/backends/reference/test/RefEndToEndTests.cpp b/src/backends/reference/test/RefEndToEndTests.cpp index 0839c1c7af..1b96db9678 100644 --- a/src/backends/reference/test/RefEndToEndTests.cpp +++ b/src/backends/reference/test/RefEndToEndTests.cpp @@ -1346,6 +1346,17 @@ BOOST_AUTO_TEST_CASE(RefAsyncFP32StridedSlicedMultiThreadedEndToEndTest) { armnn::experimental::StridedSlicedMultiThreadedEndToEndTest(defaultBackends); } + +BOOST_AUTO_TEST_CASE(RefAsyncScheduledFP32StridedSlicedEndToEndTest) +{ + armnn::experimental::AsyncScheduledStridedSlicedEndToEndTest(defaultBackends); +} + +BOOST_AUTO_TEST_CASE(RefAsyncScheduledStridedSlicedMultiThreadedEndToEndTest) +{ + using namespace armnn::experimental; + AsyncScheduledStridedSlicedMultiThreadedEndToEndTest(defaultBackends); +} #endif BOOST_AUTO_TEST_SUITE_END() -- cgit v1.2.1