aboutsummaryrefslogtreecommitdiff
path: root/applications/message_handler/lib/include/message_handler.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'applications/message_handler/lib/include/message_handler.hpp')
-rw-r--r--applications/message_handler/lib/include/message_handler.hpp187
1 files changed, 187 insertions, 0 deletions
diff --git a/applications/message_handler/lib/include/message_handler.hpp b/applications/message_handler/lib/include/message_handler.hpp
new file mode 100644
index 0000000..3c227be
--- /dev/null
+++ b/applications/message_handler/lib/include/message_handler.hpp
@@ -0,0 +1,187 @@
+/*
+ * Copyright (c) 2020-2022 Arm Limited.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * Licensed under the Apache License, Version 2.0 (the License); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MESSAGE_HANDLER_H
+#define MESSAGE_HANDLER_H
+
+#include "FreeRTOS.h"
+#include "queue.h"
+#include "semphr.h"
+
+#include "message_queue.hpp"
+#include "networks.hpp"
+#include <ethosu_core_interface.h>
+#if defined(ETHOSU)
+#include <ethosu_driver.h>
+#endif
+#include <inference_parser.hpp>
+#include <inference_process.hpp>
+#include <mailbox.hpp>
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdio>
+#include <inttypes.h>
+#include <list>
+#include <vector>
+
+namespace MessageHandler {
+
+template <typename T, size_t capacity = 10>
+class Queue {
+public:
+ using Predicate = std::function<bool(const T &data)>;
+
+ Queue() {
+ mutex = xSemaphoreCreateMutex();
+ size = xSemaphoreCreateCounting(capacity, 0u);
+
+ if (mutex == nullptr || size == nullptr) {
+ printf("Error: failed to allocate memory for inference queue\n");
+ }
+ }
+
+ ~Queue() {
+ vSemaphoreDelete(mutex);
+ vSemaphoreDelete(size);
+ }
+
+ bool push(const T &data) {
+ xSemaphoreTake(mutex, portMAX_DELAY);
+ if (list.size() >= capacity) {
+ xSemaphoreGive(mutex);
+ return false;
+ }
+
+ list.push_back(data);
+ xSemaphoreGive(mutex);
+
+ // increase number of available inferences to pop
+ xSemaphoreGive(size);
+ return true;
+ }
+
+ void pop(T &data) {
+ // decrease the number of available inferences to pop
+ xSemaphoreTake(size, portMAX_DELAY);
+
+ xSemaphoreTake(mutex, portMAX_DELAY);
+ data = list.front();
+ list.pop_front();
+ xSemaphoreGive(mutex);
+ }
+
+ bool erase(Predicate pred) {
+ // let's optimistically assume we are removing an inference, so decrease pop
+ if (pdFALSE == xSemaphoreTake(size, 0)) {
+ // if there are no inferences return immediately
+ return false;
+ }
+
+ xSemaphoreTake(mutex, portMAX_DELAY);
+ auto found = std::find_if(list.begin(), list.end(), pred);
+ bool erased = found != list.end();
+ if (erased) {
+ list.erase(found);
+ }
+ xSemaphoreGive(mutex);
+
+ if (!erased) {
+ // no inference erased, so let's put the size count back
+ xSemaphoreGive(size);
+ }
+
+ return erased;
+ }
+
+private:
+ std::list<T> list;
+
+ SemaphoreHandle_t mutex;
+ SemaphoreHandle_t size;
+};
+
+class IncomingMessageHandler {
+public:
+ IncomingMessageHandler(EthosU::ethosu_core_queue &inputMessageQueue,
+ EthosU::ethosu_core_queue &outputMessageQueue,
+ Mailbox::Mailbox &mailbox,
+ std::shared_ptr<Queue<EthosU::ethosu_core_inference_req>> inferenceInputQueue,
+ QueueHandle_t inferenceOutputQueue,
+ SemaphoreHandle_t messageNotify,
+ std::shared_ptr<Networks> networks);
+
+ void run();
+
+private:
+ bool handleMessage();
+ bool handleInferenceOutput();
+ static void handleIrq(void *userArg);
+
+ void sendPong();
+ void sendErrorAndResetQueue(EthosU::ethosu_core_msg_err_type type, const char *message);
+ void sendVersionRsp();
+ void sendCapabilitiesRsp(uint64_t userArg);
+ void sendNetworkInfoRsp(uint64_t userArg, EthosU::ethosu_core_network_buffer &network);
+ void sendInferenceRsp(EthosU::ethosu_core_inference_rsp &inference);
+ void sendFailedInferenceRsp(uint64_t userArg, uint32_t status);
+ void sendCancelInferenceRsp(uint64_t userArg, uint32_t status);
+ void readCapabilties(EthosU::ethosu_core_msg_capabilities_rsp &rsp);
+
+ MessageQueue::QueueImpl inputMessageQueue;
+ MessageQueue::QueueImpl outputMessageQueue;
+ Mailbox::Mailbox &mailbox;
+ InferenceProcess::InferenceParser parser;
+ std::shared_ptr<Queue<EthosU::ethosu_core_inference_req>> inferenceInputQueue;
+ QueueHandle_t inferenceOutputQueue;
+ SemaphoreHandle_t messageNotify;
+ EthosU::ethosu_core_msg_capabilities_rsp capabilities;
+ std::shared_ptr<Networks> networks;
+};
+
+class InferenceHandler {
+public:
+ InferenceHandler(uint8_t *tensorArena,
+ size_t arenaSize,
+ std::shared_ptr<Queue<EthosU::ethosu_core_inference_req>> inferenceInputQueue,
+ QueueHandle_t inferenceOutputQueue,
+ SemaphoreHandle_t messageNotify,
+ std::shared_ptr<Networks> networks);
+
+ void run();
+
+private:
+ void runInference(EthosU::ethosu_core_inference_req &req, EthosU::ethosu_core_inference_rsp &rsp);
+ bool getInferenceJob(const EthosU::ethosu_core_inference_req &req, InferenceProcess::InferenceJob &job);
+
+#if defined(ETHOSU)
+ friend void ::ethosu_inference_begin(struct ethosu_driver *drv, void *userArg);
+ friend void ::ethosu_inference_end(struct ethosu_driver *drv, void *userArg);
+#endif
+ std::shared_ptr<Queue<EthosU::ethosu_core_inference_req>> inferenceInputQueue;
+ QueueHandle_t inferenceOutputQueue;
+ SemaphoreHandle_t messageNotify;
+ InferenceProcess::InferenceProcess inference;
+ EthosU::ethosu_core_inference_req *currentReq;
+ EthosU::ethosu_core_inference_rsp *currentRsp;
+ std::shared_ptr<Networks> networks;
+};
+
+} // namespace MessageHandler
+
+#endif