aboutsummaryrefslogtreecommitdiff
path: root/src/profiling/SendThread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/profiling/SendThread.cpp')
-rw-r--r--src/profiling/SendThread.cpp278
1 files changed, 278 insertions, 0 deletions
diff --git a/src/profiling/SendThread.cpp b/src/profiling/SendThread.cpp
new file mode 100644
index 0000000000..d595c9d4a5
--- /dev/null
+++ b/src/profiling/SendThread.cpp
@@ -0,0 +1,278 @@
+//
+// Copyright © 2020 Arm Ltd. All rights reserved.
+// SPDX-License-Identifier: MIT
+//
+
+#include "SendThread.hpp"
+#include "EncodeVersion.hpp"
+#include "ProfilingUtils.hpp"
+
+#include <armnn/Exceptions.hpp>
+#include <armnn/Conversion.hpp>
+#include <Processes.hpp>
+
+#include <boost/format.hpp>
+#include <boost/numeric/conversion/cast.hpp>
+#include <boost/core/ignore_unused.hpp>
+
+#include <cstring>
+
+namespace armnn
+{
+
+namespace profiling
+{
+
+using boost::numeric_cast;
+
+SendThread::SendThread(armnn::profiling::ProfilingStateMachine& profilingStateMachine,
+ armnn::profiling::IBufferManager& buffer, armnn::profiling::ISendCounterPacket& sendCounterPacket, int timeout)
+ : m_StateMachine(profilingStateMachine)
+ , m_BufferManager(buffer)
+ , m_SendCounterPacket(sendCounterPacket)
+ , m_Timeout(timeout)
+ , m_IsRunning(false)
+ , m_KeepRunning(false)
+ , m_SendThreadException(nullptr)
+{
+ m_BufferManager.SetConsumer(this);
+}
+
+void SendThread::SetReadyToRead()
+{
+ // We need to wait for the send thread to release its mutex
+ {
+ std::lock_guard<std::mutex> lck(m_WaitMutex);
+ m_ReadyToRead = true;
+ }
+ // Signal the send thread that there's something to read in the buffer
+ m_WaitCondition.notify_one();
+}
+
+void SendThread::Start(IProfilingConnection& profilingConnection)
+{
+ // Check if the send thread is already running
+ if (m_IsRunning.load())
+ {
+ // The send thread is already running
+ return;
+ }
+
+ if (m_SendThread.joinable())
+ {
+ m_SendThread.join();
+ }
+
+ // Mark the send thread as running
+ m_IsRunning.store(true);
+
+ // Keep the send procedure going until the send thread is signalled to stop
+ m_KeepRunning.store(true);
+
+ // Make sure the send thread will not flush the buffer until signaled to do so
+ // no need for a mutex as the send thread can not be running at this point
+ m_ReadyToRead = false;
+
+ m_PacketSent = false;
+
+ // Start the send thread
+ m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
+}
+
+void SendThread::Stop(bool rethrowSendThreadExceptions)
+{
+ // Signal the send thread to stop
+ m_KeepRunning.store(false);
+
+ // Check that the send thread is running
+ if (m_SendThread.joinable())
+ {
+ // Kick the send thread out of the wait condition
+ SetReadyToRead();
+ // Wait for the send thread to complete operations
+ m_SendThread.join();
+ }
+
+ // Check if the send thread exception has to be rethrown
+ if (!rethrowSendThreadExceptions)
+ {
+ // No need to rethrow the send thread exception, return immediately
+ return;
+ }
+
+ // Check if there's an exception to rethrow
+ if (m_SendThreadException)
+ {
+ // Rethrow the send thread exception
+ std::rethrow_exception(m_SendThreadException);
+
+ // Nullify the exception as it has been rethrown
+ m_SendThreadException = nullptr;
+ }
+}
+
+void SendThread::Send(IProfilingConnection& profilingConnection)
+{
+ // Run once and keep the sending procedure looping until the thread is signalled to stop
+ do
+ {
+ // Check the current state of the profiling service
+ ProfilingState currentState = m_StateMachine.GetCurrentState();
+ switch (currentState)
+ {
+ case ProfilingState::Uninitialised:
+ case ProfilingState::NotConnected:
+
+ // The send thread cannot be running when the profiling service is uninitialized or not connected,
+ // stop the thread immediately
+ m_KeepRunning.store(false);
+ m_IsRunning.store(false);
+
+ // An exception should be thrown here, save it to be rethrown later from the main thread so that
+ // it can be caught by the consumer
+ m_SendThreadException =
+ std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
+ "profiling service not yet initialized or connected"));
+
+ return;
+ case ProfilingState::WaitingForAck:
+
+ // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
+ // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
+ // updated by the command handler
+
+ // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
+ m_SendCounterPacket.SendStreamMetaDataPacket();
+
+ // Flush the buffer manually to send the packet
+ FlushBuffer(profilingConnection);
+
+ // Wait for a connection ack from the remote server. We should expect a response within timeout value.
+ // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
+ // StreamMetadata again.
+
+ // Wait condition lock scope - Begin
+ {
+ std::unique_lock<std::mutex> lock(m_WaitMutex);
+
+ bool timeout = m_WaitCondition.wait_for(lock,
+ std::chrono::milliseconds(m_Timeout),
+ [&]{ return m_ReadyToRead; });
+ // If we get notified we need to flush the buffer again
+ if(timeout)
+ {
+ // Otherwise if we just timed out don't flush the buffer
+ continue;
+ }
+ //reset condition variable predicate for next use
+ m_ReadyToRead = false;
+ }
+ // Wait condition lock scope - End
+ break;
+ case ProfilingState::Active:
+ default:
+ // Wait condition lock scope - Begin
+ {
+ std::unique_lock<std::mutex> lock(m_WaitMutex);
+
+ // Normal working state for the send thread
+ // Check if the send thread is required to enforce a timeout wait policy
+ if (m_Timeout < 0)
+ {
+ // Wait indefinitely until notified that something to read has become available in the buffer
+ m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
+ }
+ else
+ {
+ // Wait until the thread is notified of something to read from the buffer,
+ // or check anyway after the specified number of milliseconds
+ m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
+ }
+
+ //reset condition variable predicate for next use
+ m_ReadyToRead = false;
+ }
+ // Wait condition lock scope - End
+ break;
+ }
+
+ // Send all the available packets in the buffer
+ FlushBuffer(profilingConnection);
+ } while (m_KeepRunning.load());
+
+ // Ensure that all readable data got written to the profiling connection before the thread is stopped
+ // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
+ FlushBuffer(profilingConnection, false);
+
+ // Mark the send thread as not running
+ m_IsRunning.store(false);
+}
+
+void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
+{
+ // Get the first available readable buffer
+ IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
+
+ // Initialize the flag that indicates whether at least a packet has been sent
+ bool packetsSent = false;
+
+ while (packetBuffer != nullptr)
+ {
+ // Get the data to send from the buffer
+ const unsigned char* readBuffer = packetBuffer->GetReadableData();
+ unsigned int readBufferSize = packetBuffer->GetSize();
+
+ if (readBuffer == nullptr || readBufferSize == 0)
+ {
+ // Nothing to send, get the next available readable buffer and continue
+ m_BufferManager.MarkRead(packetBuffer);
+ packetBuffer = m_BufferManager.GetReadableBuffer();
+
+ continue;
+ }
+
+ // Check that the profiling connection is open, silently drop the data and continue if it's closed
+ if (profilingConnection.IsOpen())
+ {
+ // Write a packet to the profiling connection. Silently ignore any write error and continue
+ profilingConnection.WritePacket(readBuffer, boost::numeric_cast<uint32_t>(readBufferSize));
+
+ // Set the flag that indicates whether at least a packet has been sent
+ packetsSent = true;
+ }
+
+ // Mark the packet buffer as read
+ m_BufferManager.MarkRead(packetBuffer);
+
+ // Get the next available readable buffer
+ packetBuffer = m_BufferManager.GetReadableBuffer();
+ }
+ // Check whether at least a packet has been sent
+ if (packetsSent && notifyWatchers)
+ {
+ // Wait for the parent thread to release its mutex if necessary
+ {
+ std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
+ m_PacketSent = true;
+ }
+ // Notify to any watcher that something has been sent
+ m_PacketSentWaitCondition.notify_one();
+ }
+}
+
+bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
+{
+ std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
+ // Blocks until notified that at least a packet has been sent or until timeout expires.
+ bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
+ std::chrono::milliseconds(timeout),
+ [&] { return m_PacketSent; });
+
+ m_PacketSent = false;
+
+ return timedOut;
+}
+
+} // namespace profiling
+
+} // namespace armnn