aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Davis <keith.davis@arm.com>2021-04-22 10:10:34 +0100
committerNarumol Prangnawarat <narumol.prangnawarat@arm.com>2021-05-07 18:12:22 +0100
commitabee0016b5b5c4129bc8d30f440449d835a6404a (patch)
treee8dc790452af46603bd3eec3e670b9b5afc9dc9d
parentdca769b9673a1e197258f7b35637b2a17f1a9e8b (diff)
downloadarmnn-abee0016b5b5c4129bc8d30f440449d835a6404a.tar.gz
IVGCVSW-5813 Add Async Queue to IRuntime
Signed-off-by: Keith Davis <keith.davis@arm.com> Change-Id: Icc0d131c8ee2e9748e2f14762a75962b39c10f9d
-rw-r--r--CMakeLists.txt3
-rw-r--r--include/armnn/IAsyncExecutionCallback.hpp43
-rw-r--r--include/armnn/IRuntime.hpp23
-rw-r--r--include/armnn/Types.hpp22
-rw-r--r--src/armnn/AsyncExecutionCallback.cpp57
-rw-r--r--src/armnn/AsyncExecutionCallback.hpp48
-rw-r--r--src/armnn/LoadedNetwork.cpp160
-rw-r--r--src/armnn/LoadedNetwork.hpp75
-rw-r--r--src/armnn/Runtime.cpp48
-rw-r--r--src/armnn/Runtime.hpp14
-rw-r--r--src/backends/backendsCommon/test/StridedSliceAsyncEndToEndTest.hpp159
-rw-r--r--src/backends/reference/test/RefEndToEndTests.cpp11
12 files changed, 616 insertions, 47 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 86dad31a21..51f08bade9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -136,6 +136,7 @@ list(APPEND armnn_sources
include/armnn/DescriptorsFwd.hpp
include/armnn/Exceptions.hpp
include/armnn/backends/ILayerSupport.hpp
+ include/armnn/IAsyncExecutionCallback.hpp
include/armnn/ILayerVisitor.hpp
include/armnn/INetwork.hpp
include/armnn/IProfiler.hpp
@@ -302,6 +303,8 @@ list(APPEND armnn_sources
src/armnn/layers/TransposeLayer.cpp
src/armnn/layers/UnmapLayer.cpp
src/armnn/layers/UnmapLayer.hpp
+ src/armnn/AsyncExecutionCallback.cpp
+ src/armnn/AsyncExecutionCallback.hpp
src/armnn/BackendRegistry.cpp
src/armnn/BackendSettings.hpp
src/armnn/BackendHelper.cpp
diff --git a/include/armnn/IAsyncExecutionCallback.hpp b/include/armnn/IAsyncExecutionCallback.hpp
new file mode 100644
index 0000000000..045ec4581f
--- /dev/null
+++ b/include/armnn/IAsyncExecutionCallback.hpp
@@ -0,0 +1,43 @@
+//
+// Copyright © 2021 Arm Ltd and Contributors. All rights reserved.
+// SPDX-License-Identifier: MIT
+//
+
+#pragma once
+
+#include "Types.hpp"
+
+namespace armnn
+{
+
+namespace experimental
+{
+
+class IAsyncExecutionCallback;
+using IAsyncExecutionCallbackPtr = std::shared_ptr<IAsyncExecutionCallback>;
+
+class IAsyncExecutionCallback
+{
+public:
+ virtual ~IAsyncExecutionCallback() {};
+
+ // Notify the AsyncExecutionCallback object of the armnn execution status
+ virtual void Notify(armnn::Status status, InferenceTimingPair timeTaken) = 0;
+
+ // Block the calling thread until the AsyncExecutionCallback object allows it to proceed
+ virtual void Wait() const = 0;
+
+ // Retrieve the ArmNN Status from the AsyncExecutionCallback that has been notified
+ virtual armnn::Status GetStatus() const = 0;
+
+ // Retrieve the start time before executing the inference
+ virtual HighResolutionClock GetStartTime() const = 0;
+
+ // Retrieve the time after executing the inference
+ virtual HighResolutionClock GetEndTime() const = 0;
+
+};
+
+} // experimental
+
+} // namespace armnn
diff --git a/include/armnn/IRuntime.hpp b/include/armnn/IRuntime.hpp
index 55c57974dc..f296a5f564 100644
--- a/include/armnn/IRuntime.hpp
+++ b/include/armnn/IRuntime.hpp
@@ -8,6 +8,7 @@
#include "INetwork.hpp"
#include "IProfiler.hpp"
#include "IWorkingMemHandle.hpp"
+#include "IAsyncExecutionCallback.hpp"
#include "Tensor.hpp"
#include "Types.hpp"
#include "TypesUtils.hpp"
@@ -31,20 +32,24 @@ struct INetworkProperties
ARMNN_DEPRECATED_MSG("Please use INetworkProperties constructor with MemorySource argument")
INetworkProperties(bool importEnabled = false,
bool exportEnabled = false,
- bool asyncEnabled = false)
+ bool asyncEnabled = false,
+ size_t numThreads = 0)
: m_ImportEnabled(importEnabled)
, m_ExportEnabled(exportEnabled)
, m_AsyncEnabled(asyncEnabled)
+ , m_NumThreads(numThreads)
, m_InputSource(MemorySource::Undefined)
, m_OutputSource(MemorySource::Undefined)
{}
INetworkProperties(bool asyncEnabled,
MemorySource m_InputSource,
- MemorySource m_OutputSource)
+ MemorySource m_OutputSource,
+ size_t numThreads = 0)
: m_ImportEnabled(m_InputSource != MemorySource::Undefined)
, m_ExportEnabled(m_OutputSource != MemorySource::Undefined)
, m_AsyncEnabled(asyncEnabled)
+ , m_NumThreads(numThreads)
, m_InputSource(m_InputSource)
, m_OutputSource(m_OutputSource)
{}
@@ -54,7 +59,9 @@ struct INetworkProperties
/// Deprecated and will be removed in future release.
const bool m_ExportEnabled;
- const bool m_AsyncEnabled;
+ const bool m_AsyncEnabled;
+ const size_t m_NumThreads;
+
const MemorySource m_InputSource;
const MemorySource m_OutputSource;
@@ -184,6 +191,16 @@ public:
const InputTensors& inputTensors,
const OutputTensors& outputTensors);
+ /// This is an experimental function
+ /// Schedule a thread safe execution by taking the input tensors and an execution priority for Quality of Service.
+ /// The output tensors will then be filled and the callback object will notify that the execution has either
+ /// succeeded or failed.
+ void Schedule(NetworkId networkId,
+ const InputTensors& inputTensors,
+ const OutputTensors& outputTensors,
+ const QosExecPriority priority,
+ std::shared_ptr<IAsyncExecutionCallback> callback);
+
/// Unloads a network from the IRuntime.
/// At the moment this only removes the network from the m_Impl->m_Network.
/// This might need more work in the future to be AndroidNN compliant.
diff --git a/include/armnn/Types.hpp b/include/armnn/Types.hpp
index bc41003c57..9e46d08501 100644
--- a/include/armnn/Types.hpp
+++ b/include/armnn/Types.hpp
@@ -8,6 +8,7 @@
#include <functional>
#include <memory>
#include <stdint.h>
+#include <chrono>
#include "BackendId.hpp"
#include "Exceptions.hpp"
#include "Deprecated.hpp"
@@ -20,6 +21,9 @@ constexpr unsigned int MaxNumOfTensorDimensions = 5U;
/// The lowest performance data capture interval we support is 10 miliseconds.
constexpr unsigned int LOWEST_CAPTURE_PERIOD = 10000u;
+/// Variable to control expire rate of priority queue
+constexpr unsigned int EXPIRE_RATE = 3U;
+
/// @enum Status enumeration
/// @var Status::Successful
/// @var Status::Failure
@@ -31,14 +35,14 @@ enum class Status
enum class DataType
{
- Float16 = 0,
- Float32 = 1,
+ Float16 = 0,
+ Float32 = 1,
QAsymmU8 = 2,
Signed32 = 3,
- Boolean = 4,
+ Boolean = 4,
QSymmS16 = 5,
QuantizedSymm8PerAxis ARMNN_DEPRECATED_ENUM_MSG("Per Axis property inferred by number of scales in TensorInfo") = 6,
- QSymmS8 = 7,
+ QSymmS8 = 7,
QAsymmS8 = 8,
BFloat16 = 9,
Signed64 = 10,
@@ -53,6 +57,13 @@ enum class DataLayout
NHWC = 2
};
+enum class QosExecPriority
+{
+ Low = 0,
+ Medium = 1,
+ High = 2
+};
+
enum class ActivationFunction
{
Sigmoid = 0,
@@ -304,6 +315,9 @@ class ITensorHandle;
/// @param tensorHandle - TensorHandle for the input tensor to the Debug layer
using DebugCallbackFunction = std::function<void(LayerGuid guid, unsigned int slotIndex, ITensorHandle* tensorHandle)>;
+/// Define a timer and associated inference ID for recording execution times
+using HighResolutionClock = std::chrono::high_resolution_clock::time_point;
+using InferenceTimingPair = std::pair<HighResolutionClock, HighResolutionClock>;
namespace profiling
{
diff --git a/src/armnn/AsyncExecutionCallback.cpp b/src/armnn/AsyncExecutionCallback.cpp
new file mode 100644
index 0000000000..c44808918d
--- /dev/null
+++ b/src/armnn/AsyncExecutionCallback.cpp
@@ -0,0 +1,57 @@
+//
+// Copyright © 2021 Arm Ltd and Contributors. All rights reserved.
+// SPDX-License-Identifier: MIT
+//
+
+#include <AsyncExecutionCallback.hpp>
+
+namespace armnn
+{
+
+namespace experimental
+{
+
+void AsyncExecutionCallback::Notify(armnn::Status status, InferenceTimingPair timeTaken)
+{
+ {
+ std::lock_guard<std::mutex> hold(m_Mutex);
+ if (m_Notified)
+ {
+ return;
+ }
+ // store results and mark as notified
+ m_Status = status;
+ m_StartTime = timeTaken.first;
+ m_EndTime = timeTaken.second;
+ m_Notified = true;
+ }
+ m_Condition.notify_all();
+}
+
+void AsyncExecutionCallback::Wait() const
+{
+ std::unique_lock<std::mutex> lock(m_Mutex);
+ m_Condition.wait(lock, [this] { return m_Notified; });
+}
+
+armnn::Status AsyncExecutionCallback::GetStatus() const
+{
+ Wait();
+ return m_Status;
+}
+
+HighResolutionClock AsyncExecutionCallback::GetStartTime() const
+{
+ Wait();
+ return m_StartTime;
+}
+
+HighResolutionClock AsyncExecutionCallback::GetEndTime() const
+{
+ Wait();
+ return m_EndTime;
+}
+
+} // namespace experimental
+
+} // namespace armnn \ No newline at end of file
diff --git a/src/armnn/AsyncExecutionCallback.hpp b/src/armnn/AsyncExecutionCallback.hpp
new file mode 100644
index 0000000000..c17b839748
--- /dev/null
+++ b/src/armnn/AsyncExecutionCallback.hpp
@@ -0,0 +1,48 @@
+//
+// Copyright © 2021 Arm Ltd and Contributors. All rights reserved.
+// SPDX-License-Identifier: MIT
+//
+
+#pragma once
+
+#include <armnn/IAsyncExecutionCallback.hpp>
+#include <armnn/Types.hpp>
+#include <condition_variable>
+
+#include <mutex>
+#include <thread>
+
+namespace armnn
+{
+
+namespace experimental
+{
+
+class AsyncExecutionCallback final : public IAsyncExecutionCallback
+{
+public:
+ AsyncExecutionCallback()
+ {}
+ ~AsyncExecutionCallback()
+ {}
+
+ void Notify(armnn::Status status, InferenceTimingPair timeTaken);
+ void Wait() const;
+
+ armnn::Status GetStatus() const;
+ HighResolutionClock GetStartTime() const;
+ HighResolutionClock GetEndTime() const;
+
+private:
+ mutable std::mutex m_Mutex;
+ mutable std::condition_variable m_Condition;
+
+ HighResolutionClock m_StartTime;
+ HighResolutionClock m_EndTime;
+ armnn::Status m_Status = Status::Failure;
+ bool m_Notified = false;
+};
+
+} // namespace experimental
+
+} // namespace armnn \ No newline at end of file
diff --git a/src/armnn/LoadedNetwork.cpp b/src/armnn/LoadedNetwork.cpp
index 46eb9883fb..67de00f0f3 100644
--- a/src/armnn/LoadedNetwork.cpp
+++ b/src/armnn/LoadedNetwork.cpp
@@ -24,6 +24,7 @@
#include <LabelsAndEventClasses.hpp>
#include <fmt/format.h>
+#include <armnn/utility/Timer.hpp>
namespace armnn
{
@@ -84,7 +85,8 @@ void AddWorkloadStructure(std::unique_ptr<TimelineUtilityMethods>& timelineUtils
std::unique_ptr<LoadedNetwork> LoadedNetwork::MakeLoadedNetwork(std::unique_ptr<IOptimizedNetwork> net,
std::string& errorMessage,
const INetworkProperties& networkProperties,
- profiling::ProfilingService& profilingService)
+ profiling::ProfilingService& profilingService,
+ const NetworkId networkIdOut)
{
std::unique_ptr<LoadedNetwork> loadedNetwork;
@@ -98,7 +100,7 @@ std::unique_ptr<LoadedNetwork> LoadedNetwork::MakeLoadedNetwork(std::unique_ptr<
try
{
- loadedNetwork.reset(new LoadedNetwork(std::move(net), networkProperties, profilingService));
+ loadedNetwork.reset(new LoadedNetwork(std::move(net), networkProperties, profilingService, networkIdOut));
}
catch (const armnn::RuntimeException& error)
{
@@ -118,9 +120,11 @@ std::unique_ptr<LoadedNetwork> LoadedNetwork::MakeLoadedNetwork(std::unique_ptr<
LoadedNetwork::LoadedNetwork(std::unique_ptr<IOptimizedNetwork> net,
const INetworkProperties& networkProperties,
- profiling::ProfilingService& profilingService) :
+ profiling::ProfilingService& profilingService,
+ const NetworkId networkId) :
m_OptimizedNetwork(std::move(net)),
m_NetworkProperties(networkProperties),
+ m_NetworkId(networkId),
m_TensorHandleFactoryRegistry(),
m_ProfilingService(profilingService)
{
@@ -161,6 +165,14 @@ LoadedNetwork::LoadedNetwork(std::unique_ptr<IOptimizedNetwork> net,
}
}
}
+
+ // Create the thread pool which will have working memory handles assigned to each thread
+ // Should occur after factories are registered so thet the WorkingMemHandles can be created
+ if (m_NetworkProperties.m_NumThreads > 0 && networkProperties.m_AsyncEnabled)
+ {
+ CreateThreadPool(m_NetworkProperties.m_NumThreads);
+ }
+
if (!networkProperties.m_AsyncEnabled)
{
for (auto &&layer : order)
@@ -846,6 +858,147 @@ bool LoadedNetwork::Execute(std::unique_ptr<TimelineUtilityMethods>& timelineUti
return success;
}
+void LoadedNetwork::CreateThreadPool(std::size_t numThreads)
+{
+
+ for (auto i = 0u; i < numThreads; ++i)
+ {
+ std::unique_ptr<IWorkingMemHandle> workingMemHandle = CreateWorkingMemHandle(m_NetworkId);
+ m_Threads.emplace_back(
+ std::make_unique<std::thread>(
+ &LoadedNetwork::ProcessExecPriorities,
+ this,
+ std::move(workingMemHandle)
+ )
+ );
+ }
+}
+
+void LoadedNetwork::TerminateThreadPool() noexcept
+{
+ {
+ std::unique_lock<std::mutex> threadPoolLock(m_ThreadPoolMutex);
+ m_TerminatePool = true;
+ }
+
+ m_ThreadPoolEvent.notify_all();
+
+ for (auto &thread : m_Threads)
+ {
+ thread->join();
+ }
+}
+
+void LoadedNetwork::Schedule(const InputTensors& inputTensors,
+ const OutputTensors& outputTensors,
+ const QosExecPriority priority,
+ std::shared_ptr<IAsyncExecutionCallback> cb)
+{
+ // Group execution parameters so that they can be easily added to the queue
+ ExecutionTuple groupExecParams = std::make_tuple(inputTensors, outputTensors, cb);
+ std::shared_ptr<ExecutionTuple> operation = make_shared<ExecutionTuple>(groupExecParams);
+
+ // Add a message to the queue and notify the request thread
+ std::unique_lock<std::mutex> lock(m_ThreadPoolMutex);
+ switch (priority) {
+ case QosExecPriority::High:
+ m_HighPriorityQueue.push(operation);
+ break;
+ case QosExecPriority::Low:
+ m_LowPriorityQueue.push(operation);
+ break;
+ case QosExecPriority::Medium:
+ default:
+ m_MediumPriorityQueue.push(operation);
+ }
+ m_ThreadPoolEvent.notify_one();
+}
+
+void LoadedNetwork::ProcessExecPriorities(std::unique_ptr<IWorkingMemHandle> workingMemHandle)
+{
+ int expireRate = EXPIRE_RATE;
+ int highPriorityCount = 0;
+ int mediumPriorityCount = 0;
+
+ IWorkingMemHandle& workingMemHandleRef = *workingMemHandle.get();
+
+ while (true)
+ {
+ std::shared_ptr<ExecutionTuple> currentExecInProgress(nullptr);
+ {
+ // Wait for a message to be added to the queue
+ // This is in a separate scope to minimise the lifetime of the lock
+ std::unique_lock<std::mutex> lock(m_ThreadPoolMutex);
+
+ m_ThreadPoolEvent.wait(lock,
+ [=] {
+ return m_TerminatePool || !m_HighPriorityQueue.empty() ||
+ !m_MediumPriorityQueue.empty() || !m_LowPriorityQueue.empty();
+ });
+
+ if (m_TerminatePool && m_HighPriorityQueue.empty() && m_MediumPriorityQueue.empty() &&
+ m_LowPriorityQueue.empty())
+ {
+ break;
+ }
+
+ // Get the message to process from the front of each queue based on priority from high to low
+ // Get high priority first if it does not exceed the expire rate
+ if (!m_HighPriorityQueue.empty() && highPriorityCount < expireRate)
+ {
+ currentExecInProgress = m_HighPriorityQueue.front();
+ m_HighPriorityQueue.pop();
+ highPriorityCount += 1;
+ }
+ // If high priority queue is empty or the count exceeds the expire rate, get medium priority message
+ else if (!m_MediumPriorityQueue.empty() && mediumPriorityCount < expireRate)
+ {
+ currentExecInProgress = m_MediumPriorityQueue.front();
+ m_MediumPriorityQueue.pop();
+ mediumPriorityCount += 1;
+ // Reset high priority count
+ highPriorityCount = 0;
+ }
+ // If medium priority queue is empty or the count exceeds the expire rate, get low priority message
+ else if (!m_LowPriorityQueue.empty())
+ {
+ currentExecInProgress = m_LowPriorityQueue.front();
+ m_LowPriorityQueue.pop();
+ // Reset high and medium priority count
+ highPriorityCount = 0;
+ mediumPriorityCount = 0;
+ }
+ else
+ {
+ // Reset high and medium priority count
+ highPriorityCount = 0;
+ mediumPriorityCount = 0;
+ continue;
+ }
+ }
+
+ // invoke the asynchronous execution method
+ auto inputTensors = std::get<0>(*currentExecInProgress);
+ auto outputTensors = std::get<1>(*currentExecInProgress);
+ auto cb = std::get<2>(*currentExecInProgress);
+
+ // Get time at start of inference
+ HighResolutionClock startTime = armnn::GetTimeNow();
+
+ try // executing the inference
+ {
+ // Execute and populate the time at end of inference in the callback
+ Execute(inputTensors, outputTensors, workingMemHandleRef) == Status::Success ?
+ cb->Notify(Status::Success, std::make_pair(startTime, armnn::GetTimeNow())) :
+ cb->Notify(Status::Failure, std::make_pair(startTime, armnn::GetTimeNow()));
+ }
+ catch (const RuntimeException& error)
+ {
+ cb->Notify(Status::Failure, std::make_pair(startTime, armnn::GetTimeNow()));
+ }
+ }
+}
+
void LoadedNetwork::EnqueueInput(const BindableLayer& layer,
const ConstTensor& inputTensor,
WorkingMemHandle& context)
@@ -1096,6 +1249,7 @@ Status LoadedNetwork::Execute(const InputTensors& inputTensors,
EnqueueOutput(*outputLayer, GetOutputTensor(outputLayer->GetBindingId(), outputTensors), workingMemHandle);
}
}
+
return executionSucceeded ? Status::Success : Status::Failure;
}
diff --git a/src/armnn/LoadedNetwork.hpp b/src/armnn/LoadedNetwork.hpp
index 51092c744e..b5474db294 100644
--- a/src/armnn/LoadedNetwork.hpp
+++ b/src/armnn/LoadedNetwork.hpp
@@ -19,13 +19,14 @@
#include <TimelineUtilityMethods.hpp>
#include <mutex>
+#include <condition_variable>
#include <unordered_map>
namespace cl
{
- class Context;
- class CommandQueue;
- class Device;
+class Context;
+class CommandQueue;
+class Device;
}
namespace armnn
@@ -34,8 +35,19 @@ namespace armnn
class LoadedNetwork
{
public:
- using WorkloadQueue = std::vector< std::unique_ptr<IWorkload> >;
- ~LoadedNetwork(){ FreeWorkingMemory(); }
+ using WorkloadQueue = std::vector<std::unique_ptr<IWorkload>>;
+
+ using ExecutionTuple = std::tuple<InputTensors,
+ OutputTensors,
+ std::shared_ptr<IAsyncExecutionCallback>>;
+
+ using ExecutionQueue = std::queue<std::shared_ptr<ExecutionTuple>>;
+
+ ~LoadedNetwork()
+ {
+ FreeWorkingMemory();
+ TerminateThreadPool();
+ }
/// Create a new unique WorkingMemHandle object. Create multiple handles if you wish to have
/// overlapped Execution by calling this function from different threads.
@@ -44,16 +56,25 @@ public:
TensorInfo GetInputTensorInfo(LayerBindingId layerId) const;
TensorInfo GetOutputTensorInfo(LayerBindingId layerId) const;
+ /// Single thread execution of the loaded network
Status EnqueueWorkload(const InputTensors& inputTensors, const OutputTensors& outputTensors);
+ /// Thread safe execution of the loaded network
Status Execute(const InputTensors& inputTensors,
const OutputTensors& outputTensors,
IWorkingMemHandle& workingMemHandle);
+ /// Schedule an asynchronous execution on the loaded network
+ void Schedule(const InputTensors& inputTensors,
+ const OutputTensors& outputTensors,
+ const QosExecPriority priority,
+ std::shared_ptr<IAsyncExecutionCallback> cb);
+
static std::unique_ptr<LoadedNetwork> MakeLoadedNetwork(std::unique_ptr<IOptimizedNetwork> net,
- std::string & errorMessage,
+ std::string& errorMessage,
const INetworkProperties& networkProperties,
- profiling::ProfilingService& profilingService);
+ profiling::ProfilingService& profilingService,
+ const NetworkId networkIdOut);
// NOTE we return by reference as the purpose of this method is only to provide
// access to the private m_Profiler and in theory we should not need to increment
@@ -87,7 +108,8 @@ private:
LoadedNetwork(std::unique_ptr<IOptimizedNetwork> net,
const INetworkProperties& networkProperties,
- profiling::ProfilingService& profilingService);
+ profiling::ProfilingService& profilingService,
+ const NetworkId networkIdOut);
void EnqueueInput(const BindableLayer& layer, ITensorHandle* tensorHandle, const TensorInfo& tensorInfo);
@@ -97,9 +119,15 @@ private:
void EnqueueOutput(const BindableLayer& layer, const Tensor& outputTensor, WorkingMemHandle& handle);
+ void ProcessExecPriorities(std::unique_ptr<IWorkingMemHandle> workingMemHandle);
+
bool Execute(std::unique_ptr<profiling::TimelineUtilityMethods>& timelineUtils,
profiling::ProfilingGuid inferenceGuid);
+ void CreateThreadPool(std::size_t numThreads);
+
+ void TerminateThreadPool() noexcept;
+
const IWorkloadFactory& GetWorkloadFactory(const Layer& layer) const;
using BackendPtrMap = std::unordered_map<BackendId, IBackendInternalUniquePtr>;
@@ -108,19 +136,38 @@ private:
WorkloadFactoryMap m_WorkloadFactories;
std::unique_ptr<IOptimizedNetwork> m_OptimizedNetwork;
- WorkloadQueue m_InputQueue;
- WorkloadQueue m_WorkloadQueue;
- WorkloadQueue m_OutputQueue;
- std::shared_ptr<IProfiler> m_Profiler;
+ std::shared_ptr<IProfiler> m_Profiler;
+
+ WorkloadQueue m_InputQueue;
+ WorkloadQueue m_WorkloadQueue;
+ WorkloadQueue m_OutputQueue;
mutable std::mutex m_WorkingMemMutex;
- bool m_IsWorkingMemAllocated=false;
+ bool m_IsWorkingMemAllocated = false;
+
+ std::vector<std::unique_ptr<std::thread>> m_Threads;
+ std::stack<IWorkingMemHandle> m_WorkingMemHandles;
+
+ ExecutionQueue m_HighPriorityQueue;
+ ExecutionQueue m_MediumPriorityQueue;
+ ExecutionQueue m_LowPriorityQueue;
+
+ // Condition Variables require mutex which will guard the shared state.
+ // Has an event happened? Stop signal for example
+ std::condition_variable m_ThreadPoolEvent;
+ std::mutex m_ThreadPoolMutex;
+
+ // The shared state for conditional variable
+ bool m_TerminatePool = false;
+
INetworkProperties m_NetworkProperties;
+ const NetworkId m_NetworkId;
+
TensorHandleFactoryRegistry m_TensorHandleFactoryRegistry;
- profiling::ProfilingService& m_ProfilingService;
+ profiling::ProfilingService& m_ProfilingService;
};
}
diff --git a/src/armnn/Runtime.cpp b/src/armnn/Runtime.cpp
index 1dd86a61ce..e04cf5ddaf 100644
--- a/src/armnn/Runtime.cpp
+++ b/src/armnn/Runtime.cpp
@@ -88,6 +88,15 @@ Status IRuntime::Execute(IWorkingMemHandle& workingMemHandle,
return pRuntimeImpl->Execute(workingMemHandle, inputTensors, outputTensors);
}
+void IRuntime::Schedule(NetworkId networkId,
+ const InputTensors& inputTensors,
+ const OutputTensors& outputTensors,
+ const QosExecPriority priority,
+ std::shared_ptr<IAsyncExecutionCallback> cb)
+{
+ pRuntimeImpl->Schedule(networkId, inputTensors, outputTensors, priority, cb);
+}
+
Status IRuntime::UnloadNetwork(NetworkId networkId)
{
return pRuntimeImpl->UnloadNetwork(networkId);
@@ -150,7 +159,8 @@ Status RuntimeImpl::LoadNetwork(NetworkId& networkIdOut,
std::unique_ptr<IOptimizedNetwork>(rawNetwork),
errorMessage,
networkProperties,
- m_ProfilingService);
+ m_ProfilingService,
+ networkIdOut);
if (!loadedNetwork)
{
@@ -439,24 +449,42 @@ Status RuntimeImpl::Execute(IWorkingMemHandle& iWorkingMemHandle,
}
if (!loadedNetwork->IsAsyncEnabled())
{
- ARMNN_LOG(error) << "Network " << networkId << " is not async enabled.\n";
+ ARMNN_LOG(error) << "Attempting execute " << networkId << " when it is not async enabled.\n";
return Status::Failure;
}
ProfilerManager::GetInstance().RegisterProfiler(loadedNetwork->GetProfiler().get());
ARMNN_SCOPED_PROFILING_EVENT(Compute::Undefined, "Execute");
- static thread_local NetworkId lastId = networkId;
- if (lastId != networkId)
+ return loadedNetwork->Execute(inputTensors, outputTensors, iWorkingMemHandle);
+}
+
+void RuntimeImpl::Schedule(NetworkId networkId,
+ const InputTensors& inputTensors,
+ const OutputTensors& outputTensors,
+ const QosExecPriority priority,
+ std::shared_ptr<IAsyncExecutionCallback> callback)
+{
+ LoadedNetwork* loadedNetwork = GetLoadedNetworkPtr(networkId);
+
+ if (!loadedNetwork)
{
- LoadedNetworkFuncSafe(lastId, [](LoadedNetwork* network)
- {
- network->FreeWorkingMemory();
- });
+ throw armnn::Exception(
+ "Network with ID of " + std::to_string(networkId) + " does not exist \n"
+ );
+ }
+ if (!loadedNetwork->IsAsyncEnabled())
+ {
+ throw armnn::Exception(
+ "Attempting to schedule Network " + std::to_string(networkId) + " when it is not async enabled \n"
+ );
}
- lastId=networkId;
- return loadedNetwork->Execute(inputTensors, outputTensors, iWorkingMemHandle);
+ ProfilerManager::GetInstance().RegisterProfiler(loadedNetwork->GetProfiler().get());
+
+ ARMNN_SCOPED_PROFILING_EVENT(Compute::Undefined, "Schedule");
+
+ loadedNetwork->Schedule(inputTensors, outputTensors, priority, callback);
}
/// Create a new unique WorkingMemHandle object. Create multiple handles if you wish to have
diff --git a/src/armnn/Runtime.hpp b/src/armnn/Runtime.hpp
index da5445383f..55a4accf67 100644
--- a/src/armnn/Runtime.hpp
+++ b/src/armnn/Runtime.hpp
@@ -60,6 +60,20 @@ public:
const InputTensors& inputTensors,
const OutputTensors& outputTensors);
+ /// This is an experimental function.
+ /// Schedule a thread safe execution by taking the input tensors and an execution priority for Quality of Service.
+ /// The output tensors will then be filled and the callback object will notify that the execution has either
+ /// succeeded or failed.
+ void Schedule(NetworkId networkId,
+ const InputTensors& inputTensors,
+ const OutputTensors& outputTensors,
+ const QosExecPriority priority,
+ std::shared_ptr<IAsyncExecutionCallback> callback);
+
+ /// This is an experimental function.
+ /// Evaluates a network using input in inputTensors and outputs filled into outputTensors.
+ /// This function performs a thread safe execution of the network. Returns once execution is complete.
+ /// Will block until this and any other thread using the same workingMem object completes.
Status Execute(IWorkingMemHandle& workingMemHandle,
const InputTensors& inputTensors,
const OutputTensors& outputTensors);
diff --git a/src/backends/backendsCommon/test/StridedSliceAsyncEndToEndTest.hpp b/src/backends/backendsCommon/test/StridedSliceAsyncEndToEndTest.hpp
index b20ff4f142..87cade7dca 100644
--- a/src/backends/backendsCommon/test/StridedSliceAsyncEndToEndTest.hpp
+++ b/src/backends/backendsCommon/test/StridedSliceAsyncEndToEndTest.hpp
@@ -9,7 +9,9 @@
#include <armnn/IWorkingMemHandle.hpp>
#include <armnn/INetwork.hpp>
+#include <armnn/IAsyncExecutionCallback.hpp>
+#include <AsyncExecutionCallback.hpp>
#include <backendsCommon/test/CommonTestUtils.hpp>
#include <boost/test/unit_test.hpp>
@@ -114,27 +116,29 @@ void AsyncThreadedEndToEndTestImpl(INetworkPtr network,
}
-
-
template<DataType ArmnnIType, DataType ArmnnOType,
- typename TInput = ResolveType <ArmnnIType>, typename TOutput = ResolveType <ArmnnOType>>
+ typename TInput = ResolveType<ArmnnIType>, typename TOutput = ResolveType<ArmnnOType>>
void AsyncEndToEndTestImpl(INetworkPtr network,
const std::map<int, std::vector<TInput>>& inputTensorData,
const std::map<int, std::vector<TOutput>>& expectedOutputData,
std::vector<BackendId> backends,
- float tolerance = 0.000001f)
+ float tolerance = 0.000001f,
+ size_t numThreads = 0)
{
// Create Runtime in which test will run
IRuntime::CreationOptions options;
- IRuntimePtr runtime(IRuntime::Create(options));
+ IRuntimePtr runtime(IRuntime::Create(options));
// Optimize the Network
IOptimizedNetworkPtr optNet = Optimize(*network, backends, runtime->GetDeviceSpec());
// Creates AsyncNetwork
NetworkId networkId = 0;
+
std::string errorMessage;
- const INetworkProperties networkProperties(true, MemorySource::Undefined, MemorySource::Undefined);
+
+ const INetworkProperties networkProperties(true, MemorySource::Undefined, MemorySource::Undefined, numThreads);
+
runtime->LoadNetwork(networkId, std::move(optNet), errorMessage, networkProperties);
InputTensors inputTensors;
@@ -157,17 +161,50 @@ void AsyncEndToEndTestImpl(INetworkPtr network,
outputStorage.at(it.first).data())});
}
- // Create WorkingMemHandle for this async network
- std::unique_ptr<IWorkingMemHandle> workingMemHandle = runtime->CreateWorkingMemHandle(networkId);
- IWorkingMemHandle& workingMemHandleRef = *workingMemHandle.get();
+ if (numThreads == 0)
+ {
+ // Create WorkingMemHandle for this async network
+ std::unique_ptr<IWorkingMemHandle> workingMemHandle = runtime->CreateWorkingMemHandle(networkId);
+ IWorkingMemHandle& workingMemHandleRef = *workingMemHandle.get();
- // Run the async network
- runtime->Execute(workingMemHandleRef, inputTensors, outputTensors);
+ // Run the async network
+ runtime->Execute(workingMemHandleRef, inputTensors, outputTensors);
+ }
+ else
+ {
+ std::vector<IAsyncExecutionCallbackPtr> callbacks;
+
+ // Create 1000 callbacks that will be checked post scheduling
+ for (size_t i = 0; i < 1000; ++i)
+ {
+ callbacks.emplace_back(std::make_shared<AsyncExecutionCallback>());
+ }
+
+ // For the asyncronous execution, we are adding a pool of working memory handles (1 per thread) in the
+ // LoadedNetwork with a each scheduled inference having a spefic priority
+ for (IAsyncExecutionCallbackPtr cb : callbacks)
+ {
+ runtime->Schedule(networkId,
+ inputTensors,
+ outputTensors,
+ static_cast<QosExecPriority>(rand()%3),
+ cb);
+ }
+
+ // Wait until the execution signals a notify
+ for (IAsyncExecutionCallbackPtr cb : callbacks)
+ {
+ cb->Wait();
+
+ // Checks the results.
+ BOOST_CHECK(cb->GetStatus() == Status::Success);
+ }
+ }
- // Checks the results.
for (auto&& it : expectedOutputData)
{
std::vector<TOutput> out = outputStorage.at(it.first);
+
for (unsigned int i = 0; i < out.size(); ++i)
{
BOOST_CHECK(Compare<ArmnnOType>(it.second[i], out[i], tolerance) == true);
@@ -263,7 +300,103 @@ void StridedSlicedEndToEndTest(const std::vector<BackendId>& backends)
std::map<int, std::vector<T>> inputTensorData = {{0, inputData}};
std::map<int, std::vector<T>> expectedOutputData = {{0, outputExpected}};
- AsyncEndToEndTestImpl<ArmnnType, ArmnnType>(move(net), inputTensorData, expectedOutputData, backends, 1);
+ AsyncEndToEndTestImpl<ArmnnType, ArmnnType>(move(net), inputTensorData, expectedOutputData, backends, 0.000001f);
+}
+
+template<armnn::DataType ArmnnType>
+void AsyncScheduledStridedSlicedEndToEndTest(const std::vector<BackendId>& backends)
+{
+ using namespace armnn;
+ using T = ResolveType<ArmnnType>;
+
+ const TensorShape& inputShape = {3, 2, 3, 1};
+ const TensorShape& outputShape = {1, 2, 3, 1};
+ const std::vector<int>& beginData = {1, 0, 0, 0};
+ const std::vector<int>& endData = {2, 2, 3, 1};
+ const std::vector<int>& stridesData = {1, 1, 1, 1};
+ int beginMask = 0;
+ int endMask = 0;
+ int shrinkAxisMask = 0;
+ int ellipsisMask = 0;
+ int newAxisMask = 0;
+
+ // Builds up the structure of the network
+ INetworkPtr net = CreateStridedSliceNetwork<ArmnnType>(inputShape,
+ outputShape,
+ beginData,
+ endData,
+ stridesData,
+ beginMask,
+ endMask,
+ shrinkAxisMask,
+ ellipsisMask,
+ newAxisMask);
+
+ // Creates structures for input & output.
+ std::vector<T> inputData{
+ 1.0f, 1.0f, 1.0f, 2.0f, 2.0f, 2.0f,
+
+ 3.0f, 3.0f, 3.0f, 4.0f, 4.0f, 4.0f,
+
+ 5.0f, 5.0f, 5.0f, 6.0f, 6.0f, 6.0f
+ };
+
+ std::vector<T> outputExpected{
+ 3.0f, 3.0f, 3.0f, 4.0f, 4.0f, 4.0f
+ };
+
+ std::map<int, std::vector<T>> inputTensorData = {{0, inputData}};
+ std::map<int, std::vector<T>> expectedOutputData = {{0, outputExpected}};
+
+ AsyncEndToEndTestImpl<ArmnnType, ArmnnType>(move(net), inputTensorData, expectedOutputData, backends, 0.000001f, 1);
+}
+
+template<armnn::DataType ArmnnType>
+void AsyncScheduledStridedSlicedMultiThreadedEndToEndTest(const std::vector<BackendId>& backends)
+{
+ using namespace armnn;
+ using T = ResolveType<ArmnnType>;
+
+ const TensorShape& inputShape = {3, 2, 3, 1};
+ const TensorShape& outputShape = {1, 2, 3, 1};
+ const std::vector<int>& beginData = {1, 0, 0, 0};
+ const std::vector<int>& endData = {2, 2, 3, 1};
+ const std::vector<int>& stridesData = {1, 1, 1, 1};
+ int beginMask = 0;
+ int endMask = 0;
+ int shrinkAxisMask = 0;
+ int ellipsisMask = 0;
+ int newAxisMask = 0;
+
+ // Builds up the structure of the network
+ INetworkPtr net = CreateStridedSliceNetwork<ArmnnType>(inputShape,
+ outputShape,
+ beginData,
+ endData,
+ stridesData,
+ beginMask,
+ endMask,
+ shrinkAxisMask,
+ ellipsisMask,
+ newAxisMask);
+
+ // Creates structures for input & output.
+ std::vector<T> inputData{
+ 1.0f, 1.0f, 1.0f, 2.0f, 2.0f, 2.0f,
+
+ 3.0f, 3.0f, 3.0f, 4.0f, 4.0f, 4.0f,
+
+ 5.0f, 5.0f, 5.0f, 6.0f, 6.0f, 6.0f
+ };
+
+ std::vector<T> outputExpected{
+ 3.0f, 3.0f, 3.0f, 4.0f, 4.0f, 4.0f
+ };
+
+ std::map<int, std::vector<T>> inputTensorData = {{0, inputData}};
+ std::map<int, std::vector<T>> expectedOutputData = {{0, outputExpected}};
+
+ AsyncEndToEndTestImpl<ArmnnType, ArmnnType>(move(net), inputTensorData, expectedOutputData, backends, 0.000001f, 3);
}
template<armnn::DataType ArmnnType>
diff --git a/src/backends/reference/test/RefEndToEndTests.cpp b/src/backends/reference/test/RefEndToEndTests.cpp
index 0839c1c7af..1b96db9678 100644
--- a/src/backends/reference/test/RefEndToEndTests.cpp
+++ b/src/backends/reference/test/RefEndToEndTests.cpp
@@ -1346,6 +1346,17 @@ BOOST_AUTO_TEST_CASE(RefAsyncFP32StridedSlicedMultiThreadedEndToEndTest)
{
armnn::experimental::StridedSlicedMultiThreadedEndToEndTest<armnn::DataType::Float32>(defaultBackends);
}
+
+BOOST_AUTO_TEST_CASE(RefAsyncScheduledFP32StridedSlicedEndToEndTest)
+{
+ armnn::experimental::AsyncScheduledStridedSlicedEndToEndTest<armnn::DataType::Float32>(defaultBackends);
+}
+
+BOOST_AUTO_TEST_CASE(RefAsyncScheduledStridedSlicedMultiThreadedEndToEndTest)
+{
+ using namespace armnn::experimental;
+ AsyncScheduledStridedSlicedMultiThreadedEndToEndTest<armnn::DataType::Float32>(defaultBackends);
+}
#endif
BOOST_AUTO_TEST_SUITE_END()