aboutsummaryrefslogtreecommitdiff
path: root/src/profiling/SendThread.cpp
blob: d595c9d4a5c008e14438af0c48fb2e074994ed90 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
//
// Copyright © 2020 Arm Ltd. All rights reserved.
// SPDX-License-Identifier: MIT
//

#include "SendThread.hpp"
#include "EncodeVersion.hpp"
#include "ProfilingUtils.hpp"

#include <armnn/Exceptions.hpp>
#include <armnn/Conversion.hpp>
#include <Processes.hpp>

#include <boost/format.hpp>
#include <boost/numeric/conversion/cast.hpp>
#include <boost/core/ignore_unused.hpp>

#include <cstring>

namespace armnn
{

namespace profiling
{

using boost::numeric_cast;

SendThread::SendThread(armnn::profiling::ProfilingStateMachine& profilingStateMachine,
    armnn::profiling::IBufferManager& buffer, armnn::profiling::ISendCounterPacket& sendCounterPacket, int timeout)
    : m_StateMachine(profilingStateMachine)
    , m_BufferManager(buffer)
    , m_SendCounterPacket(sendCounterPacket)
    , m_Timeout(timeout)
    , m_IsRunning(false)
    , m_KeepRunning(false)
    , m_SendThreadException(nullptr)
{
    m_BufferManager.SetConsumer(this);
}

void SendThread::SetReadyToRead()
{
    // We need to wait for the send thread to release its mutex
    {
        std::lock_guard<std::mutex> lck(m_WaitMutex);
        m_ReadyToRead = true;
    }
    // Signal the send thread that there's something to read in the buffer
    m_WaitCondition.notify_one();
}

void SendThread::Start(IProfilingConnection& profilingConnection)
{
    // Check if the send thread is already running
    if (m_IsRunning.load())
    {
        // The send thread is already running
        return;
    }

    if (m_SendThread.joinable())
    {
        m_SendThread.join();
    }

    // Mark the send thread as running
    m_IsRunning.store(true);

    // Keep the send procedure going until the send thread is signalled to stop
    m_KeepRunning.store(true);

    // Make sure the send thread will not flush the buffer until signaled to do so
    // no need for a mutex as the send thread can not be running at this point
    m_ReadyToRead = false;

    m_PacketSent = false;

    // Start the send thread
    m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
}

void SendThread::Stop(bool rethrowSendThreadExceptions)
{
    // Signal the send thread to stop
    m_KeepRunning.store(false);

    // Check that the send thread is running
    if (m_SendThread.joinable())
    {
        // Kick the send thread out of the wait condition
        SetReadyToRead();
        // Wait for the send thread to complete operations
        m_SendThread.join();
    }

    // Check if the send thread exception has to be rethrown
    if (!rethrowSendThreadExceptions)
    {
        // No need to rethrow the send thread exception, return immediately
        return;
    }

    // Check if there's an exception to rethrow
    if (m_SendThreadException)
    {
        // Rethrow the send thread exception
        std::rethrow_exception(m_SendThreadException);

        // Nullify the exception as it has been rethrown
        m_SendThreadException = nullptr;
    }
}

void SendThread::Send(IProfilingConnection& profilingConnection)
{
    // Run once and keep the sending procedure looping until the thread is signalled to stop
    do
    {
        // Check the current state of the profiling service
        ProfilingState currentState = m_StateMachine.GetCurrentState();
        switch (currentState)
        {
        case ProfilingState::Uninitialised:
        case ProfilingState::NotConnected:

            // The send thread cannot be running when the profiling service is uninitialized or not connected,
            // stop the thread immediately
            m_KeepRunning.store(false);
            m_IsRunning.store(false);

            // An exception should be thrown here, save it to be rethrown later from the main thread so that
            // it can be caught by the consumer
            m_SendThreadException =
                    std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
                                                             "profiling service not yet initialized or connected"));

            return;
        case ProfilingState::WaitingForAck:

            // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
            // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
            // updated by the command handler

            // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
            m_SendCounterPacket.SendStreamMetaDataPacket();

             // Flush the buffer manually to send the packet
            FlushBuffer(profilingConnection);

            // Wait for a connection ack from the remote server. We should expect a response within timeout value.
            // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
            // StreamMetadata again.

            // Wait condition lock scope - Begin
            {
                std::unique_lock<std::mutex> lock(m_WaitMutex);

                bool timeout = m_WaitCondition.wait_for(lock,
                                                        std::chrono::milliseconds(m_Timeout),
                                                        [&]{ return m_ReadyToRead; });
                // If we get notified we need to flush the buffer again
                if(timeout)
                {
                    // Otherwise if we just timed out don't flush the buffer
                    continue;
                }
                //reset condition variable predicate for next use
                m_ReadyToRead = false;
            }
            // Wait condition lock scope - End
            break;
        case ProfilingState::Active:
        default:
            // Wait condition lock scope - Begin
            {
                std::unique_lock<std::mutex> lock(m_WaitMutex);

                // Normal working state for the send thread
                // Check if the send thread is required to enforce a timeout wait policy
                if (m_Timeout < 0)
                {
                    // Wait indefinitely until notified that something to read has become available in the buffer
                    m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
                }
                else
                {
                    // Wait until the thread is notified of something to read from the buffer,
                    // or check anyway after the specified number of milliseconds
                    m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
                }

                //reset condition variable predicate for next use
                m_ReadyToRead = false;
            }
            // Wait condition lock scope - End
            break;
        }

        // Send all the available packets in the buffer
        FlushBuffer(profilingConnection);
    } while (m_KeepRunning.load());

    // Ensure that all readable data got written to the profiling connection before the thread is stopped
    // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
    FlushBuffer(profilingConnection, false);

    // Mark the send thread as not running
    m_IsRunning.store(false);
}

void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
{
    // Get the first available readable buffer
    IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();

    // Initialize the flag that indicates whether at least a packet has been sent
    bool packetsSent = false;

    while (packetBuffer != nullptr)
    {
        // Get the data to send from the buffer
        const unsigned char* readBuffer = packetBuffer->GetReadableData();
        unsigned int readBufferSize = packetBuffer->GetSize();

        if (readBuffer == nullptr || readBufferSize == 0)
        {
            // Nothing to send, get the next available readable buffer and continue
            m_BufferManager.MarkRead(packetBuffer);
            packetBuffer = m_BufferManager.GetReadableBuffer();

            continue;
        }

        // Check that the profiling connection is open, silently drop the data and continue if it's closed
        if (profilingConnection.IsOpen())
        {
            // Write a packet to the profiling connection. Silently ignore any write error and continue
            profilingConnection.WritePacket(readBuffer, boost::numeric_cast<uint32_t>(readBufferSize));

            // Set the flag that indicates whether at least a packet has been sent
            packetsSent = true;
        }

        // Mark the packet buffer as read
        m_BufferManager.MarkRead(packetBuffer);

        // Get the next available readable buffer
        packetBuffer = m_BufferManager.GetReadableBuffer();
    }
    // Check whether at least a packet has been sent
    if (packetsSent && notifyWatchers)
    {
        // Wait for the parent thread to release its mutex if necessary
        {
            std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
            m_PacketSent = true;
        }
        // Notify to any watcher that something has been sent
        m_PacketSentWaitCondition.notify_one();
    }
}

bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
{
    std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
    // Blocks until notified that at least a packet has been sent or until timeout expires.
    bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
                                                       std::chrono::milliseconds(timeout),
                                                       [&] { return m_PacketSent; });

    m_PacketSent = false;

    return timedOut;
}

} // namespace profiling

} // namespace armnn