// // Copyright © 2019 Arm Ltd. All rights reserved. // SPDX-License-Identifier: MIT // #include "FileOnlyProfilingConnection.hpp" #include "PacketVersionResolver.hpp" #include #include #include #include #include #include #include namespace armnn { namespace profiling { std::vector StreamMetaDataProcessor::GetHeadersAccepted() { std::vector headers; headers.push_back(m_MetaDataPacketHeader); return headers; } void StreamMetaDataProcessor::HandlePacket(const Packet& packet) { if (packet.GetHeader() != m_MetaDataPacketHeader) { throw armnnProfiling::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets"); } // determine the endianness of the protocol TargetEndianness endianness; if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == armnnProfiling::PIPE_MAGIC) { endianness = TargetEndianness::BeWire; } else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == armnnProfiling::PIPE_MAGIC) { endianness = TargetEndianness::LeWire; } else { throw armnnProfiling::ProfilingException("Protocol read error. Unable to read PIPE_MAGIC value."); } m_FileOnlyProfilingConnection->SetEndianess(endianness); // send back the acknowledgement std::unique_ptr uniqueNullPtr = nullptr; Packet returnPacket(0x10000, 0, uniqueNullPtr); m_FileOnlyProfilingConnection->ReturnPacket(returnPacket); } uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness) { // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the // specified endianness. if (endianness == TargetEndianness::BeWire) { return static_cast(data[0]) << 24 | static_cast(data[1]) << 16 | static_cast(data[2]) << 8 | static_cast(data[3]); } else { return static_cast(data[3]) << 24 | static_cast(data[2]) << 16 | static_cast(data[1]) << 8 | static_cast(data[0]); } } FileOnlyProfilingConnection::~FileOnlyProfilingConnection() { try { Close(); } catch (...) { // do nothing } } bool FileOnlyProfilingConnection::IsOpen() const { // This type of connection is always open. return true; } void FileOnlyProfilingConnection::Close() { // Dump any unread packets out of the queue. size_t initialSize = m_PacketQueue.size(); for (size_t i = 0; i < initialSize; ++i) { m_PacketQueue.pop(); } // dispose of the processing thread m_KeepRunning.store(false); if (m_LocalHandlersThread.joinable()) { // make sure the thread wakes up and sees it has to stop m_ConditionPacketReadable.notify_one(); m_LocalHandlersThread.join(); } } bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length) { ARMNN_ASSERT(buffer); Packet packet = ReceivePacket(buffer, length); ForwardPacketToHandlers(packet); return true; } void FileOnlyProfilingConnection::ReturnPacket(Packet& packet) { { std::lock_guard lck(m_PacketAvailableMutex); m_PacketQueue.push(std::move(packet)); } m_ConditionPacketAvailable.notify_one(); } Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout) { std::unique_lock lck(m_PacketAvailableMutex); // Here we are using m_PacketQueue.empty() as a predicate variable // The conditional variable will wait until packetQueue is not empty or until a timeout if (!m_ConditionPacketAvailable.wait_for(lck, std::chrono::milliseconds(timeout), [&]{return !m_PacketQueue.empty();})) { Packet empty; return empty; } Packet returnedPacket = std::move(m_PacketQueue.front()); m_PacketQueue.pop(); return returnedPacket; } void FileOnlyProfilingConnection::Fail(const std::string& errorMessage) { Close(); throw RuntimeException(errorMessage); } /// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start /// a processing thread that will ensure that processing of packets will happen on a separate /// thread from the profiling services send thread and will therefore protect against the /// profiling message buffer becoming exhausted because packet handling slows the dispatch. void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler) { m_PacketHandlers.push_back(std::move(localPacketHandler)); ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back(); localCopy->SetConnection(this); if (localCopy->GetHeadersAccepted().empty()) { //this is a universal handler m_UniversalHandlers.push_back(localCopy); } else { for (uint32_t header : localCopy->GetHeadersAccepted()) { auto iter = m_IndexedHandlers.find(header); if (iter == m_IndexedHandlers.end()) { std::vector handlers; handlers.push_back(localCopy); m_IndexedHandlers.emplace(std::make_pair(header, handlers)); } else { iter->second.push_back(localCopy); } } } } void FileOnlyProfilingConnection::StartProcessingThread() { // check if the thread has already started if (m_IsRunning.load()) { return; } // make sure if there was one running before it is joined if (m_LocalHandlersThread.joinable()) { m_LocalHandlersThread.join(); } m_IsRunning.store(true); m_KeepRunning.store(true); m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this); } void FileOnlyProfilingConnection::ForwardPacketToHandlers(Packet& packet) { if (m_PacketHandlers.empty()) { return; } if (!m_KeepRunning.load()) { return; } { std::unique_lock readableListLock(m_ReadableMutex); if (!m_KeepRunning.load()) { return; } m_ReadableList.push(std::move(packet)); } m_ConditionPacketReadable.notify_one(); } void FileOnlyProfilingConnection::ServiceLocalHandlers() { do { Packet returnedPacket; bool readPacket = false; { // only lock while we are taking the packet off the incoming list std::unique_lock lck(m_ReadableMutex); if (m_Timeout < 0) { m_ConditionPacketReadable.wait(lck, [&] { return !m_ReadableList.empty(); }); } else { m_ConditionPacketReadable.wait_for(lck, std::chrono::milliseconds(std::max(m_Timeout, 1000)), [&] { return !m_ReadableList.empty(); }); } if (m_KeepRunning.load()) { if (!m_ReadableList.empty()) { returnedPacket = std::move(m_ReadableList.front()); m_ReadableList.pop(); readPacket = true; } } else { ClearReadableList(); } } if (m_KeepRunning.load() && readPacket) { DispatchPacketToHandlers(returnedPacket); } } while (m_KeepRunning.load()); // make sure the readable list is cleared ClearReadableList(); m_IsRunning.store(false); } void FileOnlyProfilingConnection::ClearReadableList() { // make sure the incoming packet queue gets emptied size_t initialSize = m_ReadableList.size(); for (size_t i = 0; i < initialSize; ++i) { m_ReadableList.pop(); } } void FileOnlyProfilingConnection::DispatchPacketToHandlers(const Packet& packet) { for (auto& delegate : m_UniversalHandlers) { delegate->HandlePacket(packet); } auto iter = m_IndexedHandlers.find(packet.GetHeader()); if (iter != m_IndexedHandlers.end()) { for (auto& delegate : iter->second) { try { delegate->HandlePacket(packet); } catch (const armnnProfiling::ProfilingException& ex) { Fail(ex.what()); } catch (const std::exception& ex) { Fail(ex.what()); } catch (...) { Fail("handler failed"); } } } } } // namespace profiling } // namespace armnn