diff options
author | Lior Dekel <lior.dekel@arm.com> | 2021-08-02 12:03:55 +0300 |
---|---|---|
committer | Lior Dekel <lior.dekel@arm.com> | 2021-08-11 15:09:13 +0300 |
commit | 489e40be5d744966d5e5b704ed7c164d1aa234ae (patch) | |
tree | 95a57c1ce1ad66aa4cc7e7ed71e75e3bcec18fab /applications/threadx_demo/main.cpp | |
parent | e5fcbd33e318c822478196c6cca4a6f5ebbdcc0f (diff) | |
download | ethos-u-core-platform-489e40be5d744966d5e5b704ed7c164d1aa234ae.tar.gz |
Adding inference application for ThreadX
Change-Id: Icadb4ceb50997a31bafa2356bfa434568c056593
Diffstat (limited to 'applications/threadx_demo/main.cpp')
-rw-r--r-- | applications/threadx_demo/main.cpp | 435 |
1 files changed, 435 insertions, 0 deletions
diff --git a/applications/threadx_demo/main.cpp b/applications/threadx_demo/main.cpp new file mode 100644 index 0000000..94430ee --- /dev/null +++ b/applications/threadx_demo/main.cpp @@ -0,0 +1,435 @@ +/* + * Copyright (c) 2019-2021 Arm Limited. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the License); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/**************************************************************************** + * Includes + ****************************************************************************/ +#include "tx_api.h" + +#include <inttypes.h> +#include <stdio.h> +#include <vector> + +#include "inference_process.hpp" + +// Model data (Defined & changable by modifiying compile definition in CMakeLists.txt) +#include "input.h" +#include "model.h" +#include "output.h" + +using namespace std; +using namespace InferenceProcess; + +/**************************************************************************** + * Defines + ****************************************************************************/ +// Nr. of threads to process inferences with. Thread reserves driver & runs inference (Normally 1 per NPU, but not a +// must) +#define NUM_INFERENCE_THREADS 1 +// Nr. of threads to create jobs and recieve responses +#define NUM_JOB_THREADS 2 +// Nr. of jobs to create per job thread +#define NUM_JOBS_PER_THREAD 1 + +#define PROCESS_THREAD_STACK_SIZE (16 * 1024) +#define SENDER_THREAD_STACK_SIZE (2 * 1024) +#define PROCESS_THREAD_CONTEXT_SIZE (sizeof(TX_THREAD)) +#define SENDER_THREAD_CONTEXT_SIZE (sizeof(TX_THREAD)) + +// Tensor arena size +#ifdef TENSOR_ARENA_SIZE // If defined in model.h +#define TENSOR_ARENA_SIZE_PER_INFERENCE TENSOR_ARENA_SIZE +#else // If not defined, use maximum available - 2M +#define TENSOR_ARENA_SIZE 2000000 +#define TENSOR_ARENA_SIZE_PER_INFERENCE (TENSOR_ARENA_SIZE / NUM_INFERENCE_THREADS) +#endif + +#define PROCESS_QUEUE_SIZE (NUM_JOBS_PER_THREAD * NUM_JOB_THREADS * sizeof(xInferenceJob *)) +#define SENDER_QUEUE_SIZE (NUM_JOBS_PER_THREAD * sizeof(xInferenceJob *)) + +/* BYTE_POOL_SIZE_OVERHEAD is used to increase the memory byte pool size, as the number of + allocatable bytes in a memory byte pool is slightly less than what was specified during creation */ +#define BYTE_POOL_SIZE_OVERHEAD (512) +#define BYTE_POOL_SIZE \ + (((PROCESS_THREAD_CONTEXT_SIZE + PROCESS_THREAD_STACK_SIZE) * NUM_INFERENCE_THREADS) + \ + (SENDER_THREAD_CONTEXT_SIZE + SENDER_THREAD_STACK_SIZE + SENDER_QUEUE_SIZE) * NUM_JOB_THREADS + \ + PROCESS_QUEUE_SIZE + BYTE_POOL_SIZE_OVERHEAD) + +/**************************************************************************** + * Structures + ****************************************************************************/ +struct ProcessThreadParams { + ProcessThreadParams() : queueHandle(nullptr), tensorArena(nullptr), arenaSize(0) {} + ProcessThreadParams(TX_QUEUE *_queue, uint8_t *_tensorArena, size_t _arenaSize) : + queueHandle(_queue), tensorArena(_tensorArena), arenaSize(_arenaSize) {} + + TX_QUEUE *queueHandle; + uint8_t *tensorArena; + size_t arenaSize; +}; + +// Wrapper around InferenceProcess::InferenceJob. Adds responseQueue and status for ThreadX multi-threaded purposes. +struct xInferenceJob : public InferenceJob { + TX_QUEUE *responseQueue; + bool status; + + xInferenceJob() : InferenceJob(), responseQueue(nullptr), status(false) {} + xInferenceJob(const string &_name, + const DataPtr &_networkModel, + const vector<DataPtr> &_input, + const vector<DataPtr> &_output, + const vector<DataPtr> &_expectedOutput, + const size_t _numBytesToPrint, + const vector<uint8_t> &_pmuEventConfig, + const uint32_t _pmuCycleCounterEnable, + TX_QUEUE *_queue) : + InferenceJob(_name, + _networkModel, + _input, + _output, + _expectedOutput, + _numBytesToPrint, + _pmuEventConfig, + _pmuCycleCounterEnable), + responseQueue(_queue), status(false) {} +}; + +/**************************************************************************** + * Global and static variables + ****************************************************************************/ +namespace { +// Number of total completed jobs, needed to exit application correctly if NUM_JOB_THREADS > 1 +int totalCompletedJobs = 0; + +// TensorArena static initialisation +const size_t arenaSize = TENSOR_ARENA_SIZE_PER_INFERENCE; + +TX_QUEUE inferenceProcessQueue; + +ProcessThreadParams threadParams[NUM_INFERENCE_THREADS]; + +TX_BYTE_POOL bytePool; +ULONG memoryArea[BYTE_POOL_SIZE / sizeof(ULONG)]; +} // namespace + +__attribute__((section(".bss.tensor_arena"), aligned(16))) +uint8_t inferenceProcessTensorArena[NUM_INFERENCE_THREADS][arenaSize]; + +/**************************************************************************** + * Mutex & Semaphore + * Overrides weak-linked symbols in ethosu_driver.c to implement thread handling + ****************************************************************************/ +extern "C" { +void *ethosu_mutex_create(void) { + UINT status; + TX_MUTEX *mutex; + + mutex = new TX_MUTEX; + status = tx_mutex_create(mutex, "mutex 0", TX_NO_INHERIT); + if (status != TX_SUCCESS) { + printf("mutex create failed, error - %d\n", status); + } + return (void *)mutex; +} + +void ethosu_mutex_lock(void *mutex) { + UINT status; + status = tx_mutex_get(reinterpret_cast<TX_MUTEX *>(mutex), TX_WAIT_FOREVER); + if (status != TX_SUCCESS) { + printf("mutex get failed, error - %d\n", status); + } + return; +} + +void ethosu_mutex_unlock(void *mutex) { + UINT status; + status = tx_mutex_put(reinterpret_cast<TX_MUTEX *>(mutex)); + if (status != TX_SUCCESS) { + printf("mutex put failed, error - %d\n", status); + } + return; +} + +void *ethosu_semaphore_create(void) { + UINT status; + TX_SEMAPHORE *semaphore; + + semaphore = new TX_SEMAPHORE; + status = tx_semaphore_create(semaphore, "semaphore", 1); + + if (status != TX_SUCCESS) { + printf("Semaphore create failed, error - %d\n", status); + } + + return (void *)semaphore; +} + +void ethosu_semaphore_take(void *sem) { + UINT status; + + status = tx_semaphore_get(reinterpret_cast<TX_SEMAPHORE *>(sem), TX_WAIT_FOREVER); + + if (status != TX_SUCCESS) { + printf("Semaphore get/take, error - %d\n", status); + } + + return; +} + +void ethosu_semaphore_give(void *sem) { + UINT status; + + status = tx_semaphore_put(reinterpret_cast<TX_SEMAPHORE *>(sem)); + + if (status != TX_SUCCESS) { + printf("Semaphore put/give, error - %d\n", status); + } + + return; +} +} + +/**************************************************************************** + * Functions + ****************************************************************************/ +// inferenceProcessThread - Run jobs from queue with available driver +void inferenceProcessThread(ULONG pvParameters) { + ProcessThreadParams params = *reinterpret_cast<ProcessThreadParams *>(pvParameters); + UINT tx_status = TX_QUEUE_ERROR; + + class InferenceProcess inferenceProcess(params.tensorArena, params.arenaSize); + + for (;;) { + xInferenceJob *xJob; + + // Get the job details from the process queue + tx_status = tx_queue_receive(params.queueHandle, &xJob, TX_WAIT_FOREVER); + if (tx_status != TX_SUCCESS) { + printf("process failed to receive from Queue, error - %d\n", tx_status); + exit(1); + } + + // run the job + bool status = inferenceProcess.runJob(*xJob); + xJob->status = status; + + // Send response for the job in the response queue + tx_status = tx_queue_send(xJob->responseQueue, &xJob, TX_WAIT_FOREVER); + if (tx_status != TX_SUCCESS) { + printf("process inferenceProcessThread failed to send to Queue, error - %d\n", tx_status); + exit(1); + } + } + + tx_status = tx_thread_terminate(nullptr); + if (tx_status != TX_SUCCESS) { + printf("process inferenceProcessThread failed to terminate thread, error - %d\n", tx_status); + exit(1); + } +} + +// inferenceSenderThread - Creates NUM_INFERNECE_JOBS jobs, queues them, and then listens for completion status +void inferenceSenderThread(ULONG pvParameters) { + int ret = 0; + TX_QUEUE senderQueue; + UINT status = TX_QUEUE_ERROR; + TX_QUEUE *inferenceProcessQueueLocal = reinterpret_cast<TX_QUEUE *>(pvParameters); + xInferenceJob jobs[NUM_JOBS_PER_THREAD]; + CHAR *senderQueuePtr = nullptr; + + /* Allocate memory for this inference sender thread responses queue */ + status = tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&senderQueuePtr), SENDER_QUEUE_SIZE, TX_NO_WAIT); + if (status != TX_SUCCESS) { + printf("Sender thread failed to allocate bytes for Queue, error - %d\n", status); + exit(1); + } + + /* Create responses queue for this inference sender thread */ + status = tx_queue_create( + &senderQueue, "senderQueue", sizeof(xInferenceJob *) / sizeof(uint32_t), senderQueuePtr, SENDER_QUEUE_SIZE); + + if (status != TX_SUCCESS) { + printf("Sender thread failed to create Queue, error - %d\n", status); + exit(1); + } + + /* Create the jobs and queue them in the inference process queue */ + for (int n = 0; n < NUM_JOBS_PER_THREAD; n++) { + + // Create job + xInferenceJob *job = &jobs[n]; + job->name = string(modelName); + job->networkModel = DataPtr(networkModelData, sizeof(networkModelData)); + job->input.push_back(DataPtr(inputData, sizeof(inputData))); + job->expectedOutput.push_back(DataPtr(expectedOutputData, sizeof(expectedOutputData))); + job->responseQueue = &senderQueue; + + // queue job + status = tx_queue_send(inferenceProcessQueueLocal, &job, TX_WAIT_FOREVER); + if (status != TX_SUCCESS) { + printf("Sender thread failed to send to Queue, error - %d\n", status); + exit(1); + } + } + + /* Listen for completion status on the response queue */ + do { + xInferenceJob *pSendJob; + + status = tx_queue_receive(&senderQueue, &pSendJob, TX_WAIT_FOREVER); + if (status != TX_SUCCESS) { + printf("Sender thread failed to receive from Queue, error - %d\n", status); + exit(1); + } + + totalCompletedJobs++; + ret = (pSendJob->status); + if (pSendJob->status != 0) { + break; + } + } while (totalCompletedJobs < NUM_JOBS_PER_THREAD * NUM_JOB_THREADS); + + /* delete the response queue */ + status = tx_queue_delete(&senderQueue); + if (status != TX_SUCCESS) { + printf("Sender thread failed to delete Queue, error - %d\n", status); + exit(1); + } + + exit(ret); +} + +/**************************************************************************** + * Application + ****************************************************************************/ +int main() { + /* Enter the ThreadX kernel. */ + tx_kernel_enter(); + return 0; +} + +void tx_application_define(void *first_unused_memory) { + UINT status; + CHAR *senderThreadStackPtr[NUM_JOB_THREADS] = {nullptr}; + CHAR *processThreadStackPtr[NUM_INFERENCE_THREADS] = {nullptr}; + CHAR *processQueuePtr = nullptr; + CHAR *senderThreadPtr[NUM_JOB_THREADS] = {nullptr}; + CHAR *processThreadPtr[NUM_INFERENCE_THREADS] = {nullptr}; + + /* Create a byte memory pool from which to allocate the threads stacks and queues. */ + status = tx_byte_pool_create(&bytePool, "byte pool", memoryArea, BYTE_POOL_SIZE); + if (status != TX_SUCCESS) { + printf("Main failed to allocate pool of bytes, error - %d\n", status); + exit(1); + } + + /* Allocate memory for the inference process queue */ + status = tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&processQueuePtr), PROCESS_QUEUE_SIZE, TX_NO_WAIT); + if (status != TX_SUCCESS) { + printf("Main failed to allocate bytes for process queue, error - %d\n", status); + exit(1); + } + + status = tx_queue_create(&inferenceProcessQueue, + "inferenceProcessQueue", + sizeof(xInferenceJob *) / sizeof(uint32_t), + processQueuePtr, + PROCESS_QUEUE_SIZE); + if (status != TX_SUCCESS) { + printf("Main failed to create Queue, error - %d\n", status); + exit(1); + } + + /* inferenceSender threads to create and queue the jobs */ + for (int n = 0; n < NUM_JOB_THREADS; n++) { + + /* Allocate the thread context for the inference sender thread. */ + status = + tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&senderThreadPtr[n]), sizeof(TX_THREAD), TX_NO_WAIT); + if (status != TX_SUCCESS) { + printf("Main failed to allocate bytes for sender tread, error - %d\n", status); + exit(1); + } + + /* Allocate the stack for the inference sender thread. */ + status = tx_byte_allocate( + &bytePool, reinterpret_cast<VOID **>(&senderThreadStackPtr[n]), SENDER_THREAD_STACK_SIZE, TX_NO_WAIT); + if (status != TX_SUCCESS) { + printf("Main failed to allocate bytes for sender tread stack, error - %d\n", status); + exit(1); + } + + /* Create the inference sender thread. */ + status = tx_thread_create(reinterpret_cast<TX_THREAD *>(senderThreadPtr[n]), + "senderThread", + inferenceSenderThread, + reinterpret_cast<ULONG>(&inferenceProcessQueue), + senderThreadStackPtr[n], + SENDER_THREAD_STACK_SIZE, + 1, + 1, + TX_NO_TIME_SLICE, + TX_AUTO_START); + if (status != TX_SUCCESS) { + printf("Main failed to create Thread, error - %d\n", status); + exit(1); + } + } + + /* Create inferenceProcess threads to process the queued jobs */ + for (int n = 0; n < NUM_INFERENCE_THREADS; n++) { + + /* Allocate the thread context for the inference process thread. */ + status = + tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&processThreadPtr[n]), sizeof(TX_THREAD), TX_NO_WAIT); + if (status != TX_SUCCESS) { + printf("Main failed to allocate bytes for process tread, error - %d\n", status); + exit(1); + } + + /* Allocate the stack for the inference process thread. */ + status = tx_byte_allocate( + &bytePool, reinterpret_cast<VOID **>(&processThreadStackPtr[n]), PROCESS_THREAD_STACK_SIZE, TX_NO_WAIT); + if (status != TX_SUCCESS) { + printf("Main failed to allocate bytes for process stack, error - %d\n", status); + exit(1); + } + + threadParams[n] = ProcessThreadParams( + &inferenceProcessQueue, inferenceProcessTensorArena[n], reinterpret_cast<size_t>(arenaSize)); + + /* Create the inference process thread. */ + status = tx_thread_create(reinterpret_cast<TX_THREAD *>(processThreadPtr[n]), + "processThread", + inferenceProcessThread, + reinterpret_cast<ULONG>(&threadParams[n]), + processThreadStackPtr[n], + PROCESS_THREAD_STACK_SIZE, + 1, + 1, + TX_NO_TIME_SLICE, + TX_AUTO_START); + if (status != TX_SUCCESS) { + printf("Main failed to create thread, error - %d\n", status); + exit(1); + } + } + + printf("ThreadX application initialisation - Done \n"); + return; +} |