From 01d0281404183c84d26e863502cac8d83044c0bf Mon Sep 17 00:00:00 2001 From: Jim Flynn Date: Wed, 29 Apr 2020 21:12:13 +0100 Subject: IVGCVSW-4595 Change FileOnlyProfilingConnection to all packet processor model Change-Id: Ieccb26190d80e570ddef8d7c22e824eda1b92d7f Signed-off-by: Jim Flynn --- src/profiling/FileOnlyProfilingConnection.cpp | 240 +++++++++----------------- 1 file changed, 85 insertions(+), 155 deletions(-) (limited to 'src/profiling/FileOnlyProfilingConnection.cpp') diff --git a/src/profiling/FileOnlyProfilingConnection.cpp b/src/profiling/FileOnlyProfilingConnection.cpp index 1d4e23b7aa..1e26aaa472 100644 --- a/src/profiling/FileOnlyProfilingConnection.cpp +++ b/src/profiling/FileOnlyProfilingConnection.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -20,160 +21,107 @@ namespace armnn namespace profiling { -FileOnlyProfilingConnection::~FileOnlyProfilingConnection() -{ - Close(); -} - -bool FileOnlyProfilingConnection::IsOpen() const +std::vector StreamMetaDataProcessor::GetHeadersAccepted() { - // This type of connection is always open. - return true; + std::vector headers; + headers.push_back(m_MetaDataPacketHeader); + return headers; } -void FileOnlyProfilingConnection::Close() +void StreamMetaDataProcessor::HandlePacket(const Packet& packet) { - // Dump any unread packets out of the queue. - size_t initialSize = m_PacketQueue.size(); - for (size_t i = 0; i < initialSize; ++i) + if (packet.GetHeader() != m_MetaDataPacketHeader) { - m_PacketQueue.pop(); + throw armnnProfiling::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets"); } - // dispose of the processing thread - m_KeepRunning.store(false); - if (m_LocalHandlersThread.joinable()) + // determine the endianness of the protocol + TargetEndianness endianness; + if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == armnnProfiling::PIPE_MAGIC) { - // make sure the thread wakes up and sees it has to stop - m_ConditionPacketReadable.notify_one(); - m_LocalHandlersThread.join(); + endianness = TargetEndianness::BeWire; } -} - -bool FileOnlyProfilingConnection::WaitForStreamMeta(const unsigned char* buffer, uint32_t length) -{ - IgnoreUnused(length); - - // The first word, stream_metadata_identifer, should always be 0. - if (ToUint32(buffer, TargetEndianness::BeWire) != 0) + else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == armnnProfiling::PIPE_MAGIC) { - Fail("Protocol error. The stream_metadata_identifer was not 0."); + endianness = TargetEndianness::LeWire; } - - // Before we interpret the length we need to read the pipe_magic word to determine endianness. - if (ToUint32(buffer + 8, TargetEndianness::BeWire) == armnnProfiling::PIPE_MAGIC) + else { - m_Endianness = TargetEndianness::BeWire; + throw armnnProfiling::ProfilingException("Protocol read error. Unable to read PIPE_MAGIC value."); } - else if (ToUint32(buffer + 8, TargetEndianness::LeWire) == armnnProfiling::PIPE_MAGIC) + m_FileOnlyProfilingConnection->SetEndianess(endianness); + // send back the acknowledgement + std::unique_ptr uniqueNullPtr = nullptr; + Packet returnPacket(0x10000, 0, uniqueNullPtr); + m_FileOnlyProfilingConnection->ReturnPacket(returnPacket); +} + +uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness) +{ + // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the + // specified endianness. + if (endianness == TargetEndianness::BeWire) { - m_Endianness = TargetEndianness::LeWire; + return static_cast(data[0]) << 24 | static_cast(data[1]) << 16 | + static_cast(data[2]) << 8 | static_cast(data[3]); } else { - Fail("Protocol read error. Unable to read PIPE_MAGIC value."); + return static_cast(data[3]) << 24 | static_cast(data[2]) << 16 | + static_cast(data[1]) << 8 | static_cast(data[0]); } - return true; } -void FileOnlyProfilingConnection::SendConnectionAck() +FileOnlyProfilingConnection::~FileOnlyProfilingConnection() { - if (!m_QuietOp) + try { - std::cout << "Sending connection acknowledgement." << std::endl; + Close(); } - std::unique_ptr uniqueNullPtr = nullptr; + catch (...) { - std::lock_guard lck(m_PacketAvailableMutex); - m_PacketQueue.push(Packet(0x10000, 0, uniqueNullPtr)); + // do nothing } - m_ConditionPacketAvailable.notify_one(); } -bool FileOnlyProfilingConnection::SendCounterSelectionPacket() +bool FileOnlyProfilingConnection::IsOpen() const { - uint32_t uint16_t_size = sizeof(uint16_t); - uint32_t uint32_t_size = sizeof(uint32_t); - - uint32_t offset = 0; - uint32_t bodySize = uint32_t_size + boost::numeric_cast(m_IdList.size()) * uint16_t_size; - - auto uniqueData = std::make_unique(bodySize); - unsigned char* data = reinterpret_cast(uniqueData.get()); - - // Copy capturePeriod - WriteUint32(data, offset, m_Options.m_CapturePeriod); + // This type of connection is always open. + return true; +} - // Copy m_IdList - offset += uint32_t_size; - for (const uint16_t& id : m_IdList) +void FileOnlyProfilingConnection::Close() +{ + // Dump any unread packets out of the queue. + size_t initialSize = m_PacketQueue.size(); + for (size_t i = 0; i < initialSize; ++i) { - WriteUint16(data, offset, id); - offset += uint16_t_size; + m_PacketQueue.pop(); } - + // dispose of the processing thread + m_KeepRunning.store(false); + if (m_LocalHandlersThread.joinable()) { - std::lock_guard lck(m_PacketAvailableMutex); - m_PacketQueue.push(Packet(0x40000, bodySize, uniqueData)); + // make sure the thread wakes up and sees it has to stop + m_ConditionPacketReadable.notify_one(); + m_LocalHandlersThread.join(); } - m_ConditionPacketAvailable.notify_one(); - - return true; } bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length) { ARMNN_ASSERT(buffer); Packet packet = ReceivePacket(buffer, length); + ForwardPacketToHandlers(packet); + return true; +} - // Read Header and determine case - uint32_t outgoingHeaderAsWords[2]; - PackageActivity packageActivity = GetPackageActivity(packet, outgoingHeaderAsWords); - - switch (packageActivity) +void FileOnlyProfilingConnection::ReturnPacket(Packet& packet) +{ { - case PackageActivity::StreamMetaData: - { - if (!WaitForStreamMeta(buffer, length)) - { - return EXIT_FAILURE; - } - - SendConnectionAck(); - break; - } - case PackageActivity::CounterDirectory: - { - std::unique_ptr uniqueCounterData = std::make_unique(length - 8); - - std::memcpy(uniqueCounterData.get(), buffer + 8, length - 8); - - Packet directoryPacket(outgoingHeaderAsWords[0], length - 8, uniqueCounterData); - - armnn::profiling::PacketVersionResolver packetVersionResolver; - DirectoryCaptureCommandHandler directoryCaptureCommandHandler( - 0, 2, packetVersionResolver.ResolvePacketVersion(0, 2).GetEncodedValue()); - directoryCaptureCommandHandler.operator()(directoryPacket); - const ICounterDirectory& counterDirectory = directoryCaptureCommandHandler.GetCounterDirectory(); - for (auto& category : counterDirectory.GetCategories()) - { - // Remember we need to translate the Uid's from our CounterDirectory instance to the parent one. - std::vector translatedCounters; - for (auto const& copyUid : category->m_Counters) - { - translatedCounters.emplace_back(directoryCaptureCommandHandler.TranslateUIDCopyToOriginal(copyUid)); - } - m_IdList.insert(std::end(m_IdList), std::begin(translatedCounters), std::end(translatedCounters)); - } - SendCounterSelectionPacket(); - break; - } - default: - { - break; - } + std::lock_guard lck(m_PacketAvailableMutex); + m_PacketQueue.push(std::move(packet)); } - ForwardPacketToHandlers(packet); - return true; + m_ConditionPacketAvailable.notify_one(); } Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout) @@ -182,11 +130,12 @@ Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout) // Here we are using m_PacketQueue.empty() as a predicate variable // The conditional variable will wait until packetQueue is not empty or until a timeout - if(!m_ConditionPacketAvailable.wait_for(lck, - std::chrono::milliseconds(timeout), - [&]{return !m_PacketQueue.empty();})) + if (!m_ConditionPacketAvailable.wait_for(lck, + std::chrono::milliseconds(timeout), + [&]{return !m_PacketQueue.empty();})) { - throw armnn::TimeoutException("Thread has timed out as per requested time limit"); + Packet empty; + return empty; } Packet returnedPacket = std::move(m_PacketQueue.front()); @@ -194,40 +143,6 @@ Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout) return returnedPacket; } -PackageActivity FileOnlyProfilingConnection::GetPackageActivity(const Packet& packet, uint32_t headerAsWords[2]) -{ - headerAsWords[0] = packet.GetHeader(); - headerAsWords[1] = packet.GetLength(); - if (headerAsWords[0] == 0x20000) // Packet family = 0 Packet Id = 2 - { - return PackageActivity::CounterDirectory; - } - else if (headerAsWords[0] == 0) // Packet family = 0 Packet Id = 0 - { - return PackageActivity::StreamMetaData; - } - else - { - return PackageActivity::Unknown; - } -} - -uint32_t FileOnlyProfilingConnection::ToUint32(const unsigned char* data, TargetEndianness endianness) -{ - // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the - // specified endianness. - if (endianness == TargetEndianness::BeWire) - { - return static_cast(data[0]) << 24 | static_cast(data[1]) << 16 | - static_cast(data[2]) << 8 | static_cast(data[3]); - } - else - { - return static_cast(data[3]) << 24 | static_cast(data[2]) << 16 | - static_cast(data[1]) << 8 | static_cast(data[0]); - } -} - void FileOnlyProfilingConnection::Fail(const std::string& errorMessage) { Close(); @@ -290,13 +205,13 @@ void FileOnlyProfilingConnection::ForwardPacketToHandlers(Packet& packet) { return; } - if (m_KeepRunning.load() == false) + if (!m_KeepRunning.load()) { return; } { std::unique_lock readableListLock(m_ReadableMutex); - if (m_KeepRunning.load() == false) + if (!m_KeepRunning.load()) { return; } @@ -367,9 +282,24 @@ void FileOnlyProfilingConnection::DispatchPacketToHandlers(const Packet& packet) auto iter = m_IndexedHandlers.find(packet.GetHeader()); if (iter != m_IndexedHandlers.end()) { - for (auto &delegate : iter->second) + for (auto& delegate : iter->second) { - delegate->HandlePacket(packet); + try + { + delegate->HandlePacket(packet); + } + catch (const armnnProfiling::ProfilingException& ex) + { + Fail(ex.what()); + } + catch (const std::exception& ex) + { + Fail(ex.what()); + } + catch (...) + { + Fail("handler failed"); + } } } } -- cgit v1.2.1