ArmNN
 21.05
SendThread.cpp
Go to the documentation of this file.
1 //
2 // Copyright © 2020 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5 
6 #include "SendThread.hpp"
7 #include "ProfilingUtils.hpp"
8 
9 #include <armnn/Exceptions.hpp>
10 #include <armnn/Conversion.hpp>
12 
13 #include <Processes.hpp>
14 
15 #include <cstring>
16 
17 namespace armnn
18 {
19 
20 namespace profiling
21 {
22 
25  armnn::profiling::ISendCounterPacket& sendCounterPacket,
26  int timeout)
27  : m_StateMachine(profilingStateMachine)
28  , m_BufferManager(buffer)
29  , m_SendCounterPacket(sendCounterPacket)
30  , m_Timeout(timeout)
31  , m_IsRunning(false)
32  , m_KeepRunning(false)
33  , m_SendThreadException(nullptr)
34 {
35  m_BufferManager.SetConsumer(this);
36 }
37 
39 {
40  // We need to wait for the send thread to release its mutex
41  {
42  std::lock_guard<std::mutex> lck(m_WaitMutex);
43  m_ReadyToRead = true;
44  }
45  // Signal the send thread that there's something to read in the buffer
46  m_WaitCondition.notify_one();
47 }
48 
49 void SendThread::Start(IProfilingConnection& profilingConnection)
50 {
51  // Check if the send thread is already running
52  if (m_IsRunning.load())
53  {
54  // The send thread is already running
55  return;
56  }
57 
58  if (m_SendThread.joinable())
59  {
60  m_SendThread.join();
61  }
62 
63  // Mark the send thread as running
64  m_IsRunning.store(true);
65 
66  // Keep the send procedure going until the send thread is signalled to stop
67  m_KeepRunning.store(true);
68 
69  // Make sure the send thread will not flush the buffer until signaled to do so
70  // no need for a mutex as the send thread can not be running at this point
71  m_ReadyToRead = false;
72 
73  m_PacketSent = false;
74 
75  // Start the send thread
76  m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
77 }
78 
79 void SendThread::Stop(bool rethrowSendThreadExceptions)
80 {
81  // Signal the send thread to stop
82  m_KeepRunning.store(false);
83 
84  // Check that the send thread is running
85  if (m_SendThread.joinable())
86  {
87  // Kick the send thread out of the wait condition
89  // Wait for the send thread to complete operations
90  m_SendThread.join();
91  }
92 
93  // Check if the send thread exception has to be rethrown
94  if (!rethrowSendThreadExceptions)
95  {
96  // No need to rethrow the send thread exception, return immediately
97  return;
98  }
99 
100  // Check if there's an exception to rethrow
101  if (m_SendThreadException)
102  {
103  // Rethrow the send thread exception
104  std::rethrow_exception(m_SendThreadException);
105 
106  // Nullify the exception as it has been rethrown
107  m_SendThreadException = nullptr;
108  }
109 }
110 
111 void SendThread::Send(IProfilingConnection& profilingConnection)
112 {
113  // Run once and keep the sending procedure looping until the thread is signalled to stop
114  do
115  {
116  // Check the current state of the profiling service
117  ProfilingState currentState = m_StateMachine.GetCurrentState();
118  switch (currentState)
119  {
122 
123  // The send thread cannot be running when the profiling service is uninitialized or not connected,
124  // stop the thread immediately
125  m_KeepRunning.store(false);
126  m_IsRunning.store(false);
127 
128  // An exception should be thrown here, save it to be rethrown later from the main thread so that
129  // it can be caught by the consumer
130  m_SendThreadException =
131  std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
132  "profiling service not yet initialized or connected"));
133 
134  return;
136 
137  // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
138  // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
139  // updated by the command handler
140 
141  // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
142  m_SendCounterPacket.SendStreamMetaDataPacket();
143 
144  // Flush the buffer manually to send the packet
145  FlushBuffer(profilingConnection);
146 
147  // Wait for a connection ack from the remote server. We should expect a response within timeout value.
148  // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
149  // StreamMetadata again.
150 
151  // Wait condition lock scope - Begin
152  {
153  std::unique_lock<std::mutex> lock(m_WaitMutex);
154 
155  bool timeout = m_WaitCondition.wait_for(lock,
156  std::chrono::milliseconds(std::max(m_Timeout, 1000)),
157  [&]{ return m_ReadyToRead; });
158  // If we get notified we need to flush the buffer again
159  if(timeout)
160  {
161  // Otherwise if we just timed out don't flush the buffer
162  continue;
163  }
164  //reset condition variable predicate for next use
165  m_ReadyToRead = false;
166  }
167  // Wait condition lock scope - End
168  break;
170  default:
171  // Wait condition lock scope - Begin
172  {
173  std::unique_lock<std::mutex> lock(m_WaitMutex);
174 
175  // Normal working state for the send thread
176  // Check if the send thread is required to enforce a timeout wait policy
177  if (m_Timeout < 0)
178  {
179  // Wait indefinitely until notified that something to read has become available in the buffer
180  m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
181  }
182  else
183  {
184  // Wait until the thread is notified of something to read from the buffer,
185  // or check anyway after the specified number of milliseconds
186  m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
187  }
188 
189  //reset condition variable predicate for next use
190  m_ReadyToRead = false;
191  }
192  // Wait condition lock scope - End
193  break;
194  }
195 
196  // Send all the available packets in the buffer
197  FlushBuffer(profilingConnection);
198  } while (m_KeepRunning.load());
199 
200  // Ensure that all readable data got written to the profiling connection before the thread is stopped
201  // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
202  FlushBuffer(profilingConnection, false);
203 
204  // Mark the send thread as not running
205  m_IsRunning.store(false);
206 }
207 
208 void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
209 {
210  // Get the first available readable buffer
211  IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
212 
213  // Initialize the flag that indicates whether at least a packet has been sent
214  bool packetsSent = false;
215 
216  while (packetBuffer != nullptr)
217  {
218  // Get the data to send from the buffer
219  const unsigned char* readBuffer = packetBuffer->GetReadableData();
220  unsigned int readBufferSize = packetBuffer->GetSize();
221 
222  if (readBuffer == nullptr || readBufferSize == 0)
223  {
224  // Nothing to send, get the next available readable buffer and continue
225  m_BufferManager.MarkRead(packetBuffer);
226  packetBuffer = m_BufferManager.GetReadableBuffer();
227 
228  continue;
229  }
230 
231  // Check that the profiling connection is open, silently drop the data and continue if it's closed
232  if (profilingConnection.IsOpen())
233  {
234  // Write a packet to the profiling connection. Silently ignore any write error and continue
235  profilingConnection.WritePacket(readBuffer, armnn::numeric_cast<uint32_t>(readBufferSize));
236 
237  // Set the flag that indicates whether at least a packet has been sent
238  packetsSent = true;
239  }
240 
241  // Mark the packet buffer as read
242  m_BufferManager.MarkRead(packetBuffer);
243 
244  // Get the next available readable buffer
245  packetBuffer = m_BufferManager.GetReadableBuffer();
246  }
247  // Check whether at least a packet has been sent
248  if (packetsSent && notifyWatchers)
249  {
250  // Wait for the parent thread to release its mutex if necessary
251  {
252  std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
253  m_PacketSent = true;
254  }
255  // Notify to any watcher that something has been sent
256  m_PacketSentWaitCondition.notify_one();
257  }
258 }
259 
260 bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
261 {
262  std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
263  // Blocks until notified that at least a packet has been sent or until timeout expires.
264  bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
265  std::chrono::milliseconds(timeout),
266  [&] { return m_PacketSent; });
267 
268  m_PacketSent = false;
269 
270  return timedOut;
271 }
272 
273 } // namespace profiling
274 
275 } // namespace armnn
virtual IPacketBufferPtr GetReadableBuffer()=0
DataLayout::NCHW false
Copyright (c) 2021 ARM Limited and Contributors.
virtual void MarkRead(IPacketBufferPtr &packetBuffer)=0
bool WaitForPacketSent(uint32_t timeout)
Definition: SendThread.cpp:260
SendThread(ProfilingStateMachine &profilingStateMachine, IBufferManager &buffer, ISendCounterPacket &sendCounterPacket, int timeout=1000)
Definition: SendThread.cpp:23
void SetReadyToRead() override
Set a "ready to read" flag in the buffer to notify the reading thread to start reading it...
Definition: SendThread.cpp:38
virtual void SendStreamMetaDataPacket()=0
Create and write a StreamMetaDataPacket in the buffer.
virtual void SetConsumer(IConsumer *consumer)=0
void Stop(bool rethrowSendThreadExceptions=true) override
Stop the thread.
Definition: SendThread.cpp:79
std::unique_ptr< IPacketBuffer > IPacketBufferPtr
void Start(IProfilingConnection &profilingConnection) override
Start the thread.
Definition: SendThread.cpp:49
virtual bool WritePacket(const unsigned char *buffer, uint32_t length)=0