aboutsummaryrefslogtreecommitdiff
path: root/src/profiling/SendCounterPacket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/profiling/SendCounterPacket.cpp')
-rw-r--r--src/profiling/SendCounterPacket.cpp238
1 files changed, 1 insertions, 237 deletions
diff --git a/src/profiling/SendCounterPacket.cpp b/src/profiling/SendCounterPacket.cpp
index 4d305af951..942caec295 100644
--- a/src/profiling/SendCounterPacket.cpp
+++ b/src/profiling/SendCounterPacket.cpp
@@ -5,7 +5,6 @@
#include "SendCounterPacket.hpp"
#include "EncodeVersion.hpp"
-#include "ProfilingUtils.hpp"
#include <armnn/Exceptions.hpp>
#include <armnn/Conversion.hpp>
@@ -169,7 +168,7 @@ void SendCounterPacket::SendStreamMetaDataPacket()
CancelOperationAndThrow<RuntimeException>(writeBuffer, "Error processing packet.");
}
- m_BufferManager.Commit(writeBuffer, totalSize);
+ m_BufferManager.Commit(writeBuffer, totalSize, false);
}
bool SendCounterPacket::CreateCategoryRecord(const CategoryPtr& category,
@@ -903,241 +902,6 @@ void SendCounterPacket::SendPeriodicCounterSelectionPacket(uint32_t capturePerio
m_BufferManager.Commit(writeBuffer, totalSize);
}
-void SendCounterPacket::SetReadyToRead()
-{
- // We need to wait for the send thread to release its mutex
- {
- std::lock_guard<std::mutex> lck(m_WaitMutex);
- m_ReadyToRead = true;
- }
- // Signal the send thread that there's something to read in the buffer
- m_WaitCondition.notify_one();
-}
-
-void SendCounterPacket::Start(IProfilingConnection& profilingConnection)
-{
- // Check if the send thread is already running
- if (m_IsRunning.load())
- {
- // The send thread is already running
- return;
- }
-
- if (m_SendThread.joinable())
- {
- m_SendThread.join();
- }
-
- // Mark the send thread as running
- m_IsRunning.store(true);
-
- // Keep the send procedure going until the send thread is signalled to stop
- m_KeepRunning.store(true);
-
- // Make sure the send thread will not flush the buffer until signaled to do so
- // no need for a mutex as the send thread can not be running at this point
- m_ReadyToRead = false;
-
- m_PacketSent = false;
-
- // Start the send thread
- m_SendThread = std::thread(&SendCounterPacket::Send, this, std::ref(profilingConnection));
-}
-
-void SendCounterPacket::Stop(bool rethrowSendThreadExceptions)
-{
- // Signal the send thread to stop
- m_KeepRunning.store(false);
-
- // Check that the send thread is running
- if (m_SendThread.joinable())
- {
- // Kick the send thread out of the wait condition
- SetReadyToRead();
- // Wait for the send thread to complete operations
- m_SendThread.join();
- }
-
- // Check if the send thread exception has to be rethrown
- if (!rethrowSendThreadExceptions)
- {
- // No need to rethrow the send thread exception, return immediately
- return;
- }
-
- // Check if there's an exception to rethrow
- if (m_SendThreadException)
- {
- // Rethrow the send thread exception
- std::rethrow_exception(m_SendThreadException);
-
- // Nullify the exception as it has been rethrown
- m_SendThreadException = nullptr;
- }
-}
-
-void SendCounterPacket::Send(IProfilingConnection& profilingConnection)
-{
- // Run once and keep the sending procedure looping until the thread is signalled to stop
- do
- {
- // Check the current state of the profiling service
- ProfilingState currentState = m_StateMachine.GetCurrentState();
- switch (currentState)
- {
- case ProfilingState::Uninitialised:
- case ProfilingState::NotConnected:
-
- // The send thread cannot be running when the profiling service is uninitialized or not connected,
- // stop the thread immediately
- m_KeepRunning.store(false);
- m_IsRunning.store(false);
-
- // An exception should be thrown here, save it to be rethrown later from the main thread so that
- // it can be caught by the consumer
- m_SendThreadException =
- std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
- "profiling service not yet initialized or connected"));
-
- return;
- case ProfilingState::WaitingForAck:
-
- // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
- // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
- // updated by the command handler
-
- // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
- SendStreamMetaDataPacket();
-
- // Flush the buffer manually to send the packet
- FlushBuffer(profilingConnection);
-
- // Wait for a connection ack from the remote server. We should expect a response within timeout value.
- // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
- // StreamMetadata again.
-
- // Wait condition lock scope - Begin
- {
- std::unique_lock<std::mutex> lock(m_WaitMutex);
-
- bool timeout = m_WaitCondition.wait_for(lock,
- std::chrono::milliseconds(m_Timeout),
- [&]{ return m_ReadyToRead; });
- // If we get notified we need to flush the buffer again
- if(timeout)
- {
- // Otherwise if we just timed out don't flush the buffer
- continue;
- }
- //reset condition variable predicate for next use
- m_ReadyToRead = false;
- }
- // Wait condition lock scope - End
- break;
- case ProfilingState::Active:
- default:
- // Wait condition lock scope - Begin
- {
- std::unique_lock<std::mutex> lock(m_WaitMutex);
-
- // Normal working state for the send thread
- // Check if the send thread is required to enforce a timeout wait policy
- if (m_Timeout < 0)
- {
- // Wait indefinitely until notified that something to read has become available in the buffer
- m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
- }
- else
- {
- // Wait until the thread is notified of something to read from the buffer,
- // or check anyway after the specified number of milliseconds
- m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
- }
-
- //reset condition variable predicate for next use
- m_ReadyToRead = false;
- }
- // Wait condition lock scope - End
- break;
- }
-
- // Send all the available packets in the buffer
- FlushBuffer(profilingConnection);
- } while (m_KeepRunning.load());
-
- // Ensure that all readable data got written to the profiling connection before the thread is stopped
- // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
- FlushBuffer(profilingConnection, false);
-
- // Mark the send thread as not running
- m_IsRunning.store(false);
-}
-
-void SendCounterPacket::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
-{
- // Get the first available readable buffer
- IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
-
- // Initialize the flag that indicates whether at least a packet has been sent
- bool packetsSent = false;
-
- while (packetBuffer != nullptr)
- {
- // Get the data to send from the buffer
- const unsigned char* readBuffer = packetBuffer->GetReadableData();
- unsigned int readBufferSize = packetBuffer->GetSize();
-
- if (readBuffer == nullptr || readBufferSize == 0)
- {
- // Nothing to send, get the next available readable buffer and continue
- m_BufferManager.MarkRead(packetBuffer);
- packetBuffer = m_BufferManager.GetReadableBuffer();
-
- continue;
- }
-
- // Check that the profiling connection is open, silently drop the data and continue if it's closed
- if (profilingConnection.IsOpen())
- {
- // Write a packet to the profiling connection. Silently ignore any write error and continue
- profilingConnection.WritePacket(readBuffer, boost::numeric_cast<uint32_t>(readBufferSize));
-
- // Set the flag that indicates whether at least a packet has been sent
- packetsSent = true;
- }
-
- // Mark the packet buffer as read
- m_BufferManager.MarkRead(packetBuffer);
-
- // Get the next available readable buffer
- packetBuffer = m_BufferManager.GetReadableBuffer();
- }
- // Check whether at least a packet has been sent
- if (packetsSent && notifyWatchers)
- {
- // Wait for the parent thread to release its mutex if necessary
- {
- std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
- m_PacketSent = true;
- }
- // Notify to any watcher that something has been sent
- m_PacketSentWaitCondition.notify_one();
- }
-}
-
-bool SendCounterPacket::WaitForPacketSent(uint32_t timeout = 1000)
-{
- std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
- // Blocks until notified that at least a packet has been sent or until timeout expires.
- bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
- std::chrono::milliseconds(timeout),
- [&] { return m_PacketSent; });
-
- m_PacketSent = false;
-
- return timedOut;
-}
-
} // namespace profiling
} // namespace armnn