1 /* Copyright 2017-present Facebook, Inc.
2  * Licensed under the Apache License, Version 2.0 */
3 #include "ChildProcess.h"
4 #include <system_error>
5 #include <thread>
6 #include "Future.h"
7 #include "Logging.h"
8 #include "make_unique.h"
9 #include "watchman_scopeguard.h"
10 
11 namespace watchman {
12 
13 ChildProcess::Environment::Environment() {
14   // Construct the map from the current process environment
15   uint32_t nenv, i;
16   const char* eq;
17   const char* ent;
18 
19   for (i = 0, nenv = 0; environ[i]; i++) {
20     nenv++;
21   }
22 
23   map_.reserve(nenv);
24 
25   for (i = 0; environ[i]; i++) {
26     ent = environ[i];
27     eq = strchr(ent, '=');
28     if (!eq) {
29       continue;
30     }
31 
32     // slice name=value into a key and a value string
33     w_string str(ent, W_STRING_BYTE);
34     auto key = str.slice(0, (uint32_t)(eq - ent));
35     auto val = str.slice(
36         1 + (uint32_t)(eq - ent), (uint32_t)(str.size() - (key.size() + 1)));
37 
38     // Replace rather than set, just in case we somehow have duplicate
39     // keys in our environment array.
40     map_[key] = val;
41   }
42 }
43 
44 ChildProcess::Environment::Environment(
45     const std::unordered_map<w_string, w_string>& map)
46     : map_(map) {}
47 
48 /* Constructs an envp array from a hash table.
49  * The returned array occupies a single contiguous block of memory
50  * such that it can be released by a single call to free(3).
51  * The last element of the returned array is set to NULL for compatibility
52  * with posix_spawn() */
53 std::unique_ptr<char*, ChildProcess::Deleter>
54 ChildProcess::Environment::asEnviron(size_t* env_size) const {
55   size_t len = (1 + map_.size()) * sizeof(char*);
56 
57   // Make a pass through to compute the required memory size
58   for (const auto& it : map_) {
59     const auto& key = it.first;
60     const auto& val = it.second;
61 
62     // key=value\0
63     len += key.size() + 1 + val.size() + 1;
64   }
65 
66   auto envp = (char**)malloc(len);
67   if (!envp) {
68     throw std::bad_alloc();
69   }
70   auto result = std::unique_ptr<char*, Deleter>(envp, Deleter());
71 
72   // Now populate
73   auto buf = (char*)(envp + map_.size() + 1);
74   size_t i = 0;
75   for (const auto& it : map_) {
76     const auto& key = it.first;
77     const auto& val = it.second;
78 
79     envp[i++] = buf;
80 
81     // key=value\0
82     memcpy(buf, key.data(), key.size());
83     buf += key.size();
84 
85     memcpy(buf, "=", 1);
86     buf++;
87 
88     memcpy(buf, val.data(), val.size());
89     buf += val.size();
90 
91     *buf = 0;
92     buf++;
93   }
94 
95   envp[map_.size()] = nullptr;
96 
97   if (env_size) {
98     *env_size = len;
99   }
100   return result;
101 }
102 
103 void ChildProcess::Environment::set(const w_string& key, const w_string& val) {
104   map_[key] = val;
105 }
106 
107 void ChildProcess::Environment::set(const w_string& key, bool bval) {
108   if (bval) {
109     map_[key] = "true";
110   } else {
111     map_.erase(key);
112   }
113 }
114 
115 void ChildProcess::Environment::set(
116     std::initializer_list<std::pair<w_string_piece, w_string_piece>> pairs) {
117   for (auto& pair : pairs) {
118     set(pair.first.asWString(), pair.second.asWString());
119   }
120 }
121 
122 void ChildProcess::Environment::unset(const w_string& key) {
123   map_.erase(key);
124 }
125 
126 ChildProcess::Options::Options() : inner_(make_unique<Inner>()) {
127 #ifdef POSIX_SPAWN_CLOEXEC_DEFAULT
128   setFlags(POSIX_SPAWN_CLOEXEC_DEFAULT);
129 #endif
130 }
131 
132 ChildProcess::Options::Inner::Inner() {
133   posix_spawnattr_init(&attr);
134   posix_spawn_file_actions_init(&actions);
135 }
136 
137 ChildProcess::Options::Inner::~Inner() {
138   posix_spawn_file_actions_destroy(&actions);
139   posix_spawnattr_destroy(&attr);
140 }
141 
142 void ChildProcess::Options::setFlags(short flags) {
143   short currentFlags;
144   auto err = posix_spawnattr_getflags(&inner_->attr, &currentFlags);
145   if (err) {
146     throw std::system_error(
147         err, std::generic_category(), "posix_spawnattr_getflags");
148   }
149   err = posix_spawnattr_setflags(&inner_->attr, currentFlags | flags);
150   if (err) {
151     throw std::system_error(
152         err, std::generic_category(), "posix_spawnattr_setflags");
153   }
154 }
155 
156 #ifdef POSIX_SPAWN_SETSIGMASK
157 void ChildProcess::Options::setSigMask(const sigset_t& mask) {
158   posix_spawnattr_setsigmask(&inner_->attr, &mask);
159   setFlags(POSIX_SPAWN_SETSIGMASK);
160 }
161 #endif
162 
163 ChildProcess::Environment& ChildProcess::Options::environment() {
164   return env_;
165 }
166 
167 void ChildProcess::Options::dup2(int fd, int targetFd) {
168   auto err = posix_spawn_file_actions_adddup2(&inner_->actions, fd, targetFd);
169   if (err) {
170     throw std::system_error(
171         err, std::generic_category(), "posix_spawn_file_actions_adddup2");
172   }
173 }
174 
175 void ChildProcess::Options::dup2(const FileDescriptor& fd, int targetFd) {
176 #ifdef _WIN32
177   auto err = posix_spawn_file_actions_adddup2_handle_np(
178       &inner_->actions, fd.handle(), targetFd);
179   if (err) {
180     throw std::system_error(
181         err,
182         std::generic_category(),
183         "posix_spawn_file_actions_adddup2_handle_np");
184   }
185 #else
186   auto err =
187       posix_spawn_file_actions_adddup2(&inner_->actions, fd.fd(), targetFd);
188   if (err) {
189     throw std::system_error(
190         err, std::generic_category(), "posix_spawn_file_actions_adddup2");
191   }
192 #endif
193 }
194 
195 void ChildProcess::Options::open(
196     int targetFd,
197     const char* path,
198     int flags,
199     int mode) {
200   auto err = posix_spawn_file_actions_addopen(
201       &inner_->actions, targetFd, path, flags, mode);
202   if (err) {
203     throw std::system_error(
204         err, std::generic_category(), "posix_spawn_file_actions_addopen");
205   }
206 }
207 
208 void ChildProcess::Options::pipe(int targetFd, bool childRead) {
209   if (pipes_.find(targetFd) != pipes_.end()) {
210     throw std::runtime_error("targetFd is already present in pipes map");
211   }
212 
213   auto result = pipes_.emplace(std::make_pair(targetFd, make_unique<Pipe>()));
214   auto pipe = result.first->second.get();
215 
216 #ifndef _WIN32
217   pipe->read.clearNonBlock();
218   pipe->write.clearNonBlock();
219 #endif
220 
221   dup2(childRead ? pipe->read : pipe->write, targetFd);
222 }
223 
224 void ChildProcess::Options::pipeStdin() {
225   pipe(STDIN_FILENO, true);
226 }
227 
228 void ChildProcess::Options::pipeStdout() {
229   pipe(STDOUT_FILENO, false);
230 }
231 
232 void ChildProcess::Options::pipeStderr() {
233   pipe(STDERR_FILENO, false);
234 }
235 
236 void ChildProcess::Options::nullStdin() {
237 #ifdef _WIN32
238   open(STDIN_FILENO, "NUL", O_RDONLY, 0);
239 #else
240   open(STDIN_FILENO, "/dev/null", O_RDONLY, 0);
241 #endif
242 }
243 
244 void ChildProcess::Options::chdir(w_string_piece path) {
245   cwd_ = std::string(path.data(), path.size());
246 #ifdef _WIN32
247   posix_spawnattr_setcwd_np(&inner_->attr, cwd_.c_str());
248 #endif
249 }
250 
251 static std::vector<w_string_piece> json_args_to_string_vec(
252     const json_ref& args) {
253   std::vector<w_string_piece> vec;
254 
255   for (auto& arg : args.array()) {
256     vec.emplace_back(json_to_w_string(arg));
257   }
258 
259   return vec;
260 }
261 
262 ChildProcess::ChildProcess(const json_ref& args, Options&& options)
263     : ChildProcess(json_args_to_string_vec(args), std::move(options)) {}
264 
265 ChildProcess::ChildProcess(std::vector<w_string_piece> args, Options&& options)
266     : pipes_(std::move(options.pipes_)) {
267   std::vector<char*> argv;
268   std::vector<std::string> argStrings;
269 
270   argStrings.reserve(args.size());
271   argv.reserve(args.size() + 1);
272 
273   for (auto& str : args) {
274     argStrings.emplace_back(str.data(), str.size());
275     argv.emplace_back(&argStrings.back()[0]);
276   }
277   argv.emplace_back(nullptr);
278 
279 #ifndef _WIN32
280   auto lock = lockCwdMutex();
281   char savedCwd[WATCHMAN_NAME_MAX];
282   if (!getcwd(savedCwd, sizeof(savedCwd))) {
283     throw std::system_error(errno, std::generic_category(), "failed to getcwd");
284   }
285   SCOPE_EXIT {
286     if (!options.cwd_.empty()) {
287       if (chdir(savedCwd) != 0) {
288         // log(FATAL) rather than throw because SCOPE_EXIT is
289         // a noexcept destructor and will call std::terminate
290         // in this case anyway.
291         log(FATAL, "failed to restore cwd of ", savedCwd);
292       }
293     }
294   };
295 
296   if (!options.cwd_.empty()) {
297     if (chdir(options.cwd_.c_str()) != 0) {
298       throw std::system_error(
299           errno,
300           std::generic_category(),
301           watchman::to<std::string>("failed to chdir to ", options.cwd_));
302     }
303   }
304 #endif
305 
306   auto envp = options.env_.asEnviron();
307   auto ret = posix_spawnp(
308       &pid_,
309       argv[0],
310       &options.inner_->actions,
311       &options.inner_->attr,
312       &argv[0],
313       envp.get());
314 
315   if (ret) {
316     // Failed, so the creator cannot call wait() on us.
317     // mark us as already done.
318     waited_ = true;
319   }
320 
321   // Log some info
322   auto level = ret == 0 ? watchman::DBG : watchman::ERR;
323   watchman::log(level, "ChildProcess: pid=", pid_, "\n");
324   for (size_t i = 0; i < args.size(); ++i) {
325     watchman::log(level, "argv[", i, "] ", args[i], "\n");
326   }
327   for (size_t i = 0; envp.get()[i]; ++i) {
328     watchman::log(level, "envp[", i, "] ", envp.get()[i], "\n");
329   }
330 
331   // Close the other ends of the pipes
332   for (auto& it : pipes_) {
333     if (it.first == STDIN_FILENO) {
334       it.second->read.close();
335     } else {
336       it.second->write.close();
337     }
338   }
339 
340   if (ret) {
341     throw std::system_error(ret, std::generic_category(), "posix_spawnp");
342   }
343 }
344 
345 static std::mutex& getCwdMutex() {
346   // Meyers singleton
347   static std::mutex m;
348   return m;
349 }
350 
351 std::unique_lock<std::mutex> ChildProcess::lockCwdMutex() {
352   return std::unique_lock<std::mutex>(getCwdMutex());
353 }
354 
355 ChildProcess::~ChildProcess() {
356   if (!waited_) {
357     watchman::log(
358         watchman::FATAL,
359         "you must call ChildProcess.wait() before destroying a ChildProcess\n");
360   }
361 }
362 
363 void ChildProcess::disown() {
364   waited_ = true;
365 }
366 
367 bool ChildProcess::terminated() {
368   if (waited_) {
369     return true;
370   }
371 
372   auto pid = waitpid(pid_, &status_, WNOHANG);
373   if (pid == pid_) {
374     waited_ = true;
375   }
376 
377   return waited_;
378 }
379 
380 int ChildProcess::wait() {
381   if (waited_) {
382     return status_;
383   }
384 
385   while (true) {
386     auto pid = waitpid(pid_, &status_, 0);
387     if (pid == pid_) {
388       waited_ = true;
389       return status_;
390     }
391 
392     if (errno != EINTR) {
393       throw std::system_error(errno, std::generic_category(), "waitpid");
394     }
395   }
396 }
397 
398 void ChildProcess::kill(
399 #ifndef _WIN32
400     int signo
401 #endif
402     ) {
403 #ifndef _WIN32
404   if (!waited_) {
405     ::kill(pid_, signo);
406   }
407 #endif
408 }
409 
410 std::pair<w_string, w_string> ChildProcess::communicate(
411     pipeWriteCallback writeCallback) {
412 #ifdef _WIN32
413   return threadedCommunicate(writeCallback);
414 #else
415   return pollingCommunicate(writeCallback);
416 #endif
417 }
418 
419 #ifndef _WIN32
420 std::pair<w_string, w_string> ChildProcess::pollingCommunicate(
421     pipeWriteCallback writeCallback) {
422   std::unordered_map<int, std::string> outputs;
423 
424   for (auto& it : pipes_) {
425     if (it.first != STDIN_FILENO) {
426       // We only want output streams here
427       continue;
428     }
429     watchman::log(
430         watchman::DBG, "Setting up output buffer for fd ", it.first, "\n");
431     outputs.emplace(std::make_pair(it.first, ""));
432   }
433 
434   std::vector<pollfd> pfds;
435   std::unordered_map<int, int> revmap;
436   pfds.reserve(pipes_.size());
437   revmap.reserve(pipes_.size());
438 
439   while (!pipes_.empty()) {
440     revmap.clear();
441     pfds.clear();
442 
443     watchman::log(
444         watchman::DBG, "Setting up pollfds for ", pipes_.size(), " fds\n");
445 
446     for (auto& it : pipes_) {
447       pollfd pfd;
448       if (it.first == STDIN_FILENO) {
449         pfd.fd = it.second->write.fd();
450         pfd.events = POLLOUT;
451       } else {
452         pfd.fd = it.second->read.fd();
453         pfd.events = POLLIN;
454       }
455       pfds.emplace_back(std::move(pfd));
456       revmap[pfd.fd] = it.first;
457     }
458 
459     int r;
460     do {
461       watchman::log(watchman::DBG, "waiting for ", pfds.size(), " fds\n");
462       r = ::poll(pfds.data(), pfds.size(), -1);
463     } while (r == -1 && errno == EINTR);
464     if (r == -1) {
465       watchman::log(watchman::ERR, "poll error\n");
466       throw std::system_error(errno, std::generic_category(), "poll");
467     }
468 
469     for (auto& pfd : pfds) {
470       watchman::log(
471           watchman::DBG,
472           "fd ",
473           pfd.fd,
474           " revmap to ",
475           revmap[pfd.fd],
476           " has events ",
477           pfd.revents,
478           "\n");
479       if ((pfd.revents & (POLLHUP | POLLIN)) &&
480           revmap[pfd.fd] != STDIN_FILENO) {
481         watchman::log(
482             watchman::DBG,
483             "fd ",
484             pfd.fd,
485             " rev=",
486             revmap[pfd.fd],
487             " is readable\n");
488         char buf[BUFSIZ];
489         auto l = ::read(pfd.fd, buf, sizeof(buf));
490         if (l == -1 && (errno == EAGAIN || errno == EINTR)) {
491           watchman::log(
492               watchman::DBG,
493               "fd ",
494               pfd.fd,
495               " rev=",
496               revmap[pfd.fd],
497               " read give EAGAIN\n");
498           continue;
499         }
500         if (l == -1) {
501           int err = errno;
502           watchman::log(
503               watchman::ERR,
504               "failed to read from pipe fd ",
505               pfd.fd,
506               " err ",
507               strerror(err),
508               "\n");
509           throw std::system_error(
510               err, std::generic_category(), "reading from child process");
511         }
512         watchman::log(
513             watchman::DBG,
514             "fd ",
515             pfd.fd,
516             " rev=",
517             revmap[pfd.fd],
518             " read ",
519             l,
520             " bytes\n");
521         if (l == 0) {
522           // Stream is done; close it out.
523           pipes_.erase(revmap[pfd.fd]);
524           continue;
525         }
526         outputs[revmap[pfd.fd]].append(buf, l);
527       }
528 
529       if ((pfd.revents & POLLOUT) && revmap[pfd.fd] == STDIN_FILENO &&
530           writeCallback(pipes_.at(revmap[pfd.fd])->write)) {
531         // We should close it
532         watchman::log(
533             watchman::DBG,
534             "fd ",
535             pfd.fd,
536             " rev ",
537             revmap[pfd.fd],
538             " writer says to close\n");
539         pipes_.erase(revmap[pfd.fd]);
540         continue;
541       }
542 
543       if (pfd.revents & (POLLHUP | POLLERR)) {
544         // Something wrong with it, so close it
545         pipes_.erase(revmap[pfd.fd]);
546         watchman::log(
547             watchman::DBG,
548             "fd ",
549             pfd.fd,
550             " rev ",
551             revmap[pfd.fd],
552             " error status, so closing\n");
553         continue;
554       }
555     }
556 
557     watchman::log(watchman::DBG, "remaining pipes ", pipes_.size(), "\n");
558   }
559 
560   auto optBuffer = [&](int fd) -> w_string {
561     auto it = outputs.find(fd);
562     if (it == outputs.end()) {
563       watchman::log(watchman::DBG, "communicate fd ", fd, " nullptr\n");
564       return nullptr;
565     }
566     watchman::log(
567         watchman::DBG, "communicate fd ", fd, " gives ", it->second, "\n");
568     return w_string(it->second.data(), it->second.size());
569   };
570 
571   return std::make_pair(optBuffer(STDOUT_FILENO), optBuffer(STDERR_FILENO));
572 }
573 #endif
574 
575 /** Spawn a thread to read from the pipe connected to the specified fd.
576  * Returns a Future that will hold a string with the entire output from
577  * that stream. */
578 Future<w_string> ChildProcess::readPipe(int fd) {
579   auto it = pipes_.find(fd);
580   if (it == pipes_.end()) {
581     return makeFuture(w_string(nullptr));
582   }
583 
584   auto p = std::make_shared<Promise<w_string>>();
585   std::thread thr([this, fd, p] {
586     std::string result;
587     try {
588       auto& pipe = pipes_[fd];
589       while (true) {
590         char buf[4096];
591         auto readResult = pipe->read.read(buf, sizeof(buf));
592         if (readResult.hasError()) {
593           p->setException(
594               std::make_exception_ptr(std::system_error(readResult.error())));
595           return;
596         }
597         auto len = readResult.value();
598         if (len == 0) {
599           // all done
600           break;
601         }
602         result.append(buf, len);
603       }
604       p->setValue(w_string(result.data(), result.size()));
605     } catch (const std::exception& exc) {
606       p->setException(std::current_exception());
607     }
608   });
609 
610   thr.detach();
611   return p->getFuture();
612 }
613 
614 /** threadedCommunicate uses threads to read from the output streams.
615  * It is intended to be used on Windows where there is no reasonable
616  * way to carry out a non-blocking read on a pipe.  We compile and
617  * test it on all platforms to make it easier to avoid regressions. */
618 std::pair<w_string, w_string> ChildProcess::threadedCommunicate(
619     pipeWriteCallback writeCallback) {
620   auto outFuture = readPipe(STDOUT_FILENO);
621   auto errFuture = readPipe(STDERR_FILENO);
622 
623   auto it = pipes_.find(STDIN_FILENO);
624   if (it != pipes_.end()) {
625     auto& inPipe = pipes_[STDIN_FILENO];
626     while (!writeCallback(inPipe->write)) {
627       ; // keep trying to greedily write to the pipe
628     }
629     // Close the input stream; this typically signals the child
630     // process that we're done and allows us to safely block
631     // on the reads below.
632     pipes_.erase(STDIN_FILENO);
633   }
634 
635   return std::make_pair(outFuture.get(), errFuture.get());
636 }
637 
638 }
639