aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFinn Williams <Finn.Williams@arm.com>2019-12-03 11:55:31 +0000
committerFinn Williams <Finn.Williams@arm.com>2019-12-04 11:47:50 +0000
commit73ae7fa28d05babde16fa232c1d823b35c893e3e (patch)
tree002138055eec225f2017c37cca9dcac6400f6cc0
parent811ca5566ac4a166fb4d8d8ee4f235dda59b1eab (diff)
downloadarmnn-73ae7fa28d05babde16fa232c1d823b35c893e3e.tar.gz
IVGCVSW-4221 Fix SendCounterPacket hanging for indefinite time
Signed-off-by: Finn Williams <Finn.Williams@arm.com> Change-Id: I612f4d0162e7f35296f7d484350a937f6344fcfb
-rw-r--r--src/profiling/SendCounterPacket.cpp135
-rw-r--r--src/profiling/SendCounterPacket.hpp2
2 files changed, 75 insertions, 62 deletions
diff --git a/src/profiling/SendCounterPacket.cpp b/src/profiling/SendCounterPacket.cpp
index 2d6458316f..e3a9b77f41 100644
--- a/src/profiling/SendCounterPacket.cpp
+++ b/src/profiling/SendCounterPacket.cpp
@@ -903,6 +903,11 @@ void SendCounterPacket::SendPeriodicCounterSelectionPacket(uint32_t capturePerio
void SendCounterPacket::SetReadyToRead()
{
+ // We need to wait for the send thread to release its mutex
+ {
+ std::lock_guard<std::mutex> lck(m_WaitMutex);
+ m_ReadyToRead = true;
+ }
// Signal the send thread that there's something to read in the buffer
m_WaitCondition.notify_one();
}
@@ -927,6 +932,10 @@ void SendCounterPacket::Start(IProfilingConnection& profilingConnection)
// Keep the send procedure going until the send thread is signalled to stop
m_KeepRunning.store(true);
+ // Make sure the send thread will not flush the buffer until signaled to do so
+ // no need for a mutex as the send thread can not be running at this point
+ m_ReadyToRead = false;
+
// Start the send thread
m_SendThread = std::thread(&SendCounterPacket::Send, this, std::ref(profilingConnection));
}
@@ -940,8 +949,7 @@ void SendCounterPacket::Stop(bool rethrowSendThreadExceptions)
if (m_SendThread.joinable())
{
// Kick the send thread out of the wait condition
- m_WaitCondition.notify_one();
-
+ SetReadyToRead();
// Wait for the send thread to complete operations
m_SendThread.join();
}
@@ -953,22 +961,15 @@ void SendCounterPacket::Stop(bool rethrowSendThreadExceptions)
return;
}
- // Exception handling lock scope - Begin
+ // Check if there's an exception to rethrow
+ if (m_SendThreadException)
{
- // Lock the mutex to handle any exception coming from the send thread
- std::lock_guard<std::mutex> lock(m_WaitMutex);
-
- // Check if there's an exception to rethrow
- if (m_SendThreadException)
- {
- // Rethrow the send thread exception
- std::rethrow_exception(m_SendThreadException);
+ // Rethrow the send thread exception
+ std::rethrow_exception(m_SendThreadException);
- // Nullify the exception as it has been rethrown
- m_SendThreadException = nullptr;
- }
+ // Nullify the exception as it has been rethrown
+ m_SendThreadException = nullptr;
}
- // Exception handling lock scope - End
}
void SendCounterPacket::Send(IProfilingConnection& profilingConnection)
@@ -976,70 +977,80 @@ void SendCounterPacket::Send(IProfilingConnection& profilingConnection)
// Keep the sending procedure looping until the thread is signalled to stop
while (m_KeepRunning.load())
{
- // Wait condition lock scope - Begin
+ // Check the current state of the profiling service
+ ProfilingState currentState = m_StateMachine.GetCurrentState();
+ switch (currentState)
{
- // Lock the mutex to wait on it
- std::unique_lock<std::mutex> lock(m_WaitMutex);
+ case ProfilingState::Uninitialised:
+ case ProfilingState::NotConnected:
+
+ // The send thread cannot be running when the profiling service is uninitialized or not connected,
+ // stop the thread immediately
+ m_KeepRunning.store(false);
+ m_IsRunning.store(false);
+
+ // An exception should be thrown here, save it to be rethrown later from the main thread so that
+ // it can be caught by the consumer
+ m_SendThreadException =
+ std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
+ "profiling service not yet initialized or connected"));
+
+ return;
+ case ProfilingState::WaitingForAck:
+
+ // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
+ // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
+ // updated by the command handler
- // Check the current state of the profiling service
- ProfilingState currentState = m_StateMachine.GetCurrentState();
- switch (currentState)
+ // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
+ SendStreamMetaDataPacket();
+
+ // Flush the buffer manually to send the packet
+ FlushBuffer(profilingConnection);
+
+ // Wait for a connection ack from the remote server. We should expect a response within timeout value.
+ // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
+ // StreamMetadata again.
+
+ // Wait condition lock scope - Begin
{
- case ProfilingState::Uninitialised:
- case ProfilingState::NotConnected:
-
- // The send thread cannot be running when the profiling service is uninitialized or not connected,
- // stop the thread immediately
- m_KeepRunning.store(false);
- m_IsRunning.store(false);
-
- // An exception should be thrown here, save it to be rethrown later from the main thread so that
- // it can be caught by the consumer
- m_SendThreadException =
- std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
- "profiling service not yet initialized or connected"));
-
- return;
- case ProfilingState::WaitingForAck:
-
- // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
- // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
- // updated by the command handler
-
- // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
- SendStreamMetaDataPacket();
-
- // Flush the buffer manually to send the packet
- FlushBuffer(profilingConnection);
-
- // Wait for a connection ack from the remote server. We should expect a response within timeout value.
- // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
- // StreamMetadata again.
- m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout));
-
- // Do not flush the buffer again
- continue;
- case ProfilingState::Active:
- default:
- // Normal working state for the send thread
+ std::unique_lock<std::mutex> lock(m_WaitMutex);
+
+ m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&]{ return m_ReadyToRead; });
+ //reset condition variable predicate for next use
+ m_ReadyToRead = false;
+ }
+ // Wait condition lock scope - End
+
+ // Do not flush the buffer again
+ continue;
+ case ProfilingState::Active:
+ default:
+ // Wait condition lock scope - Begin
+ {
+ std::unique_lock<std::mutex> lock(m_WaitMutex);
+
+ // Normal working state for the send thread
// Check if the send thread is required to enforce a timeout wait policy
if (m_Timeout < 0)
{
// Wait indefinitely until notified that something to read has become available in the buffer
- m_WaitCondition.wait(lock);
+ m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
}
else
{
// Wait until the thread is notified of something to read from the buffer,
// or check anyway after the specified number of milliseconds
- m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout));
+ m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
}
- break;
+ //reset condition variable predicate for next use
+ m_ReadyToRead = false;
}
+ // Wait condition lock scope - End
+ break;
}
- // Wait condition lock scope - End
// Send all the available packets in the buffer
FlushBuffer(profilingConnection);
diff --git a/src/profiling/SendCounterPacket.hpp b/src/profiling/SendCounterPacket.hpp
index 42e84323e4..11587552b8 100644
--- a/src/profiling/SendCounterPacket.hpp
+++ b/src/profiling/SendCounterPacket.hpp
@@ -109,6 +109,8 @@ private:
std::thread m_SendThread;
std::atomic<bool> m_IsRunning;
std::atomic<bool> m_KeepRunning;
+ // m_ReadyToRead will be protected by m_WaitMutex
+ bool m_ReadyToRead;
std::exception_ptr m_SendThreadException;
std::mutex m_PacketSentWaitMutex;
std::condition_variable m_PacketSentWaitCondition;