From 5d737fb3b06c17ff6b65fb307343ca1c0c680401 Mon Sep 17 00:00:00 2001 From: Matteo Martincigh Date: Mon, 7 Oct 2019 13:05:13 +0100 Subject: IVGCVSW-3937 Update the Send thread to send out the Metadata packet * The Send thread now automatically sends out Stream Metadata packets when the Profiling Service is in WaitingForAck state * Added a reference to the profiling state in the SendCounterPacket class * Moving the RuntimeException thrown in the Send thread to the main thread for rethrowing * The Stop method now rethrows the exception occurred in the send thread * The Stop method does not rethrow when destructing the object * Added unit tests Signed-off-by: Matteo Martincigh Change-Id: Ice7080bff63199eac84fc4fa1d37fb1a6fcdff89 --- src/profiling/SendCounterPacket.cpp | 91 +++++++++++++++++++++++++++++++++---- 1 file changed, 81 insertions(+), 10 deletions(-) (limited to 'src/profiling/SendCounterPacket.cpp') diff --git a/src/profiling/SendCounterPacket.cpp b/src/profiling/SendCounterPacket.cpp index dc5a950bea..b9f2b187b7 100644 --- a/src/profiling/SendCounterPacket.cpp +++ b/src/profiling/SendCounterPacket.cpp @@ -920,7 +920,7 @@ void SendCounterPacket::Start(IProfilingConnection& profilingConnection) m_SendThread = std::thread(&SendCounterPacket::Send, this, std::ref(profilingConnection)); } -void SendCounterPacket::Stop() +void SendCounterPacket::Stop(bool rethrowSendThreadExceptions) { // Signal the send thread to stop m_KeepRunning.store(false); @@ -934,6 +934,30 @@ void SendCounterPacket::Stop() // Wait for the send thread to complete operations m_SendThread.join(); } + + // Check if the send thread exception has to be rethrown + if (!rethrowSendThreadExceptions) + { + // No need to rethrow the send thread exception, return immediately + return; + } + + // Exception handling lock scope - Begin + { + // Lock the mutex to handle any exception coming from the send thread + std::unique_lock lock(m_WaitMutex); + + // Check if there's an exception to rethrow + if (m_SendThreadException) + { + // Rethrow the send thread exception + std::rethrow_exception(m_SendThreadException); + + // Nullify the exception as it has been rethrown + m_SendThreadException = nullptr; + } + } + // Exception handling lock scope - End } void SendCounterPacket::Send(IProfilingConnection& profilingConnection) @@ -946,20 +970,67 @@ void SendCounterPacket::Send(IProfilingConnection& profilingConnection) // Lock the mutex to wait on it std::unique_lock lock(m_WaitMutex); - if (m_Timeout < 0) + // Check the current state of the profiling service + ProfilingState currentState = m_StateMachine.GetCurrentState(); + switch (currentState) { - // Wait indefinitely until notified that something to read has become available in the buffer + case ProfilingState::Uninitialised: + case ProfilingState::NotConnected: + + // The send thread cannot be running when the profiling service is uninitialized or not connected, + // stop the thread immediately + m_KeepRunning.store(false); + m_IsRunning.store(false); + + // An exception should be thrown here, save it to be rethrown later from the main thread so that + // it can be caught by the consumer + m_SendThreadException = + std::make_exception_ptr(RuntimeException("The send thread should not be running with the " + "profiling service not yet initialized or connected")); + + return; + case ProfilingState::WaitingForAck: + + // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged. + // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically + // updated by the command handler + + // Prepare a StreamMetadata packet and write it to the Counter Stream buffer + SendStreamMetaDataPacket(); + + // Flush the buffer manually to send the packet + FlushBuffer(profilingConnection); + + // Wait indefinitely until notified otherwise (it could that the profiling state has changed due to the + // connection being acknowledged, or that new data is ready to be sent, or that the send thread is + // being shut down, etc.) m_WaitCondition.wait(lock); - } - else - { - // Wait until the thread is notified of something to read from the buffer, - // or check anyway after the specified number of milliseconds - m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout)); + + // Do not flush the buffer again + continue; + case ProfilingState::Active: + default: + // Normal working state for the send thread + + // Check if the send thread is required to enforce a timeout wait policy + if (m_Timeout < 0) + { + // Wait indefinitely until notified that something to read has become available in the buffer + m_WaitCondition.wait(lock); + } + else + { + // Wait until the thread is notified of something to read from the buffer, + // or check anyway after the specified number of milliseconds + m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout)); + } + + break; } } // Wait condition lock scope - End + // Send all the available packets in the buffer FlushBuffer(profilingConnection); } @@ -1000,7 +1071,7 @@ void SendCounterPacket::FlushBuffer(IProfilingConnection& profilingConnection) // Mark the packet buffer as read m_BufferManager.MarkRead(packetBuffer); - // Get next available readable buffer + // Get the next available readable buffer packetBuffer = m_BufferManager.GetReadableBuffer(); } } -- cgit v1.2.1