diff options
Diffstat (limited to 'src/profiling/SendCounterPacket.cpp')
-rw-r--r-- | src/profiling/SendCounterPacket.cpp | 98 |
1 files changed, 47 insertions, 51 deletions
diff --git a/src/profiling/SendCounterPacket.cpp b/src/profiling/SendCounterPacket.cpp index 0a2f08b095..9aafa2ccc8 100644 --- a/src/profiling/SendCounterPacket.cpp +++ b/src/profiling/SendCounterPacket.cpp @@ -67,17 +67,11 @@ void SendCounterPacket::SendStreamMetaDataPacket() std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") - % totalSize)); - } - - if (writeBuffer == nullptr) - { - CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory."); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } try @@ -795,19 +789,11 @@ void SendCounterPacket::SendCounterDirectoryPacket(const ICounterDirectory& coun uint32_t reserved = 0; std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - // Check that the reserved buffer size is enough to hold the counter directory packet - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes") - % totalSize)); - } - - // Check the buffer handle is valid - if (writeBuffer == nullptr) - { - CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory"); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } // Offset for writing to the buffer @@ -837,17 +823,11 @@ void SendCounterPacket::SendPeriodicCounterCapturePacket(uint64_t timestamp, con std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") - % totalSize)); - } - - if (writeBuffer == nullptr) - { - CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory."); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } // Create header. @@ -888,17 +868,11 @@ void SendCounterPacket::SendPeriodicCounterSelectionPacket(uint32_t capturePerio std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") - % totalSize)); - } - - if (writeBuffer == nullptr) - { - CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory."); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } // Create header. @@ -972,25 +946,47 @@ void SendCounterPacket::Send() // Lock the mutex to wait on it std::unique_lock<std::mutex> lock(m_WaitMutex); - // Wait until the thread is notified of something to read from the buffer, or check anyway after a second - m_WaitCondition.wait_for(lock, std::chrono::seconds(1)); + if (m_Timeout < 0) + { + // Wait indefinitely until notified that something to read has become available in the buffer + m_WaitCondition.wait(lock); + } + else + { + // Wait until the thread is notified of something to read from the buffer, + // or check anyway after a second + m_WaitCondition.wait_for(lock, std::chrono::seconds(m_Timeout)); + } } // Wait condition lock scope - End - // Get the buffer to read from - std::unique_ptr<IPacketBuffer> packetBuffer = m_BufferManager.GetReadableBuffer(); - if (packetBuffer == nullptr) - { - // Nothing to read from, ignore and continue - continue; - } + FlushBuffer(); + } + // Ensure that all readable data got written to the profiling connection before the thread is stopped + FlushBuffer(); + + // Mark the send thread as not running + m_IsRunning.store(false); +} + +void SendCounterPacket::FlushBuffer() +{ + // Get the first available readable buffer + std::unique_ptr<IPacketBuffer> packetBuffer = m_BufferManager.GetReadableBuffer(); + + while (packetBuffer != nullptr) + { // Get the data to send from the buffer const unsigned char* readBuffer = packetBuffer->GetReadableData(); unsigned int readBufferSize = packetBuffer->GetSize(); + if (readBuffer == nullptr || readBufferSize == 0) { - // Nothing to send, ignore and continue + // Nothing to send, get the next available readable buffer and continue + m_BufferManager.MarkRead(packetBuffer); + packetBuffer = m_BufferManager.GetReadableBuffer(); + continue; } @@ -1003,10 +999,10 @@ void SendCounterPacket::Send() // Mark the packet buffer as read m_BufferManager.MarkRead(packetBuffer); - } - // Mark the send thread as not running - m_IsRunning.store(false); + // Get next available readable buffer + packetBuffer = m_BufferManager.GetReadableBuffer(); + } } } // namespace profiling |