// // Copyright © 2020 Arm Ltd. All rights reserved. // SPDX-License-Identifier: MIT // #include "SendThread.hpp" #include "EncodeVersion.hpp" #include "ProfilingUtils.hpp" #include #include #include #include #include #include namespace armnn { namespace profiling { using boost::numeric_cast; SendThread::SendThread(armnn::profiling::ProfilingStateMachine& profilingStateMachine, armnn::profiling::IBufferManager& buffer, armnn::profiling::ISendCounterPacket& sendCounterPacket, int timeout) : m_StateMachine(profilingStateMachine) , m_BufferManager(buffer) , m_SendCounterPacket(sendCounterPacket) , m_Timeout(timeout) , m_IsRunning(false) , m_KeepRunning(false) , m_SendThreadException(nullptr) { m_BufferManager.SetConsumer(this); } void SendThread::SetReadyToRead() { // We need to wait for the send thread to release its mutex { std::lock_guard lck(m_WaitMutex); m_ReadyToRead = true; } // Signal the send thread that there's something to read in the buffer m_WaitCondition.notify_one(); } void SendThread::Start(IProfilingConnection& profilingConnection) { // Check if the send thread is already running if (m_IsRunning.load()) { // The send thread is already running return; } if (m_SendThread.joinable()) { m_SendThread.join(); } // Mark the send thread as running m_IsRunning.store(true); // Keep the send procedure going until the send thread is signalled to stop m_KeepRunning.store(true); // Make sure the send thread will not flush the buffer until signaled to do so // no need for a mutex as the send thread can not be running at this point m_ReadyToRead = false; m_PacketSent = false; // Start the send thread m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection)); } void SendThread::Stop(bool rethrowSendThreadExceptions) { // Signal the send thread to stop m_KeepRunning.store(false); // Check that the send thread is running if (m_SendThread.joinable()) { // Kick the send thread out of the wait condition SetReadyToRead(); // Wait for the send thread to complete operations m_SendThread.join(); } // Check if the send thread exception has to be rethrown if (!rethrowSendThreadExceptions) { // No need to rethrow the send thread exception, return immediately return; } // Check if there's an exception to rethrow if (m_SendThreadException) { // Rethrow the send thread exception std::rethrow_exception(m_SendThreadException); // Nullify the exception as it has been rethrown m_SendThreadException = nullptr; } } void SendThread::Send(IProfilingConnection& profilingConnection) { // Run once and keep the sending procedure looping until the thread is signalled to stop do { // Check the current state of the profiling service ProfilingState currentState = m_StateMachine.GetCurrentState(); switch (currentState) { case ProfilingState::Uninitialised: case ProfilingState::NotConnected: // The send thread cannot be running when the profiling service is uninitialized or not connected, // stop the thread immediately m_KeepRunning.store(false); m_IsRunning.store(false); // An exception should be thrown here, save it to be rethrown later from the main thread so that // it can be caught by the consumer m_SendThreadException = std::make_exception_ptr(RuntimeException("The send thread should not be running with the " "profiling service not yet initialized or connected")); return; case ProfilingState::WaitingForAck: // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged. // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically // updated by the command handler // Prepare a StreamMetadata packet and write it to the Counter Stream buffer m_SendCounterPacket.SendStreamMetaDataPacket(); // Flush the buffer manually to send the packet FlushBuffer(profilingConnection); // Wait for a connection ack from the remote server. We should expect a response within timeout value. // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the // StreamMetadata again. // Wait condition lock scope - Begin { std::unique_lock lock(m_WaitMutex); bool timeout = m_WaitCondition.wait_for(lock, std::chrono::milliseconds(std::max(m_Timeout, 1000)), [&]{ return m_ReadyToRead; }); // If we get notified we need to flush the buffer again if(timeout) { // Otherwise if we just timed out don't flush the buffer continue; } //reset condition variable predicate for next use m_ReadyToRead = false; } // Wait condition lock scope - End break; case ProfilingState::Active: default: // Wait condition lock scope - Begin { std::unique_lock lock(m_WaitMutex); // Normal working state for the send thread // Check if the send thread is required to enforce a timeout wait policy if (m_Timeout < 0) { // Wait indefinitely until notified that something to read has become available in the buffer m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; }); } else { // Wait until the thread is notified of something to read from the buffer, // or check anyway after the specified number of milliseconds m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; }); } //reset condition variable predicate for next use m_ReadyToRead = false; } // Wait condition lock scope - End break; } // Send all the available packets in the buffer FlushBuffer(profilingConnection); } while (m_KeepRunning.load()); // Ensure that all readable data got written to the profiling connection before the thread is stopped // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread) FlushBuffer(profilingConnection, false); // Mark the send thread as not running m_IsRunning.store(false); } void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers) { // Get the first available readable buffer IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer(); // Initialize the flag that indicates whether at least a packet has been sent bool packetsSent = false; while (packetBuffer != nullptr) { // Get the data to send from the buffer const unsigned char* readBuffer = packetBuffer->GetReadableData(); unsigned int readBufferSize = packetBuffer->GetSize(); if (readBuffer == nullptr || readBufferSize == 0) { // Nothing to send, get the next available readable buffer and continue m_BufferManager.MarkRead(packetBuffer); packetBuffer = m_BufferManager.GetReadableBuffer(); continue; } // Check that the profiling connection is open, silently drop the data and continue if it's closed if (profilingConnection.IsOpen()) { // Write a packet to the profiling connection. Silently ignore any write error and continue profilingConnection.WritePacket(readBuffer, boost::numeric_cast(readBufferSize)); // Set the flag that indicates whether at least a packet has been sent packetsSent = true; } // Mark the packet buffer as read m_BufferManager.MarkRead(packetBuffer); // Get the next available readable buffer packetBuffer = m_BufferManager.GetReadableBuffer(); } // Check whether at least a packet has been sent if (packetsSent && notifyWatchers) { // Wait for the parent thread to release its mutex if necessary { std::lock_guard lck(m_PacketSentWaitMutex); m_PacketSent = true; } // Notify to any watcher that something has been sent m_PacketSentWaitCondition.notify_one(); } } bool SendThread::WaitForPacketSent(uint32_t timeout = 1000) { std::unique_lock lock(m_PacketSentWaitMutex); // Blocks until notified that at least a packet has been sent or until timeout expires. bool timedOut = m_PacketSentWaitCondition.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return m_PacketSent; }); m_PacketSent = false; return timedOut; } } // namespace profiling } // namespace armnn