1 //===-- LaneBasedExecutionQueue.cpp ---------------------------------------===//
2 //
3 // This source file is part of the Swift.org open source project
4 //
5 // Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors
6 // Licensed under Apache License v2.0 with Runtime Library Exception
7 //
8 // See http://swift.org/LICENSE.txt for license information
9 // See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10 //
11 //===----------------------------------------------------------------------===//
12
13 #include "llbuild/BuildSystem/BuildExecutionQueue.h"
14
15 #include "POSIXEnvironment.h"
16
17 #include "llbuild/Basic/LLVM.h"
18 #include "llbuild/Basic/PlatformUtility.h"
19 #include "llbuild/Basic/Tracing.h"
20
21 #include "llbuild/BuildSystem/BuildDescription.h"
22 #include "llbuild/BuildSystem/CommandResult.h"
23
24 #include "llvm/ADT/ArrayRef.h"
25 #include "llvm/ADT/Hashing.h"
26 #include "llvm/ADT/SmallString.h"
27 #include "llvm/ADT/STLExtras.h"
28 #include "llvm/ADT/Twine.h"
29 #include "llvm/Support/Path.h"
30 #include "llvm/Support/Program.h"
31
32 #include <atomic>
33 #include <condition_variable>
34 #include <deque>
35 #include <mutex>
36 #include <memory>
37 #include <thread>
38 #include <vector>
39 #include <string>
40 #include <unordered_set>
41
42 #include <fcntl.h>
43 #include <pthread.h>
44 #include <unistd.h>
45 #include <signal.h>
46 #include <spawn.h>
47 #include <sys/resource.h>
48 #include <sys/wait.h>
49
50 using namespace llbuild;
51 using namespace llbuild::buildsystem;
52
53 namespace {
54
55 struct LaneBasedExecutionQueueJobContext {
56 uint32_t laneNumber;
57
58 QueueJob& job;
59 };
60
61 /// Build execution queue.
62 //
63 // FIXME: Consider trying to share this with the Ninja implementation.
64 class LaneBasedExecutionQueue : public BuildExecutionQueue {
65 /// The number of lanes the queue was configured with.
66 unsigned numLanes;
67
68 /// A thread for each lane.
69 std::vector<std::unique_ptr<std::thread>> lanes;
70
71 /// The ready queue of jobs to execute.
72 std::deque<QueueJob> readyJobs;
73 std::mutex readyJobsMutex;
74 std::condition_variable readyJobsCondition;
75 bool cancelled { false };
76 bool shutdown { false };
77
78 /// The set of spawned processes to terminate if we get cancelled.
79 std::unordered_set<pid_t> spawnedProcesses;
80 std::mutex spawnedProcessesMutex;
81
82 /// Management of cancellation and SIGKILL escalation
83 std::unique_ptr<std::thread> killAfterTimeoutThread = nullptr;
84 std::condition_variable queueCompleteCondition;
85 std::mutex queueCompleteMutex;
86 bool queueComplete { false };
87
88 /// The base environment.
89 const char* const* environment;
90
executeLane(unsigned laneNumber)91 void executeLane(unsigned laneNumber) {
92 // Set the thread name, if available.
93 #if defined(__APPLE__)
94 pthread_setname_np(
95 (llvm::Twine("org.swift.llbuild Lane-") +
96 llvm::Twine(laneNumber)).str().c_str());
97 #elif defined(__linux__)
98 pthread_setname_np(
99 pthread_self(),
100 (llvm::Twine("org.swift.llbuild Lane-") +
101 llvm::Twine(laneNumber)).str().c_str());
102 #endif
103
104 // Set the QoS class, if available.
105 #if defined(__APPLE__)
106 pthread_set_qos_class_self_np(QOS_CLASS_UTILITY, 0);
107 #endif
108
109 // Execute items from the queue until shutdown.
110 while (true) {
111 // Take a job from the ready queue.
112 QueueJob job{};
113 uint64_t readyJobsCount;
114 {
115 std::unique_lock<std::mutex> lock(readyJobsMutex);
116
117 // While the queue is empty, wait for an item.
118 while (!shutdown && readyJobs.empty()) {
119 readyJobsCondition.wait(lock);
120 }
121 if (shutdown && readyJobs.empty())
122 return;
123
124 // Take an item according to the chosen policy.
125 job = readyJobs.front();
126 readyJobs.pop_front();
127 readyJobsCount = readyJobs.size();
128 }
129
130 // If we got an empty job, the queue is shutting down.
131 if (!job.getForCommand())
132 break;
133
134 // Process the job.
135 LaneBasedExecutionQueueJobContext context{ laneNumber, job };
136 {
137 TracingPoint(TraceEventKind::ExecutionQueueDepth, readyJobsCount);
138 TracingString commandNameID(
139 TraceEventKind::ExecutionQueueJob,
140 job.getForCommand()->getName());
141 TracingInterval i(TraceEventKind::ExecutionQueueJob,
142 context.laneNumber, commandNameID);
143 getDelegate().commandJobStarted(job.getForCommand());
144 job.execute(reinterpret_cast<QueueJobContext*>(&context));
145 getDelegate().commandJobFinished(job.getForCommand());
146 }
147 }
148 }
149
killAfterTimeout()150 void killAfterTimeout() {
151 std::unique_lock<std::mutex> lock(queueCompleteMutex);
152
153 if (!queueComplete) {
154 // Shorten timeout if in testing context
155 if (getenv("LLBUILD_TEST") != nullptr) {
156 queueCompleteCondition.wait_for(lock, std::chrono::milliseconds(1000));
157 } else {
158 queueCompleteCondition.wait_for(lock, std::chrono::seconds(10));
159 }
160
161 sendSignalToProcesses(SIGKILL);
162 }
163 }
164
sendSignalToProcesses(int signal)165 void sendSignalToProcesses(int signal) {
166 std::unique_lock<std::mutex> lock(spawnedProcessesMutex);
167
168 for (pid_t pid: spawnedProcesses) {
169 // We are killing the whole process group here, this depends on us
170 // spawning each process in its own group earlier.
171 ::kill(-pid, signal);
172 }
173 }
174
175 public:
LaneBasedExecutionQueue(BuildExecutionQueueDelegate & delegate,unsigned numLanes,const char * const * environment)176 LaneBasedExecutionQueue(BuildExecutionQueueDelegate& delegate,
177 unsigned numLanes,
178 const char* const* environment)
179 : BuildExecutionQueue(delegate), numLanes(numLanes),
180 environment(environment)
181 {
182 for (unsigned i = 0; i != numLanes; ++i) {
183 lanes.push_back(std::unique_ptr<std::thread>(
184 new std::thread(
185 &LaneBasedExecutionQueue::executeLane, this, i)));
186 }
187 }
188
~LaneBasedExecutionQueue()189 virtual ~LaneBasedExecutionQueue() {
190 // Shut down the lanes.
191 {
192 std::unique_lock<std::mutex> lock(readyJobsMutex);
193 shutdown = true;
194 readyJobsCondition.notify_all();
195 }
196
197 for (unsigned i = 0; i != numLanes; ++i) {
198 lanes[i]->join();
199 }
200
201 if (killAfterTimeoutThread) {
202 {
203 std::unique_lock<std::mutex> lock(queueCompleteMutex);
204 queueComplete = true;
205 queueCompleteCondition.notify_all();
206 }
207 killAfterTimeoutThread->join();
208 }
209 }
210
addJob(QueueJob job)211 virtual void addJob(QueueJob job) override {
212 uint64_t readyJobsCount;
213 {
214 std::lock_guard<std::mutex> guard(readyJobsMutex);
215 readyJobs.push_back(job);
216 readyJobsCondition.notify_one();
217 readyJobsCount = readyJobs.size();
218 }
219 TracingPoint(TraceEventKind::ExecutionQueueDepth, readyJobsCount);
220 }
221
cancelAllJobs()222 virtual void cancelAllJobs() override {
223 {
224 std::lock_guard<std::mutex> lock(readyJobsMutex);
225 std::lock_guard<std::mutex> guard(spawnedProcessesMutex);
226 if (cancelled) return;
227 cancelled = true;
228 readyJobsCondition.notify_all();
229 }
230
231 sendSignalToProcesses(SIGINT);
232 killAfterTimeoutThread = llvm::make_unique<std::thread>(
233 &LaneBasedExecutionQueue::killAfterTimeout, this);
234 }
235
236 virtual CommandResult
executeProcess(QueueJobContext * opaqueContext,ArrayRef<StringRef> commandLine,ArrayRef<std::pair<StringRef,StringRef>> environment,bool inheritEnvironment)237 executeProcess(QueueJobContext* opaqueContext,
238 ArrayRef<StringRef> commandLine,
239 ArrayRef<std::pair<StringRef,
240 StringRef>> environment,
241 bool inheritEnvironment) override {
242 LaneBasedExecutionQueueJobContext& context =
243 *reinterpret_cast<LaneBasedExecutionQueueJobContext*>(opaqueContext);
244 TracingInterval subprocessInterval(TraceEventKind::ExecutionQueueSubprocess,
245 context.laneNumber);
246
247 {
248 std::unique_lock<std::mutex> lock(readyJobsMutex);
249 // Do not execute new processes anymore after cancellation.
250 if (cancelled) {
251 return CommandResult::Cancelled;
252 }
253 }
254
255 // Assign a process handle, which just needs to be unique for as long as we
256 // are communicating with the delegate.
257 struct BuildExecutionQueueDelegate::ProcessHandle handle;
258 handle.id = reinterpret_cast<uintptr_t>(&handle);
259
260 // Whether or not we are capturing output.
261 const bool shouldCaptureOutput = true;
262
263 getDelegate().commandProcessStarted(context.job.getForCommand(), handle);
264
265 // Initialize the spawn attributes.
266 posix_spawnattr_t attributes;
267 posix_spawnattr_init(&attributes);
268
269 // Unmask all signals.
270 sigset_t noSignals;
271 sigemptyset(&noSignals);
272 posix_spawnattr_setsigmask(&attributes, &noSignals);
273
274 // Reset all signals to default behavior.
275 //
276 // On Linux, this can only be used to reset signals that are legal to
277 // modify, so we have to take care about the set we use.
278 #if defined(__linux__)
279 sigset_t mostSignals;
280 sigemptyset(&mostSignals);
281 for (int i = 1; i < SIGSYS; ++i) {
282 if (i == SIGKILL || i == SIGSTOP) continue;
283 sigaddset(&mostSignals, i);
284 }
285 posix_spawnattr_setsigdefault(&attributes, &mostSignals);
286 #else
287 sigset_t mostSignals;
288 sigfillset(&mostSignals);
289 sigdelset(&mostSignals, SIGKILL);
290 sigdelset(&mostSignals, SIGSTOP);
291 posix_spawnattr_setsigdefault(&attributes, &mostSignals);
292 #endif
293
294 // Establish a separate process group.
295 posix_spawnattr_setpgroup(&attributes, 0);
296
297 // Set the attribute flags.
298 unsigned flags = POSIX_SPAWN_SETSIGMASK | POSIX_SPAWN_SETSIGDEF;
299 flags |= POSIX_SPAWN_SETPGROUP;
300
301 // Close all other files by default.
302 //
303 // FIXME: Note that this is an Apple-specific extension, and we will have to
304 // do something else on other platforms (and unfortunately, there isn't
305 // really an easy answer other than using a stub executable).
306 #ifdef __APPLE__
307 flags |= POSIX_SPAWN_CLOEXEC_DEFAULT;
308 #endif
309
310 posix_spawnattr_setflags(&attributes, flags);
311
312 // Setup the file actions.
313 posix_spawn_file_actions_t fileActions;
314 posix_spawn_file_actions_init(&fileActions);
315
316 // Open /dev/null as stdin.
317 posix_spawn_file_actions_addopen(
318 &fileActions, 0, "/dev/null", O_RDONLY, 0);
319
320 // If we are capturing output, create a pipe and appropriate spawn actions.
321 int outputPipe[2]{ -1, -1 };
322 if (shouldCaptureOutput) {
323 if (basic::sys::pipe(outputPipe) < 0) {
324 getDelegate().commandProcessHadError(
325 context.job.getForCommand(), handle,
326 Twine("unable to open output pipe (") + strerror(errno) + ")");
327 getDelegate().commandProcessFinished(context.job.getForCommand(),
328 handle, CommandResult::Failed, -1);
329 return CommandResult::Failed;
330 }
331
332 // Open the write end of the pipe as stdout and stderr.
333 posix_spawn_file_actions_adddup2(&fileActions, outputPipe[1], 1);
334 posix_spawn_file_actions_adddup2(&fileActions, outputPipe[1], 2);
335
336 // Close the read and write ends of the pipe.
337 posix_spawn_file_actions_addclose(&fileActions, outputPipe[0]);
338 posix_spawn_file_actions_addclose(&fileActions, outputPipe[1]);
339 } else {
340 // Otherwise, propagate the current stdout/stderr.
341 posix_spawn_file_actions_adddup2(&fileActions, 1, 1);
342 posix_spawn_file_actions_adddup2(&fileActions, 2, 2);
343 }
344
345 // Form the complete C string command line.
346 std::vector<std::string> argsStorage(
347 commandLine.begin(), commandLine.end());
348 std::vector<const char*> args(argsStorage.size() + 1);
349 for (size_t i = 0; i != argsStorage.size(); ++i) {
350 args[i] = argsStorage[i].c_str();
351 }
352 args[argsStorage.size()] = nullptr;
353
354 // Form the complete environment.
355 //
356 // NOTE: We construct the environment in order of precedence, so
357 // overridden keys should be defined first.
358 POSIXEnvironment posixEnv;
359
360 // Export a task ID to subprocesses.
361 //
362 // We currently only export the lane ID, but eventually will export a unique
363 // task ID for SR-6053.
364 posixEnv.setIfMissing("LLBUILD_TASK_ID", Twine(context.laneNumber).str());
365
366 // Add the requested environment.
367 for (const auto& entry: environment) {
368 posixEnv.setIfMissing(entry.first, entry.second);
369 }
370
371 // Inherit the base environment, if desired.
372 //
373 // FIXME: This involves a lot of redundant allocation, currently. We could
374 // cache this for the common case of a directly inherited environment.
375 if (inheritEnvironment) {
376 for (const char* const* p = this->environment; *p != nullptr; ++p) {
377 auto pair = StringRef(*p).split('=');
378 posixEnv.setIfMissing(pair.first, pair.second);
379 }
380 }
381
382 // Resolve the executable path, if necessary.
383 //
384 // FIXME: This should be cached.
385 if (!llvm::sys::path::is_absolute(args[0])) {
386 auto res = llvm::sys::findProgramByName(args[0]);
387 if (!res.getError()) {
388 argsStorage[0] = *res;
389 args[0] = argsStorage[0].c_str();
390 }
391 }
392
393 // Spawn the command.
394 pid_t pid = -1;
395 bool wasCancelled;
396 {
397 // We need to hold the spawn processes lock when we spawn, to ensure that
398 // we don't create a process in between when we are cancelled.
399 std::lock_guard<std::mutex> guard(spawnedProcessesMutex);
400 wasCancelled = cancelled;
401
402 // If we have been cancelled since we started, do nothing.
403 if (!wasCancelled) {
404 if (posix_spawn(&pid, args[0], /*file_actions=*/&fileActions,
405 /*attrp=*/&attributes, const_cast<char**>(args.data()),
406 const_cast<char* const*>(posixEnv.getEnvp())) != 0) {
407 getDelegate().commandProcessHadError(
408 context.job.getForCommand(), handle,
409 Twine("unable to spawn process (") + strerror(errno) + ")");
410 getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
411 CommandResult::Failed, -1);
412 pid = -1;
413 } else {
414 spawnedProcesses.insert(pid);
415 }
416 }
417 }
418
419 posix_spawn_file_actions_destroy(&fileActions);
420 posix_spawnattr_destroy(&attributes);
421
422 // If we failed to launch a process, clean up and abort.
423 if (pid == -1) {
424 if (shouldCaptureOutput) {
425 ::close(outputPipe[1]);
426 ::close(outputPipe[0]);
427 }
428 return wasCancelled ? CommandResult::Cancelled : CommandResult::Failed;
429 }
430
431 // Read the command output, if capturing.
432 if (shouldCaptureOutput) {
433 // Close the write end of the output pipe.
434 ::close(outputPipe[1]);
435
436 // Read all the data from the output pipe.
437 while (true) {
438 char buf[4096];
439 ssize_t numBytes = read(outputPipe[0], buf, sizeof(buf));
440 if (numBytes < 0) {
441 getDelegate().commandProcessHadError(
442 context.job.getForCommand(), handle,
443 Twine("unable to read process output (") + strerror(errno) + ")");
444 break;
445 }
446
447 if (numBytes == 0)
448 break;
449
450 // Notify the client of the output.
451 getDelegate().commandProcessHadOutput(
452 context.job.getForCommand(), handle,
453 StringRef(buf, numBytes));
454 }
455
456 // Close the read end of the pipe.
457 ::close(outputPipe[0]);
458 }
459
460 // Wait for the command to complete.
461 struct rusage usage;
462 int status, result = wait4(pid, &status, 0, &usage);
463 while (result == -1 && errno == EINTR)
464 result = wait4(pid, &status, 0, &usage);
465
466 // Update the set of spawned processes.
467 {
468 std::lock_guard<std::mutex> guard(spawnedProcessesMutex);
469 spawnedProcesses.erase(pid);
470 }
471
472 if (result == -1) {
473 getDelegate().commandProcessHadError(
474 context.job.getForCommand(), handle,
475 Twine("unable to wait for process (") + strerror(errno) + ")");
476 getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
477 CommandResult::Failed, -1);
478 return CommandResult::Failed;
479 }
480
481 // We report additional info in the tracing interval
482 // arg2: user time, in us
483 // arg3: sys time, in us
484 // arg4: memory usage, in bytes
485 subprocessInterval.arg2 = (uint64_t(usage.ru_utime.tv_sec) * 1000000000 +
486 uint64_t(usage.ru_utime.tv_usec) * 1000);
487 subprocessInterval.arg3 = (uint64_t(usage.ru_stime.tv_sec) * 1000000000 +
488 uint64_t(usage.ru_stime.tv_usec) * 1000);
489 subprocessInterval.arg4 = usage.ru_maxrss;
490
491 // FIXME: We should report a statistic for how much output we read from the
492 // subprocess (probably as a new point sample).
493
494 // Notify of the process completion.
495 bool cancelled = WIFSIGNALED(status) && (WTERMSIG(status) == SIGINT || WTERMSIG(status) == SIGKILL);
496 CommandResult commandResult = cancelled ? CommandResult::Cancelled : (status == 0) ? CommandResult::Succeeded : CommandResult::Failed;
497 getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
498 commandResult, status);
499 return commandResult;
500 }
501 };
502
503 }
504
505 #if !defined(_WIN32)
506 extern "C" {
507 extern char **environ;
508 }
509 #endif
510
511 BuildExecutionQueue*
createLaneBasedExecutionQueue(BuildExecutionQueueDelegate & delegate,int numLanes,const char * const * environment)512 llbuild::buildsystem::createLaneBasedExecutionQueue(
513 BuildExecutionQueueDelegate& delegate, int numLanes,
514 const char* const* environment) {
515 if (!environment) {
516 environment = const_cast<const char* const*>(environ);
517 }
518 return new LaneBasedExecutionQueue(delegate, numLanes, environment);
519 }
520