diff options
Diffstat (limited to 'src/profiling/SendCounterPacket.cpp')
-rw-r--r-- | src/profiling/SendCounterPacket.cpp | 44 |
1 files changed, 29 insertions, 15 deletions
diff --git a/src/profiling/SendCounterPacket.cpp b/src/profiling/SendCounterPacket.cpp index b222270546..33eaeabc3e 100644 --- a/src/profiling/SendCounterPacket.cpp +++ b/src/profiling/SendCounterPacket.cpp @@ -65,11 +65,12 @@ void SendCounterPacket::SendStreamMetaDataPacket() uint32_t offset = 0; uint32_t reserved = 0; - unsigned char *writeBuffer = m_Buffer.Reserve(totalSize, reserved); + std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); if (reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( + writeBuffer, boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } @@ -120,25 +121,25 @@ void SendCounterPacket::SendStreamMetaDataPacket() if (infoSize) { - memcpy(&writeBuffer[offset], info.c_str(), infoSize); + memcpy(&writeBuffer->GetWritableData()[offset], info.c_str(), infoSize); offset += infoSize; } if (hardwareVersionSize) { - memcpy(&writeBuffer[offset], hardwareVersion.c_str(), hardwareVersionSize); + memcpy(&writeBuffer->GetWritableData()[offset], hardwareVersion.c_str(), hardwareVersionSize); offset += hardwareVersionSize; } if (softwareVersionSize) { - memcpy(&writeBuffer[offset], softwareVersion.c_str(), softwareVersionSize); + memcpy(&writeBuffer->GetWritableData()[offset], softwareVersion.c_str(), softwareVersionSize); offset += softwareVersionSize; } if (processNameSize) { - memcpy(&writeBuffer[offset], processName.c_str(), processNameSize); + memcpy(&writeBuffer->GetWritableData()[offset], processName.c_str(), processNameSize); offset += processNameSize; } @@ -170,10 +171,10 @@ void SendCounterPacket::SendStreamMetaDataPacket() } catch(...) { - CancelOperationAndThrow<RuntimeException>("Error processing packet."); + CancelOperationAndThrow<RuntimeException>(writeBuffer, "Error processing packet."); } - m_Buffer.Commit(totalSize); + m_BufferManager.Commit(writeBuffer, totalSize); } bool SendCounterPacket::CreateCategoryRecord(const CategoryPtr& category, @@ -792,12 +793,13 @@ void SendCounterPacket::SendCounterDirectoryPacket(const ICounterDirectory& coun // Reserve space in the buffer for the packet uint32_t reserved = 0; - unsigned char* writeBuffer = m_Buffer.Reserve(totalSize, reserved); + std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); // Check that the reserved buffer size is enough to hold the counter directory packet if (reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( + writeBuffer, boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes") % totalSize)); } @@ -818,7 +820,7 @@ void SendCounterPacket::SendCounterDirectoryPacket(const ICounterDirectory& coun offset += numeric_cast<uint32_t>(uint32_t_size); } - m_Buffer.Commit(totalSize); + m_BufferManager.Commit(writeBuffer, totalSize); } void SendCounterPacket::SendPeriodicCounterCapturePacket(uint64_t timestamp, const IndexValuePairsVector& values) @@ -833,11 +835,12 @@ void SendCounterPacket::SendPeriodicCounterCapturePacket(uint64_t timestamp, con uint32_t offset = 0; uint32_t reserved = 0; - unsigned char* writeBuffer = m_Buffer.Reserve(totalSize, reserved); + std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); if (reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( + writeBuffer, boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } @@ -868,7 +871,7 @@ void SendCounterPacket::SendPeriodicCounterCapturePacket(uint64_t timestamp, con offset += numeric_cast<uint32_t>(sizeof(uint32_t)); } - m_Buffer.Commit(totalSize); + m_BufferManager.Commit(writeBuffer, totalSize); } void SendCounterPacket::SendPeriodicCounterSelectionPacket(uint32_t capturePeriod, @@ -883,11 +886,12 @@ void SendCounterPacket::SendPeriodicCounterSelectionPacket(uint32_t capturePerio uint32_t offset = 0; uint32_t reserved = 0; - unsigned char* writeBuffer = m_Buffer.Reserve(totalSize, reserved); + std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved); if (reserved < totalSize) { CancelOperationAndThrow<BufferExhaustion>( + writeBuffer, boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } @@ -914,7 +918,7 @@ void SendCounterPacket::SendPeriodicCounterSelectionPacket(uint32_t capturePerio offset += numeric_cast<uint32_t>(sizeof(uint16_t)); } - m_Buffer.Commit(totalSize); + m_BufferManager.Commit(writeBuffer, totalSize); } void SendCounterPacket::SetReadyToRead() @@ -973,9 +977,17 @@ void SendCounterPacket::Send() } // Wait condition lock scope - End + + + std::unique_ptr<IPacketBuffer> packetBuffer = m_BufferManager.GetReadableBuffer(); + if (packetBuffer == nullptr) + { + continue; + } + const unsigned char* readBuffer = packetBuffer->GetReadableData(); // Get the data to send from the buffer - unsigned int readBufferSize = 0; - const unsigned char* readBuffer = m_Buffer.GetReadBuffer(readBufferSize); + unsigned int readBufferSize = packetBuffer->GetSize(); + if (readBuffer == nullptr || readBufferSize == 0) { // Nothing to send, ignore and continue @@ -988,6 +1000,8 @@ void SendCounterPacket::Send() // Write a packet to the profiling connection. Silently ignore any write error and continue m_ProfilingConnection.WritePacket(readBuffer, boost::numeric_cast<uint32_t>(readBufferSize)); } + + m_BufferManager.MarkRead(packetBuffer); } // Mark the send thread as not running |