ArmNN
 20.05
FileOnlyProfilingConnection.cpp
Go to the documentation of this file.
1 //
2 // Copyright © 2019 Arm Ltd. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5 
8 
9 #include <armnn/Exceptions.hpp>
10 #include <common/include/Constants.hpp>
11 
12 #include <algorithm>
13 #include <boost/numeric/conversion/cast.hpp>
14 #include <iostream>
15 #include <thread>
16 
17 namespace armnn
18 {
19 
20 namespace profiling
21 {
22 
24 {
25  Close();
26 }
27 
29 {
30  // This type of connection is always open.
31  return true;
32 }
33 
35 {
36  // Dump any unread packets out of the queue.
37  size_t initialSize = m_PacketQueue.size();
38  for (size_t i = 0; i < initialSize; ++i)
39  {
40  m_PacketQueue.pop();
41  }
42  // dispose of the processing thread
43  m_KeepRunning.store(false);
44  if (m_LocalHandlersThread.joinable())
45  {
46  // make sure the thread wakes up and sees it has to stop
47  m_ConditionPacketReadable.notify_one();
48  m_LocalHandlersThread.join();
49  }
50 }
51 
52 bool FileOnlyProfilingConnection::WaitForStreamMeta(const unsigned char* buffer, uint32_t length)
53 {
54  IgnoreUnused(length);
55 
56  // The first word, stream_metadata_identifer, should always be 0.
57  if (ToUint32(buffer, TargetEndianness::BeWire) != 0)
58  {
59  Fail("Protocol error. The stream_metadata_identifer was not 0.");
60  }
61 
62  // Before we interpret the length we need to read the pipe_magic word to determine endianness.
63  if (ToUint32(buffer + 8, TargetEndianness::BeWire) == armnnProfiling::PIPE_MAGIC)
64  {
65  m_Endianness = TargetEndianness::BeWire;
66  }
67  else if (ToUint32(buffer + 8, TargetEndianness::LeWire) == armnnProfiling::PIPE_MAGIC)
68  {
69  m_Endianness = TargetEndianness::LeWire;
70  }
71  else
72  {
73  Fail("Protocol read error. Unable to read PIPE_MAGIC value.");
74  }
75  return true;
76 }
77 
78 void FileOnlyProfilingConnection::SendConnectionAck()
79 {
80  if (!m_QuietOp)
81  {
82  std::cout << "Sending connection acknowledgement." << std::endl;
83  }
84  std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
85  {
86  std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
87  m_PacketQueue.push(Packet(0x10000, 0, uniqueNullPtr));
88  }
89  m_ConditionPacketAvailable.notify_one();
90 }
91 
92 bool FileOnlyProfilingConnection::SendCounterSelectionPacket()
93 {
94  uint32_t uint16_t_size = sizeof(uint16_t);
95  uint32_t uint32_t_size = sizeof(uint32_t);
96 
97  uint32_t offset = 0;
98  uint32_t bodySize = uint32_t_size + boost::numeric_cast<uint32_t>(m_IdList.size()) * uint16_t_size;
99 
100  auto uniqueData = std::make_unique<unsigned char[]>(bodySize);
101  unsigned char* data = reinterpret_cast<unsigned char*>(uniqueData.get());
102 
103  // Copy capturePeriod
104  WriteUint32(data, offset, m_Options.m_CapturePeriod);
105 
106  // Copy m_IdList
107  offset += uint32_t_size;
108  for (const uint16_t& id : m_IdList)
109  {
110  WriteUint16(data, offset, id);
111  offset += uint16_t_size;
112  }
113 
114  {
115  std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
116  m_PacketQueue.push(Packet(0x40000, bodySize, uniqueData));
117  }
118  m_ConditionPacketAvailable.notify_one();
119 
120  return true;
121 }
122 
123 bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
124 {
125  ARMNN_ASSERT(buffer);
126  Packet packet = ReceivePacket(buffer, length);
127 
128  // Read Header and determine case
129  uint32_t outgoingHeaderAsWords[2];
130  PackageActivity packageActivity = GetPackageActivity(packet, outgoingHeaderAsWords);
131 
132  switch (packageActivity)
133  {
135  {
136  if (!WaitForStreamMeta(buffer, length))
137  {
138  return EXIT_FAILURE;
139  }
140 
141  SendConnectionAck();
142  break;
143  }
145  {
146  std::unique_ptr<unsigned char[]> uniqueCounterData = std::make_unique<unsigned char[]>(length - 8);
147 
148  std::memcpy(uniqueCounterData.get(), buffer + 8, length - 8);
149 
150  Packet directoryPacket(outgoingHeaderAsWords[0], length - 8, uniqueCounterData);
151 
152  armnn::profiling::PacketVersionResolver packetVersionResolver;
153  DirectoryCaptureCommandHandler directoryCaptureCommandHandler(
154  0, 2, packetVersionResolver.ResolvePacketVersion(0, 2).GetEncodedValue());
155  directoryCaptureCommandHandler.operator()(directoryPacket);
156  const ICounterDirectory& counterDirectory = directoryCaptureCommandHandler.GetCounterDirectory();
157  for (auto& category : counterDirectory.GetCategories())
158  {
159  // Remember we need to translate the Uid's from our CounterDirectory instance to the parent one.
160  std::vector<uint16_t> translatedCounters;
161  for (auto const& copyUid : category->m_Counters)
162  {
163  translatedCounters.emplace_back(directoryCaptureCommandHandler.TranslateUIDCopyToOriginal(copyUid));
164  }
165  m_IdList.insert(std::end(m_IdList), std::begin(translatedCounters), std::end(translatedCounters));
166  }
167  SendCounterSelectionPacket();
168  break;
169  }
170  default:
171  {
172  break;
173  }
174  }
175  ForwardPacketToHandlers(packet);
176  return true;
177 }
178 
180 {
181  std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
182 
183  // Here we are using m_PacketQueue.empty() as a predicate variable
184  // The conditional variable will wait until packetQueue is not empty or until a timeout
185  if(!m_ConditionPacketAvailable.wait_for(lck,
186  std::chrono::milliseconds(timeout),
187  [&]{return !m_PacketQueue.empty();}))
188  {
189  throw armnn::TimeoutException("Thread has timed out as per requested time limit");
190  }
191 
192  Packet returnedPacket = std::move(m_PacketQueue.front());
193  m_PacketQueue.pop();
194  return returnedPacket;
195 }
196 
197 PackageActivity FileOnlyProfilingConnection::GetPackageActivity(const Packet& packet, uint32_t headerAsWords[2])
198 {
199  headerAsWords[0] = packet.GetHeader();
200  headerAsWords[1] = packet.GetLength();
201  if (headerAsWords[0] == 0x20000) // Packet family = 0 Packet Id = 2
202  {
204  }
205  else if (headerAsWords[0] == 0) // Packet family = 0 Packet Id = 0
206  {
208  }
209  else
210  {
212  }
213 }
214 
215 uint32_t FileOnlyProfilingConnection::ToUint32(const unsigned char* data, TargetEndianness endianness)
216 {
217  // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
218  // specified endianness.
219  if (endianness == TargetEndianness::BeWire)
220  {
221  return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
222  static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
223  }
224  else
225  {
226  return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
227  static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
228  }
229 }
230 
231 void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
232 {
233  Close();
234  throw RuntimeException(errorMessage);
235 }
236 
237 /// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
238 /// a processing thread that will ensure that processing of packets will happen on a separate
239 /// thread from the profiling services send thread and will therefore protect against the
240 /// profiling message buffer becoming exhausted because packet handling slows the dispatch.
241 void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
242 {
243  m_PacketHandlers.push_back(std::move(localPacketHandler));
244  ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
245  localCopy->SetConnection(this);
246  if (localCopy->GetHeadersAccepted().empty())
247  {
248  //this is a universal handler
249  m_UniversalHandlers.push_back(localCopy);
250  }
251  else
252  {
253  for (uint32_t header : localCopy->GetHeadersAccepted())
254  {
255  auto iter = m_IndexedHandlers.find(header);
256  if (iter == m_IndexedHandlers.end())
257  {
258  std::vector<ILocalPacketHandlerSharedPtr> handlers;
259  handlers.push_back(localCopy);
260  m_IndexedHandlers.emplace(std::make_pair(header, handlers));
261  }
262  else
263  {
264  iter->second.push_back(localCopy);
265  }
266  }
267  }
268 }
269 
270 void FileOnlyProfilingConnection::StartProcessingThread()
271 {
272  // check if the thread has already started
273  if (m_IsRunning.load())
274  {
275  return;
276  }
277  // make sure if there was one running before it is joined
278  if (m_LocalHandlersThread.joinable())
279  {
280  m_LocalHandlersThread.join();
281  }
282  m_IsRunning.store(true);
283  m_KeepRunning.store(true);
284  m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
285 }
286 
287 void FileOnlyProfilingConnection::ForwardPacketToHandlers(Packet& packet)
288 {
289  if (m_PacketHandlers.empty())
290  {
291  return;
292  }
293  if (m_KeepRunning.load() == false)
294  {
295  return;
296  }
297  {
298  std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
299  if (m_KeepRunning.load() == false)
300  {
301  return;
302  }
303  m_ReadableList.push(std::move(packet));
304  }
305  m_ConditionPacketReadable.notify_one();
306 }
307 
308 void FileOnlyProfilingConnection::ServiceLocalHandlers()
309 {
310  do
311  {
312  Packet returnedPacket;
313  bool readPacket = false;
314  { // only lock while we are taking the packet off the incoming list
315  std::unique_lock<std::mutex> lck(m_ReadableMutex);
316  if (m_Timeout < 0)
317  {
318  m_ConditionPacketReadable.wait(lck,
319  [&] { return !m_ReadableList.empty(); });
320  }
321  else
322  {
323  m_ConditionPacketReadable.wait_for(lck,
324  std::chrono::milliseconds(std::max(m_Timeout, 1000)),
325  [&] { return !m_ReadableList.empty(); });
326  }
327  if (m_KeepRunning.load())
328  {
329  if (!m_ReadableList.empty())
330  {
331  returnedPacket = std::move(m_ReadableList.front());
332  m_ReadableList.pop();
333  readPacket = true;
334  }
335  }
336  else
337  {
338  ClearReadableList();
339  }
340  }
341  if (m_KeepRunning.load() && readPacket)
342  {
343  DispatchPacketToHandlers(returnedPacket);
344  }
345  } while (m_KeepRunning.load());
346  // make sure the readable list is cleared
347  ClearReadableList();
348  m_IsRunning.store(false);
349 }
350 
351 void FileOnlyProfilingConnection::ClearReadableList()
352 {
353  // make sure the incoming packet queue gets emptied
354  size_t initialSize = m_ReadableList.size();
355  for (size_t i = 0; i < initialSize; ++i)
356  {
357  m_ReadableList.pop();
358  }
359 }
360 
361 void FileOnlyProfilingConnection::DispatchPacketToHandlers(const Packet& packet)
362 {
363  for (auto& delegate : m_UniversalHandlers)
364  {
365  delegate->HandlePacket(packet);
366  }
367  auto iter = m_IndexedHandlers.find(packet.GetHeader());
368  if (iter != m_IndexedHandlers.end())
369  {
370  for (auto &delegate : iter->second)
371  {
372  delegate->HandlePacket(packet);
373  }
374  }
375 }
376 
377 } // namespace profiling
378 
379 } // namespace armnn
std::shared_ptr< ILocalPacketHandler > ILocalPacketHandlerSharedPtr
void WriteUint16(const IPacketBufferPtr &packetBuffer, unsigned int offset, uint16_t value)
void WriteUint32(const IPacketBufferPtr &packetBuffer, unsigned int offset, uint32_t value)
Version ResolvePacketVersion(uint32_t familyId, uint32_t packetId) const
Copyright (c) 2020 ARM Limited.
void IgnoreUnused(Ts &&...)
virtual const Categories & GetCategories() const =0
#define ARMNN_ASSERT(COND)
Definition: Assert.hpp:14
std::enable_if_t< std::is_unsigned< Source >::value &&std::is_unsigned< Dest >::value, Dest > numeric_cast(Source source)
Definition: NumericCast.hpp:33
Packet ReceivePacket(const unsigned char *buffer, uint32_t length)
bool WritePacket(const unsigned char *buffer, uint32_t length) override