ArmNN
 21.02
FileOnlyProfilingConnection.cpp
Go to the documentation of this file.
1 //
2 // Copyright © 2019 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5 
7 #include "PacketVersionResolver.hpp"
8 
9 #include <armnn/Exceptions.hpp>
10 #include <common/include/Constants.hpp>
11 #include <common/include/ProfilingException.hpp>
12 
13 #include <algorithm>
14 #include <iostream>
15 #include <thread>
16 
17 namespace armnn
18 {
19 
20 namespace profiling
21 {
22 
24 {
25  std::vector<uint32_t> headers;
26  headers.push_back(m_MetaDataPacketHeader);
27  return headers;
28 }
29 
30 void StreamMetaDataProcessor::HandlePacket(const arm::pipe::Packet& packet)
31 {
32  if (packet.GetHeader() != m_MetaDataPacketHeader)
33  {
34  throw arm::pipe::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets");
35  }
36  // determine the endianness of the protocol
37  TargetEndianness endianness;
38  if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == arm::pipe::PIPE_MAGIC)
39  {
40  endianness = TargetEndianness::BeWire;
41  }
42  else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == arm::pipe::PIPE_MAGIC)
43  {
44  endianness = TargetEndianness::LeWire;
45  }
46  else
47  {
48  throw arm::pipe::ProfilingException("Protocol read error. Unable to read the PIPE_MAGIC value.");
49  }
50  m_FileOnlyProfilingConnection->SetEndianess(endianness);
51  // send back the acknowledgement
52  std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
53  arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr);
54  m_FileOnlyProfilingConnection->ReturnPacket(returnPacket);
55 }
56 
57 uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness)
58 {
59  // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
60  // specified endianness.
61  if (endianness == TargetEndianness::BeWire)
62  {
63  return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
64  static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
65  }
66  else
67  {
68  return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
69  static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
70  }
71 }
72 
74 {
75  try
76  {
77  Close();
78  }
79  catch (...)
80  {
81  // do nothing
82  }
83 }
84 
86 {
87  // This type of connection is always open.
88  return true;
89 }
90 
92 {
93  // Dump any unread packets out of the queue.
94  size_t initialSize = m_PacketQueue.size();
95  for (size_t i = 0; i < initialSize; ++i)
96  {
97  m_PacketQueue.pop();
98  }
99  // dispose of the processing thread
100  m_KeepRunning.store(false);
101  if (m_LocalHandlersThread.joinable())
102  {
103  // make sure the thread wakes up and sees it has to stop
104  m_ConditionPacketReadable.notify_one();
105  m_LocalHandlersThread.join();
106  }
107 }
108 
109 bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
110 {
111  ARMNN_ASSERT(buffer);
112  arm::pipe::Packet packet = ReceivePacket(buffer, length);
113  ForwardPacketToHandlers(packet);
114  return true;
115 }
116 
117 void FileOnlyProfilingConnection::ReturnPacket(arm::pipe::Packet& packet)
118 {
119  {
120  std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
121  m_PacketQueue.push(std::move(packet));
122  }
123  m_ConditionPacketAvailable.notify_one();
124 }
125 
126 arm::pipe::Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
127 {
128  std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
129 
130  // Here we are using m_PacketQueue.empty() as a predicate variable
131  // The conditional variable will wait until packetQueue is not empty or until a timeout
132  if (!m_ConditionPacketAvailable.wait_for(lck,
133  std::chrono::milliseconds(timeout),
134  [&]{return !m_PacketQueue.empty();}))
135  {
136  arm::pipe::Packet empty;
137  return empty;
138  }
139 
140  arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front());
141  m_PacketQueue.pop();
142  return returnedPacket;
143 }
144 
145 void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
146 {
147  Close();
148  throw RuntimeException(errorMessage);
149 }
150 
151 /// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
152 /// a processing thread that will ensure that processing of packets will happen on a separate
153 /// thread from the profiling services send thread and will therefore protect against the
154 /// profiling message buffer becoming exhausted because packet handling slows the dispatch.
155 void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
156 {
157  m_PacketHandlers.push_back(std::move(localPacketHandler));
158  ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
159  localCopy->SetConnection(this);
160  if (localCopy->GetHeadersAccepted().empty())
161  {
162  //this is a universal handler
163  m_UniversalHandlers.push_back(localCopy);
164  }
165  else
166  {
167  for (uint32_t header : localCopy->GetHeadersAccepted())
168  {
169  auto iter = m_IndexedHandlers.find(header);
170  if (iter == m_IndexedHandlers.end())
171  {
172  std::vector<ILocalPacketHandlerSharedPtr> handlers;
173  handlers.push_back(localCopy);
174  m_IndexedHandlers.emplace(std::make_pair(header, handlers));
175  }
176  else
177  {
178  iter->second.push_back(localCopy);
179  }
180  }
181  }
182 }
183 
184 void FileOnlyProfilingConnection::StartProcessingThread()
185 {
186  // check if the thread has already started
187  if (m_IsRunning.load())
188  {
189  return;
190  }
191  // make sure if there was one running before it is joined
192  if (m_LocalHandlersThread.joinable())
193  {
194  m_LocalHandlersThread.join();
195  }
196  m_IsRunning.store(true);
197  m_KeepRunning.store(true);
198  m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
199 }
200 
201 void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet)
202 {
203  if (m_PacketHandlers.empty())
204  {
205  return;
206  }
207  if (!m_KeepRunning.load())
208  {
209  return;
210  }
211  {
212  std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
213  if (!m_KeepRunning.load())
214  {
215  return;
216  }
217  m_ReadableList.push(std::move(packet));
218  }
219  m_ConditionPacketReadable.notify_one();
220 }
221 
222 void FileOnlyProfilingConnection::ServiceLocalHandlers()
223 {
224  do
225  {
226  arm::pipe::Packet returnedPacket;
227  bool readPacket = false;
228  { // only lock while we are taking the packet off the incoming list
229  std::unique_lock<std::mutex> lck(m_ReadableMutex);
230  if (m_Timeout < 0)
231  {
232  m_ConditionPacketReadable.wait(lck,
233  [&] { return !m_ReadableList.empty(); });
234  }
235  else
236  {
237  m_ConditionPacketReadable.wait_for(lck,
238  std::chrono::milliseconds(std::max(m_Timeout, 1000)),
239  [&] { return !m_ReadableList.empty(); });
240  }
241  if (m_KeepRunning.load())
242  {
243  if (!m_ReadableList.empty())
244  {
245  returnedPacket = std::move(m_ReadableList.front());
246  m_ReadableList.pop();
247  readPacket = true;
248  }
249  }
250  else
251  {
252  ClearReadableList();
253  }
254  }
255  if (m_KeepRunning.load() && readPacket)
256  {
257  DispatchPacketToHandlers(returnedPacket);
258  }
259  } while (m_KeepRunning.load());
260  // make sure the readable list is cleared
261  ClearReadableList();
262  m_IsRunning.store(false);
263 }
264 
265 void FileOnlyProfilingConnection::ClearReadableList()
266 {
267  // make sure the incoming packet queue gets emptied
268  size_t initialSize = m_ReadableList.size();
269  for (size_t i = 0; i < initialSize; ++i)
270  {
271  m_ReadableList.pop();
272  }
273 }
274 
275 void FileOnlyProfilingConnection::DispatchPacketToHandlers(const arm::pipe::Packet& packet)
276 {
277  for (auto& delegate : m_UniversalHandlers)
278  {
279  delegate->HandlePacket(packet);
280  }
281  auto iter = m_IndexedHandlers.find(packet.GetHeader());
282  if (iter != m_IndexedHandlers.end())
283  {
284  for (auto& delegate : iter->second)
285  {
286  try
287  {
288  delegate->HandlePacket(packet);
289  }
290  catch (const arm::pipe::ProfilingException& ex)
291  {
292  Fail(ex.what());
293  }
294  catch (const std::exception& ex)
295  {
296  Fail(ex.what());
297  }
298  catch (...)
299  {
300  Fail("handler failed");
301  }
302  }
303  }
304 }
305 
306 } // namespace profiling
307 
308 } // namespace armnn
std::shared_ptr< ILocalPacketHandler > ILocalPacketHandlerSharedPtr
Copyright (c) 2021 ARM Limited and Contributors.
arm::pipe::Packet ReceivePacket(const unsigned char *buffer, uint32_t length)
void SetEndianess(const TargetEndianness &endianness) override
#define ARMNN_ASSERT(COND)
Definition: Assert.hpp:14
std::vector< uint32_t > GetHeadersAccepted() override
void ReturnPacket(arm::pipe::Packet &packet) override
void HandlePacket(const arm::pipe::Packet &packet) override
process the packet
bool WritePacket(const unsigned char *buffer, uint32_t length) override
arm::pipe::Packet ReadPacket(uint32_t timeout) override