27 : m_StateMachine(profilingStateMachine)
28 , m_BufferManager(buffer)
29 , m_SendCounterPacket(sendCounterPacket)
32 , m_KeepRunning(
false)
33 , m_SendThreadException(nullptr)
42 std::lock_guard<std::mutex> lck(m_WaitMutex);
46 m_WaitCondition.notify_one();
52 if (m_IsRunning.load())
58 if (m_SendThread.joinable())
64 m_IsRunning.store(
true);
67 m_KeepRunning.store(
true);
71 m_ReadyToRead =
false;
76 m_SendThread = std::thread(&SendThread::Send,
this, std::ref(profilingConnection));
82 m_KeepRunning.store(
false);
85 if (m_SendThread.joinable())
94 if (!rethrowSendThreadExceptions)
101 if (m_SendThreadException)
104 std::rethrow_exception(m_SendThreadException);
107 m_SendThreadException =
nullptr;
118 switch (currentState)
125 m_KeepRunning.store(
false);
126 m_IsRunning.store(
false);
130 m_SendThreadException =
131 std::make_exception_ptr(
RuntimeException(
"The send thread should not be running with the " 132 "profiling service not yet initialized or connected"));
145 FlushBuffer(profilingConnection);
153 std::unique_lock<std::mutex> lock(m_WaitMutex);
155 bool timeout = m_WaitCondition.wait_for(lock,
156 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
157 [&]{
return m_ReadyToRead; });
165 m_ReadyToRead =
false;
173 std::unique_lock<std::mutex> lock(m_WaitMutex);
180 m_WaitCondition.wait(lock, [&] {
return m_ReadyToRead; });
186 m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] {
return m_ReadyToRead; });
190 m_ReadyToRead =
false;
197 FlushBuffer(profilingConnection);
198 }
while (m_KeepRunning.load());
202 FlushBuffer(profilingConnection,
false);
205 m_IsRunning.store(
false);
214 bool packetsSent =
false;
216 while (packetBuffer !=
nullptr)
219 const unsigned char* readBuffer = packetBuffer->GetReadableData();
220 unsigned int readBufferSize = packetBuffer->GetSize();
222 if (readBuffer ==
nullptr || readBufferSize == 0)
225 m_BufferManager.
MarkRead(packetBuffer);
232 if (profilingConnection.
IsOpen())
235 profilingConnection.
WritePacket(readBuffer, armnn::numeric_cast<uint32_t>(readBufferSize));
242 m_BufferManager.
MarkRead(packetBuffer);
248 if (packetsSent && notifyWatchers)
252 std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
256 m_PacketSentWaitCondition.notify_one();
262 std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
264 bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
265 std::chrono::milliseconds(timeout),
266 [&] {
return m_PacketSent; });
268 m_PacketSent =
false;
ProfilingState GetCurrentState() const
virtual IPacketBufferPtr GetReadableBuffer()=0
Copyright (c) 2020 ARM Limited.
virtual void MarkRead(IPacketBufferPtr &packetBuffer)=0
bool WaitForPacketSent(uint32_t timeout)
SendThread(ProfilingStateMachine &profilingStateMachine, IBufferManager &buffer, ISendCounterPacket &sendCounterPacket, int timeout=1000)
void SetReadyToRead() override
Set a "ready to read" flag in the buffer to notify the reading thread to start reading it...
virtual void SendStreamMetaDataPacket()=0
Create and write a StreamMetaDataPacket in the buffer.
virtual void SetConsumer(IConsumer *consumer)=0
virtual bool IsOpen() const =0
void Stop(bool rethrowSendThreadExceptions=true) override
Stop the thread.
std::unique_ptr< IPacketBuffer > IPacketBufferPtr
void Start(IProfilingConnection &profilingConnection) override
Start the thread.
virtual bool WritePacket(const unsigned char *buffer, uint32_t length)=0