From 09ad6f909f25aef02b7f53bba320b534b9260786 Mon Sep 17 00:00:00 2001 From: Finn Williams Date: Thu, 19 Dec 2019 17:05:18 +0000 Subject: IVGCVSW-4229 Fix Intermittent failures in ExternalProfiling * Added a BufferManager.Reset() method to prevent packets being retained after a test * Fixed a bug causing the send thread to wait needlessly before moving to active state * Refactored SendCoundPacketTests and ProfilingTests test helper classes * Fixed issue where WaitForPacketSent could miss a notification and timeout Signed-off-by: Finn Williams Change-Id: I353a652260c2f7dd465baa9e979e22f50f3ca6a7 --- src/profiling/BufferManager.cpp | 38 ++- src/profiling/BufferManager.hpp | 9 +- src/profiling/FileOnlyProfilingConnection.cpp | 13 +- src/profiling/ProfilingService.cpp | 8 +- src/profiling/ProfilingService.hpp | 2 +- src/profiling/SendCounterPacket.cpp | 36 ++- src/profiling/SendCounterPacket.hpp | 10 +- .../test/FileOnlyProfilingDecoratorTests.cpp | 47 ++- src/profiling/test/ProfilingTests.cpp | 316 ++++++--------------- src/profiling/test/ProfilingTests.hpp | 50 +++- src/profiling/test/SendCounterPacketTests.cpp | 63 ++-- src/profiling/test/SendCounterPacketTests.hpp | 67 ++++- 12 files changed, 321 insertions(+), 338 deletions(-) diff --git a/src/profiling/BufferManager.cpp b/src/profiling/BufferManager.cpp index 6481c5e9cb..b24bf4b5b0 100644 --- a/src/profiling/BufferManager.cpp +++ b/src/profiling/BufferManager.cpp @@ -5,9 +5,6 @@ #include "BufferManager.hpp" #include "PacketBuffer.hpp" -#include "ProfilingUtils.hpp" - -#include namespace armnn { @@ -16,15 +13,10 @@ namespace profiling { BufferManager::BufferManager(unsigned int numberOfBuffers, unsigned int maxPacketSize) - : m_MaxBufferSize(maxPacketSize) + : m_MaxBufferSize(maxPacketSize), + m_NumberOfBuffers(numberOfBuffers) { - m_AvailableList.reserve(numberOfBuffers); - for (unsigned int i = 0; i < numberOfBuffers; ++i) - { - IPacketBufferPtr buffer = std::make_unique(maxPacketSize); - m_AvailableList.emplace_back(std::move(buffer)); - } - m_ReadableList.reserve(numberOfBuffers); + Initialize(); } IPacketBufferPtr BufferManager::Reserve(unsigned int requestedSize, unsigned int& reservedSize) @@ -55,7 +47,17 @@ void BufferManager::Commit(IPacketBufferPtr& packetBuffer, unsigned int size) readableListLock.lock(); m_ReadableList.push_back(std::move(packetBuffer)); readableListLock.unlock(); - m_ReadDataAvailable.notify_one(); +} + +void BufferManager::Initialize() +{ + m_AvailableList.reserve(m_NumberOfBuffers); + for (unsigned int i = 0; i < m_NumberOfBuffers; ++i) + { + IPacketBufferPtr buffer = std::make_unique(m_MaxBufferSize); + m_AvailableList.emplace_back(std::move(buffer)); + } + m_ReadableList.reserve(m_NumberOfBuffers); } void BufferManager::Release(IPacketBufferPtr& packetBuffer) @@ -67,6 +69,18 @@ void BufferManager::Release(IPacketBufferPtr& packetBuffer) availableListLock.unlock(); } +void BufferManager::Reset() +{ + //This method should only be called once all threads have been joined + std::lock_guard readableListLock(m_ReadableMutex); + std::lock_guard availableListLock(m_AvailableMutex); + + m_AvailableList.clear(); + m_ReadableList.clear(); + + Initialize(); +} + IPacketBufferPtr BufferManager::GetReadableBuffer() { std::unique_lock readableListLock(m_ReadableMutex); diff --git a/src/profiling/BufferManager.hpp b/src/profiling/BufferManager.hpp index afa4240f14..495b113867 100644 --- a/src/profiling/BufferManager.hpp +++ b/src/profiling/BufferManager.hpp @@ -26,6 +26,8 @@ public: IPacketBufferPtr Reserve(unsigned int requestedSize, unsigned int& reservedSize) override; + void Reset(); + void Commit(IPacketBufferPtr& packetBuffer, unsigned int size) override; void Release(IPacketBufferPtr& packetBuffer) override; @@ -35,8 +37,12 @@ public: void MarkRead(IPacketBufferPtr& packetBuffer) override; private: + void Initialize(); + // Maximum buffer size unsigned int m_MaxBufferSize; + // Number of buffers + unsigned int m_NumberOfBuffers; // List of available packet buffers std::vector m_AvailableList; @@ -49,9 +55,6 @@ private: // Mutex for readable packet buffer list std::mutex m_ReadableMutex; - - // Condition to notify when data is availabe to be read - std::condition_variable m_ReadDataAvailable; }; } // namespace profiling diff --git a/src/profiling/FileOnlyProfilingConnection.cpp b/src/profiling/FileOnlyProfilingConnection.cpp index 75862616b9..1db8030313 100644 --- a/src/profiling/FileOnlyProfilingConnection.cpp +++ b/src/profiling/FileOnlyProfilingConnection.cpp @@ -166,13 +166,16 @@ bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint3 Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout) { std::unique_lock lck(m_PacketAvailableMutex); - if (m_PacketQueue.empty()) + + // Here we are using m_PacketQueue.empty() as a predicate variable + // The conditional variable will wait until packetQueue is not empty or until a timeout + if(!m_ConditionPacketAvailable.wait_for(lck, + std::chrono::milliseconds(timeout), + [&]{return !m_PacketQueue.empty();})) { - if(m_ConditionPacketAvailable.wait_for(lck, std::chrono::milliseconds(timeout)) == std::cv_status::timeout) - { - throw armnn::TimeoutException("Thread has timed out as per requested time limit"); - } + throw armnn::TimeoutException("Thread has timed out as per requested time limit"); } + Packet returnedPacket = std::move(m_PacketQueue.front()); m_PacketQueue.pop(); return returnedPacket; diff --git a/src/profiling/ProfilingService.cpp b/src/profiling/ProfilingService.cpp index 9119597ceb..cc41fb2d69 100644 --- a/src/profiling/ProfilingService.cpp +++ b/src/profiling/ProfilingService.cpp @@ -371,6 +371,7 @@ void ProfilingService::Reset() m_CounterIndex.clear(); m_CounterValues.clear(); m_CounterDirectory.Clear(); + m_BufferManager.Reset(); // ...finally reset the profiling state machine m_StateMachine.Reset(); @@ -379,11 +380,12 @@ void ProfilingService::Reset() void ProfilingService::Stop() { // The order in which we reset/stop the components is not trivial! - - // First stop the threads (Command Handler first)... + // First stop the producing threads + // Command Handler first as it is responsible for launching then Periodic Counter capture thread m_CommandHandler.Stop(); - m_SendCounterPacket.Stop(false); m_PeriodicCounterCapture.Stop(); + // The the consuming thread + m_SendCounterPacket.Stop(false); // ...then close and destroy the profiling connection... if (m_ProfilingConnection != nullptr && m_ProfilingConnection->IsOpen()) diff --git a/src/profiling/ProfilingService.hpp b/src/profiling/ProfilingService.hpp index 746edb8a6c..24748bb69b 100644 --- a/src/profiling/ProfilingService.hpp +++ b/src/profiling/ProfilingService.hpp @@ -206,7 +206,7 @@ protected: { instance.m_StateMachine.TransitionToState(newState); } - void WaitForPacketSent(ProfilingService& instance, uint32_t timeout = 1000) + bool WaitForPacketSent(ProfilingService& instance, uint32_t timeout = 1000) { return instance.m_SendCounterPacket.WaitForPacketSent(timeout); } diff --git a/src/profiling/SendCounterPacket.cpp b/src/profiling/SendCounterPacket.cpp index 5badea9d83..5128331c46 100644 --- a/src/profiling/SendCounterPacket.cpp +++ b/src/profiling/SendCounterPacket.cpp @@ -936,6 +936,8 @@ void SendCounterPacket::Start(IProfilingConnection& profilingConnection) // no need for a mutex as the send thread can not be running at this point m_ReadyToRead = false; + m_PacketSent = false; + // Start the send thread m_SendThread = std::thread(&SendCounterPacket::Send, this, std::ref(profilingConnection)); } @@ -1016,15 +1018,20 @@ void SendCounterPacket::Send(IProfilingConnection& profilingConnection) { std::unique_lock lock(m_WaitMutex); - m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&]{ return m_ReadyToRead; }); - + bool timeout = m_WaitCondition.wait_for(lock, + std::chrono::milliseconds(m_Timeout), + [&]{ return m_ReadyToRead; }); + // If we get notified we need to flush the buffer again + if(timeout) + { + // Otherwise if we just timed out don't flush the buffer + continue; + } //reset condition variable predicate for next use m_ReadyToRead = false; } // Wait condition lock scope - End - - // Do not flush the buffer again - continue; + break; case ProfilingState::Active: default: // Wait condition lock scope - Begin @@ -1103,15 +1110,32 @@ void SendCounterPacket::FlushBuffer(IProfilingConnection& profilingConnection, b // Get the next available readable buffer packetBuffer = m_BufferManager.GetReadableBuffer(); } - // Check whether at least a packet has been sent if (packetsSent && notifyWatchers) { + // Wait for the parent thread to release its mutex if necessary + { + std::lock_guard lck(m_PacketSentWaitMutex); + m_PacketSent = true; + } // Notify to any watcher that something has been sent m_PacketSentWaitCondition.notify_one(); } } +bool SendCounterPacket::WaitForPacketSent(uint32_t timeout = 1000) +{ + std::unique_lock lock(m_PacketSentWaitMutex); + // Blocks until notified that at least a packet has been sent or until timeout expires. + bool timedOut = m_PacketSentWaitCondition.wait_for(lock, + std::chrono::milliseconds(timeout), + [&] { return m_PacketSent; }); + + m_PacketSent = false; + + return timedOut; +} + } // namespace profiling } // namespace armnn diff --git a/src/profiling/SendCounterPacket.hpp b/src/profiling/SendCounterPacket.hpp index 11587552b8..80d6f8437a 100644 --- a/src/profiling/SendCounterPacket.hpp +++ b/src/profiling/SendCounterPacket.hpp @@ -63,13 +63,7 @@ public: void Start(IProfilingConnection& profilingConnection); void Stop(bool rethrowSendThreadExceptions = true); bool IsRunning() { return m_IsRunning.load(); } - - void WaitForPacketSent(uint32_t timeout = 1000) - { - std::unique_lock lock(m_PacketSentWaitMutex); - // Blocks until notified that at least a packet has been sent or until timeout expires. - m_PacketSentWaitCondition.wait_for(lock, std::chrono::milliseconds(timeout)); - } + bool WaitForPacketSent(uint32_t timeout); private: void Send(IProfilingConnection& profilingConnection); @@ -111,6 +105,8 @@ private: std::atomic m_KeepRunning; // m_ReadyToRead will be protected by m_WaitMutex bool m_ReadyToRead; + // m_PacketSent will be protected by m_PacketSentWaitMutex + bool m_PacketSent; std::exception_ptr m_SendThreadException; std::mutex m_PacketSentWaitMutex; std::condition_variable m_PacketSentWaitCondition; diff --git a/src/profiling/test/FileOnlyProfilingDecoratorTests.cpp b/src/profiling/test/FileOnlyProfilingDecoratorTests.cpp index b30b38f2e4..26634704ae 100644 --- a/src/profiling/test/FileOnlyProfilingDecoratorTests.cpp +++ b/src/profiling/test/FileOnlyProfilingDecoratorTests.cpp @@ -23,6 +23,16 @@ using namespace armnn; using namespace std::chrono_literals; +class FileOnlyHelperService : public ProfilingService +{ + public: + // Wait for a notification from the send thread + bool WaitForPacketsSent(uint32_t timeout = 1000) + { + return ProfilingService::WaitForPacketSent(ProfilingService::Instance(), timeout); + } +}; + BOOST_AUTO_TEST_SUITE(FileOnlyProfilingDecoratorTests) BOOST_AUTO_TEST_CASE(DumpOutgoingValidFileEndToEnd) @@ -38,6 +48,8 @@ BOOST_AUTO_TEST_CASE(DumpOutgoingValidFileEndToEnd) options.m_OutgoingCaptureFile = tempPath.string(); options.m_CapturePeriod = 100; + FileOnlyHelperService helper; + // Enable the profiling service ProfilingService& profilingService = ProfilingService::Instance(); profilingService.ResetExternalProfilingOptions(options, true); @@ -45,35 +57,24 @@ BOOST_AUTO_TEST_CASE(DumpOutgoingValidFileEndToEnd) profilingService.Update(); profilingService.Update(); - uint32_t timeout = 1000; // Wait for a maximum of 1000mSec - uint32_t sleepTime = 2; // in 2mSec intervals. - uint32_t timeSlept = 0; - // Give the profiling service sending thread time start executing and send the stream metadata. - while (profilingService.GetCurrentState() != ProfilingState::WaitingForAck) - { - if (timeSlept >= timeout) - { - BOOST_FAIL("Timeout: Profiling service did not switch to WaitingForAck state"); - } - std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); - timeSlept += sleepTime; - } + BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::WaitingForAck); profilingService.Update(); - - timeSlept = 0; - - while (profilingService.GetCurrentState() != profiling::ProfilingState::Active) + // First packet sent will be the SendStreamMetaDataPacket, it's possible though unlikely that it will be sent twice + // The second or possibly third packet will be the CounterDirectoryPacket which means the + // ConnectionAcknowledgedCommandHandler has set the state to active + uint32_t packetCount = 0; + while(profilingService.GetCurrentState() != ProfilingState::Active && packetCount < 3) { - if (timeSlept >= timeout) + if(!helper.WaitForPacketsSent()) { - BOOST_FAIL("Timeout: Profiling service did not switch to Active state"); + BOOST_FAIL("Timeout waiting for packets"); } - std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); - timeSlept += sleepTime; + packetCount++; } + BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); // Minimum test here is to check that the file was created. BOOST_CHECK(boost::filesystem::exists(tempPath.c_str()) == true); @@ -86,13 +87,11 @@ BOOST_AUTO_TEST_CASE(DumpOutgoingValidFileEndToEnd) // period should be enough to have some data in the file. // Wait for 1 collection period plus a bit of overhead.. - std::this_thread::sleep_for(std::chrono::milliseconds(150)); + helper.WaitForPacketsSent(); // In order to flush the files we need to gracefully close the profiling service. options.m_EnableProfiling = false; profilingService.ResetExternalProfilingOptions(options, true); - // Wait a short time to allow the threads to clean themselves up. - std::this_thread::sleep_for(std::chrono::milliseconds(500)); // The output file size should be greater than 0. struct stat statusBuffer; diff --git a/src/profiling/test/ProfilingTests.cpp b/src/profiling/test/ProfilingTests.cpp index 1519cd42fd..13863b6074 100644 --- a/src/profiling/test/ProfilingTests.cpp +++ b/src/profiling/test/ProfilingTests.cpp @@ -41,6 +41,7 @@ #include using namespace armnn::profiling; +using PacketType = MockProfilingConnection::PacketType; BOOST_AUTO_TEST_SUITE(ExternalProfiling) @@ -143,7 +144,6 @@ BOOST_AUTO_TEST_CASE(CheckCommandHandler) profilingStateMachine.TransitionToState(ProfilingState::NotConnected); profilingStateMachine.TransitionToState(ProfilingState::WaitingForAck); - // A 1mSec timeout should be enough to slow the command handler thread a little. CommandHandler commandHandler0(1, true, commandHandlerRegistry, packetVersionResolver); // This should start the command handler thread return the connection ack and put the profiling @@ -184,16 +184,32 @@ BOOST_AUTO_TEST_CASE(CheckCommandHandler) } std::this_thread::sleep_for(std::chrono::milliseconds(2)); } - // and leave another short period for the timeout exception to be processed and the loop to break. - std::this_thread::sleep_for(std::chrono::milliseconds(3)); // The command handler loop should have stopped after the timeout. - BOOST_CHECK(!commandHandler1.IsRunning()); + // wait for the timeout exception to be processed and the loop to break. + uint32_t timeout = 50; + uint32_t timeSlept = 0; + while (commandHandler1.IsRunning()) + { + if (timeSlept >= timeout) + { + BOOST_FAIL("Timeout: The command handler loop did not stop after the timeout"); + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + timeSlept ++; + } commandHandler1.Stop(); // The state machine should never have received the ack so will still be in WaitingForAck. BOOST_CHECK(profilingStateMachine.GetCurrentState() == ProfilingState::WaitingForAck); + // Now try sending a bad connection acknowledged packet + TestProfilingConnectionBadAckPacket testProfilingConnectionBadAckPacket; + commandHandler1.Start(testProfilingConnectionBadAckPacket); + commandHandler1.Stop(); + // This should also not change the state machine + BOOST_CHECK(profilingStateMachine.GetCurrentState() == ProfilingState::WaitingForAck); + // Disable stop after timeout and now commandHandler1 should persist after a timeout commandHandler1.SetStopAfterTimeout(false); // Restart the thread. @@ -2362,24 +2378,8 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodConnectionAcknowledgedPacket) profilingService.Update(); // Start the command handler and the send thread // Wait for the Stream Metadata packet to be sent - helper.WaitForProfilingPacketsSent(mockProfilingConnection); - - // Check that the mock profiling connection contains one Stream Metadata packet - const std::vector writtenData = mockProfilingConnection->GetWrittenData(); - if (writtenData.size() > 1) - { - // If this thread has been blocked for some time a second or more Stream Metadata packet could have been sent. - // In these cases make sure all packet are of length streamMetadataPacketsize - for(uint32_t packetLength : writtenData) - { - BOOST_TEST(packetLength == streamMetadataPacketsize); - } - } - else - { - BOOST_TEST(writtenData.size() == 1); - BOOST_TEST(writtenData[0] == streamMetadataPacketsize); - } + BOOST_CHECK(helper.WaitForPacketsSent( + mockProfilingConnection, PacketType::StreamMetaData, streamMetadataPacketsize) >= 1); // Write a valid "Connection Acknowledged" packet into the mock profiling connection, to simulate a valid // reply from an external profiling service @@ -2400,7 +2400,7 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodConnectionAcknowledgedPacket) mockProfilingConnection->WritePacket(std::move(connectionAcknowledgedPacket)); // Wait for the counter directory packet to ensure the ConnectionAcknowledgedCommandHandler has run. - helper.WaitForProfilingPacketsSent(mockProfilingConnection, 5000); + BOOST_CHECK(helper.WaitForPacketsSent(mockProfilingConnection, PacketType::CounterDirectory) == 1); // The Connection Acknowledged Command Handler should have updated the profiling state accordingly BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); @@ -2433,17 +2433,10 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodRequestCounterDirectoryPacket) MockProfilingConnection* mockProfilingConnection = helper.GetMockProfilingConnection(); BOOST_CHECK(mockProfilingConnection); - // Wait for the Stream Metadata packet the be sent - // (we are not testing the connection acknowledgement here so it will be ignored by this test) - helper.WaitForProfilingPacketsSent(mockProfilingConnection); - // Force the profiling service to the "Active" state helper.ForceTransitionToState(ProfilingState::Active); BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); - // Remove the packets received so far - mockProfilingConnection->Clear(); - // Write a valid "Request Counter Directory" packet into the mock profiling connection, to simulate a valid // reply from an external profiling service @@ -2462,14 +2455,10 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodRequestCounterDirectoryPacket) // Write the packet to the mock profiling connection mockProfilingConnection->WritePacket(std::move(requestCounterDirectoryPacket)); - // Wait for the Counter Directory packet to be sent - helper.WaitForProfilingPacketsSent(mockProfilingConnection, 5000); - - // Check that the mock profiling connection contains one Counter Directory packet - const std::vector writtenData = mockProfilingConnection->GetWrittenData(); - BOOST_TEST(writtenData.size() == 2); - BOOST_TEST(writtenData[0] == 427); // The size of the expected Timeline Directory packet - BOOST_TEST(writtenData[1] ==656); // The size of the expected Counter Directory packet + // Expecting one CounterDirectory Packet of length 656 + // and one TimelineMessageDirectory packet of length 427 + BOOST_CHECK(helper.WaitForPacketsSent(mockProfilingConnection, PacketType::CounterDirectory, 656) == 1); + BOOST_CHECK(helper.WaitForPacketsSent(mockProfilingConnection, PacketType::TimelineMessageDirectory, 427) == 1); // The Request Counter Directory Command Handler should not have updated the profiling state BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); @@ -2502,10 +2491,6 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadPeriodicCounterSelectionPacketInval MockProfilingConnection* mockProfilingConnection = helper.GetMockProfilingConnection(); BOOST_CHECK(mockProfilingConnection); - // Wait for the Stream Metadata packet the be sent - // (we are not testing the connection acknowledgement here so it will be ignored by this test) - helper.WaitForProfilingPacketsSent(mockProfilingConnection); - // Force the profiling service to the "Active" state helper.ForceTransitionToState(ProfilingState::Active); BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); @@ -2548,29 +2533,10 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadPeriodicCounterSelectionPacketInval // Write the packet to the mock profiling connection mockProfilingConnection->WritePacket(std::move(periodicCounterSelectionPacket)); - // Expecting one Periodic Counter Selection packet and at least one Periodic Counter Capture packet - int expectedPackets = 2; - std::vector receivedPackets; - - // Keep waiting until all the expected packets have been received - do - { - helper.WaitForProfilingPacketsSent(mockProfilingConnection); - const std::vector writtenData = mockProfilingConnection->GetWrittenData(); - if (writtenData.empty()) - { - BOOST_ERROR("Packets should be available for reading at this point"); - return; - } - receivedPackets.insert(receivedPackets.end(), writtenData.begin(), writtenData.end()); - expectedPackets -= boost::numeric_cast(writtenData.size()); - } while (expectedPackets > 0); - BOOST_TEST(!receivedPackets.empty()); - - // The size of the expected Periodic Counter Selection packet - BOOST_TEST((std::find(receivedPackets.begin(), receivedPackets.end(), 14) != receivedPackets.end())); - // The size of the expected Periodic Counter Capture packet - BOOST_TEST((std::find(receivedPackets.begin(), receivedPackets.end(), 22) != receivedPackets.end())); + // Expecting one Periodic Counter Selection packet of length 14 + // and at least one Periodic Counter Capture packet of length 22 + BOOST_CHECK(helper.WaitForPacketsSent(mockProfilingConnection, PacketType::PeriodicCounterSelection, 14) == 1); + BOOST_CHECK(helper.WaitForPacketsSent(mockProfilingConnection, PacketType::PeriodicCounterCapture, 22) >= 1); // The Periodic Counter Selection Handler should not have updated the profiling state BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); @@ -2605,15 +2571,12 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodPeriodicCounterSelectionPacketNoCo // Wait for the Stream Metadata packet the be sent // (we are not testing the connection acknowledgement here so it will be ignored by this test) - helper.WaitForProfilingPacketsSent(mockProfilingConnection); + helper.WaitForPacketsSent(mockProfilingConnection, PacketType::StreamMetaData); // Force the profiling service to the "Active" state helper.ForceTransitionToState(ProfilingState::Active); BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); - // Remove the packets received so far - mockProfilingConnection->Clear(); - // Write a "Periodic Counter Selection" packet into the mock profiling connection, to simulate an input from an // external profiling service @@ -2632,16 +2595,15 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodPeriodicCounterSelectionPacketNoCo // Write the packet to the mock profiling connection mockProfilingConnection->WritePacket(std::move(periodicCounterSelectionPacket)); - // Wait for the Periodic Counter Selection packet to be sent - helper.WaitForProfilingPacketsSent(mockProfilingConnection, 5000); + // Wait for the Periodic Counter Selection packet of length 12 to be sent + // The size of the expected Periodic Counter Selection (echos the sent one) + BOOST_CHECK(helper.WaitForPacketsSent(mockProfilingConnection, PacketType::PeriodicCounterSelection, 12) == 1); // The Periodic Counter Selection Handler should not have updated the profiling state BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); - // Check that the mock profiling connection contains one Periodic Counter Selection - const std::vector writtenData = mockProfilingConnection->GetWrittenData(); - BOOST_TEST(writtenData.size() == 1); // Only one packet is expected (no Periodic Counter packets) - BOOST_TEST(writtenData[0] == 12); // The size of the expected Periodic Counter Selection (echos the sent one) + // No Periodic Counter packets are expected + BOOST_CHECK(helper.WaitForPacketsSent(mockProfilingConnection, PacketType::PeriodicCounterCapture, 0, 0) == 0); // Reset the profiling service to stop any running thread options.m_EnableProfiling = false; @@ -2671,17 +2633,14 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodPeriodicCounterSelectionPacketSing MockProfilingConnection* mockProfilingConnection = helper.GetMockProfilingConnection(); BOOST_CHECK(mockProfilingConnection); - // Wait for the Stream Metadata packet the be sent + // Wait for the Stream Metadata packet to be sent // (we are not testing the connection acknowledgement here so it will be ignored by this test) - helper.WaitForProfilingPacketsSent(mockProfilingConnection); + helper.WaitForPacketsSent(mockProfilingConnection, PacketType::StreamMetaData); // Force the profiling service to the "Active" state helper.ForceTransitionToState(ProfilingState::Active); BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); - // Remove the packets received so far - mockProfilingConnection->Clear(); - // Write a "Periodic Counter Selection" packet into the mock profiling connection, to simulate an input from an // external profiling service @@ -2715,29 +2674,10 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodPeriodicCounterSelectionPacketSing // Write the packet to the mock profiling connection mockProfilingConnection->WritePacket(std::move(periodicCounterSelectionPacket)); - // Expecting one Periodic Counter Selection packet and at least one Periodic Counter Capture packet - int expectedPackets = 2; - std::vector receivedPackets; - - // Keep waiting until all the expected packets have been received - do - { - helper.WaitForProfilingPacketsSent(mockProfilingConnection); - const std::vector writtenData = mockProfilingConnection->GetWrittenData(); - if (writtenData.empty()) - { - BOOST_ERROR("Packets should be available for reading at this point"); - return; - } - receivedPackets.insert(receivedPackets.end(), writtenData.begin(), writtenData.end()); - expectedPackets -= boost::numeric_cast(writtenData.size()); - } while (expectedPackets > 0); - BOOST_TEST(!receivedPackets.empty()); - - // The size of the expected Periodic Counter Selection packet (echos the sent one) - BOOST_TEST((std::find(receivedPackets.begin(), receivedPackets.end(), 14) != receivedPackets.end())); - // The size of the expected Periodic Counter Capture packet - BOOST_TEST((std::find(receivedPackets.begin(), receivedPackets.end(), 22) != receivedPackets.end())); + // Expecting one Periodic Counter Selection packet of length 14 + // and at least one Periodic Counter Capture packet of length 22 + BOOST_CHECK(helper.WaitForPacketsSent(mockProfilingConnection, PacketType::PeriodicCounterSelection, 14) == 1); + BOOST_CHECK(helper.WaitForPacketsSent(mockProfilingConnection, PacketType::PeriodicCounterCapture, 22) >= 1); // The Periodic Counter Selection Handler should not have updated the profiling state BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); @@ -2771,15 +2711,12 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodPeriodicCounterSelectionPacketMult // Wait for the Stream Metadata packet the be sent // (we are not testing the connection acknowledgement here so it will be ignored by this test) - helper.WaitForProfilingPacketsSent(mockProfilingConnection); + helper.WaitForPacketsSent(mockProfilingConnection, PacketType::StreamMetaData); // Force the profiling service to the "Active" state helper.ForceTransitionToState(ProfilingState::Active); BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); - // Remove the packets received so far - mockProfilingConnection->Clear(); - // Write a "Periodic Counter Selection" packet into the mock profiling connection, to simulate an input from an // external profiling service @@ -2815,29 +2752,10 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodPeriodicCounterSelectionPacketMult // Write the packet to the mock profiling connection mockProfilingConnection->WritePacket(std::move(periodicCounterSelectionPacket)); - // Expecting one Periodic Counter Selection packet and at least one Periodic Counter Capture packet - int expectedPackets = 2; - std::vector receivedPackets; - - // Keep waiting until all the expected packets have been received - do - { - helper.WaitForProfilingPacketsSent(mockProfilingConnection); - const std::vector writtenData = mockProfilingConnection->GetWrittenData(); - if (writtenData.empty()) - { - BOOST_ERROR("Packets should be available for reading at this point"); - return; - } - receivedPackets.insert(receivedPackets.end(), writtenData.begin(), writtenData.end()); - expectedPackets -= boost::numeric_cast(writtenData.size()); - } while (expectedPackets > 0); - BOOST_TEST(!receivedPackets.empty()); - - // The size of the expected Periodic Counter Selection packet (echos the sent one) - BOOST_TEST((std::find(receivedPackets.begin(), receivedPackets.end(), 16) != receivedPackets.end())); - // The size of the expected Periodic Counter Capture packet - BOOST_TEST((std::find(receivedPackets.begin(), receivedPackets.end(), 28) != receivedPackets.end())); + // Expecting one PeriodicCounterSelection Packet with a length of 16 + // And at least one PeriodicCounterCapture Packet with a length of 28 + BOOST_CHECK(helper.WaitForPacketsSent(mockProfilingConnection, PacketType::PeriodicCounterSelection, 16) == 1); + BOOST_CHECK(helper.WaitForPacketsSent(mockProfilingConnection, PacketType::PeriodicCounterCapture, 28) >= 1); // The Periodic Counter Selection Handler should not have updated the profiling state BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); @@ -2883,7 +2801,7 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceDisconnect) // Wait for the Stream Metadata packet the be sent // (we are not testing the connection acknowledgement here so it will be ignored by this test) - helper.WaitForProfilingPacketsSent(mockProfilingConnection); + helper.WaitForPacketsSent(mockProfilingConnection, PacketType::StreamMetaData); // Force the profiling service to the "Active" state helper.ForceTransitionToState(ProfilingState::Active); @@ -2928,15 +2846,12 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodPerJobCounterSelectionPacket) // Wait for the Stream Metadata packet the be sent // (we are not testing the connection acknowledgement here so it will be ignored by this test) - helper.WaitForProfilingPacketsSent(mockProfilingConnection); + helper.WaitForPacketsSent(mockProfilingConnection, PacketType::StreamMetaData); // Force the profiling service to the "Active" state helper.ForceTransitionToState(ProfilingState::Active); BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); - // Remove the packets received so far - mockProfilingConnection->Clear(); - // Write a "Per-Job Counter Selection" packet into the mock profiling connection, to simulate an input from an // external profiling service @@ -2959,14 +2874,15 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceGoodPerJobCounterSelectionPacket) // the Per-Job Counter Selection packet gets processed by the profiling service std::this_thread::sleep_for(std::chrono::milliseconds(5)); - // The Per-Job Counter Selection packets are dropped silently, so there should be no reply coming - // from the profiling service - const std::vector writtenData = mockProfilingConnection->GetWrittenData(); - BOOST_TEST(writtenData.empty()); - // The Per-Job Counter Selection Command Handler should not have updated the profiling state BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); + // The Per-Job Counter Selection packets are dropped silently, so there should be no reply coming + // from the profiling service + const auto StreamMetaDataSize = static_cast( + helper.WaitForPacketsSent(mockProfilingConnection, PacketType::StreamMetaData, 0, 0)); + BOOST_CHECK(StreamMetaDataSize == mockProfilingConnection->GetWrittenDataSize()); + // Reset the profiling service to stop any running thread options.m_EnableProfiling = false; profilingService.ResetExternalProfilingOptions(options, true); @@ -3015,6 +2931,11 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceEnabled) std::stringstream ss; StreamRedirector streamRedirector(std::cout, ss.rdbuf()); profilingService.Update(); + + // Reset the profiling service to stop any running thread + options.m_EnableProfiling = false; + profilingService.ResetExternalProfilingOptions(options, true); + streamRedirector.CancelRedirect(); // Check that the expected error has occurred and logged to the standard output @@ -3023,9 +2944,6 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceEnabled) std::cout << ss.str(); BOOST_FAIL("Expected string not found."); } - // Reset the profiling service to stop any running thread - options.m_EnableProfiling = false; - profilingService.ResetExternalProfilingOptions(options, true); } BOOST_AUTO_TEST_CASE(CheckProfilingServiceEnabledRuntime) @@ -3049,6 +2967,10 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceEnabledRuntime) StreamRedirector streamRedirector(std::cout, ss.rdbuf()); profilingService.Update(); + // Reset the profiling service to stop any running thread + options.m_EnableProfiling = false; + profilingService.ResetExternalProfilingOptions(options, true); + streamRedirector.CancelRedirect(); // Check that the expected error has occurred and logged to the standard output @@ -3057,9 +2979,6 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceEnabledRuntime) std::cout << ss.str(); BOOST_FAIL("Expected string not found."); } - // Reset the profiling service to stop any running thread - options.m_EnableProfiling = false; - profilingService.ResetExternalProfilingOptions(options, true); } BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadConnectionAcknowledgedPacket) @@ -3073,11 +2992,6 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadConnectionAcknowledgedPacket) std::stringstream ss; StreamRedirector streamRedirector(std::cout, ss.rdbuf()); - // Calculate the size of a Stream Metadata packet - std::string processName = GetProcessName().substr(0, 60); - unsigned int processNameSize = processName.empty() ? 0 : boost::numeric_cast(processName.size()) + 1; - unsigned int streamMetadataPacketsize = 118 + processNameSize; - // Reset the profiling service to the uninitialized state armnn::Runtime::CreationOptions::ExternalProfilingOptions options; options.m_EnableProfiling = true; @@ -3094,34 +3008,7 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadConnectionAcknowledgedPacket) MockProfilingConnection* mockProfilingConnection = helper.GetMockProfilingConnection(); BOOST_CHECK(mockProfilingConnection); - // Remove the packets received so far - mockProfilingConnection->Clear(); - BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::WaitingForAck); - profilingService.Update(); - - // Wait for the Stream Metadata packet to be sent - helper.WaitForProfilingPacketsSent(mockProfilingConnection); - - // Check that the mock profiling connection contains one Stream Metadata packet - const std::vector writtenData = mockProfilingConnection->GetWrittenData(); - if (writtenData.size() > 1) - { - // If this thread has been blocked for some time a second or more Stream Metadata packet could have been sent. - // In these cases make sure all packet are of length streamMetadataPacketsize - for(uint32_t packetLength : writtenData) - { - BOOST_TEST(packetLength == streamMetadataPacketsize); - } - } - else - { - BOOST_TEST(writtenData.size() == 1); - BOOST_TEST(writtenData[0] == streamMetadataPacketsize); - } - - // Write a valid "Connection Acknowledged" packet into the mock profiling connection, to simulate a valid - // reply from an external profiling service // Connection Acknowledged Packet header (word 0, word 1 is always zero): // 26:31 [6] packet_family: Control Packet Family, value 0b000000 @@ -3134,13 +3021,16 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadConnectionAcknowledgedPacket) // Create the Connection Acknowledged Packet Packet connectionAcknowledgedPacket(header); - - // Write the packet to the mock profiling connection + // Write an invalid "Connection Acknowledged" packet into the mock profiling connection, to simulate an invalid + // reply from an external profiling service mockProfilingConnection->WritePacket(std::move(connectionAcknowledgedPacket)); - // Wait for a bit (must at least be the delay value of the mock profiling connection) to make sure that - // the Connection Acknowledged packet gets processed by the profiling service - std::this_thread::sleep_for(std::chrono::milliseconds(15)); + // Start the command thread + profilingService.Update(); + + // Wait for the command thread to join + options.m_EnableProfiling = false; + profilingService.ResetExternalProfilingOptions(options, true); streamRedirector.CancelRedirect(); @@ -3150,13 +3040,6 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadConnectionAcknowledgedPacket) std::cout << ss.str(); BOOST_FAIL("Expected string not found."); } - - // The Connection Acknowledged Command Handler should not have updated the profiling state - BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::WaitingForAck); - - // Reset the profiling service to stop any running thread - options.m_EnableProfiling = false; - profilingService.ResetExternalProfilingOptions(options, true); } BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadRequestCounterDirectoryPacket) @@ -3182,23 +3065,11 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadRequestCounterDirectoryPacket) BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::NotConnected); profilingService.Update(); // Create the profiling connection BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::WaitingForAck); - profilingService.Update(); // Start the command handler and the send thread // Get the mock profiling connection MockProfilingConnection* mockProfilingConnection = helper.GetMockProfilingConnection(); BOOST_CHECK(mockProfilingConnection); - // Wait for the Stream Metadata packet the be sent - // (we are not testing the connection acknowledgement here so it will be ignored by this test) - helper.WaitForProfilingPacketsSent(mockProfilingConnection); - - // Force the profiling service to the "Active" state - helper.ForceTransitionToState(ProfilingState::Active); - BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); - - // Remove the packets received so far - mockProfilingConnection->Clear(); - // Write a valid "Request Counter Directory" packet into the mock profiling connection, to simulate a valid // reply from an external profiling service @@ -3217,9 +3088,12 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadRequestCounterDirectoryPacket) // Write the packet to the mock profiling connection mockProfilingConnection->WritePacket(std::move(requestCounterDirectoryPacket)); - // Wait for a bit (must at least be the delay value of the mock profiling connection) to make sure that - // the Create the Request Counter packet gets processed by the profiling service - std::this_thread::sleep_for(std::chrono::milliseconds(15)); + // Start the command handler and the send thread + profilingService.Update(); + + // Reset the profiling service to stop and join any running thread + options.m_EnableProfiling = false; + profilingService.ResetExternalProfilingOptions(options, true); streamRedirector.CancelRedirect(); @@ -3229,13 +3103,6 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadRequestCounterDirectoryPacket) std::cout << ss.str(); BOOST_FAIL("Expected string not found."); } - - // The Request Counter Directory Command Handler should not have updated the profiling state - BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); - - // Reset the profiling service to stop any running thread - options.m_EnableProfiling = false; - profilingService.ResetExternalProfilingOptions(options, true); } BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadPeriodicCounterSelectionPacket) @@ -3267,17 +3134,6 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadPeriodicCounterSelectionPacket) MockProfilingConnection* mockProfilingConnection = helper.GetMockProfilingConnection(); BOOST_CHECK(mockProfilingConnection); - // Wait for the Stream Metadata packet the be sent - // (we are not testing the connection acknowledgement here so it will be ignored by this test) - helper.WaitForProfilingPacketsSent(mockProfilingConnection); - - // Force the profiling service to the "Active" state - helper.ForceTransitionToState(ProfilingState::Active); - BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); - - // Remove the packets received so far - mockProfilingConnection->Clear(); - // Write a "Periodic Counter Selection" packet into the mock profiling connection, to simulate an input from an // external profiling service @@ -3295,10 +3151,11 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadPeriodicCounterSelectionPacket) // Write the packet to the mock profiling connection mockProfilingConnection->WritePacket(std::move(periodicCounterSelectionPacket)); + profilingService.Update(); - // Wait for a bit (must at least be the delay value of the mock profiling connection) to make sure that - // the Periodic Counter Selection packet gets processed by the profiling service - std::this_thread::sleep_for(std::chrono::milliseconds(7)); + // Reset the profiling service to stop any running thread + options.m_EnableProfiling = false; + profilingService.ResetExternalProfilingOptions(options, true); // Check that the expected error has occurred and logged to the standard output streamRedirector.CancelRedirect(); @@ -3309,13 +3166,6 @@ BOOST_AUTO_TEST_CASE(CheckProfilingServiceBadPeriodicCounterSelectionPacket) std::cout << ss.str(); BOOST_FAIL("Expected string not found."); } - - // The Periodic Counter Selection Handler should not have updated the profiling state - BOOST_CHECK(profilingService.GetCurrentState() == ProfilingState::Active); - - // Reset the profiling service to stop any running thread - options.m_EnableProfiling = false; - profilingService.ResetExternalProfilingOptions(options, true); } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/profiling/test/ProfilingTests.hpp b/src/profiling/test/ProfilingTests.hpp index 208fb80865..d6300e6429 100644 --- a/src/profiling/test/ProfilingTests.hpp +++ b/src/profiling/test/ProfilingTests.hpp @@ -153,6 +153,25 @@ private: std::atomic m_ReadRequests; }; +class TestProfilingConnectionBadAckPacket : public TestProfilingConnectionBase +{ +public: + Packet ReadPacket(uint32_t timeout) override + { + boost::ignore_unused(timeout); + // Connection Acknowledged Packet header (word 0, word 1 is always zero): + // 26:31 [6] packet_family: Control Packet Family, value 0b000000 + // 16:25 [10] packet_id: Packet identifier, value 0b0000000001 + // 8:15 [8] reserved: Reserved, value 0b00000000 + // 0:7 [8] reserved: Reserved, value 0b00000000 + uint32_t packetFamily = 0; + uint32_t packetId = 37; // Wrong packet id!!! + uint32_t header = ((packetFamily & 0x0000003F) << 26) | ((packetId & 0x000003FF) << 16); + + return Packet(header); + } +}; + class TestFunctorA : public CommandHandlerFunctor { public: @@ -216,17 +235,36 @@ public: TransitionToState(ProfilingService::Instance(), newState); } - void WaitForProfilingPacketsSent(MockProfilingConnection* mockProfilingConnection, uint32_t timeout = 1000) + long WaitForPacketsSent(MockProfilingConnection* mockProfilingConnection, + MockProfilingConnection::PacketType packetType, + uint32_t length = 0, + uint32_t timeout = 1000) { - if (!mockProfilingConnection->HasWrittenData()) + long packetCount = mockProfilingConnection->CheckForPacket({packetType, length}); + // The first packet we receive may not be the one we are looking for, so keep looping until till we find it, + // or until WaitForPacketsSent times out + while(packetCount == 0 && timeout != 0) { - WaitForPacketSent(ProfilingService::Instance(), timeout); - // It's possible the wait has timed out. Check there is some data. - if (!mockProfilingConnection->HasWrittenData()) + std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now(); + // Wait for a notification from the send thread + ProfilingService::WaitForPacketSent(ProfilingService::Instance(), timeout); + + std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); + + // We need to make sure the timeout does not reset each time we call WaitForPacketsSent + uint32_t elapsedTime = static_cast( + std::chrono::duration_cast(end - start).count()); + + packetCount = mockProfilingConnection->CheckForPacket({packetType, length}); + + if (elapsedTime > timeout) { - throw RuntimeException("ProfilingTests::WaitForProfilingPacketsSent timeout waiting for packet."); + break; } + + timeout -= elapsedTime; } + return packetCount; } private: diff --git a/src/profiling/test/SendCounterPacketTests.cpp b/src/profiling/test/SendCounterPacketTests.cpp index 951b652c3b..19423165a9 100644 --- a/src/profiling/test/SendCounterPacketTests.cpp +++ b/src/profiling/test/SendCounterPacketTests.cpp @@ -26,7 +26,7 @@ namespace { // A short delay to wait for the thread to process a packet. -uint16_t constexpr WAIT_UNTIL_READABLE_MS = 100; +uint16_t constexpr WAIT_UNTIL_READABLE_MS = 20; void SetNotConnectedProfilingState(ProfilingStateMachine& profilingStateMachine) { @@ -93,6 +93,8 @@ void SetActiveProfilingState(ProfilingStateMachine& profilingStateMachine) BOOST_AUTO_TEST_SUITE(SendCounterPacketTests) +using PacketType = MockProfilingConnection::PacketType; + BOOST_AUTO_TEST_CASE(MockSendCounterPacketTest) { MockBufferManager mockBuffer(512); @@ -2049,8 +2051,6 @@ BOOST_AUTO_TEST_CASE(SendThreadTest2) // To test an exact value of the "read size" in the mock buffer, wait to allow the send thread to // read all what's remaining in the buffer - std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_UNTIL_READABLE_MS)); - sendCounterPacket.Stop(); BOOST_CHECK(mockStreamCounterBuffer.GetCommittedSize() == totalWrittenSize); @@ -2201,7 +2201,9 @@ BOOST_AUTO_TEST_CASE(SendThreadBufferTest) sendCounterPacket.SetReadyToRead(); - std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_UNTIL_READABLE_MS)); + // Join the send thread to make sure it has read the buffer + sendCounterPacket.Stop(); + sendCounterPacket.Start(mockProfilingConnection); // The buffer is read by the send thread so it should not be in the readable buffer. auto readBuffer = bufferManager.GetReadableBuffer(); @@ -2239,7 +2241,9 @@ BOOST_AUTO_TEST_CASE(SendThreadBufferTest) sendCounterPacket.SetReadyToRead(); - std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_UNTIL_READABLE_MS)); + // Join the send thread to make sure it has read the buffer + sendCounterPacket.Stop(); + sendCounterPacket.Start(mockProfilingConnection); // The buffer is read by the send thread so it should not be in the readable buffer. readBuffer = bufferManager.GetReadableBuffer(); @@ -2280,7 +2284,9 @@ BOOST_AUTO_TEST_CASE(SendThreadBufferTest) sendCounterPacket.SetReadyToRead(); - std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_UNTIL_READABLE_MS)); + // Join the send thread to make sure it has read the buffer + sendCounterPacket.Stop(); + sendCounterPacket.Start(mockProfilingConnection); // The buffer is read by the send thread so it should not be in the readable buffer. readBuffer = bufferManager.GetReadableBuffer(); @@ -2349,16 +2355,15 @@ BOOST_AUTO_TEST_CASE(SendThreadBufferTest1) BOOST_TEST(reservedSize == 512); BOOST_TEST(reservedBuffer.get()); - // Check that data was actually written to the profiling connection in any order - const std::vector writtenData = mockProfilingConnection.GetWrittenData(); - BOOST_TEST(writtenData.size() == 3); - bool foundStreamMetaDataPacket = - std::find(writtenData.begin(), writtenData.end(), streamMetadataPacketsize) != writtenData.end(); - bool foundCounterDirectoryPacket = std::find(writtenData.begin(), writtenData.end(), 32) != writtenData.end(); - bool foundPeriodicCounterCapturePacket = std::find(writtenData.begin(), writtenData.end(), 28) != writtenData.end(); - BOOST_TEST(foundStreamMetaDataPacket); - BOOST_TEST(foundCounterDirectoryPacket); - BOOST_TEST(foundPeriodicCounterCapturePacket); + const auto writtenDataSize = mockProfilingConnection.GetWrittenDataSize(); + const auto metaDataPacketCount = + mockProfilingConnection.CheckForPacket({PacketType::StreamMetaData, streamMetadataPacketsize}); + + BOOST_TEST(metaDataPacketCount >= 1); + BOOST_TEST(mockProfilingConnection.CheckForPacket({PacketType::CounterDirectory, 32}) == 1); + BOOST_TEST(mockProfilingConnection.CheckForPacket({PacketType::PeriodicCounterCapture, 28}) == 1); + // Check that we only received the packets we expected + BOOST_TEST(metaDataPacketCount + 2 == writtenDataSize); } BOOST_AUTO_TEST_CASE(SendThreadSendStreamMetadataPacket1) @@ -2407,10 +2412,12 @@ BOOST_AUTO_TEST_CASE(SendThreadSendStreamMetadataPacket3) // Wait for sendCounterPacket to join BOOST_CHECK_NO_THROW(sendCounterPacket.Stop()); - // Check that the buffer contains at least one Stream Metadata packet - const std::vector writtenData = mockProfilingConnection.GetWrittenData(); - BOOST_TEST(writtenData.size() >= 1); - BOOST_TEST(writtenData[0] == streamMetadataPacketsize); + // Check that the buffer contains at least one Stream Metadata packet and no other packets + const auto writtenDataSize = mockProfilingConnection.GetWrittenDataSize(); + + BOOST_TEST(writtenDataSize >= 1); + BOOST_TEST(mockProfilingConnection.CheckForPacket( + {PacketType::StreamMetaData, streamMetadataPacketsize}) == writtenDataSize); } BOOST_AUTO_TEST_CASE(SendThreadSendStreamMetadataPacket4) @@ -2437,12 +2444,13 @@ BOOST_AUTO_TEST_CASE(SendThreadSendStreamMetadataPacket4) BOOST_TEST((profilingStateMachine.GetCurrentState() == ProfilingState::WaitingForAck)); // Check that the buffer contains at least one Stream Metadata packet - const std::vector writtenData = mockProfilingConnection.GetWrittenData(); - BOOST_TEST(writtenData.size() >= 1); - BOOST_TEST(writtenData[0] == streamMetadataPacketsize); + BOOST_TEST(mockProfilingConnection.CheckForPacket({PacketType::StreamMetaData, streamMetadataPacketsize}) >= 1); mockProfilingConnection.Clear(); + sendCounterPacket.Stop(); + sendCounterPacket.Start(mockProfilingConnection); + // Try triggering a new buffer read sendCounterPacket.SetReadyToRead(); @@ -2452,9 +2460,12 @@ BOOST_AUTO_TEST_CASE(SendThreadSendStreamMetadataPacket4) // Check that the profiling state is still "WaitingForAck" BOOST_TEST((profilingStateMachine.GetCurrentState() == ProfilingState::WaitingForAck)); - // Check that the buffer contains at least one Stream Metadata packet - BOOST_TEST(writtenData.size() >= 1); - BOOST_TEST(writtenData[0] == streamMetadataPacketsize); + // Check that the buffer contains at least one Stream Metadata packet and no other packets + const auto writtenDataSize = mockProfilingConnection.GetWrittenDataSize(); + + BOOST_TEST(writtenDataSize >= 1); + BOOST_TEST(mockProfilingConnection.CheckForPacket( + {PacketType::StreamMetaData, streamMetadataPacketsize}) == writtenDataSize); } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/profiling/test/SendCounterPacketTests.hpp b/src/profiling/test/SendCounterPacketTests.hpp index 6b244be0b4..4d517e5829 100644 --- a/src/profiling/test/SendCounterPacketTests.hpp +++ b/src/profiling/test/SendCounterPacketTests.hpp @@ -32,6 +32,19 @@ public: , m_Packet() {} + enum class PacketType + { + StreamMetaData, + ConnectionAcknowledge, + CounterDirectory, + ReqCounterDirectory, + PeriodicCounterSelection, + PerJobCounterSelection, + TimelineMessageDirectory, + PeriodicCounterCapture, + Unknown + }; + bool IsOpen() const override { std::lock_guard lock(m_Mutex); @@ -53,11 +66,49 @@ public: return false; } + uint32_t header = ReadUint32(buffer, 0); + + uint32_t packetFamily = (header >> 26); + uint32_t packetId = ((header >> 16) & 1023); + + PacketType packetType; + + switch (packetFamily) + { + case 0: + packetType = packetId < 6 ? PacketType(packetId) : PacketType::Unknown; + break; + case 1: + packetType = packetId == 0 ? PacketType::TimelineMessageDirectory : PacketType::Unknown; + break; + case 3: + packetType = packetId == 0 ? PacketType::PeriodicCounterCapture : PacketType::Unknown; + break; + default: + packetType = PacketType::Unknown; + } + std::lock_guard lock(m_Mutex); - m_WrittenData.push_back(length); + m_WrittenData.push_back({ packetType, length }); return true; } + + long CheckForPacket(const std::pair packetInfo) + { + std::lock_guard lock(m_Mutex); + + if(packetInfo.second != 0) + { + return std::count(m_WrittenData.begin(), m_WrittenData.end(), packetInfo); + } + else + { + return std::count_if(m_WrittenData.begin(), m_WrittenData.end(), + [&packetInfo](const std::pair pair) { return packetInfo.first == pair.first; }); + } + } + bool WritePacket(Packet&& packet) { std::lock_guard lock(m_Mutex); @@ -76,19 +127,11 @@ public: return std::move(m_Packet); } - std::vector GetWrittenData() + unsigned long GetWrittenDataSize() { std::lock_guard lock(m_Mutex); - std::vector writtenData = m_WrittenData; - m_WrittenData.clear(); - return writtenData; - } - - bool HasWrittenData() const - { - std::lock_guard lock(m_Mutex); - return !m_WrittenData.empty(); + return m_WrittenData.size(); } void Clear() @@ -100,7 +143,7 @@ public: private: bool m_IsOpen; - std::vector m_WrittenData; + std::vector> m_WrittenData; Packet m_Packet; mutable std::mutex m_Mutex; }; -- cgit v1.2.1