1 /* Copyright 2012-present Facebook, Inc.
2  * Licensed under the Apache License, Version 2.0 */
3 
4 #include "watchman.h"
5 #ifdef HAVE_LIBGIMLI_H
6 # include <libgimli.h>
7 #endif
8 #include <thread>
9 
10 using watchman::FileDescriptor;
11 
12 watchman::Synchronized<std::unordered_set<std::shared_ptr<watchman_client>>>
13     clients;
14 static FileDescriptor listener_fd;
15 #ifndef _WIN32
16 static std::unique_ptr<watchman_event> listener_thread_event;
17 #else
18 static HANDLE listener_thread_event;
19 #endif
20 static volatile bool stopping = false;
21 #ifdef HAVE_LIBGIMLI_H
22 static volatile struct gimli_heartbeat *hb = NULL;
23 #endif
24 
w_is_stopping(void)25 bool w_is_stopping(void) {
26   return stopping;
27 }
28 
make_response(void)29 json_ref make_response(void) {
30   auto resp = json_object();
31 
32   resp.set("version", typed_string_to_json(PACKAGE_VERSION, W_STRING_UNICODE));
33 
34   return resp;
35 }
36 
enqueue_response(struct watchman_client * client,json_ref && json,bool ping)37 bool enqueue_response(
38     struct watchman_client* client,
39     json_ref&& json,
40     bool ping) {
41   client->enqueueResponse(std::move(json), ping);
42   return true;
43 }
44 
send_and_dispose_response(struct watchman_client * client,json_ref && response)45 void send_and_dispose_response(
46     struct watchman_client* client,
47     json_ref&& response) {
48   enqueue_response(client, std::move(response), false);
49 }
50 
send_error_response(struct watchman_client * client,const char * fmt,...)51 void send_error_response(struct watchman_client *client,
52     const char *fmt, ...)
53 {
54   va_list ap;
55 
56   va_start(ap, fmt);
57   auto errorText = w_string::vprintf(fmt, ap);
58   va_end(ap);
59 
60   auto resp = make_response();
61   resp.set("error", w_string_to_json(errorText));
62 
63   if (client->perf_sample) {
64     client->perf_sample->add_meta("error", w_string_to_json(errorText));
65   }
66 
67   if (client->current_command) {
68     char *command = NULL;
69     command = json_dumps(client->current_command, 0);
70     watchman::log(
71         watchman::ERR,
72         "send_error_response: ",
73         command,
74         ", failed: ",
75         errorText,
76         "\n");
77     free(command);
78   } else {
79     watchman::log(watchman::ERR, "send_error_response: ", errorText, "\n");
80   }
81 
82   send_and_dispose_response(client, std::move(resp));
83 }
84 
watchman_client()85 watchman_client::watchman_client() : watchman_client(nullptr) {}
86 
watchman_client(std::unique_ptr<watchman_stream> && stm)87 watchman_client::watchman_client(std::unique_ptr<watchman_stream>&& stm)
88     : stm(std::move(stm)), ping(w_event_make()) {
89   w_log(W_LOG_DBG, "accepted client:stm=%p\n", this->stm.get());
90 }
91 
~watchman_client()92 watchman_client::~watchman_client() {
93   debugSub.reset();
94   errorSub.reset();
95 
96   w_log(W_LOG_DBG, "client_delete %p\n", this);
97 
98   if (stm) {
99     stm->shutdown();
100   }
101 }
102 
enqueueResponse(json_ref && resp,bool ping)103 void watchman_client::enqueueResponse(json_ref&& resp, bool ping) {
104   responses.emplace_back(std::move(resp));
105 
106   if (ping) {
107     this->ping->notify();
108   }
109 }
110 
w_request_shutdown(void)111 void w_request_shutdown(void) {
112   stopping = true;
113   // Knock listener thread out of poll/accept
114 #ifndef _WIN32
115   if (listener_thread_event) {
116     listener_thread_event->notify();
117   }
118 #else
119   SetEvent(listener_thread_event);
120 #endif
121 }
122 
123 // The client thread reads and decodes json packets,
124 // then dispatches the commands that it finds
client_thread(std::shared_ptr<watchman_client> client)125 static void client_thread(std::shared_ptr<watchman_client> client) {
126   struct watchman_event_poll pfd[2];
127   json_error_t jerr;
128   bool send_ok = true;
129   // Keep a persistent vector around so that we can avoid allocating
130   // and releasing heap memory when we collect items from the publisher
131   std::vector<std::shared_ptr<const watchman::Publisher::Item>> pending;
132 
133   client->stm->setNonBlock(true);
134   w_set_thread_name(
135       "client=%p:stm=%p:pid=%d",
136       client.get(),
137       client->stm.get(),
138       client->stm->getPeerProcessID());
139 
140   client->client_is_owner = client->stm->peerIsOwner();
141 
142   pfd[0].evt = client->stm->getEvents();
143   pfd[1].evt = client->ping.get();
144 
145   while (!stopping) {
146     // Wait for input from either the client socket or
147     // via the ping pipe, which signals that some other
148     // thread wants to unilaterally send data to the client
149 
150     ignore_result(w_poll_events(pfd, 2, 2000));
151     if (stopping) {
152       break;
153     }
154 
155     if (pfd[0].ready) {
156       auto request = client->reader.decodeNext(client->stm.get(), &jerr);
157 
158       if (!request && errno == EAGAIN) {
159         // That's fine
160       } else if (!request) {
161         // Not so cool
162         if (client->reader.wpos == client->reader.rpos) {
163           // If they disconnected in between PDUs, no need to log
164           // any error
165           goto disconnected;
166         }
167         send_error_response(
168             client.get(),
169             "invalid json at position %d: %s",
170             jerr.position,
171             jerr.text);
172         w_log(W_LOG_ERR, "invalid data from client: %s\n", jerr.text);
173 
174         goto disconnected;
175       } else if (request) {
176         client->pdu_type = client->reader.pdu_type;
177         client->capabilities = client->reader.capabilities;
178         dispatch_command(client.get(), request, CMD_DAEMON);
179       }
180     }
181 
182     if (pfd[1].ready) {
183       while (client->ping->testAndClear()) {
184         // Enqueue refs to pending log payloads
185         pending.clear();
186         getPending(pending, client->debugSub, client->errorSub);
187         for (auto& item : pending) {
188           client->enqueueResponse(json_ref(item->payload), false);
189         }
190 
191         // Maybe we have subscriptions to dispatch?
192         auto userClient =
193             std::dynamic_pointer_cast<watchman_user_client>(client);
194 
195         if (userClient) {
196           std::vector<w_string> subsToDelete;
197           for (auto& subiter : userClient->unilateralSub) {
198             auto sub = subiter.first;
199             auto subStream = subiter.second;
200 
201             watchman::log(
202                 watchman::DBG, "consider fan out sub ", sub->name, "\n");
203 
204             pending.clear();
205             subStream->getPending(pending);
206             bool seenSettle = false;
207             for (auto& item : pending) {
208               auto dumped = json_dumps(item->payload, 0);
209               watchman::log(
210                   watchman::DBG,
211                   "Unilateral payload for sub ",
212                   sub->name,
213                   " ",
214                   dumped ? dumped : "<<MISSING!!>>",
215                   "\n");
216               free(dumped);
217 
218               if (item->payload.get_default("canceled")) {
219                 auto resp = make_response();
220 
221                 watchman::log(
222                     watchman::ERR,
223                     "Cancel subscription ",
224                     sub->name,
225                     " due to root cancellation\n");
226 
227                 resp.set({{"root", item->payload.get_default("root")},
228                           {"unilateral", json_true()},
229                           {"canceled", json_true()},
230                           {"subscription", w_string_to_json(sub->name)}});
231                 client->enqueueResponse(std::move(resp), false);
232                 // Remember to cancel this subscription.
233                 // We can't do it in this loop because that would
234                 // invalidate the iterators and cause a headache.
235                 subsToDelete.push_back(sub->name);
236                 continue;
237               }
238 
239               if (item->payload.get_default("state-enter") ||
240                   item->payload.get_default("state-leave")) {
241                 auto resp = make_response();
242                 json_object_update(item->payload, resp);
243                 resp.set({{"unilateral", json_true()},
244                           {"subscription", w_string_to_json(sub->name)}});
245                 client->enqueueResponse(std::move(resp), false);
246 
247                 watchman::log(
248                     watchman::DBG,
249                     "Fan out subscription state change for ",
250                     sub->name,
251                     "\n");
252                 continue;
253               }
254 
255               if (!sub->debug_paused && item->payload.get_default("settled")) {
256                 seenSettle = true;
257                 continue;
258               }
259             }
260 
261             if (seenSettle) {
262               sub->processSubscription();
263             }
264           }
265 
266           for (auto& name : subsToDelete) {
267             userClient->unsubByName(name);
268           }
269         }
270       }
271     }
272 
273     /* now send our response(s) */
274     while (!client->responses.empty()) {
275       auto& response_to_send = client->responses.front();
276 
277       if (send_ok) {
278         client->stm->setNonBlock(false);
279         /* Return the data in the same format that was used to ask for it.
280          * Don't bother sending any more messages if the client disconnects,
281          * but still free their memory.
282          */
283         send_ok = client->writer.pduEncodeToStream(
284             client->pdu_type,
285             client->capabilities,
286             response_to_send,
287             client->stm.get());
288         client->stm->setNonBlock(true);
289       }
290 
291       client->responses.pop_front();
292     }
293   }
294 
295 disconnected:
296   w_set_thread_name(
297       "NOT_CONN:client=%p:stm=%p:pid=%d",
298       client.get(),
299       client->stm.get(),
300       client->stm->getPeerProcessID());
301   // Remove the client from the map before we tear it down, as this makes
302   // it easier to flush out pending writes on windows without worrying
303   // about w_log_to_clients contending for the write buffers
304   clients.wlock()->erase(client);
305 }
306 
307 // This is just a placeholder.
308 // This catches SIGUSR1 so we don't terminate.
309 // We use this to interrupt blocking syscalls
310 // on the worker threads
wakeme(int)311 static void wakeme(int) {}
312 
313 #if defined(HAVE_KQUEUE) || defined(HAVE_FSEVENTS)
314 #ifdef __OpenBSD__
315 #include <sys/siginfo.h>
316 #endif
317 #include <sys/param.h>
318 #include <sys/sysctl.h>
319 #include <sys/time.h>
320 #include <sys/resource.h>
321 #endif
322 
323 #ifndef _WIN32
324 
325 // If we are running under inetd-style supervision, call this function
326 // to move the inetd provided socket descriptor(s) to a new descriptor
327 // number and remember that we can just use these when we're starting
328 // up the listener.
w_listener_prep_inetd()329 void w_listener_prep_inetd() {
330   if (listener_fd) {
331     throw std::runtime_error(
332         "w_listener_prep_inetd: listener_fd is already assigned");
333   }
334 
335   listener_fd = FileDescriptor(dup(STDIN_FILENO), "dup(stdin) for listener");
336 }
337 
get_listener_socket(const char * path)338 static FileDescriptor get_listener_socket(const char *path)
339 {
340   struct sockaddr_un un;
341   mode_t perms = cfg_get_perms(
342       "sock_access", true /* write bits */, false /* execute bits */);
343   FileDescriptor listener_fd;
344 
345 #ifdef __APPLE__
346   listener_fd = w_get_listener_socket_from_launchd();
347   if (listener_fd) {
348     w_log(W_LOG_ERR, "Using socket from launchd as listening socket\n");
349     return listener_fd;
350   }
351 #endif
352 
353   if (strlen(path) >= sizeof(un.sun_path) - 1) {
354     w_log(W_LOG_ERR, "%s: path is too long\n",
355         path);
356     return FileDescriptor();
357   }
358 
359   listener_fd = FileDescriptor(socket(PF_LOCAL, SOCK_STREAM, 0), "socket");
360 
361   un.sun_family = PF_LOCAL;
362   memcpy(un.sun_path, path, strlen(path) + 1);
363   unlink(path);
364 
365   if (bind(listener_fd.fd(), (struct sockaddr*)&un, sizeof(un)) != 0) {
366     w_log(W_LOG_ERR, "bind(%s): %s\n",
367       path, strerror(errno));
368     return FileDescriptor();
369   }
370 
371   // The permissions in the containing directory should be correct, so this
372   // should be correct as well. But set the permissions in any case.
373   if (chmod(path, perms) == -1) {
374     w_log(W_LOG_ERR, "chmod(%s, %#o): %s", path, perms, strerror(errno));
375     return FileDescriptor();
376   }
377 
378   // Double-check that the socket has the right permissions. This can happen
379   // when the containing directory was created in a previous run, with a group
380   // the user is no longer in.
381   struct stat st;
382   if (lstat(path, &st) == -1) {
383     watchman::log(watchman::ERR, "lstat(", path, "): ", strerror(errno), "\n");
384     return FileDescriptor();
385   }
386 
387   // This is for testing only
388   // (test_sock_perms.py:test_user_previously_in_sock_group). Do not document.
389   const char *sock_group_name = cfg_get_string("__sock_file_group", nullptr);
390   if (!sock_group_name) {
391     sock_group_name = cfg_get_string("sock_group", nullptr);
392   }
393 
394   if (sock_group_name) {
395     const struct group *sock_group = w_get_group(sock_group_name);
396     if (!sock_group) {
397       return FileDescriptor();
398     }
399     if (st.st_gid != sock_group->gr_gid) {
400       watchman::log(
401         watchman::ERR,
402         "for socket '", path, "', gid ", st.st_gid,
403         " doesn't match expected gid ", sock_group->gr_gid, " (group name ",
404         sock_group_name, "). Ensure that you are still a member of group ",
405         sock_group_name, ".\n");
406       return FileDescriptor();
407     }
408   }
409 
410   if (listen(listener_fd.fd(), 200) != 0) {
411     w_log(W_LOG_ERR, "listen(%s): %s\n",
412         path, strerror(errno));
413     return FileDescriptor();
414   }
415 
416   return listener_fd;
417 }
418 #endif
419 
make_new_client(std::unique_ptr<watchman_stream> && stm)420 static std::shared_ptr<watchman_client> make_new_client(
421     std::unique_ptr<watchman_stream>&& stm) {
422   auto client = std::make_shared<watchman_user_client>(std::move(stm));
423 
424   clients.wlock()->insert(client);
425 
426   // Start a thread for the client.
427   // We used to use libevent for this, but we have
428   // a low volume of concurrent clients and the json
429   // parse/encode APIs are not easily used in a non-blocking
430   // server architecture.
431   try {
432     std::thread thr([client]() { client_thread(client); });
433     thr.detach();
434   } catch (const std::exception& e) {
435     clients.wlock()->erase(client);
436     throw;
437   }
438 
439   return client;
440 }
441 
442 #ifdef _WIN32
named_pipe_accept_loop(const char * path)443 static void named_pipe_accept_loop(const char *path) {
444   HANDLE handles[2];
445   OVERLAPPED olap;
446   HANDLE connected_event = CreateEvent(NULL, FALSE, TRUE, NULL);
447 
448   if (!connected_event) {
449     w_log(W_LOG_ERR, "named_pipe_accept_loop: CreateEvent failed: %s\n",
450         win32_strerror(GetLastError()));
451     return;
452   }
453 
454   listener_thread_event = CreateEvent(NULL, FALSE, TRUE, NULL);
455 
456   handles[0] = connected_event;
457   handles[1] = listener_thread_event;
458   memset(&olap, 0, sizeof(olap));
459   olap.hEvent = connected_event;
460 
461   w_log(W_LOG_ERR, "waiting for pipe clients on %s\n", path);
462   while (!stopping) {
463     FileDescriptor client_fd;
464     DWORD res;
465 
466     client_fd = FileDescriptor(intptr_t(CreateNamedPipe(
467         path,
468         PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
469         PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_REJECT_REMOTE_CLIENTS,
470         PIPE_UNLIMITED_INSTANCES,
471         WATCHMAN_IO_BUF_SIZE,
472         512,
473         0,
474         nullptr)));
475 
476     if (!client_fd) {
477       w_log(W_LOG_ERR, "CreateNamedPipe(%s) failed: %s\n",
478           path, win32_strerror(GetLastError()));
479       continue;
480     }
481 
482     ResetEvent(connected_event);
483     if (!ConnectNamedPipe((HANDLE)client_fd.handle(), &olap)) {
484       res = GetLastError();
485 
486       if (res == ERROR_PIPE_CONNECTED) {
487         make_new_client(w_stm_fdopen(std::move(client_fd)));
488         continue;
489       }
490 
491       if (res != ERROR_IO_PENDING) {
492         w_log(W_LOG_ERR, "ConnectNamedPipe: %s\n",
493             win32_strerror(GetLastError()));
494         continue;
495       }
496 
497       res = WaitForMultipleObjectsEx(2, handles, false, INFINITE, true);
498       if (res == WAIT_OBJECT_0 + 1) {
499         // Signalled to stop
500         CancelIoEx((HANDLE)client_fd.handle(), &olap);
501         continue;
502       }
503 
504       if (res != WAIT_OBJECT_0) {
505         w_log(
506             W_LOG_ERR,
507             "WaitForMultipleObjectsEx: ConnectNamedPipe: "
508             "unexpected status %u\n",
509             res);
510         CancelIoEx((HANDLE)client_fd.handle(), &olap);
511         continue;
512       }
513     }
514     make_new_client(w_stm_fdopen(std::move(client_fd)));
515   }
516 }
517 #endif
518 
519 #ifndef _WIN32
accept_loop(FileDescriptor && listenerDescriptor)520 static void accept_loop(FileDescriptor&& listenerDescriptor) {
521   auto listener = w_stm_fdopen(std::move(listenerDescriptor));
522   while (!stopping) {
523     FileDescriptor client_fd;
524     struct watchman_event_poll pfd[2];
525     int bufsize;
526 
527 #ifdef HAVE_LIBGIMLI_H
528     if (hb) {
529       gimli_heartbeat_set(hb, GIMLI_HB_RUNNING);
530     }
531 #endif
532 
533     pfd[0].evt = listener->getEvents();
534     pfd[1].evt = listener_thread_event.get();
535 
536     if (w_poll_events(pfd, 2, 60000) == 0) {
537       if (stopping) {
538         break;
539       }
540       // Timed out, or error.
541       // Arrange to sanity check that we're working
542       w_check_my_sock();
543       continue;
544     }
545 
546     if (stopping) {
547       break;
548     }
549 
550 #ifdef HAVE_ACCEPT4
551     client_fd = FileDescriptor(
552         accept4(listener->getFileDescriptor().fd(), nullptr, 0, SOCK_CLOEXEC));
553 #else
554     client_fd =
555         FileDescriptor(accept(listener->getFileDescriptor().fd(), nullptr, 0));
556 #endif
557     if (!client_fd) {
558       continue;
559     }
560     client_fd.setCloExec();
561     bufsize = WATCHMAN_IO_BUF_SIZE;
562     setsockopt(
563         client_fd.fd(),
564         SOL_SOCKET,
565         SO_SNDBUF,
566         (void*)&bufsize,
567         sizeof(bufsize));
568 
569     make_new_client(w_stm_fdopen(std::move(client_fd)));
570   }
571 }
572 #endif
573 
w_start_listener(const char * path)574 bool w_start_listener(const char *path)
575 {
576 #ifndef _WIN32
577   struct sigaction sa;
578   sigset_t sigset;
579 #endif
580 
581 #ifdef HAVE_LIBGIMLI_H
582   hb = gimli_heartbeat_attach();
583 #endif
584 
585 #if defined(HAVE_KQUEUE) || defined(HAVE_FSEVENTS)
586   {
587     struct rlimit limit;
588 # ifndef __OpenBSD__
589     int mib[2] = { CTL_KERN,
590 #  ifdef KERN_MAXFILESPERPROC
591       KERN_MAXFILESPERPROC
592 #  else
593       KERN_MAXFILES
594 #  endif
595     };
596 # endif
597     int maxperproc;
598 
599     getrlimit(RLIMIT_NOFILE, &limit);
600 
601 # ifndef __OpenBSD__
602     {
603       size_t len;
604 
605       len = sizeof(maxperproc);
606       sysctl(mib, 2, &maxperproc, &len, NULL, 0);
607       w_log(W_LOG_ERR, "file limit is %" PRIu64
608           " kern.maxfilesperproc=%i\n",
609           limit.rlim_cur, maxperproc);
610     }
611 # else
612     maxperproc = limit.rlim_max;
613     w_log(W_LOG_ERR, "openfiles-cur is %" PRIu64
614         " openfiles-max=%i\n",
615         limit.rlim_cur, maxperproc);
616 # endif
617 
618     if (limit.rlim_cur != RLIM_INFINITY &&
619         maxperproc > 0 &&
620         limit.rlim_cur < (rlim_t)maxperproc) {
621       limit.rlim_cur = maxperproc;
622 
623       if (setrlimit(RLIMIT_NOFILE, &limit)) {
624         w_log(W_LOG_ERR,
625           "failed to raise limit to %" PRIu64 " (%s).\n",
626           limit.rlim_cur,
627           strerror(errno));
628       } else {
629         w_log(W_LOG_ERR,
630             "raised file limit to %" PRIu64 "\n",
631             limit.rlim_cur);
632       }
633     }
634 
635     getrlimit(RLIMIT_NOFILE, &limit);
636 #ifndef HAVE_FSEVENTS
637     if (limit.rlim_cur < 10240) {
638       w_log(W_LOG_ERR,
639           "Your file descriptor limit is very low (%" PRIu64 "), "
640           "please consult the watchman docs on raising the limits\n",
641           limit.rlim_cur);
642     }
643 #endif
644   }
645 #endif
646 
647 #ifndef _WIN32
648   signal(SIGPIPE, SIG_IGN);
649 
650   /* allow SIGUSR1 and SIGCHLD to wake up a blocked thread, without restarting
651    * syscalls */
652   memset(&sa, 0, sizeof(sa));
653   sa.sa_handler = wakeme;
654   sa.sa_flags = 0;
655   sigaction(SIGUSR1, &sa, NULL);
656   sigaction(SIGCHLD, &sa, NULL);
657 
658   // Block SIGCHLD everywhere
659   sigemptyset(&sigset);
660   sigaddset(&sigset, SIGCHLD);
661   sigprocmask(SIG_BLOCK, &sigset, NULL);
662 
663   if (listener_fd) {
664     // Assume that it was prepped by w_listener_prep_inetd()
665     w_log(W_LOG_ERR, "Using socket from inetd as listening socket\n");
666   } else {
667     listener_fd = get_listener_socket(path);
668     if (!listener_fd) {
669       return false;
670     }
671   }
672   listener_fd.setCloExec();
673 #endif
674 
675 #ifdef HAVE_LIBGIMLI_H
676   if (hb) {
677     gimli_heartbeat_set(hb, GIMLI_HB_RUNNING);
678   } else {
679     w_setup_signal_handlers();
680   }
681 #else
682   w_setup_signal_handlers();
683 #endif
684   listener_fd.setNonBlock();
685 
686   // Now run the dispatch
687 #ifndef _WIN32
688   listener_thread_event = w_event_make();
689   accept_loop(std::move(listener_fd));
690 #else
691   named_pipe_accept_loop(path);
692 #endif
693 
694   // Wait for clients, waking any sleeping clients up in the process
695   {
696     int interval = 2000;
697     int last_count = 0, n_clients = 0;
698     const int max_interval = 1000000; // 1 second
699 
700     do {
701       {
702         auto clientsLock = clients.rlock();
703         n_clients = clientsLock->size();
704 
705         for (auto client : *clientsLock) {
706           client->ping->notify();
707         }
708       }
709 
710       if (n_clients != last_count) {
711         w_log(W_LOG_ERR, "waiting for %d clients to terminate\n", n_clients);
712       }
713       usleep(interval);
714       interval = std::min(interval * 2, max_interval);
715     } while (n_clients > 0);
716   }
717 
718   w_state_shutdown();
719 
720   return true;
721 }
722 
723 /* get-pid */
cmd_get_pid(struct watchman_client * client,const json_ref &)724 static void cmd_get_pid(struct watchman_client* client, const json_ref&) {
725   auto resp = make_response();
726 
727   resp.set("pid", json_integer(getpid()));
728 
729   send_and_dispose_response(client, std::move(resp));
730 }
731 W_CMD_REG("get-pid", cmd_get_pid, CMD_DAEMON, NULL)
732 
733 
734 /* vim:ts=2:sw=2:et:
735  */
736