diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/profiling/BufferManager.cpp | 7 | ||||
-rw-r--r-- | src/profiling/SendCounterPacket.cpp | 98 | ||||
-rw-r--r-- | src/profiling/SendCounterPacket.hpp | 14 | ||||
-rw-r--r-- | src/profiling/test/BufferTests.cpp | 33 | ||||
-rw-r--r-- | src/profiling/test/SendCounterPacketTests.cpp | 195 | ||||
-rw-r--r-- | src/profiling/test/SendCounterPacketTests.hpp | 29 |
6 files changed, 311 insertions, 65 deletions
diff --git a/src/profiling/BufferManager.cpp b/src/profiling/BufferManager.cpp index dbf4466bbb..6ac3ee17d1 100644 --- a/src/profiling/BufferManager.cpp +++ b/src/profiling/BufferManager.cpp @@ -29,16 +29,17 @@ BufferManager::BufferManager(unsigned int numberOfBuffers, unsigned int maxPacke std::unique_ptr<IPacketBuffer> BufferManager::Reserve(unsigned int requestedSize, unsigned int& reservedSize) { + reservedSize = 0; std::unique_lock<std::mutex> availableListLock(m_AvailableMutex, std::defer_lock); if (requestedSize > m_MaxBufferSize) { - throw armnn::InvalidArgumentException("The maximum buffer size that can be requested is [" + - std::to_string(m_MaxBufferSize) + "] bytes"); + return nullptr; } availableListLock.lock(); if (m_AvailableList.empty()) { - throw armnn::profiling::BufferExhaustion("Buffer not available"); + availableListLock.unlock(); + return nullptr; } std::unique_ptr<IPacketBuffer> buffer = std::move(m_AvailableList.back()); m_AvailableList.pop_back(); diff --git a/src/profiling/SendCounterPacket.cpp b/src/profiling/SendCounterPacket.cpp index 0a2f08b095..9aafa2ccc8 100644 --- a/src/profiling/SendCounterPacket.cpp +++ b/src/profiling/SendCounterPacket.cpp @@ -67,17 +67,11 @@ void SendCounterPacket::SendStreamMetaDataPacket() std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") - % totalSize)); - } - - if (writeBuffer == nullptr) - { - CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory."); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } try @@ -795,19 +789,11 @@ void SendCounterPacket::SendCounterDirectoryPacket(const ICounterDirectory& coun uint32_t reserved = 0; std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - // Check that the reserved buffer size is enough to hold the counter directory packet - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes") - % totalSize)); - } - - // Check the buffer handle is valid - if (writeBuffer == nullptr) - { - CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory"); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } // Offset for writing to the buffer @@ -837,17 +823,11 @@ void SendCounterPacket::SendPeriodicCounterCapturePacket(uint64_t timestamp, con std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") - % totalSize)); - } - - if (writeBuffer == nullptr) - { - CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory."); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } // Create header. @@ -888,17 +868,11 @@ void SendCounterPacket::SendPeriodicCounterSelectionPacket(uint32_t capturePerio std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") - % totalSize)); - } - - if (writeBuffer == nullptr) - { - CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory."); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } // Create header. @@ -972,25 +946,47 @@ void SendCounterPacket::Send() // Lock the mutex to wait on it std::unique_lock<std::mutex> lock(m_WaitMutex); - // Wait until the thread is notified of something to read from the buffer, or check anyway after a second - m_WaitCondition.wait_for(lock, std::chrono::seconds(1)); + if (m_Timeout < 0) + { + // Wait indefinitely until notified that something to read has become available in the buffer + m_WaitCondition.wait(lock); + } + else + { + // Wait until the thread is notified of something to read from the buffer, + // or check anyway after a second + m_WaitCondition.wait_for(lock, std::chrono::seconds(m_Timeout)); + } } // Wait condition lock scope - End - // Get the buffer to read from - std::unique_ptr<IPacketBuffer> packetBuffer = m_BufferManager.GetReadableBuffer(); - if (packetBuffer == nullptr) - { - // Nothing to read from, ignore and continue - continue; - } + FlushBuffer(); + } + // Ensure that all readable data got written to the profiling connection before the thread is stopped + FlushBuffer(); + + // Mark the send thread as not running + m_IsRunning.store(false); +} + +void SendCounterPacket::FlushBuffer() +{ + // Get the first available readable buffer + std::unique_ptr<IPacketBuffer> packetBuffer = m_BufferManager.GetReadableBuffer(); + + while (packetBuffer != nullptr) + { // Get the data to send from the buffer const unsigned char* readBuffer = packetBuffer->GetReadableData(); unsigned int readBufferSize = packetBuffer->GetSize(); + if (readBuffer == nullptr || readBufferSize == 0) { - // Nothing to send, ignore and continue + // Nothing to send, get the next available readable buffer and continue + m_BufferManager.MarkRead(packetBuffer); + packetBuffer = m_BufferManager.GetReadableBuffer(); + continue; } @@ -1003,10 +999,10 @@ void SendCounterPacket::Send() // Mark the packet buffer as read m_BufferManager.MarkRead(packetBuffer); - } - // Mark the send thread as not running - m_IsRunning.store(false); + // Get next available readable buffer + packetBuffer = m_BufferManager.GetReadableBuffer(); + } } } // namespace profiling diff --git a/src/profiling/SendCounterPacket.hpp b/src/profiling/SendCounterPacket.hpp index c57546d907..748371b9fa 100644 --- a/src/profiling/SendCounterPacket.hpp +++ b/src/profiling/SendCounterPacket.hpp @@ -9,11 +9,13 @@ #include "ISendCounterPacket.hpp" #include "ICounterDirectory.hpp" #include "IProfilingConnection.hpp" +#include "ProfilingUtils.hpp" #include <atomic> +#include <condition_variable> #include <mutex> #include <thread> -#include <condition_variable> +#include <type_traits> namespace armnn { @@ -31,11 +33,12 @@ public: using IndexValuePairsVector = std::vector<std::pair<uint16_t, uint32_t>>; - SendCounterPacket(IProfilingConnection& profilingConnection, IBufferManager& buffer) + SendCounterPacket(IProfilingConnection& profilingConnection, IBufferManager& buffer, int timeout = 1) : m_ProfilingConnection(profilingConnection) , m_BufferManager(buffer) , m_IsRunning(false) , m_KeepRunning(false) + , m_Timeout(timeout) {} ~SendCounterPacket() { Stop(); } @@ -70,6 +73,10 @@ private: template <typename ExceptionType> void CancelOperationAndThrow(std::unique_ptr<IPacketBuffer>& writerBuffer, const std::string& errorMessage) { + if (std::is_same<ExceptionType, armnn::profiling::BufferExhaustion>::value) + { + SetReadyToRead(); + } if (writerBuffer != nullptr) { // Cancel the operation @@ -80,6 +87,8 @@ private: throw ExceptionType(errorMessage); } + void FlushBuffer(); + IProfilingConnection& m_ProfilingConnection; IBufferManager& m_BufferManager; std::mutex m_WaitMutex; @@ -87,6 +96,7 @@ private: std::thread m_SendThread; std::atomic<bool> m_IsRunning; std::atomic<bool> m_KeepRunning; + int m_Timeout; protected: // Helper methods, protected for testing diff --git a/src/profiling/test/BufferTests.cpp b/src/profiling/test/BufferTests.cpp index a2f3c30849..7a06843ab3 100644 --- a/src/profiling/test/BufferTests.cpp +++ b/src/profiling/test/BufferTests.cpp @@ -130,7 +130,9 @@ BOOST_AUTO_TEST_CASE(BufferReserveExceedingSpaceTest) unsigned int reservedSize = 0; // Cannot reserve buffer bigger than maximum buffer size - BOOST_CHECK_THROW(bufferManager.Reserve(1024, reservedSize), armnn::InvalidArgumentException); + auto reservedBuffer = bufferManager.Reserve(1024, reservedSize); + BOOST_TEST(reservedSize == 0); + BOOST_TEST(!reservedBuffer.get()); } BOOST_AUTO_TEST_CASE(BufferExhaustionTest) @@ -144,7 +146,9 @@ BOOST_AUTO_TEST_CASE(BufferExhaustionTest) BOOST_TEST(packetBuffer.get()); // Cannot reserve buffer when buffer is not available - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize), BufferExhaustion); + auto reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 0); + BOOST_TEST(!reservedBuffer.get()); } BOOST_AUTO_TEST_CASE(BufferReserveMultipleTest) @@ -173,7 +177,9 @@ BOOST_AUTO_TEST_CASE(BufferReserveMultipleTest) // Cannot reserve when buffer is not available unsigned int reservedSize3 = 0; - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize3), BufferExhaustion); + auto reservedBuffer = bufferManager.Reserve(512, reservedSize3); + BOOST_TEST(reservedSize3 == 0); + BOOST_TEST(!reservedBuffer.get()); } BOOST_AUTO_TEST_CASE(BufferReleaseTest) @@ -195,7 +201,9 @@ BOOST_AUTO_TEST_CASE(BufferReleaseTest) // Cannot reserve when buffer is not available unsigned int reservedSize2 = 0; - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion); + auto reservedBuffer = bufferManager.Reserve(512, reservedSize2); + BOOST_TEST(reservedSize2 == 0); + BOOST_TEST(!reservedBuffer.get()); bufferManager.Release(packetBuffer0); @@ -222,7 +230,9 @@ BOOST_AUTO_TEST_CASE(BufferCommitTest) BOOST_TEST(packetBuffer1.get()); unsigned int reservedSize2 = 0; - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion); + auto reservedBuffer = bufferManager.Reserve(512, reservedSize2); + BOOST_TEST(reservedSize2 == 0); + BOOST_TEST(!reservedBuffer.get()); bufferManager.Commit(packetBuffer0, 256); @@ -232,7 +242,10 @@ BOOST_AUTO_TEST_CASE(BufferCommitTest) BOOST_TEST(packetBuffer2->GetSize() == 256); // Buffer not set back to available list after commit - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion); + unsigned int reservedSize = 0; + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 0); + BOOST_TEST(!reservedBuffer.get()); } BOOST_AUTO_TEST_CASE(BufferMarkReadTest) @@ -252,7 +265,9 @@ BOOST_AUTO_TEST_CASE(BufferMarkReadTest) // Cannot reserve when buffer is not available unsigned int reservedSize2 = 0; - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion); + auto reservedBuffer = bufferManager.Reserve(512, reservedSize2); + BOOST_TEST(reservedSize2 == 0); + BOOST_TEST(!reservedBuffer.get()); bufferManager.Commit(packetBuffer0, 256); @@ -262,7 +277,9 @@ BOOST_AUTO_TEST_CASE(BufferMarkReadTest) BOOST_TEST(packetBuffer2->GetSize() == 256); // Buffer not set back to available list after commit - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion); + reservedBuffer = bufferManager.Reserve(512, reservedSize2); + BOOST_TEST(reservedSize2 == 0); + BOOST_TEST(!reservedBuffer.get()); bufferManager.MarkRead(packetBuffer2); diff --git a/src/profiling/test/SendCounterPacketTests.cpp b/src/profiling/test/SendCounterPacketTests.cpp index ba1d470a50..0d21ec0b5d 100644 --- a/src/profiling/test/SendCounterPacketTests.cpp +++ b/src/profiling/test/SendCounterPacketTests.cpp @@ -5,10 +5,11 @@ #include "SendCounterPacketTests.hpp" +#include <CounterDirectory.hpp> +#include <BufferManager.hpp> #include <EncodeVersion.hpp> #include <ProfilingUtils.hpp> #include <SendCounterPacket.hpp> -#include <CounterDirectory.hpp> #include <armnn/Exceptions.hpp> #include <armnn/Conversion.hpp> @@ -2044,4 +2045,196 @@ BOOST_AUTO_TEST_CASE(SendThreadTest3) BOOST_CHECK(mockStreamCounterBuffer.GetReadSize() <= mockStreamCounterBuffer.GetCommittedSize()); } +BOOST_AUTO_TEST_CASE(SendThreadBufferTest) +{ + MockProfilingConnection mockProfilingConnection; + BufferManager bufferManager(1, 1024); + SendCounterPacket sendCounterPacket(mockProfilingConnection, bufferManager, -1); + sendCounterPacket.Start(); + + // Interleaving writes and reads to/from the buffer with pauses to test that the send thread actually waits for + // something to become available for reading + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // SendStreamMetaDataPacket + sendCounterPacket.SendStreamMetaDataPacket(); + + // Read data from the buffer + // Buffer should become readable after commit by SendStreamMetaDataPacket + auto packetBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(packetBuffer.get()); + + std::string processName = GetProcessName().substr(0, 60); + unsigned int processNameSize = processName.empty() ? 0 : boost::numeric_cast<unsigned int>(processName.size()) + 1; + unsigned int streamMetadataPacketsize = 118 + processNameSize; + BOOST_TEST(packetBuffer->GetSize() == streamMetadataPacketsize); + + // Buffer is not available when SendStreamMetaDataPacket already occupied the buffer. + unsigned int reservedSize = 0; + auto reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(!reservedBuffer.get()); + + // Recommit to be read by sendCounterPacket + bufferManager.Commit(packetBuffer, streamMetadataPacketsize); + + sendCounterPacket.SetReadyToRead(); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // The buffer is read by the send thread so it should not be in the readable buffer. + auto readBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(!readBuffer); + + // Successfully reserved the buffer with requested size + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 512); + BOOST_TEST(reservedBuffer.get()); + + // Release the buffer to be used by sendCounterPacket + bufferManager.Release(reservedBuffer); + + // SendCounterDirectoryPacket + CounterDirectory counterDirectory; + sendCounterPacket.SendCounterDirectoryPacket(counterDirectory); + + // Read data from the buffer + // Buffer should become readable after commit by SendCounterDirectoryPacket + auto counterDirectoryPacketBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(counterDirectoryPacketBuffer.get()); + + // Get the size of the Counter Directory Packet + unsigned int counterDirectoryPacketSize = 32; + BOOST_TEST(counterDirectoryPacketBuffer->GetSize() == counterDirectoryPacketSize); + + // Buffer is not available when SendCounterDirectoryPacket already occupied the buffer. + reservedSize = 0; + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 0); + BOOST_TEST(!reservedBuffer.get()); + + // Recommit to be read by sendCounterPacket + bufferManager.Commit(counterDirectoryPacketBuffer, counterDirectoryPacketSize); + + sendCounterPacket.SetReadyToRead(); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // The buffer is read by the send thread so it should not be in the readable buffer. + readBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(!readBuffer); + + // Successfully reserved the buffer with requested size + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 512); + BOOST_TEST(reservedBuffer.get()); + + // Release the buffer to be used by sendCounterPacket + bufferManager.Release(reservedBuffer); + + // SendPeriodicCounterCapturePacket + sendCounterPacket.SendPeriodicCounterCapturePacket(123u, + { + { 1u, 23u }, + { 33u, 1207623u } + }); + + // Read data from the buffer + // Buffer should become readable after commit by SendPeriodicCounterCapturePacket + auto periodicCounterCapturePacketBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(periodicCounterCapturePacketBuffer.get()); + + // Get the size of the Periodic Counter Capture Packet + unsigned int periodicCounterCapturePacketSize = 28; + BOOST_TEST(periodicCounterCapturePacketBuffer->GetSize() == periodicCounterCapturePacketSize); + + // Buffer is not available when SendPeriodicCounterCapturePacket already occupied the buffer. + reservedSize = 0; + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 0); + BOOST_TEST(!reservedBuffer.get()); + + // Recommit to be read by sendCounterPacket + bufferManager.Commit(periodicCounterCapturePacketBuffer, periodicCounterCapturePacketSize); + + sendCounterPacket.SetReadyToRead(); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // The buffer is read by the send thread so it should not be in the readable buffer. + readBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(!readBuffer); + + // Successfully reserved the buffer with requested size + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 512); + BOOST_TEST(reservedBuffer.get()); + + sendCounterPacket.Stop(); +} + +BOOST_AUTO_TEST_CASE(SendThreadBufferTest1) +{ + MockWriteProfilingConnection mockProfilingConnection; + BufferManager bufferManager(3, 1024); + SendCounterPacket sendCounterPacket(mockProfilingConnection, bufferManager, -1); + sendCounterPacket.Start(); + + // SendStreamMetaDataPacket + sendCounterPacket.SendStreamMetaDataPacket(); + + // Read data from the buffer + // Buffer should become readable after commit by SendStreamMetaDataPacket + auto packetBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(packetBuffer.get()); + + std::string processName = GetProcessName().substr(0, 60); + unsigned int processNameSize = processName.empty() ? 0 : boost::numeric_cast<unsigned int>(processName.size()) + 1; + unsigned int streamMetadataPacketsize = 118 + processNameSize; + BOOST_TEST(packetBuffer->GetSize() == streamMetadataPacketsize); + + // Recommit to be read by sendCounterPacket + bufferManager.Commit(packetBuffer, streamMetadataPacketsize); + + sendCounterPacket.SetReadyToRead(); + + // SendCounterDirectoryPacket + CounterDirectory counterDirectory; + sendCounterPacket.SendCounterDirectoryPacket(counterDirectory); + + sendCounterPacket.SetReadyToRead(); + + // SendPeriodicCounterCapturePacket + sendCounterPacket.SendPeriodicCounterCapturePacket(123u, + { + { 1u, 23u }, + { 33u, 1207623u } + }); + + sendCounterPacket.SetReadyToRead(); + + sendCounterPacket.Stop(); + + // The buffer is read by the send thread so it should not be in the readable buffer. + auto readBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(!readBuffer); + + // Successfully reserved the buffer with requested size + unsigned int reservedSize = 0; + auto reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 512); + BOOST_TEST(reservedBuffer.get()); + + // Check that data was actually written to the profiling connection in any order + std::vector<uint32_t> writtenData = mockProfilingConnection.GetWrittenData(); + std::vector<uint32_t> expectedOutput{streamMetadataPacketsize, 32, 28}; + BOOST_TEST(writtenData.size() == 3); + bool foundStreamMetaDataPacket = + std::find(writtenData.begin(), writtenData.end(), streamMetadataPacketsize) != writtenData.end(); + bool foundCounterDirectoryPacket = std::find(writtenData.begin(), writtenData.end(), 32) != writtenData.end(); + bool foundPeriodicCounterCapturePacket = std::find(writtenData.begin(), writtenData.end(), 28) != writtenData.end(); + BOOST_TEST(foundStreamMetaDataPacket); + BOOST_TEST(foundCounterDirectoryPacket); + BOOST_TEST(foundPeriodicCounterCapturePacket); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/src/profiling/test/SendCounterPacketTests.hpp b/src/profiling/test/SendCounterPacketTests.hpp index 3c3427c4c7..584df5e520 100644 --- a/src/profiling/test/SendCounterPacketTests.hpp +++ b/src/profiling/test/SendCounterPacketTests.hpp @@ -42,6 +42,35 @@ private: bool m_IsOpen; }; +class MockWriteProfilingConnection : public IProfilingConnection +{ +public: + MockWriteProfilingConnection() + : m_IsOpen(true) + {} + + bool IsOpen() override { return m_IsOpen; } + + void Close() override { m_IsOpen = false; } + + bool WritePacket(const unsigned char* buffer, uint32_t length) override + { + m_WrittenData.push_back(length); + return buffer != nullptr && length > 0; + } + + Packet ReadPacket(uint32_t timeout) override { return Packet(); } + + std::vector<uint32_t> GetWrittenData() + { + return m_WrittenData; + } + +private: + bool m_IsOpen; + std::vector<uint32_t> m_WrittenData; +}; + class MockPacketBuffer : public IPacketBuffer { public: |