From 73ae7fa28d05babde16fa232c1d823b35c893e3e Mon Sep 17 00:00:00 2001 From: Finn Williams Date: Tue, 3 Dec 2019 11:55:31 +0000 Subject: IVGCVSW-4221 Fix SendCounterPacket hanging for indefinite time Signed-off-by: Finn Williams Change-Id: I612f4d0162e7f35296f7d484350a937f6344fcfb --- src/profiling/SendCounterPacket.cpp | 135 +++++++++++++++++++----------------- src/profiling/SendCounterPacket.hpp | 2 + 2 files changed, 75 insertions(+), 62 deletions(-) diff --git a/src/profiling/SendCounterPacket.cpp b/src/profiling/SendCounterPacket.cpp index 2d6458316f..e3a9b77f41 100644 --- a/src/profiling/SendCounterPacket.cpp +++ b/src/profiling/SendCounterPacket.cpp @@ -903,6 +903,11 @@ void SendCounterPacket::SendPeriodicCounterSelectionPacket(uint32_t capturePerio void SendCounterPacket::SetReadyToRead() { + // We need to wait for the send thread to release its mutex + { + std::lock_guard lck(m_WaitMutex); + m_ReadyToRead = true; + } // Signal the send thread that there's something to read in the buffer m_WaitCondition.notify_one(); } @@ -927,6 +932,10 @@ void SendCounterPacket::Start(IProfilingConnection& profilingConnection) // Keep the send procedure going until the send thread is signalled to stop m_KeepRunning.store(true); + // Make sure the send thread will not flush the buffer until signaled to do so + // no need for a mutex as the send thread can not be running at this point + m_ReadyToRead = false; + // Start the send thread m_SendThread = std::thread(&SendCounterPacket::Send, this, std::ref(profilingConnection)); } @@ -940,8 +949,7 @@ void SendCounterPacket::Stop(bool rethrowSendThreadExceptions) if (m_SendThread.joinable()) { // Kick the send thread out of the wait condition - m_WaitCondition.notify_one(); - + SetReadyToRead(); // Wait for the send thread to complete operations m_SendThread.join(); } @@ -953,22 +961,15 @@ void SendCounterPacket::Stop(bool rethrowSendThreadExceptions) return; } - // Exception handling lock scope - Begin + // Check if there's an exception to rethrow + if (m_SendThreadException) { - // Lock the mutex to handle any exception coming from the send thread - std::lock_guard lock(m_WaitMutex); - - // Check if there's an exception to rethrow - if (m_SendThreadException) - { - // Rethrow the send thread exception - std::rethrow_exception(m_SendThreadException); + // Rethrow the send thread exception + std::rethrow_exception(m_SendThreadException); - // Nullify the exception as it has been rethrown - m_SendThreadException = nullptr; - } + // Nullify the exception as it has been rethrown + m_SendThreadException = nullptr; } - // Exception handling lock scope - End } void SendCounterPacket::Send(IProfilingConnection& profilingConnection) @@ -976,70 +977,80 @@ void SendCounterPacket::Send(IProfilingConnection& profilingConnection) // Keep the sending procedure looping until the thread is signalled to stop while (m_KeepRunning.load()) { - // Wait condition lock scope - Begin + // Check the current state of the profiling service + ProfilingState currentState = m_StateMachine.GetCurrentState(); + switch (currentState) { - // Lock the mutex to wait on it - std::unique_lock lock(m_WaitMutex); + case ProfilingState::Uninitialised: + case ProfilingState::NotConnected: + + // The send thread cannot be running when the profiling service is uninitialized or not connected, + // stop the thread immediately + m_KeepRunning.store(false); + m_IsRunning.store(false); + + // An exception should be thrown here, save it to be rethrown later from the main thread so that + // it can be caught by the consumer + m_SendThreadException = + std::make_exception_ptr(RuntimeException("The send thread should not be running with the " + "profiling service not yet initialized or connected")); + + return; + case ProfilingState::WaitingForAck: + + // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged. + // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically + // updated by the command handler - // Check the current state of the profiling service - ProfilingState currentState = m_StateMachine.GetCurrentState(); - switch (currentState) + // Prepare a StreamMetadata packet and write it to the Counter Stream buffer + SendStreamMetaDataPacket(); + + // Flush the buffer manually to send the packet + FlushBuffer(profilingConnection); + + // Wait for a connection ack from the remote server. We should expect a response within timeout value. + // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the + // StreamMetadata again. + + // Wait condition lock scope - Begin { - case ProfilingState::Uninitialised: - case ProfilingState::NotConnected: - - // The send thread cannot be running when the profiling service is uninitialized or not connected, - // stop the thread immediately - m_KeepRunning.store(false); - m_IsRunning.store(false); - - // An exception should be thrown here, save it to be rethrown later from the main thread so that - // it can be caught by the consumer - m_SendThreadException = - std::make_exception_ptr(RuntimeException("The send thread should not be running with the " - "profiling service not yet initialized or connected")); - - return; - case ProfilingState::WaitingForAck: - - // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged. - // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically - // updated by the command handler - - // Prepare a StreamMetadata packet and write it to the Counter Stream buffer - SendStreamMetaDataPacket(); - - // Flush the buffer manually to send the packet - FlushBuffer(profilingConnection); - - // Wait for a connection ack from the remote server. We should expect a response within timeout value. - // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the - // StreamMetadata again. - m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout)); - - // Do not flush the buffer again - continue; - case ProfilingState::Active: - default: - // Normal working state for the send thread + std::unique_lock lock(m_WaitMutex); + + m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&]{ return m_ReadyToRead; }); + //reset condition variable predicate for next use + m_ReadyToRead = false; + } + // Wait condition lock scope - End + + // Do not flush the buffer again + continue; + case ProfilingState::Active: + default: + // Wait condition lock scope - Begin + { + std::unique_lock lock(m_WaitMutex); + + // Normal working state for the send thread // Check if the send thread is required to enforce a timeout wait policy if (m_Timeout < 0) { // Wait indefinitely until notified that something to read has become available in the buffer - m_WaitCondition.wait(lock); + m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; }); } else { // Wait until the thread is notified of something to read from the buffer, // or check anyway after the specified number of milliseconds - m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout)); + m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; }); } - break; + //reset condition variable predicate for next use + m_ReadyToRead = false; } + // Wait condition lock scope - End + break; } - // Wait condition lock scope - End // Send all the available packets in the buffer FlushBuffer(profilingConnection); diff --git a/src/profiling/SendCounterPacket.hpp b/src/profiling/SendCounterPacket.hpp index 42e84323e4..11587552b8 100644 --- a/src/profiling/SendCounterPacket.hpp +++ b/src/profiling/SendCounterPacket.hpp @@ -109,6 +109,8 @@ private: std::thread m_SendThread; std::atomic m_IsRunning; std::atomic m_KeepRunning; + // m_ReadyToRead will be protected by m_WaitMutex + bool m_ReadyToRead; std::exception_ptr m_SendThreadException; std::mutex m_PacketSentWaitMutex; std::condition_variable m_PacketSentWaitCondition; -- cgit v1.2.1