1 //===-- ThreadedCommunication.cpp -----------------------------------------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "lldb/Core/ThreadedCommunication.h"
10 
11 #include "lldb/Host/ThreadLauncher.h"
12 #include "lldb/Utility/Connection.h"
13 #include "lldb/Utility/ConstString.h"
14 #include "lldb/Utility/Event.h"
15 #include "lldb/Utility/LLDBLog.h"
16 #include "lldb/Utility/Listener.h"
17 #include "lldb/Utility/Log.h"
18 #include "lldb/Utility/Status.h"
19 
20 #include "llvm/Support/Compiler.h"
21 
22 #include <algorithm>
23 #include <chrono>
24 #include <cstring>
25 #include <memory>
26 
27 #include <cerrno>
28 #include <cinttypes>
29 #include <cstdio>
30 
31 using namespace lldb;
32 using namespace lldb_private;
33 
34 ConstString &ThreadedCommunication::GetStaticBroadcasterClass() {
35   static ConstString class_name("lldb.communication");
36   return class_name;
37 }
38 
39 ThreadedCommunication::ThreadedCommunication(const char *name)
40     : Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false),
41       m_read_thread_did_exit(false), m_bytes(), m_bytes_mutex(),
42       m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) {
43   LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
44            "{0} ThreadedCommunication::ThreadedCommunication (name = {1})",
45            this, name);
46 
47   SetEventName(eBroadcastBitDisconnected, "disconnected");
48   SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");
49   SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");
50   SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");
51   SetEventName(eBroadcastBitPacketAvailable, "packet available");
52   SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");
53 
54   CheckInWithManager();
55 }
56 
57 ThreadedCommunication::~ThreadedCommunication() {
58   LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
59            "{0} ThreadedCommunication::~ThreadedCommunication (name = {1})",
60            this, GetBroadcasterName());
61 }
62 
63 void ThreadedCommunication::Clear() {
64   SetReadThreadBytesReceivedCallback(nullptr, nullptr);
65   StopReadThread(nullptr);
66   Communication::Clear();
67 }
68 
69 ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) {
70   assert((!m_read_thread_enabled || m_read_thread_did_exit) &&
71          "Disconnecting while the read thread is running is racy!");
72   return Communication::Disconnect(error_ptr);
73 }
74 
75 size_t ThreadedCommunication::Read(void *dst, size_t dst_len,
76                                    const Timeout<std::micro> &timeout,
77                                    ConnectionStatus &status,
78                                    Status *error_ptr) {
79   Log *log = GetLog(LLDBLog::Communication);
80   LLDB_LOG(
81       log,
82       "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
83       this, dst, dst_len, timeout, m_connection_sp.get());
84 
85   if (m_read_thread_enabled) {
86     // We have a dedicated read thread that is getting data for us
87     size_t cached_bytes = GetCachedBytes(dst, dst_len);
88     if (cached_bytes > 0) {
89       status = eConnectionStatusSuccess;
90       return cached_bytes;
91     }
92     if (timeout && timeout->count() == 0) {
93       if (error_ptr)
94         error_ptr->SetErrorString("Timed out.");
95       status = eConnectionStatusTimedOut;
96       return 0;
97     }
98 
99     if (!m_connection_sp) {
100       if (error_ptr)
101         error_ptr->SetErrorString("Invalid connection.");
102       status = eConnectionStatusNoConnection;
103       return 0;
104     }
105 
106     // No data yet, we have to start listening.
107     ListenerSP listener_sp(
108         Listener::MakeListener("ThreadedCommunication::Read"));
109     listener_sp->StartListeningForEvents(
110         this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
111 
112     // Re-check for data, as it might have arrived while we were setting up our
113     // listener.
114     cached_bytes = GetCachedBytes(dst, dst_len);
115     if (cached_bytes > 0) {
116       status = eConnectionStatusSuccess;
117       return cached_bytes;
118     }
119 
120     EventSP event_sp;
121     // Explicitly check for the thread exit, for the same reason.
122     if (m_read_thread_did_exit) {
123       // We've missed the event, lets just conjure one up.
124       event_sp = std::make_shared<Event>(eBroadcastBitReadThreadDidExit);
125     } else {
126       if (!listener_sp->GetEvent(event_sp, timeout)) {
127         if (error_ptr)
128           error_ptr->SetErrorString("Timed out.");
129         status = eConnectionStatusTimedOut;
130         return 0;
131       }
132     }
133     const uint32_t event_type = event_sp->GetType();
134     if (event_type & eBroadcastBitReadThreadGotBytes) {
135       return GetCachedBytes(dst, dst_len);
136     }
137 
138     if (event_type & eBroadcastBitReadThreadDidExit) {
139       // If the thread exited of its own accord, it either means it
140       // hit an end-of-file condition or an error.
141       status = m_pass_status;
142       if (error_ptr)
143         *error_ptr = std::move(m_pass_error);
144 
145       if (GetCloseOnEOF())
146         Disconnect(nullptr);
147       return 0;
148     }
149     llvm_unreachable("Got unexpected event type!");
150   }
151 
152   // We aren't using a read thread, just read the data synchronously in this
153   // thread.
154   return Communication::Read(dst, dst_len, timeout, status, error_ptr);
155 }
156 
157 bool ThreadedCommunication::StartReadThread(Status *error_ptr) {
158   if (error_ptr)
159     error_ptr->Clear();
160 
161   if (m_read_thread.IsJoinable())
162     return true;
163 
164   LLDB_LOG(GetLog(LLDBLog::Communication),
165            "{0} ThreadedCommunication::StartReadThread ()", this);
166 
167   const std::string thread_name =
168       llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName());
169 
170   m_read_thread_enabled = true;
171   m_read_thread_did_exit = false;
172   auto maybe_thread = ThreadLauncher::LaunchThread(
173       thread_name, [this] { return ReadThread(); });
174   if (maybe_thread) {
175     m_read_thread = *maybe_thread;
176   } else {
177     if (error_ptr)
178       *error_ptr = Status(maybe_thread.takeError());
179     else {
180       LLDB_LOG_ERROR(GetLog(LLDBLog::Host), maybe_thread.takeError(),
181                      "failed to launch host thread: {0}");
182     }
183   }
184 
185   if (!m_read_thread.IsJoinable())
186     m_read_thread_enabled = false;
187 
188   return m_read_thread_enabled;
189 }
190 
191 bool ThreadedCommunication::StopReadThread(Status *error_ptr) {
192   if (!m_read_thread.IsJoinable())
193     return true;
194 
195   LLDB_LOG(GetLog(LLDBLog::Communication),
196            "{0} ThreadedCommunication::StopReadThread ()", this);
197 
198   m_read_thread_enabled = false;
199 
200   BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
201 
202   // error = m_read_thread.Cancel();
203 
204   Status error = m_read_thread.Join(nullptr);
205   return error.Success();
206 }
207 
208 bool ThreadedCommunication::JoinReadThread(Status *error_ptr) {
209   if (!m_read_thread.IsJoinable())
210     return true;
211 
212   Status error = m_read_thread.Join(nullptr);
213   return error.Success();
214 }
215 
216 size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) {
217   std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
218   if (!m_bytes.empty()) {
219     // If DST is nullptr and we have a thread, then return the number of bytes
220     // that are available so the caller can call again
221     if (dst == nullptr)
222       return m_bytes.size();
223 
224     const size_t len = std::min<size_t>(dst_len, m_bytes.size());
225 
226     ::memcpy(dst, m_bytes.c_str(), len);
227     m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
228 
229     return len;
230   }
231   return 0;
232 }
233 
234 void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len,
235                                                bool broadcast,
236                                                ConnectionStatus status) {
237   LLDB_LOG(GetLog(LLDBLog::Communication),
238            "{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len "
239            "= {2}, "
240            "broadcast = {3})",
241            this, bytes, (uint64_t)len, broadcast);
242   if ((bytes == nullptr || len == 0) &&
243       (status != lldb::eConnectionStatusEndOfFile))
244     return;
245   if (m_callback) {
246     // If the user registered a callback, then call it and do not broadcast
247     m_callback(m_callback_baton, bytes, len);
248   } else if (bytes != nullptr && len > 0) {
249     std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
250     m_bytes.append((const char *)bytes, len);
251     if (broadcast)
252       BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes);
253   }
254 }
255 
256 bool ThreadedCommunication::ReadThreadIsRunning() {
257   return m_read_thread_enabled;
258 }
259 
260 lldb::thread_result_t ThreadedCommunication::ReadThread() {
261   Log *log = GetLog(LLDBLog::Communication);
262 
263   LLDB_LOG(log, "Communication({0}) thread starting...", this);
264 
265   uint8_t buf[1024];
266 
267   Status error;
268   ConnectionStatus status = eConnectionStatusSuccess;
269   bool done = false;
270   bool disconnect = false;
271   while (!done && m_read_thread_enabled) {
272     size_t bytes_read = ReadFromConnection(
273         buf, sizeof(buf), std::chrono::seconds(5), status, &error);
274     if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
275       AppendBytesToCache(buf, bytes_read, true, status);
276 
277     switch (status) {
278     case eConnectionStatusSuccess:
279       break;
280 
281     case eConnectionStatusEndOfFile:
282       done = true;
283       disconnect = GetCloseOnEOF();
284       break;
285     case eConnectionStatusError: // Check GetError() for details
286       if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
287         // EIO on a pipe is usually caused by remote shutdown
288         disconnect = GetCloseOnEOF();
289         done = true;
290       }
291       if (error.Fail())
292         LLDB_LOG(log, "error: {0}, status = {1}", error,
293                  ThreadedCommunication::ConnectionStatusAsString(status));
294       break;
295     case eConnectionStatusInterrupted: // Synchronization signal from
296                                        // SynchronizeWithReadThread()
297       // The connection returns eConnectionStatusInterrupted only when there is
298       // no input pending to be read, so we can signal that.
299       BroadcastEvent(eBroadcastBitNoMorePendingInput);
300       break;
301     case eConnectionStatusNoConnection:   // No connection
302     case eConnectionStatusLostConnection: // Lost connection while connected to
303                                           // a valid connection
304       done = true;
305       [[fallthrough]];
306     case eConnectionStatusTimedOut: // Request timed out
307       if (error.Fail())
308         LLDB_LOG(log, "error: {0}, status = {1}", error,
309                  ThreadedCommunication::ConnectionStatusAsString(status));
310       break;
311     }
312   }
313   m_pass_status = status;
314   m_pass_error = std::move(error);
315   LLDB_LOG(log, "Communication({0}) thread exiting...", this);
316 
317   // Start shutting down. We need to do this in a very specific order to ensure
318   // we don't race with threads wanting to read/synchronize with us.
319 
320   // First, we signal our intent to exit. This ensures no new thread start
321   // waiting on events from us.
322   m_read_thread_did_exit = true;
323 
324   // Unblock any existing thread waiting for the synchronization signal.
325   BroadcastEvent(eBroadcastBitNoMorePendingInput);
326 
327   {
328     // Wait for the synchronization thread to finish...
329     std::lock_guard<std::mutex> guard(m_synchronize_mutex);
330     // ... and disconnect.
331     if (disconnect)
332       Disconnect();
333   }
334 
335   // Finally, unblock any readers waiting for us to exit.
336   BroadcastEvent(eBroadcastBitReadThreadDidExit);
337   return {};
338 }
339 
340 void ThreadedCommunication::SetReadThreadBytesReceivedCallback(
341     ReadThreadBytesReceived callback, void *callback_baton) {
342   m_callback = callback;
343   m_callback_baton = callback_baton;
344 }
345 
346 void ThreadedCommunication::SynchronizeWithReadThread() {
347   // Only one thread can do the synchronization dance at a time.
348   std::lock_guard<std::mutex> guard(m_synchronize_mutex);
349 
350   // First start listening for the synchronization event.
351   ListenerSP listener_sp(Listener::MakeListener(
352       "ThreadedCommunication::SyncronizeWithReadThread"));
353   listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
354 
355   // If the thread is not running, there is no point in synchronizing.
356   if (!m_read_thread_enabled || m_read_thread_did_exit)
357     return;
358 
359   // Notify the read thread.
360   m_connection_sp->InterruptRead();
361 
362   // Wait for the synchronization event.
363   EventSP event_sp;
364   listener_sp->GetEvent(event_sp, std::nullopt);
365 }
366 
367 void ThreadedCommunication::SetConnection(
368     std::unique_ptr<Connection> connection) {
369   StopReadThread(nullptr);
370   Communication::SetConnection(std::move(connection));
371 }
372