ArmNN  NotReleased
SendThread.cpp
Go to the documentation of this file.
1 //
2 // Copyright © 2020 Arm Ltd. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5 
6 #include "SendThread.hpp"
7 #include "EncodeVersion.hpp"
8 #include "ProfilingUtils.hpp"
9 
10 #include <armnn/Exceptions.hpp>
11 #include <armnn/Conversion.hpp>
12 #include <Processes.hpp>
13 
14 #include <boost/format.hpp>
15 #include <boost/numeric/conversion/cast.hpp>
16 #include <boost/core/ignore_unused.hpp>
17 
18 #include <cstring>
19 
20 namespace armnn
21 {
22 
23 namespace profiling
24 {
25 
26 using boost::numeric_cast;
27 
29  armnn::profiling::IBufferManager& buffer, armnn::profiling::ISendCounterPacket& sendCounterPacket, int timeout)
30  : m_StateMachine(profilingStateMachine)
31  , m_BufferManager(buffer)
32  , m_SendCounterPacket(sendCounterPacket)
33  , m_Timeout(timeout)
34  , m_IsRunning(false)
35  , m_KeepRunning(false)
36  , m_SendThreadException(nullptr)
37 {
38  m_BufferManager.SetConsumer(this);
39 }
40 
42 {
43  // We need to wait for the send thread to release its mutex
44  {
45  std::lock_guard<std::mutex> lck(m_WaitMutex);
46  m_ReadyToRead = true;
47  }
48  // Signal the send thread that there's something to read in the buffer
49  m_WaitCondition.notify_one();
50 }
51 
52 void SendThread::Start(IProfilingConnection& profilingConnection)
53 {
54  // Check if the send thread is already running
55  if (m_IsRunning.load())
56  {
57  // The send thread is already running
58  return;
59  }
60 
61  if (m_SendThread.joinable())
62  {
63  m_SendThread.join();
64  }
65 
66  // Mark the send thread as running
67  m_IsRunning.store(true);
68 
69  // Keep the send procedure going until the send thread is signalled to stop
70  m_KeepRunning.store(true);
71 
72  // Make sure the send thread will not flush the buffer until signaled to do so
73  // no need for a mutex as the send thread can not be running at this point
74  m_ReadyToRead = false;
75 
76  m_PacketSent = false;
77 
78  // Start the send thread
79  m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
80 }
81 
82 void SendThread::Stop(bool rethrowSendThreadExceptions)
83 {
84  // Signal the send thread to stop
85  m_KeepRunning.store(false);
86 
87  // Check that the send thread is running
88  if (m_SendThread.joinable())
89  {
90  // Kick the send thread out of the wait condition
92  // Wait for the send thread to complete operations
93  m_SendThread.join();
94  }
95 
96  // Check if the send thread exception has to be rethrown
97  if (!rethrowSendThreadExceptions)
98  {
99  // No need to rethrow the send thread exception, return immediately
100  return;
101  }
102 
103  // Check if there's an exception to rethrow
104  if (m_SendThreadException)
105  {
106  // Rethrow the send thread exception
107  std::rethrow_exception(m_SendThreadException);
108 
109  // Nullify the exception as it has been rethrown
110  m_SendThreadException = nullptr;
111  }
112 }
113 
114 void SendThread::Send(IProfilingConnection& profilingConnection)
115 {
116  // Run once and keep the sending procedure looping until the thread is signalled to stop
117  do
118  {
119  // Check the current state of the profiling service
120  ProfilingState currentState = m_StateMachine.GetCurrentState();
121  switch (currentState)
122  {
125 
126  // The send thread cannot be running when the profiling service is uninitialized or not connected,
127  // stop the thread immediately
128  m_KeepRunning.store(false);
129  m_IsRunning.store(false);
130 
131  // An exception should be thrown here, save it to be rethrown later from the main thread so that
132  // it can be caught by the consumer
133  m_SendThreadException =
134  std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
135  "profiling service not yet initialized or connected"));
136 
137  return;
139 
140  // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
141  // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
142  // updated by the command handler
143 
144  // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
145  m_SendCounterPacket.SendStreamMetaDataPacket();
146 
147  // Flush the buffer manually to send the packet
148  FlushBuffer(profilingConnection);
149 
150  // Wait for a connection ack from the remote server. We should expect a response within timeout value.
151  // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
152  // StreamMetadata again.
153 
154  // Wait condition lock scope - Begin
155  {
156  std::unique_lock<std::mutex> lock(m_WaitMutex);
157 
158  bool timeout = m_WaitCondition.wait_for(lock,
159  std::chrono::milliseconds(std::max(m_Timeout, 1000)),
160  [&]{ return m_ReadyToRead; });
161  // If we get notified we need to flush the buffer again
162  if(timeout)
163  {
164  // Otherwise if we just timed out don't flush the buffer
165  continue;
166  }
167  //reset condition variable predicate for next use
168  m_ReadyToRead = false;
169  }
170  // Wait condition lock scope - End
171  break;
173  default:
174  // Wait condition lock scope - Begin
175  {
176  std::unique_lock<std::mutex> lock(m_WaitMutex);
177 
178  // Normal working state for the send thread
179  // Check if the send thread is required to enforce a timeout wait policy
180  if (m_Timeout < 0)
181  {
182  // Wait indefinitely until notified that something to read has become available in the buffer
183  m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
184  }
185  else
186  {
187  // Wait until the thread is notified of something to read from the buffer,
188  // or check anyway after the specified number of milliseconds
189  m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
190  }
191 
192  //reset condition variable predicate for next use
193  m_ReadyToRead = false;
194  }
195  // Wait condition lock scope - End
196  break;
197  }
198 
199  // Send all the available packets in the buffer
200  FlushBuffer(profilingConnection);
201  } while (m_KeepRunning.load());
202 
203  // Ensure that all readable data got written to the profiling connection before the thread is stopped
204  // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
205  FlushBuffer(profilingConnection, false);
206 
207  // Mark the send thread as not running
208  m_IsRunning.store(false);
209 }
210 
211 void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
212 {
213  // Get the first available readable buffer
214  IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
215 
216  // Initialize the flag that indicates whether at least a packet has been sent
217  bool packetsSent = false;
218 
219  while (packetBuffer != nullptr)
220  {
221  // Get the data to send from the buffer
222  const unsigned char* readBuffer = packetBuffer->GetReadableData();
223  unsigned int readBufferSize = packetBuffer->GetSize();
224 
225  if (readBuffer == nullptr || readBufferSize == 0)
226  {
227  // Nothing to send, get the next available readable buffer and continue
228  m_BufferManager.MarkRead(packetBuffer);
229  packetBuffer = m_BufferManager.GetReadableBuffer();
230 
231  continue;
232  }
233 
234  // Check that the profiling connection is open, silently drop the data and continue if it's closed
235  if (profilingConnection.IsOpen())
236  {
237  // Write a packet to the profiling connection. Silently ignore any write error and continue
238  profilingConnection.WritePacket(readBuffer, boost::numeric_cast<uint32_t>(readBufferSize));
239 
240  // Set the flag that indicates whether at least a packet has been sent
241  packetsSent = true;
242  }
243 
244  // Mark the packet buffer as read
245  m_BufferManager.MarkRead(packetBuffer);
246 
247  // Get the next available readable buffer
248  packetBuffer = m_BufferManager.GetReadableBuffer();
249  }
250  // Check whether at least a packet has been sent
251  if (packetsSent && notifyWatchers)
252  {
253  // Wait for the parent thread to release its mutex if necessary
254  {
255  std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
256  m_PacketSent = true;
257  }
258  // Notify to any watcher that something has been sent
259  m_PacketSentWaitCondition.notify_one();
260  }
261 }
262 
263 bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
264 {
265  std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
266  // Blocks until notified that at least a packet has been sent or until timeout expires.
267  bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
268  std::chrono::milliseconds(timeout),
269  [&] { return m_PacketSent; });
270 
271  m_PacketSent = false;
272 
273  return timedOut;
274 }
275 
276 } // namespace profiling
277 
278 } // namespace armnn
void Start(IProfilingConnection &profilingConnection) override
Start the thread.
Definition: SendThread.cpp:52
bool WaitForPacketSent(uint32_t timeout)
Definition: SendThread.cpp:263
std::unique_ptr< IPacketBuffer > IPacketBufferPtr
virtual void MarkRead(IPacketBufferPtr &packetBuffer)=0
DataLayout::NHWC false
void Stop(bool rethrowSendThreadExceptions=true) override
Stop the thread.
Definition: SendThread.cpp:82
virtual void SendStreamMetaDataPacket()=0
Create and write a StreamMetaDataPacket in the buffer.
virtual bool WritePacket(const unsigned char *buffer, uint32_t length)=0
void SetReadyToRead() override
Set a "ready to read" flag in the buffer to notify the reading thread to start reading it...
Definition: SendThread.cpp:41
SendThread(ProfilingStateMachine &profilingStateMachine, IBufferManager &buffer, ISendCounterPacket &sendCounterPacket, int timeout=1000)
Definition: SendThread.cpp:28
virtual IPacketBufferPtr GetReadableBuffer()=0
virtual void SetConsumer(IConsumer *consumer)=0