7 #include "PacketVersionResolver.hpp" 10 #include <common/include/Constants.hpp> 11 #include <common/include/ProfilingException.hpp> 14 #include <boost/numeric/conversion/cast.hpp> 26 std::vector<uint32_t> headers;
27 headers.push_back(m_MetaDataPacketHeader);
33 if (packet.GetHeader() != m_MetaDataPacketHeader)
35 throw arm::pipe::ProfilingException(
"StreamMetaDataProcessor can only handle Stream Meta Data Packets");
49 throw arm::pipe::ProfilingException(
"Protocol read error. Unable to read the PIPE_MAGIC value.");
53 std::unique_ptr<unsigned char[]> uniqueNullPtr =
nullptr;
54 arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr);
55 m_FileOnlyProfilingConnection->
ReturnPacket(returnPacket);
58 uint32_t StreamMetaDataProcessor::ToUint32(
const unsigned char* data,
TargetEndianness endianness)
64 return static_cast<uint32_t
>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
65 static_cast<uint32_t
>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
69 return static_cast<uint32_t
>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
70 static_cast<uint32_t
>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
95 size_t initialSize = m_PacketQueue.size();
96 for (
size_t i = 0; i < initialSize; ++i)
101 m_KeepRunning.store(
false);
102 if (m_LocalHandlersThread.joinable())
105 m_ConditionPacketReadable.notify_one();
106 m_LocalHandlersThread.join();
114 ForwardPacketToHandlers(packet);
121 std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
122 m_PacketQueue.push(std::move(packet));
124 m_ConditionPacketAvailable.notify_one();
129 std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
133 if (!m_ConditionPacketAvailable.wait_for(lck,
134 std::chrono::milliseconds(timeout),
135 [&]{return !m_PacketQueue.empty();}))
137 arm::pipe::Packet empty;
141 arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front());
143 return returnedPacket;
146 void FileOnlyProfilingConnection::Fail(
const std::string& errorMessage)
158 m_PacketHandlers.push_back(std::move(localPacketHandler));
160 localCopy->SetConnection(
this);
161 if (localCopy->GetHeadersAccepted().empty())
164 m_UniversalHandlers.push_back(localCopy);
168 for (uint32_t header : localCopy->GetHeadersAccepted())
170 auto iter = m_IndexedHandlers.find(header);
171 if (iter == m_IndexedHandlers.end())
173 std::vector<ILocalPacketHandlerSharedPtr> handlers;
174 handlers.push_back(localCopy);
175 m_IndexedHandlers.emplace(std::make_pair(header, handlers));
179 iter->second.push_back(localCopy);
185 void FileOnlyProfilingConnection::StartProcessingThread()
188 if (m_IsRunning.load())
193 if (m_LocalHandlersThread.joinable())
195 m_LocalHandlersThread.join();
197 m_IsRunning.store(
true);
198 m_KeepRunning.store(
true);
199 m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers,
this);
202 void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet)
204 if (m_PacketHandlers.empty())
208 if (!m_KeepRunning.load())
213 std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
214 if (!m_KeepRunning.load())
218 m_ReadableList.push(std::move(packet));
220 m_ConditionPacketReadable.notify_one();
223 void FileOnlyProfilingConnection::ServiceLocalHandlers()
227 arm::pipe::Packet returnedPacket;
228 bool readPacket =
false;
230 std::unique_lock<std::mutex> lck(m_ReadableMutex);
233 m_ConditionPacketReadable.wait(lck,
234 [&] {
return !m_ReadableList.empty(); });
238 m_ConditionPacketReadable.wait_for(lck,
239 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
240 [&] {
return !m_ReadableList.empty(); });
242 if (m_KeepRunning.load())
244 if (!m_ReadableList.empty())
246 returnedPacket = std::move(m_ReadableList.front());
247 m_ReadableList.pop();
256 if (m_KeepRunning.load() && readPacket)
258 DispatchPacketToHandlers(returnedPacket);
260 }
while (m_KeepRunning.load());
263 m_IsRunning.store(
false);
266 void FileOnlyProfilingConnection::ClearReadableList()
269 size_t initialSize = m_ReadableList.size();
270 for (
size_t i = 0; i < initialSize; ++i)
272 m_ReadableList.pop();
276 void FileOnlyProfilingConnection::DispatchPacketToHandlers(
const arm::pipe::Packet& packet)
278 for (
auto& delegate : m_UniversalHandlers)
280 delegate->HandlePacket(packet);
282 auto iter = m_IndexedHandlers.find(packet.GetHeader());
283 if (iter != m_IndexedHandlers.end())
285 for (
auto& delegate : iter->second)
289 delegate->HandlePacket(packet);
291 catch (
const arm::pipe::ProfilingException& ex)
295 catch (
const std::exception& ex)
301 Fail(
"handler failed");
std::shared_ptr< ILocalPacketHandler > ILocalPacketHandlerSharedPtr
~FileOnlyProfilingConnection() override
bool IsOpen() const override
Copyright (c) 2020 ARM Limited.
arm::pipe::Packet ReceivePacket(const unsigned char *buffer, uint32_t length)
void SetEndianess(const TargetEndianness &endianness) override
#define ARMNN_ASSERT(COND)
void ReturnPacket(arm::pipe::Packet &packet) override
bool WritePacket(const unsigned char *buffer, uint32_t length) override
arm::pipe::Packet ReadPacket(uint32_t timeout) override