aboutsummaryrefslogtreecommitdiff
path: root/profiling/client/src/FileOnlyProfilingConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'profiling/client/src/FileOnlyProfilingConnection.cpp')
-rw-r--r--profiling/client/src/FileOnlyProfilingConnection.cpp307
1 files changed, 307 insertions, 0 deletions
diff --git a/profiling/client/src/FileOnlyProfilingConnection.cpp b/profiling/client/src/FileOnlyProfilingConnection.cpp
new file mode 100644
index 0000000000..bee2c62de0
--- /dev/null
+++ b/profiling/client/src/FileOnlyProfilingConnection.cpp
@@ -0,0 +1,307 @@
+//
+// Copyright © 2019 Arm Ltd and Contributors. All rights reserved.
+// SPDX-License-Identifier: MIT
+//
+
+#include "FileOnlyProfilingConnection.hpp"
+
+#include <common/include/Constants.hpp>
+#include <common/include/ProfilingException.hpp>
+#include <common/include/PacketVersionResolver.hpp>
+
+#include <algorithm>
+#include <iostream>
+#include <thread>
+
+namespace arm
+{
+
+namespace pipe
+{
+
+std::vector<uint32_t> StreamMetaDataProcessor::GetHeadersAccepted()
+{
+ std::vector<uint32_t> headers;
+ headers.push_back(m_MetaDataPacketHeader);
+ return headers;
+}
+
+void StreamMetaDataProcessor::HandlePacket(const arm::pipe::Packet& packet)
+{
+ if (packet.GetHeader() != m_MetaDataPacketHeader)
+ {
+ throw arm::pipe::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets");
+ }
+ // determine the endianness of the protocol
+ TargetEndianness endianness;
+ if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == arm::pipe::PIPE_MAGIC)
+ {
+ endianness = TargetEndianness::BeWire;
+ }
+ else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == arm::pipe::PIPE_MAGIC)
+ {
+ endianness = TargetEndianness::LeWire;
+ }
+ else
+ {
+ throw arm::pipe::ProfilingException("Protocol read error. Unable to read the PIPE_MAGIC value.");
+ }
+ m_FileOnlyProfilingConnection->SetEndianess(endianness);
+ // send back the acknowledgement
+ std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
+ arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr);
+ m_FileOnlyProfilingConnection->ReturnPacket(returnPacket);
+}
+
+uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness)
+{
+ // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
+ // specified endianness.
+ if (endianness == TargetEndianness::BeWire)
+ {
+ return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
+ static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
+ }
+ else
+ {
+ return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
+ static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
+ }
+}
+
+FileOnlyProfilingConnection::~FileOnlyProfilingConnection()
+{
+ try
+ {
+ Close();
+ }
+ catch (...)
+ {
+ // do nothing
+ }
+}
+
+bool FileOnlyProfilingConnection::IsOpen() const
+{
+ // This type of connection is always open.
+ return true;
+}
+
+void FileOnlyProfilingConnection::Close()
+{
+ // Dump any unread packets out of the queue.
+ size_t initialSize = m_PacketQueue.size();
+ for (size_t i = 0; i < initialSize; ++i)
+ {
+ m_PacketQueue.pop();
+ }
+ // dispose of the processing thread
+ m_KeepRunning.store(false);
+ if (m_LocalHandlersThread.joinable())
+ {
+ // make sure the thread wakes up and sees it has to stop
+ m_ConditionPacketReadable.notify_one();
+ m_LocalHandlersThread.join();
+ }
+}
+
+bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
+{
+ ARM_PIPE_ASSERT(buffer);
+ arm::pipe::Packet packet = ReceivePacket(buffer, length);
+ ForwardPacketToHandlers(packet);
+ return true;
+}
+
+void FileOnlyProfilingConnection::ReturnPacket(arm::pipe::Packet& packet)
+{
+ {
+ std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
+ m_PacketQueue.push(std::move(packet));
+ }
+ m_ConditionPacketAvailable.notify_one();
+}
+
+arm::pipe::Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
+{
+ std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
+
+ // Here we are using m_PacketQueue.empty() as a predicate variable
+ // The conditional variable will wait until packetQueue is not empty or until a timeout
+ if (!m_ConditionPacketAvailable.wait_for(lck,
+ std::chrono::milliseconds(timeout),
+ [&]{return !m_PacketQueue.empty();}))
+ {
+ arm::pipe::Packet empty;
+ return empty;
+ }
+
+ arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front());
+ m_PacketQueue.pop();
+ return returnedPacket;
+}
+
+void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
+{
+ Close();
+ throw arm::pipe::ProfilingException(errorMessage);
+}
+
+/// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
+/// a processing thread that will ensure that processing of packets will happen on a separate
+/// thread from the profiling services send thread and will therefore protect against the
+/// profiling message buffer becoming exhausted because packet handling slows the dispatch.
+void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
+{
+ m_PacketHandlers.push_back(std::move(localPacketHandler));
+ ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
+ localCopy->SetConnection(this);
+ if (localCopy->GetHeadersAccepted().empty())
+ {
+ //this is a universal handler
+ m_UniversalHandlers.push_back(localCopy);
+ }
+ else
+ {
+ for (uint32_t header : localCopy->GetHeadersAccepted())
+ {
+ auto iter = m_IndexedHandlers.find(header);
+ if (iter == m_IndexedHandlers.end())
+ {
+ std::vector<ILocalPacketHandlerSharedPtr> handlers;
+ handlers.push_back(localCopy);
+ m_IndexedHandlers.emplace(std::make_pair(header, handlers));
+ }
+ else
+ {
+ iter->second.push_back(localCopy);
+ }
+ }
+ }
+}
+
+void FileOnlyProfilingConnection::StartProcessingThread()
+{
+ // check if the thread has already started
+ if (m_IsRunning.load())
+ {
+ return;
+ }
+ // make sure if there was one running before it is joined
+ if (m_LocalHandlersThread.joinable())
+ {
+ m_LocalHandlersThread.join();
+ }
+ m_IsRunning.store(true);
+ m_KeepRunning.store(true);
+ m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
+}
+
+void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet)
+{
+ if (m_PacketHandlers.empty())
+ {
+ return;
+ }
+ if (!m_KeepRunning.load())
+ {
+ return;
+ }
+ {
+ std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
+ if (!m_KeepRunning.load())
+ {
+ return;
+ }
+ m_ReadableList.push(std::move(packet));
+ }
+ m_ConditionPacketReadable.notify_one();
+}
+
+void FileOnlyProfilingConnection::ServiceLocalHandlers()
+{
+ do
+ {
+ arm::pipe::Packet returnedPacket;
+ bool readPacket = false;
+ { // only lock while we are taking the packet off the incoming list
+ std::unique_lock<std::mutex> lck(m_ReadableMutex);
+ if (m_Timeout < 0)
+ {
+ m_ConditionPacketReadable.wait(lck,
+ [&] { return !m_ReadableList.empty(); });
+ }
+ else
+ {
+ m_ConditionPacketReadable.wait_for(lck,
+ std::chrono::milliseconds(std::max(m_Timeout, 1000)),
+ [&] { return !m_ReadableList.empty(); });
+ }
+ if (m_KeepRunning.load())
+ {
+ if (!m_ReadableList.empty())
+ {
+ returnedPacket = std::move(m_ReadableList.front());
+ m_ReadableList.pop();
+ readPacket = true;
+ }
+ }
+ else
+ {
+ ClearReadableList();
+ }
+ }
+ if (m_KeepRunning.load() && readPacket)
+ {
+ DispatchPacketToHandlers(returnedPacket);
+ }
+ } while (m_KeepRunning.load());
+ // make sure the readable list is cleared
+ ClearReadableList();
+ m_IsRunning.store(false);
+}
+
+void FileOnlyProfilingConnection::ClearReadableList()
+{
+ // make sure the incoming packet queue gets emptied
+ size_t initialSize = m_ReadableList.size();
+ for (size_t i = 0; i < initialSize; ++i)
+ {
+ m_ReadableList.pop();
+ }
+}
+
+void FileOnlyProfilingConnection::DispatchPacketToHandlers(const arm::pipe::Packet& packet)
+{
+ for (auto& delegate : m_UniversalHandlers)
+ {
+ delegate->HandlePacket(packet);
+ }
+ auto iter = m_IndexedHandlers.find(packet.GetHeader());
+ if (iter != m_IndexedHandlers.end())
+ {
+ for (auto& delegate : iter->second)
+ {
+ try
+ {
+ delegate->HandlePacket(packet);
+ }
+ catch (const arm::pipe::ProfilingException& ex)
+ {
+ Fail(ex.what());
+ }
+ catch (const std::exception& ex)
+ {
+ Fail(ex.what());
+ }
+ catch (...)
+ {
+ Fail("handler failed");
+ }
+ }
+ }
+}
+
+} // namespace pipe
+
+} // namespace arm