1 //===-- ThreadedCommunication.cpp -----------------------------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "lldb/Core/ThreadedCommunication.h" 10 11 #include "lldb/Host/ThreadLauncher.h" 12 #include "lldb/Utility/Connection.h" 13 #include "lldb/Utility/ConstString.h" 14 #include "lldb/Utility/Event.h" 15 #include "lldb/Utility/LLDBLog.h" 16 #include "lldb/Utility/Listener.h" 17 #include "lldb/Utility/Log.h" 18 #include "lldb/Utility/Status.h" 19 20 #include "llvm/Support/Compiler.h" 21 22 #include <algorithm> 23 #include <chrono> 24 #include <cstring> 25 #include <memory> 26 27 #include <cerrno> 28 #include <cinttypes> 29 #include <cstdio> 30 31 using namespace lldb; 32 using namespace lldb_private; 33 34 ConstString &ThreadedCommunication::GetStaticBroadcasterClass() { 35 static ConstString class_name("lldb.communication"); 36 return class_name; 37 } 38 39 ThreadedCommunication::ThreadedCommunication(const char *name) 40 : Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false), 41 m_read_thread_did_exit(false), m_bytes(), m_bytes_mutex(), 42 m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) { 43 LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), 44 "{0} ThreadedCommunication::ThreadedCommunication (name = {1})", 45 this, name); 46 47 SetEventName(eBroadcastBitDisconnected, "disconnected"); 48 SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes"); 49 SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit"); 50 SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit"); 51 SetEventName(eBroadcastBitPacketAvailable, "packet available"); 52 SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input"); 53 54 CheckInWithManager(); 55 } 56 57 ThreadedCommunication::~ThreadedCommunication() { 58 LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), 59 "{0} ThreadedCommunication::~ThreadedCommunication (name = {1})", 60 this, GetBroadcasterName().AsCString()); 61 } 62 63 void ThreadedCommunication::Clear() { 64 SetReadThreadBytesReceivedCallback(nullptr, nullptr); 65 StopReadThread(nullptr); 66 Communication::Clear(); 67 } 68 69 ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) { 70 assert((!m_read_thread_enabled || m_read_thread_did_exit) && 71 "Disconnecting while the read thread is running is racy!"); 72 return Communication::Disconnect(error_ptr); 73 } 74 75 size_t ThreadedCommunication::Read(void *dst, size_t dst_len, 76 const Timeout<std::micro> &timeout, 77 ConnectionStatus &status, 78 Status *error_ptr) { 79 Log *log = GetLog(LLDBLog::Communication); 80 LLDB_LOG( 81 log, 82 "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}", 83 this, dst, dst_len, timeout, m_connection_sp.get()); 84 85 if (m_read_thread_enabled) { 86 // We have a dedicated read thread that is getting data for us 87 size_t cached_bytes = GetCachedBytes(dst, dst_len); 88 if (cached_bytes > 0) { 89 status = eConnectionStatusSuccess; 90 return cached_bytes; 91 } 92 if (timeout && timeout->count() == 0) { 93 if (error_ptr) 94 error_ptr->SetErrorString("Timed out."); 95 status = eConnectionStatusTimedOut; 96 return 0; 97 } 98 99 if (!m_connection_sp) { 100 if (error_ptr) 101 error_ptr->SetErrorString("Invalid connection."); 102 status = eConnectionStatusNoConnection; 103 return 0; 104 } 105 106 // No data yet, we have to start listening. 107 ListenerSP listener_sp( 108 Listener::MakeListener("ThreadedCommunication::Read")); 109 listener_sp->StartListeningForEvents( 110 this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit); 111 112 // Re-check for data, as it might have arrived while we were setting up our 113 // listener. 114 cached_bytes = GetCachedBytes(dst, dst_len); 115 if (cached_bytes > 0) { 116 status = eConnectionStatusSuccess; 117 return cached_bytes; 118 } 119 120 EventSP event_sp; 121 // Explicitly check for the thread exit, for the same reason. 122 if (m_read_thread_did_exit) { 123 // We've missed the event, lets just conjure one up. 124 event_sp = std::make_shared<Event>(eBroadcastBitReadThreadDidExit); 125 } else { 126 if (!listener_sp->GetEvent(event_sp, timeout)) { 127 if (error_ptr) 128 error_ptr->SetErrorString("Timed out."); 129 status = eConnectionStatusTimedOut; 130 return 0; 131 } 132 } 133 const uint32_t event_type = event_sp->GetType(); 134 if (event_type & eBroadcastBitReadThreadGotBytes) { 135 return GetCachedBytes(dst, dst_len); 136 } 137 138 if (event_type & eBroadcastBitReadThreadDidExit) { 139 // If the thread exited of its own accord, it either means it 140 // hit an end-of-file condition or an error. 141 status = m_pass_status; 142 if (error_ptr) 143 *error_ptr = std::move(m_pass_error); 144 145 if (GetCloseOnEOF()) 146 Disconnect(nullptr); 147 return 0; 148 } 149 llvm_unreachable("Got unexpected event type!"); 150 } 151 152 // We aren't using a read thread, just read the data synchronously in this 153 // thread. 154 return Communication::Read(dst, dst_len, timeout, status, error_ptr); 155 } 156 157 bool ThreadedCommunication::StartReadThread(Status *error_ptr) { 158 if (error_ptr) 159 error_ptr->Clear(); 160 161 if (m_read_thread.IsJoinable()) 162 return true; 163 164 LLDB_LOG(GetLog(LLDBLog::Communication), 165 "{0} ThreadedCommunication::StartReadThread ()", this); 166 167 const std::string thread_name = 168 llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName()); 169 170 m_read_thread_enabled = true; 171 m_read_thread_did_exit = false; 172 auto maybe_thread = ThreadLauncher::LaunchThread( 173 thread_name, [this] { return ReadThread(); }); 174 if (maybe_thread) { 175 m_read_thread = *maybe_thread; 176 } else { 177 if (error_ptr) 178 *error_ptr = Status(maybe_thread.takeError()); 179 else { 180 LLDB_LOG(GetLog(LLDBLog::Host), "failed to launch host thread: {}", 181 llvm::toString(maybe_thread.takeError())); 182 } 183 } 184 185 if (!m_read_thread.IsJoinable()) 186 m_read_thread_enabled = false; 187 188 return m_read_thread_enabled; 189 } 190 191 bool ThreadedCommunication::StopReadThread(Status *error_ptr) { 192 if (!m_read_thread.IsJoinable()) 193 return true; 194 195 LLDB_LOG(GetLog(LLDBLog::Communication), 196 "{0} ThreadedCommunication::StopReadThread ()", this); 197 198 m_read_thread_enabled = false; 199 200 BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr); 201 202 // error = m_read_thread.Cancel(); 203 204 Status error = m_read_thread.Join(nullptr); 205 return error.Success(); 206 } 207 208 bool ThreadedCommunication::JoinReadThread(Status *error_ptr) { 209 if (!m_read_thread.IsJoinable()) 210 return true; 211 212 Status error = m_read_thread.Join(nullptr); 213 return error.Success(); 214 } 215 216 size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) { 217 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 218 if (!m_bytes.empty()) { 219 // If DST is nullptr and we have a thread, then return the number of bytes 220 // that are available so the caller can call again 221 if (dst == nullptr) 222 return m_bytes.size(); 223 224 const size_t len = std::min<size_t>(dst_len, m_bytes.size()); 225 226 ::memcpy(dst, m_bytes.c_str(), len); 227 m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len); 228 229 return len; 230 } 231 return 0; 232 } 233 234 void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len, 235 bool broadcast, 236 ConnectionStatus status) { 237 LLDB_LOG(GetLog(LLDBLog::Communication), 238 "{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len " 239 "= {2}, " 240 "broadcast = {3})", 241 this, bytes, (uint64_t)len, broadcast); 242 if ((bytes == nullptr || len == 0) && 243 (status != lldb::eConnectionStatusEndOfFile)) 244 return; 245 if (m_callback) { 246 // If the user registered a callback, then call it and do not broadcast 247 m_callback(m_callback_baton, bytes, len); 248 } else if (bytes != nullptr && len > 0) { 249 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 250 m_bytes.append((const char *)bytes, len); 251 if (broadcast) 252 BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes); 253 } 254 } 255 256 bool ThreadedCommunication::ReadThreadIsRunning() { 257 return m_read_thread_enabled; 258 } 259 260 lldb::thread_result_t ThreadedCommunication::ReadThread() { 261 Log *log = GetLog(LLDBLog::Communication); 262 263 LLDB_LOG(log, "Communication({0}) thread starting...", this); 264 265 uint8_t buf[1024]; 266 267 Status error; 268 ConnectionStatus status = eConnectionStatusSuccess; 269 bool done = false; 270 bool disconnect = false; 271 while (!done && m_read_thread_enabled) { 272 size_t bytes_read = ReadFromConnection( 273 buf, sizeof(buf), std::chrono::seconds(5), status, &error); 274 if (bytes_read > 0 || status == eConnectionStatusEndOfFile) 275 AppendBytesToCache(buf, bytes_read, true, status); 276 277 switch (status) { 278 case eConnectionStatusSuccess: 279 break; 280 281 case eConnectionStatusEndOfFile: 282 done = true; 283 disconnect = GetCloseOnEOF(); 284 break; 285 case eConnectionStatusError: // Check GetError() for details 286 if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { 287 // EIO on a pipe is usually caused by remote shutdown 288 disconnect = GetCloseOnEOF(); 289 done = true; 290 } 291 if (error.Fail()) 292 LLDB_LOG(log, "error: {0}, status = {1}", error, 293 ThreadedCommunication::ConnectionStatusAsString(status)); 294 break; 295 case eConnectionStatusInterrupted: // Synchronization signal from 296 // SynchronizeWithReadThread() 297 // The connection returns eConnectionStatusInterrupted only when there is 298 // no input pending to be read, so we can signal that. 299 BroadcastEvent(eBroadcastBitNoMorePendingInput); 300 break; 301 case eConnectionStatusNoConnection: // No connection 302 case eConnectionStatusLostConnection: // Lost connection while connected to 303 // a valid connection 304 done = true; 305 [[fallthrough]]; 306 case eConnectionStatusTimedOut: // Request timed out 307 if (error.Fail()) 308 LLDB_LOG(log, "error: {0}, status = {1}", error, 309 ThreadedCommunication::ConnectionStatusAsString(status)); 310 break; 311 } 312 } 313 m_pass_status = status; 314 m_pass_error = std::move(error); 315 LLDB_LOG(log, "Communication({0}) thread exiting...", this); 316 317 // Start shutting down. We need to do this in a very specific order to ensure 318 // we don't race with threads wanting to read/synchronize with us. 319 320 // First, we signal our intent to exit. This ensures no new thread start 321 // waiting on events from us. 322 m_read_thread_did_exit = true; 323 324 // Unblock any existing thread waiting for the synchronization signal. 325 BroadcastEvent(eBroadcastBitNoMorePendingInput); 326 327 { 328 // Wait for the synchronization thread to finish... 329 std::lock_guard<std::mutex> guard(m_synchronize_mutex); 330 // ... and disconnect. 331 if (disconnect) 332 Disconnect(); 333 } 334 335 // Finally, unblock any readers waiting for us to exit. 336 BroadcastEvent(eBroadcastBitReadThreadDidExit); 337 return {}; 338 } 339 340 void ThreadedCommunication::SetReadThreadBytesReceivedCallback( 341 ReadThreadBytesReceived callback, void *callback_baton) { 342 m_callback = callback; 343 m_callback_baton = callback_baton; 344 } 345 346 void ThreadedCommunication::SynchronizeWithReadThread() { 347 // Only one thread can do the synchronization dance at a time. 348 std::lock_guard<std::mutex> guard(m_synchronize_mutex); 349 350 // First start listening for the synchronization event. 351 ListenerSP listener_sp(Listener::MakeListener( 352 "ThreadedCommunication::SyncronizeWithReadThread")); 353 listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput); 354 355 // If the thread is not running, there is no point in synchronizing. 356 if (!m_read_thread_enabled || m_read_thread_did_exit) 357 return; 358 359 // Notify the read thread. 360 m_connection_sp->InterruptRead(); 361 362 // Wait for the synchronization event. 363 EventSP event_sp; 364 listener_sp->GetEvent(event_sp, std::nullopt); 365 } 366 367 void ThreadedCommunication::SetConnection( 368 std::unique_ptr<Connection> connection) { 369 StopReadThread(nullptr); 370 Communication::SetConnection(std::move(connection)); 371 } 372