1 //===-- LaneBasedExecutionQueue.cpp ---------------------------------------===//
2 //
3 // This source file is part of the Swift.org open source project
4 //
5 // Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors
6 // Licensed under Apache License v2.0 with Runtime Library Exception
7 //
8 // See http://swift.org/LICENSE.txt for license information
9 // See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10 //
11 //===----------------------------------------------------------------------===//
12 
13 #include "llbuild/BuildSystem/BuildExecutionQueue.h"
14 
15 #include "POSIXEnvironment.h"
16 
17 #include "llbuild/Basic/LLVM.h"
18 #include "llbuild/Basic/PlatformUtility.h"
19 #include "llbuild/Basic/Tracing.h"
20 
21 #include "llbuild/BuildSystem/BuildDescription.h"
22 #include "llbuild/BuildSystem/CommandResult.h"
23 
24 #include "llvm/ADT/ArrayRef.h"
25 #include "llvm/ADT/Hashing.h"
26 #include "llvm/ADT/SmallString.h"
27 #include "llvm/ADT/STLExtras.h"
28 #include "llvm/ADT/Twine.h"
29 #include "llvm/Support/Path.h"
30 #include "llvm/Support/Program.h"
31 
32 #include <atomic>
33 #include <condition_variable>
34 #include <deque>
35 #include <mutex>
36 #include <memory>
37 #include <thread>
38 #include <vector>
39 #include <string>
40 #include <unordered_set>
41 
42 #include <fcntl.h>
43 #include <pthread.h>
44 #include <unistd.h>
45 #include <signal.h>
46 #include <spawn.h>
47 #include <sys/resource.h>
48 #include <sys/wait.h>
49 
50 using namespace llbuild;
51 using namespace llbuild::buildsystem;
52 
53 namespace {
54 
55 struct LaneBasedExecutionQueueJobContext {
56   uint32_t laneNumber;
57 
58   QueueJob& job;
59 };
60 
61 /// Build execution queue.
62 //
63 // FIXME: Consider trying to share this with the Ninja implementation.
64 class LaneBasedExecutionQueue : public BuildExecutionQueue {
65   /// The number of lanes the queue was configured with.
66   unsigned numLanes;
67 
68   /// A thread for each lane.
69   std::vector<std::unique_ptr<std::thread>> lanes;
70 
71   /// The ready queue of jobs to execute.
72   std::deque<QueueJob> readyJobs;
73   std::mutex readyJobsMutex;
74   std::condition_variable readyJobsCondition;
75   bool cancelled { false };
76   bool shutdown { false };
77 
78   /// The set of spawned processes to terminate if we get cancelled.
79   std::unordered_set<pid_t> spawnedProcesses;
80   std::mutex spawnedProcessesMutex;
81 
82   /// Management of cancellation and SIGKILL escalation
83   std::unique_ptr<std::thread> killAfterTimeoutThread = nullptr;
84   std::condition_variable queueCompleteCondition;
85   std::mutex queueCompleteMutex;
86   bool queueComplete { false };
87 
88   /// The base environment.
89   const char* const* environment;
90 
executeLane(unsigned laneNumber)91   void executeLane(unsigned laneNumber) {
92     // Set the thread name, if available.
93 #if defined(__APPLE__)
94     pthread_setname_np(
95         (llvm::Twine("org.swift.llbuild Lane-") +
96          llvm::Twine(laneNumber)).str().c_str());
97 #elif defined(__linux__)
98     pthread_setname_np(
99         pthread_self(),
100         (llvm::Twine("org.swift.llbuild Lane-") +
101          llvm::Twine(laneNumber)).str().c_str());
102 #endif
103 
104     // Set the QoS class, if available.
105 #if defined(__APPLE__)
106     pthread_set_qos_class_self_np(QOS_CLASS_UTILITY, 0);
107 #endif
108 
109     // Execute items from the queue until shutdown.
110     while (true) {
111       // Take a job from the ready queue.
112       QueueJob job{};
113       uint64_t readyJobsCount;
114       {
115         std::unique_lock<std::mutex> lock(readyJobsMutex);
116 
117         // While the queue is empty, wait for an item.
118         while (!shutdown && readyJobs.empty()) {
119           readyJobsCondition.wait(lock);
120         }
121         if (shutdown && readyJobs.empty())
122           return;
123 
124         // Take an item according to the chosen policy.
125         job = readyJobs.front();
126         readyJobs.pop_front();
127         readyJobsCount = readyJobs.size();
128       }
129 
130       // If we got an empty job, the queue is shutting down.
131       if (!job.getForCommand())
132         break;
133 
134       // Process the job.
135       LaneBasedExecutionQueueJobContext context{ laneNumber, job };
136       {
137         TracingPoint(TraceEventKind::ExecutionQueueDepth, readyJobsCount);
138         TracingString commandNameID(
139             TraceEventKind::ExecutionQueueJob,
140             job.getForCommand()->getName());
141         TracingInterval i(TraceEventKind::ExecutionQueueJob,
142                           context.laneNumber, commandNameID);
143         getDelegate().commandJobStarted(job.getForCommand());
144         job.execute(reinterpret_cast<QueueJobContext*>(&context));
145         getDelegate().commandJobFinished(job.getForCommand());
146       }
147     }
148   }
149 
killAfterTimeout()150   void killAfterTimeout() {
151     std::unique_lock<std::mutex> lock(queueCompleteMutex);
152 
153     if (!queueComplete) {
154       // Shorten timeout if in testing context
155       if (getenv("LLBUILD_TEST") != nullptr) {
156         queueCompleteCondition.wait_for(lock, std::chrono::milliseconds(1000));
157       } else {
158         queueCompleteCondition.wait_for(lock, std::chrono::seconds(10));
159       }
160 
161       sendSignalToProcesses(SIGKILL);
162     }
163   }
164 
sendSignalToProcesses(int signal)165   void sendSignalToProcesses(int signal) {
166     std::unique_lock<std::mutex> lock(spawnedProcessesMutex);
167 
168     for (pid_t pid: spawnedProcesses) {
169       // We are killing the whole process group here, this depends on us
170       // spawning each process in its own group earlier.
171       ::kill(-pid, signal);
172     }
173   }
174 
175 public:
LaneBasedExecutionQueue(BuildExecutionQueueDelegate & delegate,unsigned numLanes,const char * const * environment)176   LaneBasedExecutionQueue(BuildExecutionQueueDelegate& delegate,
177                           unsigned numLanes,
178                           const char* const* environment)
179       : BuildExecutionQueue(delegate), numLanes(numLanes),
180         environment(environment)
181   {
182     for (unsigned i = 0; i != numLanes; ++i) {
183       lanes.push_back(std::unique_ptr<std::thread>(
184                           new std::thread(
185                               &LaneBasedExecutionQueue::executeLane, this, i)));
186     }
187   }
188 
~LaneBasedExecutionQueue()189   virtual ~LaneBasedExecutionQueue() {
190     // Shut down the lanes.
191     {
192       std::unique_lock<std::mutex> lock(readyJobsMutex);
193       shutdown = true;
194       readyJobsCondition.notify_all();
195     }
196 
197     for (unsigned i = 0; i != numLanes; ++i) {
198       lanes[i]->join();
199     }
200 
201     if (killAfterTimeoutThread) {
202       {
203         std::unique_lock<std::mutex> lock(queueCompleteMutex);
204         queueComplete = true;
205         queueCompleteCondition.notify_all();
206       }
207       killAfterTimeoutThread->join();
208     }
209   }
210 
addJob(QueueJob job)211   virtual void addJob(QueueJob job) override {
212     uint64_t readyJobsCount;
213     {
214       std::lock_guard<std::mutex> guard(readyJobsMutex);
215       readyJobs.push_back(job);
216       readyJobsCondition.notify_one();
217       readyJobsCount = readyJobs.size();
218     }
219     TracingPoint(TraceEventKind::ExecutionQueueDepth, readyJobsCount);
220   }
221 
cancelAllJobs()222   virtual void cancelAllJobs() override {
223     {
224       std::lock_guard<std::mutex> lock(readyJobsMutex);
225       std::lock_guard<std::mutex> guard(spawnedProcessesMutex);
226       if (cancelled) return;
227       cancelled = true;
228       readyJobsCondition.notify_all();
229     }
230 
231     sendSignalToProcesses(SIGINT);
232     killAfterTimeoutThread = llvm::make_unique<std::thread>(
233         &LaneBasedExecutionQueue::killAfterTimeout, this);
234   }
235 
236   virtual CommandResult
executeProcess(QueueJobContext * opaqueContext,ArrayRef<StringRef> commandLine,ArrayRef<std::pair<StringRef,StringRef>> environment,bool inheritEnvironment)237   executeProcess(QueueJobContext* opaqueContext,
238                  ArrayRef<StringRef> commandLine,
239                  ArrayRef<std::pair<StringRef,
240                                     StringRef>> environment,
241                  bool inheritEnvironment) override {
242     LaneBasedExecutionQueueJobContext& context =
243       *reinterpret_cast<LaneBasedExecutionQueueJobContext*>(opaqueContext);
244     TracingInterval subprocessInterval(TraceEventKind::ExecutionQueueSubprocess,
245                                        context.laneNumber);
246 
247     {
248       std::unique_lock<std::mutex> lock(readyJobsMutex);
249       // Do not execute new processes anymore after cancellation.
250       if (cancelled) {
251         return CommandResult::Cancelled;
252       }
253     }
254 
255     // Assign a process handle, which just needs to be unique for as long as we
256     // are communicating with the delegate.
257     struct BuildExecutionQueueDelegate::ProcessHandle handle;
258     handle.id = reinterpret_cast<uintptr_t>(&handle);
259 
260     // Whether or not we are capturing output.
261     const bool shouldCaptureOutput = true;
262 
263     getDelegate().commandProcessStarted(context.job.getForCommand(), handle);
264 
265     // Initialize the spawn attributes.
266     posix_spawnattr_t attributes;
267     posix_spawnattr_init(&attributes);
268 
269     // Unmask all signals.
270     sigset_t noSignals;
271     sigemptyset(&noSignals);
272     posix_spawnattr_setsigmask(&attributes, &noSignals);
273 
274     // Reset all signals to default behavior.
275     //
276     // On Linux, this can only be used to reset signals that are legal to
277     // modify, so we have to take care about the set we use.
278 #if defined(__linux__)
279     sigset_t mostSignals;
280     sigemptyset(&mostSignals);
281     for (int i = 1; i < SIGSYS; ++i) {
282       if (i == SIGKILL || i == SIGSTOP) continue;
283       sigaddset(&mostSignals, i);
284     }
285     posix_spawnattr_setsigdefault(&attributes, &mostSignals);
286 #else
287     sigset_t mostSignals;
288     sigfillset(&mostSignals);
289     sigdelset(&mostSignals, SIGKILL);
290     sigdelset(&mostSignals, SIGSTOP);
291     posix_spawnattr_setsigdefault(&attributes, &mostSignals);
292 #endif
293 
294     // Establish a separate process group.
295     posix_spawnattr_setpgroup(&attributes, 0);
296 
297     // Set the attribute flags.
298     unsigned flags = POSIX_SPAWN_SETSIGMASK | POSIX_SPAWN_SETSIGDEF;
299     flags |= POSIX_SPAWN_SETPGROUP;
300 
301     // Close all other files by default.
302     //
303     // FIXME: Note that this is an Apple-specific extension, and we will have to
304     // do something else on other platforms (and unfortunately, there isn't
305     // really an easy answer other than using a stub executable).
306 #ifdef __APPLE__
307     flags |= POSIX_SPAWN_CLOEXEC_DEFAULT;
308 #endif
309 
310     posix_spawnattr_setflags(&attributes, flags);
311 
312     // Setup the file actions.
313     posix_spawn_file_actions_t fileActions;
314     posix_spawn_file_actions_init(&fileActions);
315 
316     // Open /dev/null as stdin.
317     posix_spawn_file_actions_addopen(
318         &fileActions, 0, "/dev/null", O_RDONLY, 0);
319 
320     // If we are capturing output, create a pipe and appropriate spawn actions.
321     int outputPipe[2]{ -1, -1 };
322     if (shouldCaptureOutput) {
323       if (basic::sys::pipe(outputPipe) < 0) {
324         getDelegate().commandProcessHadError(
325             context.job.getForCommand(), handle,
326             Twine("unable to open output pipe (") + strerror(errno) + ")");
327         getDelegate().commandProcessFinished(context.job.getForCommand(),
328                                              handle, CommandResult::Failed, -1);
329         return CommandResult::Failed;
330       }
331 
332       // Open the write end of the pipe as stdout and stderr.
333       posix_spawn_file_actions_adddup2(&fileActions, outputPipe[1], 1);
334       posix_spawn_file_actions_adddup2(&fileActions, outputPipe[1], 2);
335 
336       // Close the read and write ends of the pipe.
337       posix_spawn_file_actions_addclose(&fileActions, outputPipe[0]);
338       posix_spawn_file_actions_addclose(&fileActions, outputPipe[1]);
339     } else {
340       // Otherwise, propagate the current stdout/stderr.
341       posix_spawn_file_actions_adddup2(&fileActions, 1, 1);
342       posix_spawn_file_actions_adddup2(&fileActions, 2, 2);
343     }
344 
345     // Form the complete C string command line.
346     std::vector<std::string> argsStorage(
347         commandLine.begin(), commandLine.end());
348     std::vector<const char*> args(argsStorage.size() + 1);
349     for (size_t i = 0; i != argsStorage.size(); ++i) {
350       args[i] = argsStorage[i].c_str();
351     }
352     args[argsStorage.size()] = nullptr;
353 
354     // Form the complete environment.
355     //
356     // NOTE: We construct the environment in order of precedence, so
357     // overridden keys should be defined first.
358     POSIXEnvironment posixEnv;
359 
360     // Export a task ID to subprocesses.
361     //
362     // We currently only export the lane ID, but eventually will export a unique
363     // task ID for SR-6053.
364     posixEnv.setIfMissing("LLBUILD_TASK_ID", Twine(context.laneNumber).str());
365 
366     // Add the requested environment.
367     for (const auto& entry: environment) {
368       posixEnv.setIfMissing(entry.first, entry.second);
369     }
370 
371     // Inherit the base environment, if desired.
372     //
373     // FIXME: This involves a lot of redundant allocation, currently. We could
374     // cache this for the common case of a directly inherited environment.
375     if (inheritEnvironment) {
376       for (const char* const* p = this->environment; *p != nullptr; ++p) {
377         auto pair = StringRef(*p).split('=');
378         posixEnv.setIfMissing(pair.first, pair.second);
379       }
380     }
381 
382     // Resolve the executable path, if necessary.
383     //
384     // FIXME: This should be cached.
385     if (!llvm::sys::path::is_absolute(args[0])) {
386       auto res = llvm::sys::findProgramByName(args[0]);
387       if (!res.getError()) {
388         argsStorage[0] = *res;
389         args[0] = argsStorage[0].c_str();
390       }
391     }
392 
393     // Spawn the command.
394     pid_t pid = -1;
395     bool wasCancelled;
396     {
397       // We need to hold the spawn processes lock when we spawn, to ensure that
398       // we don't create a process in between when we are cancelled.
399       std::lock_guard<std::mutex> guard(spawnedProcessesMutex);
400       wasCancelled = cancelled;
401 
402       // If we have been cancelled since we started, do nothing.
403       if (!wasCancelled) {
404         if (posix_spawn(&pid, args[0], /*file_actions=*/&fileActions,
405                         /*attrp=*/&attributes, const_cast<char**>(args.data()),
406                         const_cast<char* const*>(posixEnv.getEnvp())) != 0) {
407           getDelegate().commandProcessHadError(
408               context.job.getForCommand(), handle,
409               Twine("unable to spawn process (") + strerror(errno) + ")");
410           getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
411                                                CommandResult::Failed, -1);
412           pid = -1;
413         } else {
414           spawnedProcesses.insert(pid);
415         }
416       }
417     }
418 
419     posix_spawn_file_actions_destroy(&fileActions);
420     posix_spawnattr_destroy(&attributes);
421 
422     // If we failed to launch a process, clean up and abort.
423     if (pid == -1) {
424       if (shouldCaptureOutput) {
425         ::close(outputPipe[1]);
426         ::close(outputPipe[0]);
427       }
428       return wasCancelled ? CommandResult::Cancelled : CommandResult::Failed;
429     }
430 
431     // Read the command output, if capturing.
432     if (shouldCaptureOutput) {
433       // Close the write end of the output pipe.
434       ::close(outputPipe[1]);
435 
436       // Read all the data from the output pipe.
437       while (true) {
438         char buf[4096];
439         ssize_t numBytes = read(outputPipe[0], buf, sizeof(buf));
440         if (numBytes < 0) {
441           getDelegate().commandProcessHadError(
442               context.job.getForCommand(), handle,
443               Twine("unable to read process output (") + strerror(errno) + ")");
444           break;
445         }
446 
447         if (numBytes == 0)
448           break;
449 
450         // Notify the client of the output.
451         getDelegate().commandProcessHadOutput(
452             context.job.getForCommand(), handle,
453             StringRef(buf, numBytes));
454       }
455 
456       // Close the read end of the pipe.
457       ::close(outputPipe[0]);
458     }
459 
460     // Wait for the command to complete.
461     struct rusage usage;
462     int status, result = wait4(pid, &status, 0, &usage);
463     while (result == -1 && errno == EINTR)
464       result = wait4(pid, &status, 0, &usage);
465 
466     // Update the set of spawned processes.
467     {
468         std::lock_guard<std::mutex> guard(spawnedProcessesMutex);
469         spawnedProcesses.erase(pid);
470     }
471 
472     if (result == -1) {
473       getDelegate().commandProcessHadError(
474           context.job.getForCommand(), handle,
475           Twine("unable to wait for process (") + strerror(errno) + ")");
476       getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
477                                            CommandResult::Failed, -1);
478       return CommandResult::Failed;
479     }
480 
481     // We report additional info in the tracing interval
482     //   arg2: user time, in us
483     //   arg3: sys time, in us
484     //   arg4: memory usage, in bytes
485     subprocessInterval.arg2 = (uint64_t(usage.ru_utime.tv_sec) * 1000000000 +
486                                uint64_t(usage.ru_utime.tv_usec) * 1000);
487     subprocessInterval.arg3 = (uint64_t(usage.ru_stime.tv_sec) * 1000000000 +
488                                uint64_t(usage.ru_stime.tv_usec) * 1000);
489     subprocessInterval.arg4 = usage.ru_maxrss;
490 
491     // FIXME: We should report a statistic for how much output we read from the
492     // subprocess (probably as a new point sample).
493 
494     // Notify of the process completion.
495     bool cancelled = WIFSIGNALED(status) && (WTERMSIG(status) == SIGINT || WTERMSIG(status) == SIGKILL);
496     CommandResult commandResult = cancelled ? CommandResult::Cancelled : (status == 0) ? CommandResult::Succeeded : CommandResult::Failed;
497     getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
498                                          commandResult, status);
499     return commandResult;
500   }
501 };
502 
503 }
504 
505 #if !defined(_WIN32)
506 extern "C" {
507   extern char **environ;
508 }
509 #endif
510 
511 BuildExecutionQueue*
createLaneBasedExecutionQueue(BuildExecutionQueueDelegate & delegate,int numLanes,const char * const * environment)512 llbuild::buildsystem::createLaneBasedExecutionQueue(
513     BuildExecutionQueueDelegate& delegate, int numLanes,
514     const char* const* environment) {
515   if (!environment) {
516     environment = const_cast<const char* const*>(environ);
517   }
518   return new LaneBasedExecutionQueue(delegate, numLanes, environment);
519 }
520