ArmNN
 20.05
SendThread.cpp
Go to the documentation of this file.
1 //
2 // Copyright © 2020 Arm Ltd. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5 
6 #include "SendThread.hpp"
7 #include "EncodeVersion.hpp"
8 #include "ProfilingUtils.hpp"
9 
10 #include <armnn/Exceptions.hpp>
11 #include <armnn/Conversion.hpp>
12 #include <Processes.hpp>
13 
14 #include <boost/format.hpp>
15 #include <boost/numeric/conversion/cast.hpp>
16 
17 #include <cstring>
18 
19 namespace armnn
20 {
21 
22 namespace profiling
23 {
24 
26 
28  armnn::profiling::IBufferManager& buffer, armnn::profiling::ISendCounterPacket& sendCounterPacket, int timeout)
29  : m_StateMachine(profilingStateMachine)
30  , m_BufferManager(buffer)
31  , m_SendCounterPacket(sendCounterPacket)
32  , m_Timeout(timeout)
33  , m_IsRunning(false)
34  , m_KeepRunning(false)
35  , m_SendThreadException(nullptr)
36 {
37  m_BufferManager.SetConsumer(this);
38 }
39 
41 {
42  // We need to wait for the send thread to release its mutex
43  {
44  std::lock_guard<std::mutex> lck(m_WaitMutex);
45  m_ReadyToRead = true;
46  }
47  // Signal the send thread that there's something to read in the buffer
48  m_WaitCondition.notify_one();
49 }
50 
51 void SendThread::Start(IProfilingConnection& profilingConnection)
52 {
53  // Check if the send thread is already running
54  if (m_IsRunning.load())
55  {
56  // The send thread is already running
57  return;
58  }
59 
60  if (m_SendThread.joinable())
61  {
62  m_SendThread.join();
63  }
64 
65  // Mark the send thread as running
66  m_IsRunning.store(true);
67 
68  // Keep the send procedure going until the send thread is signalled to stop
69  m_KeepRunning.store(true);
70 
71  // Make sure the send thread will not flush the buffer until signaled to do so
72  // no need for a mutex as the send thread can not be running at this point
73  m_ReadyToRead = false;
74 
75  m_PacketSent = false;
76 
77  // Start the send thread
78  m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
79 }
80 
81 void SendThread::Stop(bool rethrowSendThreadExceptions)
82 {
83  // Signal the send thread to stop
84  m_KeepRunning.store(false);
85 
86  // Check that the send thread is running
87  if (m_SendThread.joinable())
88  {
89  // Kick the send thread out of the wait condition
91  // Wait for the send thread to complete operations
92  m_SendThread.join();
93  }
94 
95  // Check if the send thread exception has to be rethrown
96  if (!rethrowSendThreadExceptions)
97  {
98  // No need to rethrow the send thread exception, return immediately
99  return;
100  }
101 
102  // Check if there's an exception to rethrow
103  if (m_SendThreadException)
104  {
105  // Rethrow the send thread exception
106  std::rethrow_exception(m_SendThreadException);
107 
108  // Nullify the exception as it has been rethrown
109  m_SendThreadException = nullptr;
110  }
111 }
112 
113 void SendThread::Send(IProfilingConnection& profilingConnection)
114 {
115  // Run once and keep the sending procedure looping until the thread is signalled to stop
116  do
117  {
118  // Check the current state of the profiling service
119  ProfilingState currentState = m_StateMachine.GetCurrentState();
120  switch (currentState)
121  {
124 
125  // The send thread cannot be running when the profiling service is uninitialized or not connected,
126  // stop the thread immediately
127  m_KeepRunning.store(false);
128  m_IsRunning.store(false);
129 
130  // An exception should be thrown here, save it to be rethrown later from the main thread so that
131  // it can be caught by the consumer
132  m_SendThreadException =
133  std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
134  "profiling service not yet initialized or connected"));
135 
136  return;
138 
139  // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
140  // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
141  // updated by the command handler
142 
143  // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
144  m_SendCounterPacket.SendStreamMetaDataPacket();
145 
146  // Flush the buffer manually to send the packet
147  FlushBuffer(profilingConnection);
148 
149  // Wait for a connection ack from the remote server. We should expect a response within timeout value.
150  // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
151  // StreamMetadata again.
152 
153  // Wait condition lock scope - Begin
154  {
155  std::unique_lock<std::mutex> lock(m_WaitMutex);
156 
157  bool timeout = m_WaitCondition.wait_for(lock,
158  std::chrono::milliseconds(std::max(m_Timeout, 1000)),
159  [&]{ return m_ReadyToRead; });
160  // If we get notified we need to flush the buffer again
161  if(timeout)
162  {
163  // Otherwise if we just timed out don't flush the buffer
164  continue;
165  }
166  //reset condition variable predicate for next use
167  m_ReadyToRead = false;
168  }
169  // Wait condition lock scope - End
170  break;
172  default:
173  // Wait condition lock scope - Begin
174  {
175  std::unique_lock<std::mutex> lock(m_WaitMutex);
176 
177  // Normal working state for the send thread
178  // Check if the send thread is required to enforce a timeout wait policy
179  if (m_Timeout < 0)
180  {
181  // Wait indefinitely until notified that something to read has become available in the buffer
182  m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
183  }
184  else
185  {
186  // Wait until the thread is notified of something to read from the buffer,
187  // or check anyway after the specified number of milliseconds
188  m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
189  }
190 
191  //reset condition variable predicate for next use
192  m_ReadyToRead = false;
193  }
194  // Wait condition lock scope - End
195  break;
196  }
197 
198  // Send all the available packets in the buffer
199  FlushBuffer(profilingConnection);
200  } while (m_KeepRunning.load());
201 
202  // Ensure that all readable data got written to the profiling connection before the thread is stopped
203  // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
204  FlushBuffer(profilingConnection, false);
205 
206  // Mark the send thread as not running
207  m_IsRunning.store(false);
208 }
209 
210 void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
211 {
212  // Get the first available readable buffer
213  IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
214 
215  // Initialize the flag that indicates whether at least a packet has been sent
216  bool packetsSent = false;
217 
218  while (packetBuffer != nullptr)
219  {
220  // Get the data to send from the buffer
221  const unsigned char* readBuffer = packetBuffer->GetReadableData();
222  unsigned int readBufferSize = packetBuffer->GetSize();
223 
224  if (readBuffer == nullptr || readBufferSize == 0)
225  {
226  // Nothing to send, get the next available readable buffer and continue
227  m_BufferManager.MarkRead(packetBuffer);
228  packetBuffer = m_BufferManager.GetReadableBuffer();
229 
230  continue;
231  }
232 
233  // Check that the profiling connection is open, silently drop the data and continue if it's closed
234  if (profilingConnection.IsOpen())
235  {
236  // Write a packet to the profiling connection. Silently ignore any write error and continue
237  profilingConnection.WritePacket(readBuffer, boost::numeric_cast<uint32_t>(readBufferSize));
238 
239  // Set the flag that indicates whether at least a packet has been sent
240  packetsSent = true;
241  }
242 
243  // Mark the packet buffer as read
244  m_BufferManager.MarkRead(packetBuffer);
245 
246  // Get the next available readable buffer
247  packetBuffer = m_BufferManager.GetReadableBuffer();
248  }
249  // Check whether at least a packet has been sent
250  if (packetsSent && notifyWatchers)
251  {
252  // Wait for the parent thread to release its mutex if necessary
253  {
254  std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
255  m_PacketSent = true;
256  }
257  // Notify to any watcher that something has been sent
258  m_PacketSentWaitCondition.notify_one();
259  }
260 }
261 
262 bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
263 {
264  std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
265  // Blocks until notified that at least a packet has been sent or until timeout expires.
266  bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
267  std::chrono::milliseconds(timeout),
268  [&] { return m_PacketSent; });
269 
270  m_PacketSent = false;
271 
272  return timedOut;
273 }
274 
275 } // namespace profiling
276 
277 } // namespace armnn
virtual IPacketBufferPtr GetReadableBuffer()=0
DataLayout::NHWC false
Copyright (c) 2020 ARM Limited.
virtual void MarkRead(IPacketBufferPtr &packetBuffer)=0
bool WaitForPacketSent(uint32_t timeout)
Definition: SendThread.cpp:262
SendThread(ProfilingStateMachine &profilingStateMachine, IBufferManager &buffer, ISendCounterPacket &sendCounterPacket, int timeout=1000)
Definition: SendThread.cpp:27
std::enable_if_t< std::is_unsigned< Source >::value &&std::is_unsigned< Dest >::value, Dest > numeric_cast(Source source)
Definition: NumericCast.hpp:33
void SetReadyToRead() override
Set a "ready to read" flag in the buffer to notify the reading thread to start reading it...
Definition: SendThread.cpp:40
virtual void SendStreamMetaDataPacket()=0
Create and write a StreamMetaDataPacket in the buffer.
virtual void SetConsumer(IConsumer *consumer)=0
void Stop(bool rethrowSendThreadExceptions=true) override
Stop the thread.
Definition: SendThread.cpp:81
std::unique_ptr< IPacketBuffer > IPacketBufferPtr
void Start(IProfilingConnection &profilingConnection) override
Start the thread.
Definition: SendThread.cpp:51
virtual bool WritePacket(const unsigned char *buffer, uint32_t length)=0