ArmNN
 20.08
FileOnlyProfilingConnection.cpp
Go to the documentation of this file.
1 //
2 // Copyright © 2019 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5 
7 #include "PacketVersionResolver.hpp"
8 
9 #include <armnn/Exceptions.hpp>
10 #include <common/include/Constants.hpp>
11 #include <common/include/ProfilingException.hpp>
12 
13 #include <algorithm>
14 #include <boost/numeric/conversion/cast.hpp>
15 #include <iostream>
16 #include <thread>
17 
18 namespace armnn
19 {
20 
21 namespace profiling
22 {
23 
25 {
26  std::vector<uint32_t> headers;
27  headers.push_back(m_MetaDataPacketHeader);
28  return headers;
29 }
30 
31 void StreamMetaDataProcessor::HandlePacket(const arm::pipe::Packet& packet)
32 {
33  if (packet.GetHeader() != m_MetaDataPacketHeader)
34  {
35  throw arm::pipe::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets");
36  }
37  // determine the endianness of the protocol
38  TargetEndianness endianness;
39  if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == arm::pipe::PIPE_MAGIC)
40  {
41  endianness = TargetEndianness::BeWire;
42  }
43  else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == arm::pipe::PIPE_MAGIC)
44  {
45  endianness = TargetEndianness::LeWire;
46  }
47  else
48  {
49  throw arm::pipe::ProfilingException("Protocol read error. Unable to read the PIPE_MAGIC value.");
50  }
51  m_FileOnlyProfilingConnection->SetEndianess(endianness);
52  // send back the acknowledgement
53  std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
54  arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr);
55  m_FileOnlyProfilingConnection->ReturnPacket(returnPacket);
56 }
57 
58 uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness)
59 {
60  // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
61  // specified endianness.
62  if (endianness == TargetEndianness::BeWire)
63  {
64  return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
65  static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
66  }
67  else
68  {
69  return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
70  static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
71  }
72 }
73 
75 {
76  try
77  {
78  Close();
79  }
80  catch (...)
81  {
82  // do nothing
83  }
84 }
85 
87 {
88  // This type of connection is always open.
89  return true;
90 }
91 
93 {
94  // Dump any unread packets out of the queue.
95  size_t initialSize = m_PacketQueue.size();
96  for (size_t i = 0; i < initialSize; ++i)
97  {
98  m_PacketQueue.pop();
99  }
100  // dispose of the processing thread
101  m_KeepRunning.store(false);
102  if (m_LocalHandlersThread.joinable())
103  {
104  // make sure the thread wakes up and sees it has to stop
105  m_ConditionPacketReadable.notify_one();
106  m_LocalHandlersThread.join();
107  }
108 }
109 
110 bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
111 {
112  ARMNN_ASSERT(buffer);
113  arm::pipe::Packet packet = ReceivePacket(buffer, length);
114  ForwardPacketToHandlers(packet);
115  return true;
116 }
117 
118 void FileOnlyProfilingConnection::ReturnPacket(arm::pipe::Packet& packet)
119 {
120  {
121  std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
122  m_PacketQueue.push(std::move(packet));
123  }
124  m_ConditionPacketAvailable.notify_one();
125 }
126 
127 arm::pipe::Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
128 {
129  std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
130 
131  // Here we are using m_PacketQueue.empty() as a predicate variable
132  // The conditional variable will wait until packetQueue is not empty or until a timeout
133  if (!m_ConditionPacketAvailable.wait_for(lck,
134  std::chrono::milliseconds(timeout),
135  [&]{return !m_PacketQueue.empty();}))
136  {
137  arm::pipe::Packet empty;
138  return empty;
139  }
140 
141  arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front());
142  m_PacketQueue.pop();
143  return returnedPacket;
144 }
145 
146 void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
147 {
148  Close();
149  throw RuntimeException(errorMessage);
150 }
151 
152 /// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
153 /// a processing thread that will ensure that processing of packets will happen on a separate
154 /// thread from the profiling services send thread and will therefore protect against the
155 /// profiling message buffer becoming exhausted because packet handling slows the dispatch.
156 void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
157 {
158  m_PacketHandlers.push_back(std::move(localPacketHandler));
159  ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
160  localCopy->SetConnection(this);
161  if (localCopy->GetHeadersAccepted().empty())
162  {
163  //this is a universal handler
164  m_UniversalHandlers.push_back(localCopy);
165  }
166  else
167  {
168  for (uint32_t header : localCopy->GetHeadersAccepted())
169  {
170  auto iter = m_IndexedHandlers.find(header);
171  if (iter == m_IndexedHandlers.end())
172  {
173  std::vector<ILocalPacketHandlerSharedPtr> handlers;
174  handlers.push_back(localCopy);
175  m_IndexedHandlers.emplace(std::make_pair(header, handlers));
176  }
177  else
178  {
179  iter->second.push_back(localCopy);
180  }
181  }
182  }
183 }
184 
185 void FileOnlyProfilingConnection::StartProcessingThread()
186 {
187  // check if the thread has already started
188  if (m_IsRunning.load())
189  {
190  return;
191  }
192  // make sure if there was one running before it is joined
193  if (m_LocalHandlersThread.joinable())
194  {
195  m_LocalHandlersThread.join();
196  }
197  m_IsRunning.store(true);
198  m_KeepRunning.store(true);
199  m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
200 }
201 
202 void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet)
203 {
204  if (m_PacketHandlers.empty())
205  {
206  return;
207  }
208  if (!m_KeepRunning.load())
209  {
210  return;
211  }
212  {
213  std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
214  if (!m_KeepRunning.load())
215  {
216  return;
217  }
218  m_ReadableList.push(std::move(packet));
219  }
220  m_ConditionPacketReadable.notify_one();
221 }
222 
223 void FileOnlyProfilingConnection::ServiceLocalHandlers()
224 {
225  do
226  {
227  arm::pipe::Packet returnedPacket;
228  bool readPacket = false;
229  { // only lock while we are taking the packet off the incoming list
230  std::unique_lock<std::mutex> lck(m_ReadableMutex);
231  if (m_Timeout < 0)
232  {
233  m_ConditionPacketReadable.wait(lck,
234  [&] { return !m_ReadableList.empty(); });
235  }
236  else
237  {
238  m_ConditionPacketReadable.wait_for(lck,
239  std::chrono::milliseconds(std::max(m_Timeout, 1000)),
240  [&] { return !m_ReadableList.empty(); });
241  }
242  if (m_KeepRunning.load())
243  {
244  if (!m_ReadableList.empty())
245  {
246  returnedPacket = std::move(m_ReadableList.front());
247  m_ReadableList.pop();
248  readPacket = true;
249  }
250  }
251  else
252  {
253  ClearReadableList();
254  }
255  }
256  if (m_KeepRunning.load() && readPacket)
257  {
258  DispatchPacketToHandlers(returnedPacket);
259  }
260  } while (m_KeepRunning.load());
261  // make sure the readable list is cleared
262  ClearReadableList();
263  m_IsRunning.store(false);
264 }
265 
266 void FileOnlyProfilingConnection::ClearReadableList()
267 {
268  // make sure the incoming packet queue gets emptied
269  size_t initialSize = m_ReadableList.size();
270  for (size_t i = 0; i < initialSize; ++i)
271  {
272  m_ReadableList.pop();
273  }
274 }
275 
276 void FileOnlyProfilingConnection::DispatchPacketToHandlers(const arm::pipe::Packet& packet)
277 {
278  for (auto& delegate : m_UniversalHandlers)
279  {
280  delegate->HandlePacket(packet);
281  }
282  auto iter = m_IndexedHandlers.find(packet.GetHeader());
283  if (iter != m_IndexedHandlers.end())
284  {
285  for (auto& delegate : iter->second)
286  {
287  try
288  {
289  delegate->HandlePacket(packet);
290  }
291  catch (const arm::pipe::ProfilingException& ex)
292  {
293  Fail(ex.what());
294  }
295  catch (const std::exception& ex)
296  {
297  Fail(ex.what());
298  }
299  catch (...)
300  {
301  Fail("handler failed");
302  }
303  }
304  }
305 }
306 
307 } // namespace profiling
308 
309 } // namespace armnn
std::shared_ptr< ILocalPacketHandler > ILocalPacketHandlerSharedPtr
Copyright (c) 2020 ARM Limited.
arm::pipe::Packet ReceivePacket(const unsigned char *buffer, uint32_t length)
void SetEndianess(const TargetEndianness &endianness) override
#define ARMNN_ASSERT(COND)
Definition: Assert.hpp:14
std::vector< uint32_t > GetHeadersAccepted() override
void ReturnPacket(arm::pipe::Packet &packet) override
void HandlePacket(const arm::pipe::Packet &packet) override
process the packet
bool WritePacket(const unsigned char *buffer, uint32_t length) override
arm::pipe::Packet ReadPacket(uint32_t timeout) override