diff options
Diffstat (limited to 'applications/message_handler/message_handler.hpp')
-rw-r--r-- | applications/message_handler/message_handler.hpp | 86 |
1 files changed, 82 insertions, 4 deletions
diff --git a/applications/message_handler/message_handler.hpp b/applications/message_handler/message_handler.hpp index fa79205..dd05059 100644 --- a/applications/message_handler/message_handler.hpp +++ b/applications/message_handler/message_handler.hpp @@ -24,6 +24,7 @@ #include "semphr.h" #include "message_queue.hpp" +#include <ethosu_core_interface.h> #if defined(ETHOSU) #include <ethosu_driver.h> #endif @@ -31,20 +32,97 @@ #include <inference_process.hpp> #include <mailbox.hpp> +#include <algorithm> #include <cstddef> #include <cstdio> +#include <list> #include <vector> namespace MessageHandler { +template <typename T, size_t capacity = 10> +class Queue { +public: + using Predicate = std::function<bool(const T &data)>; + + Queue() { + mutex = xSemaphoreCreateMutex(); + size = xSemaphoreCreateCounting(capacity, 0u); + + if (mutex == nullptr || size == nullptr) { + printf("Error: failed to allocate memory for inference queue\n"); + } + } + + ~Queue() { + vSemaphoreDelete(mutex); + vSemaphoreDelete(size); + } + + bool push(const T &data) { + xSemaphoreTake(mutex, portMAX_DELAY); + if (list.size() >= capacity) { + xSemaphoreGive(mutex); + return false; + } + + list.push_back(data); + xSemaphoreGive(mutex); + + // increase number of available inferences to pop + xSemaphoreGive(size); + return true; + } + + void pop(T &data) { + // decrease the number of available inferences to pop + xSemaphoreTake(size, portMAX_DELAY); + + xSemaphoreTake(mutex, portMAX_DELAY); + data = list.front(); + list.pop_front(); + xSemaphoreGive(mutex); + } + + bool erase(Predicate pred) { + // let's optimistically assume we are removing an inference, so decrease pop + if (pdFALSE == xSemaphoreTake(size, 0)) { + // if there are no inferences return immediately + return false; + } + + xSemaphoreTake(mutex, portMAX_DELAY); + auto found = std::find_if(list.begin(), list.end(), pred); + bool erased = found != list.end(); + if (erased) { + list.erase(found); + } + xSemaphoreGive(mutex); + + if (!erased) { + // no inference erased, so let's put the size count back + xSemaphoreGive(size); + } + + return erased; + } + +private: + std::list<T> list; + + SemaphoreHandle_t mutex; + SemaphoreHandle_t size; +}; + class IncomingMessageHandler { public: IncomingMessageHandler(EthosU::ethosu_core_queue &inputMessageQueue, EthosU::ethosu_core_queue &outputMessageQueue, Mailbox::Mailbox &mailbox, - QueueHandle_t inferenceInputQueue, + std::shared_ptr<Queue<EthosU::ethosu_core_inference_req>> inferenceInputQueue, QueueHandle_t inferenceOutputQueue, SemaphoreHandle_t messageNotify); + void run(); private: @@ -66,7 +144,7 @@ private: MessageQueue::QueueImpl outputMessageQueue; Mailbox::Mailbox &mailbox; InferenceProcess::InferenceParser parser; - QueueHandle_t inferenceInputQueue; + std::shared_ptr<Queue<EthosU::ethosu_core_inference_req>> inferenceInputQueue; QueueHandle_t inferenceOutputQueue; SemaphoreHandle_t messageNotify; EthosU::ethosu_core_msg_capabilities_rsp capabilities; @@ -76,7 +154,7 @@ class InferenceHandler { public: InferenceHandler(uint8_t *tensorArena, size_t arenaSize, - QueueHandle_t inferenceInputQueue, + std::shared_ptr<Queue<EthosU::ethosu_core_inference_req>> inferenceInputQueue, QueueHandle_t inferenceOutputQueue, SemaphoreHandle_t messageNotify); @@ -90,7 +168,7 @@ private: friend void ::ethosu_inference_begin(struct ethosu_driver *drv, void *userArg); friend void ::ethosu_inference_end(struct ethosu_driver *drv, void *userArg); #endif - QueueHandle_t inferenceInputQueue; + std::shared_ptr<Queue<EthosU::ethosu_core_inference_req>> inferenceInputQueue; QueueHandle_t inferenceOutputQueue; SemaphoreHandle_t messageNotify; InferenceProcess::InferenceProcess inference; |