From 3e9bc19ad523361e6b18057849e30c0c48183915 Mon Sep 17 00:00:00 2001 From: Jim Flynn Date: Wed, 23 Mar 2022 23:01:26 +0000 Subject: IVGCVSW-6706 Create the libpipeClient library Change-Id: I2368aade38ad3808fab55d8a86cd659d4e95d91e Signed-off-by: Jim Flynn --- .../client/src/FileOnlyProfilingConnection.cpp | 307 +++++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 profiling/client/src/FileOnlyProfilingConnection.cpp (limited to 'profiling/client/src/FileOnlyProfilingConnection.cpp') diff --git a/profiling/client/src/FileOnlyProfilingConnection.cpp b/profiling/client/src/FileOnlyProfilingConnection.cpp new file mode 100644 index 0000000000..bee2c62de0 --- /dev/null +++ b/profiling/client/src/FileOnlyProfilingConnection.cpp @@ -0,0 +1,307 @@ +// +// Copyright © 2019 Arm Ltd and Contributors. All rights reserved. +// SPDX-License-Identifier: MIT +// + +#include "FileOnlyProfilingConnection.hpp" + +#include +#include +#include + +#include +#include +#include + +namespace arm +{ + +namespace pipe +{ + +std::vector StreamMetaDataProcessor::GetHeadersAccepted() +{ + std::vector headers; + headers.push_back(m_MetaDataPacketHeader); + return headers; +} + +void StreamMetaDataProcessor::HandlePacket(const arm::pipe::Packet& packet) +{ + if (packet.GetHeader() != m_MetaDataPacketHeader) + { + throw arm::pipe::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets"); + } + // determine the endianness of the protocol + TargetEndianness endianness; + if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == arm::pipe::PIPE_MAGIC) + { + endianness = TargetEndianness::BeWire; + } + else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == arm::pipe::PIPE_MAGIC) + { + endianness = TargetEndianness::LeWire; + } + else + { + throw arm::pipe::ProfilingException("Protocol read error. Unable to read the PIPE_MAGIC value."); + } + m_FileOnlyProfilingConnection->SetEndianess(endianness); + // send back the acknowledgement + std::unique_ptr uniqueNullPtr = nullptr; + arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr); + m_FileOnlyProfilingConnection->ReturnPacket(returnPacket); +} + +uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness) +{ + // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the + // specified endianness. + if (endianness == TargetEndianness::BeWire) + { + return static_cast(data[0]) << 24 | static_cast(data[1]) << 16 | + static_cast(data[2]) << 8 | static_cast(data[3]); + } + else + { + return static_cast(data[3]) << 24 | static_cast(data[2]) << 16 | + static_cast(data[1]) << 8 | static_cast(data[0]); + } +} + +FileOnlyProfilingConnection::~FileOnlyProfilingConnection() +{ + try + { + Close(); + } + catch (...) + { + // do nothing + } +} + +bool FileOnlyProfilingConnection::IsOpen() const +{ + // This type of connection is always open. + return true; +} + +void FileOnlyProfilingConnection::Close() +{ + // Dump any unread packets out of the queue. + size_t initialSize = m_PacketQueue.size(); + for (size_t i = 0; i < initialSize; ++i) + { + m_PacketQueue.pop(); + } + // dispose of the processing thread + m_KeepRunning.store(false); + if (m_LocalHandlersThread.joinable()) + { + // make sure the thread wakes up and sees it has to stop + m_ConditionPacketReadable.notify_one(); + m_LocalHandlersThread.join(); + } +} + +bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length) +{ + ARM_PIPE_ASSERT(buffer); + arm::pipe::Packet packet = ReceivePacket(buffer, length); + ForwardPacketToHandlers(packet); + return true; +} + +void FileOnlyProfilingConnection::ReturnPacket(arm::pipe::Packet& packet) +{ + { + std::lock_guard lck(m_PacketAvailableMutex); + m_PacketQueue.push(std::move(packet)); + } + m_ConditionPacketAvailable.notify_one(); +} + +arm::pipe::Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout) +{ + std::unique_lock lck(m_PacketAvailableMutex); + + // Here we are using m_PacketQueue.empty() as a predicate variable + // The conditional variable will wait until packetQueue is not empty or until a timeout + if (!m_ConditionPacketAvailable.wait_for(lck, + std::chrono::milliseconds(timeout), + [&]{return !m_PacketQueue.empty();})) + { + arm::pipe::Packet empty; + return empty; + } + + arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front()); + m_PacketQueue.pop(); + return returnedPacket; +} + +void FileOnlyProfilingConnection::Fail(const std::string& errorMessage) +{ + Close(); + throw arm::pipe::ProfilingException(errorMessage); +} + +/// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start +/// a processing thread that will ensure that processing of packets will happen on a separate +/// thread from the profiling services send thread and will therefore protect against the +/// profiling message buffer becoming exhausted because packet handling slows the dispatch. +void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler) +{ + m_PacketHandlers.push_back(std::move(localPacketHandler)); + ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back(); + localCopy->SetConnection(this); + if (localCopy->GetHeadersAccepted().empty()) + { + //this is a universal handler + m_UniversalHandlers.push_back(localCopy); + } + else + { + for (uint32_t header : localCopy->GetHeadersAccepted()) + { + auto iter = m_IndexedHandlers.find(header); + if (iter == m_IndexedHandlers.end()) + { + std::vector handlers; + handlers.push_back(localCopy); + m_IndexedHandlers.emplace(std::make_pair(header, handlers)); + } + else + { + iter->second.push_back(localCopy); + } + } + } +} + +void FileOnlyProfilingConnection::StartProcessingThread() +{ + // check if the thread has already started + if (m_IsRunning.load()) + { + return; + } + // make sure if there was one running before it is joined + if (m_LocalHandlersThread.joinable()) + { + m_LocalHandlersThread.join(); + } + m_IsRunning.store(true); + m_KeepRunning.store(true); + m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this); +} + +void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet) +{ + if (m_PacketHandlers.empty()) + { + return; + } + if (!m_KeepRunning.load()) + { + return; + } + { + std::unique_lock readableListLock(m_ReadableMutex); + if (!m_KeepRunning.load()) + { + return; + } + m_ReadableList.push(std::move(packet)); + } + m_ConditionPacketReadable.notify_one(); +} + +void FileOnlyProfilingConnection::ServiceLocalHandlers() +{ + do + { + arm::pipe::Packet returnedPacket; + bool readPacket = false; + { // only lock while we are taking the packet off the incoming list + std::unique_lock lck(m_ReadableMutex); + if (m_Timeout < 0) + { + m_ConditionPacketReadable.wait(lck, + [&] { return !m_ReadableList.empty(); }); + } + else + { + m_ConditionPacketReadable.wait_for(lck, + std::chrono::milliseconds(std::max(m_Timeout, 1000)), + [&] { return !m_ReadableList.empty(); }); + } + if (m_KeepRunning.load()) + { + if (!m_ReadableList.empty()) + { + returnedPacket = std::move(m_ReadableList.front()); + m_ReadableList.pop(); + readPacket = true; + } + } + else + { + ClearReadableList(); + } + } + if (m_KeepRunning.load() && readPacket) + { + DispatchPacketToHandlers(returnedPacket); + } + } while (m_KeepRunning.load()); + // make sure the readable list is cleared + ClearReadableList(); + m_IsRunning.store(false); +} + +void FileOnlyProfilingConnection::ClearReadableList() +{ + // make sure the incoming packet queue gets emptied + size_t initialSize = m_ReadableList.size(); + for (size_t i = 0; i < initialSize; ++i) + { + m_ReadableList.pop(); + } +} + +void FileOnlyProfilingConnection::DispatchPacketToHandlers(const arm::pipe::Packet& packet) +{ + for (auto& delegate : m_UniversalHandlers) + { + delegate->HandlePacket(packet); + } + auto iter = m_IndexedHandlers.find(packet.GetHeader()); + if (iter != m_IndexedHandlers.end()) + { + for (auto& delegate : iter->second) + { + try + { + delegate->HandlePacket(packet); + } + catch (const arm::pipe::ProfilingException& ex) + { + Fail(ex.what()); + } + catch (const std::exception& ex) + { + Fail(ex.what()); + } + catch (...) + { + Fail("handler failed"); + } + } + } +} + +} // namespace pipe + +} // namespace arm -- cgit v1.2.1