1 /* Copyright 2012-present Facebook, Inc.
2 * Licensed under the Apache License, Version 2.0 */
3
4 #include "watchman.h"
5 #ifdef HAVE_LIBGIMLI_H
6 # include <libgimli.h>
7 #endif
8 #include <thread>
9
10 using watchman::FileDescriptor;
11
12 watchman::Synchronized<std::unordered_set<std::shared_ptr<watchman_client>>>
13 clients;
14 static FileDescriptor listener_fd;
15 #ifndef _WIN32
16 static std::unique_ptr<watchman_event> listener_thread_event;
17 #else
18 static HANDLE listener_thread_event;
19 #endif
20 static volatile bool stopping = false;
21 #ifdef HAVE_LIBGIMLI_H
22 static volatile struct gimli_heartbeat *hb = NULL;
23 #endif
24
w_is_stopping(void)25 bool w_is_stopping(void) {
26 return stopping;
27 }
28
make_response(void)29 json_ref make_response(void) {
30 auto resp = json_object();
31
32 resp.set("version", typed_string_to_json(PACKAGE_VERSION, W_STRING_UNICODE));
33
34 return resp;
35 }
36
enqueue_response(struct watchman_client * client,json_ref && json,bool ping)37 bool enqueue_response(
38 struct watchman_client* client,
39 json_ref&& json,
40 bool ping) {
41 client->enqueueResponse(std::move(json), ping);
42 return true;
43 }
44
send_and_dispose_response(struct watchman_client * client,json_ref && response)45 void send_and_dispose_response(
46 struct watchman_client* client,
47 json_ref&& response) {
48 enqueue_response(client, std::move(response), false);
49 }
50
send_error_response(struct watchman_client * client,const char * fmt,...)51 void send_error_response(struct watchman_client *client,
52 const char *fmt, ...)
53 {
54 va_list ap;
55
56 va_start(ap, fmt);
57 auto errorText = w_string::vprintf(fmt, ap);
58 va_end(ap);
59
60 auto resp = make_response();
61 resp.set("error", w_string_to_json(errorText));
62
63 if (client->perf_sample) {
64 client->perf_sample->add_meta("error", w_string_to_json(errorText));
65 }
66
67 if (client->current_command) {
68 char *command = NULL;
69 command = json_dumps(client->current_command, 0);
70 watchman::log(
71 watchman::ERR,
72 "send_error_response: ",
73 command,
74 ", failed: ",
75 errorText,
76 "\n");
77 free(command);
78 } else {
79 watchman::log(watchman::ERR, "send_error_response: ", errorText, "\n");
80 }
81
82 send_and_dispose_response(client, std::move(resp));
83 }
84
watchman_client()85 watchman_client::watchman_client() : watchman_client(nullptr) {}
86
watchman_client(std::unique_ptr<watchman_stream> && stm)87 watchman_client::watchman_client(std::unique_ptr<watchman_stream>&& stm)
88 : stm(std::move(stm)), ping(w_event_make()) {
89 w_log(W_LOG_DBG, "accepted client:stm=%p\n", this->stm.get());
90 }
91
~watchman_client()92 watchman_client::~watchman_client() {
93 debugSub.reset();
94 errorSub.reset();
95
96 w_log(W_LOG_DBG, "client_delete %p\n", this);
97
98 if (stm) {
99 stm->shutdown();
100 }
101 }
102
enqueueResponse(json_ref && resp,bool ping)103 void watchman_client::enqueueResponse(json_ref&& resp, bool ping) {
104 responses.emplace_back(std::move(resp));
105
106 if (ping) {
107 this->ping->notify();
108 }
109 }
110
w_request_shutdown(void)111 void w_request_shutdown(void) {
112 stopping = true;
113 // Knock listener thread out of poll/accept
114 #ifndef _WIN32
115 if (listener_thread_event) {
116 listener_thread_event->notify();
117 }
118 #else
119 SetEvent(listener_thread_event);
120 #endif
121 }
122
123 // The client thread reads and decodes json packets,
124 // then dispatches the commands that it finds
client_thread(std::shared_ptr<watchman_client> client)125 static void client_thread(std::shared_ptr<watchman_client> client) {
126 struct watchman_event_poll pfd[2];
127 json_error_t jerr;
128 bool send_ok = true;
129 // Keep a persistent vector around so that we can avoid allocating
130 // and releasing heap memory when we collect items from the publisher
131 std::vector<std::shared_ptr<const watchman::Publisher::Item>> pending;
132
133 client->stm->setNonBlock(true);
134 w_set_thread_name(
135 "client=%p:stm=%p:pid=%d",
136 client.get(),
137 client->stm.get(),
138 client->stm->getPeerProcessID());
139
140 client->client_is_owner = client->stm->peerIsOwner();
141
142 pfd[0].evt = client->stm->getEvents();
143 pfd[1].evt = client->ping.get();
144
145 while (!stopping) {
146 // Wait for input from either the client socket or
147 // via the ping pipe, which signals that some other
148 // thread wants to unilaterally send data to the client
149
150 ignore_result(w_poll_events(pfd, 2, 2000));
151 if (stopping) {
152 break;
153 }
154
155 if (pfd[0].ready) {
156 auto request = client->reader.decodeNext(client->stm.get(), &jerr);
157
158 if (!request && errno == EAGAIN) {
159 // That's fine
160 } else if (!request) {
161 // Not so cool
162 if (client->reader.wpos == client->reader.rpos) {
163 // If they disconnected in between PDUs, no need to log
164 // any error
165 goto disconnected;
166 }
167 send_error_response(
168 client.get(),
169 "invalid json at position %d: %s",
170 jerr.position,
171 jerr.text);
172 w_log(W_LOG_ERR, "invalid data from client: %s\n", jerr.text);
173
174 goto disconnected;
175 } else if (request) {
176 client->pdu_type = client->reader.pdu_type;
177 client->capabilities = client->reader.capabilities;
178 dispatch_command(client.get(), request, CMD_DAEMON);
179 }
180 }
181
182 if (pfd[1].ready) {
183 while (client->ping->testAndClear()) {
184 // Enqueue refs to pending log payloads
185 pending.clear();
186 getPending(pending, client->debugSub, client->errorSub);
187 for (auto& item : pending) {
188 client->enqueueResponse(json_ref(item->payload), false);
189 }
190
191 // Maybe we have subscriptions to dispatch?
192 auto userClient =
193 std::dynamic_pointer_cast<watchman_user_client>(client);
194
195 if (userClient) {
196 std::vector<w_string> subsToDelete;
197 for (auto& subiter : userClient->unilateralSub) {
198 auto sub = subiter.first;
199 auto subStream = subiter.second;
200
201 watchman::log(
202 watchman::DBG, "consider fan out sub ", sub->name, "\n");
203
204 pending.clear();
205 subStream->getPending(pending);
206 bool seenSettle = false;
207 for (auto& item : pending) {
208 auto dumped = json_dumps(item->payload, 0);
209 watchman::log(
210 watchman::DBG,
211 "Unilateral payload for sub ",
212 sub->name,
213 " ",
214 dumped ? dumped : "<<MISSING!!>>",
215 "\n");
216 free(dumped);
217
218 if (item->payload.get_default("canceled")) {
219 auto resp = make_response();
220
221 watchman::log(
222 watchman::ERR,
223 "Cancel subscription ",
224 sub->name,
225 " due to root cancellation\n");
226
227 resp.set({{"root", item->payload.get_default("root")},
228 {"unilateral", json_true()},
229 {"canceled", json_true()},
230 {"subscription", w_string_to_json(sub->name)}});
231 client->enqueueResponse(std::move(resp), false);
232 // Remember to cancel this subscription.
233 // We can't do it in this loop because that would
234 // invalidate the iterators and cause a headache.
235 subsToDelete.push_back(sub->name);
236 continue;
237 }
238
239 if (item->payload.get_default("state-enter") ||
240 item->payload.get_default("state-leave")) {
241 auto resp = make_response();
242 json_object_update(item->payload, resp);
243 resp.set({{"unilateral", json_true()},
244 {"subscription", w_string_to_json(sub->name)}});
245 client->enqueueResponse(std::move(resp), false);
246
247 watchman::log(
248 watchman::DBG,
249 "Fan out subscription state change for ",
250 sub->name,
251 "\n");
252 continue;
253 }
254
255 if (!sub->debug_paused && item->payload.get_default("settled")) {
256 seenSettle = true;
257 continue;
258 }
259 }
260
261 if (seenSettle) {
262 sub->processSubscription();
263 }
264 }
265
266 for (auto& name : subsToDelete) {
267 userClient->unsubByName(name);
268 }
269 }
270 }
271 }
272
273 /* now send our response(s) */
274 while (!client->responses.empty()) {
275 auto& response_to_send = client->responses.front();
276
277 if (send_ok) {
278 client->stm->setNonBlock(false);
279 /* Return the data in the same format that was used to ask for it.
280 * Don't bother sending any more messages if the client disconnects,
281 * but still free their memory.
282 */
283 send_ok = client->writer.pduEncodeToStream(
284 client->pdu_type,
285 client->capabilities,
286 response_to_send,
287 client->stm.get());
288 client->stm->setNonBlock(true);
289 }
290
291 client->responses.pop_front();
292 }
293 }
294
295 disconnected:
296 w_set_thread_name(
297 "NOT_CONN:client=%p:stm=%p:pid=%d",
298 client.get(),
299 client->stm.get(),
300 client->stm->getPeerProcessID());
301 // Remove the client from the map before we tear it down, as this makes
302 // it easier to flush out pending writes on windows without worrying
303 // about w_log_to_clients contending for the write buffers
304 clients.wlock()->erase(client);
305 }
306
307 // This is just a placeholder.
308 // This catches SIGUSR1 so we don't terminate.
309 // We use this to interrupt blocking syscalls
310 // on the worker threads
wakeme(int)311 static void wakeme(int) {}
312
313 #if defined(HAVE_KQUEUE) || defined(HAVE_FSEVENTS)
314 #ifdef __OpenBSD__
315 #include <sys/siginfo.h>
316 #endif
317 #include <sys/param.h>
318 #include <sys/sysctl.h>
319 #include <sys/time.h>
320 #include <sys/resource.h>
321 #endif
322
323 #ifndef _WIN32
324
325 // If we are running under inetd-style supervision, call this function
326 // to move the inetd provided socket descriptor(s) to a new descriptor
327 // number and remember that we can just use these when we're starting
328 // up the listener.
w_listener_prep_inetd()329 void w_listener_prep_inetd() {
330 if (listener_fd) {
331 throw std::runtime_error(
332 "w_listener_prep_inetd: listener_fd is already assigned");
333 }
334
335 listener_fd = FileDescriptor(dup(STDIN_FILENO), "dup(stdin) for listener");
336 }
337
get_listener_socket(const char * path)338 static FileDescriptor get_listener_socket(const char *path)
339 {
340 struct sockaddr_un un;
341 mode_t perms = cfg_get_perms(
342 "sock_access", true /* write bits */, false /* execute bits */);
343 FileDescriptor listener_fd;
344
345 #ifdef __APPLE__
346 listener_fd = w_get_listener_socket_from_launchd();
347 if (listener_fd) {
348 w_log(W_LOG_ERR, "Using socket from launchd as listening socket\n");
349 return listener_fd;
350 }
351 #endif
352
353 if (strlen(path) >= sizeof(un.sun_path) - 1) {
354 w_log(W_LOG_ERR, "%s: path is too long\n",
355 path);
356 return FileDescriptor();
357 }
358
359 listener_fd = FileDescriptor(socket(PF_LOCAL, SOCK_STREAM, 0), "socket");
360
361 un.sun_family = PF_LOCAL;
362 memcpy(un.sun_path, path, strlen(path) + 1);
363 unlink(path);
364
365 if (bind(listener_fd.fd(), (struct sockaddr*)&un, sizeof(un)) != 0) {
366 w_log(W_LOG_ERR, "bind(%s): %s\n",
367 path, strerror(errno));
368 return FileDescriptor();
369 }
370
371 // The permissions in the containing directory should be correct, so this
372 // should be correct as well. But set the permissions in any case.
373 if (chmod(path, perms) == -1) {
374 w_log(W_LOG_ERR, "chmod(%s, %#o): %s", path, perms, strerror(errno));
375 return FileDescriptor();
376 }
377
378 // Double-check that the socket has the right permissions. This can happen
379 // when the containing directory was created in a previous run, with a group
380 // the user is no longer in.
381 struct stat st;
382 if (lstat(path, &st) == -1) {
383 watchman::log(watchman::ERR, "lstat(", path, "): ", strerror(errno), "\n");
384 return FileDescriptor();
385 }
386
387 // This is for testing only
388 // (test_sock_perms.py:test_user_previously_in_sock_group). Do not document.
389 const char *sock_group_name = cfg_get_string("__sock_file_group", nullptr);
390 if (!sock_group_name) {
391 sock_group_name = cfg_get_string("sock_group", nullptr);
392 }
393
394 if (sock_group_name) {
395 const struct group *sock_group = w_get_group(sock_group_name);
396 if (!sock_group) {
397 return FileDescriptor();
398 }
399 if (st.st_gid != sock_group->gr_gid) {
400 watchman::log(
401 watchman::ERR,
402 "for socket '", path, "', gid ", st.st_gid,
403 " doesn't match expected gid ", sock_group->gr_gid, " (group name ",
404 sock_group_name, "). Ensure that you are still a member of group ",
405 sock_group_name, ".\n");
406 return FileDescriptor();
407 }
408 }
409
410 if (listen(listener_fd.fd(), 200) != 0) {
411 w_log(W_LOG_ERR, "listen(%s): %s\n",
412 path, strerror(errno));
413 return FileDescriptor();
414 }
415
416 return listener_fd;
417 }
418 #endif
419
make_new_client(std::unique_ptr<watchman_stream> && stm)420 static std::shared_ptr<watchman_client> make_new_client(
421 std::unique_ptr<watchman_stream>&& stm) {
422 auto client = std::make_shared<watchman_user_client>(std::move(stm));
423
424 clients.wlock()->insert(client);
425
426 // Start a thread for the client.
427 // We used to use libevent for this, but we have
428 // a low volume of concurrent clients and the json
429 // parse/encode APIs are not easily used in a non-blocking
430 // server architecture.
431 try {
432 std::thread thr([client]() { client_thread(client); });
433 thr.detach();
434 } catch (const std::exception& e) {
435 clients.wlock()->erase(client);
436 throw;
437 }
438
439 return client;
440 }
441
442 #ifdef _WIN32
named_pipe_accept_loop(const char * path)443 static void named_pipe_accept_loop(const char *path) {
444 HANDLE handles[2];
445 OVERLAPPED olap;
446 HANDLE connected_event = CreateEvent(NULL, FALSE, TRUE, NULL);
447
448 if (!connected_event) {
449 w_log(W_LOG_ERR, "named_pipe_accept_loop: CreateEvent failed: %s\n",
450 win32_strerror(GetLastError()));
451 return;
452 }
453
454 listener_thread_event = CreateEvent(NULL, FALSE, TRUE, NULL);
455
456 handles[0] = connected_event;
457 handles[1] = listener_thread_event;
458 memset(&olap, 0, sizeof(olap));
459 olap.hEvent = connected_event;
460
461 w_log(W_LOG_ERR, "waiting for pipe clients on %s\n", path);
462 while (!stopping) {
463 FileDescriptor client_fd;
464 DWORD res;
465
466 client_fd = FileDescriptor(intptr_t(CreateNamedPipe(
467 path,
468 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
469 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_REJECT_REMOTE_CLIENTS,
470 PIPE_UNLIMITED_INSTANCES,
471 WATCHMAN_IO_BUF_SIZE,
472 512,
473 0,
474 nullptr)));
475
476 if (!client_fd) {
477 w_log(W_LOG_ERR, "CreateNamedPipe(%s) failed: %s\n",
478 path, win32_strerror(GetLastError()));
479 continue;
480 }
481
482 ResetEvent(connected_event);
483 if (!ConnectNamedPipe((HANDLE)client_fd.handle(), &olap)) {
484 res = GetLastError();
485
486 if (res == ERROR_PIPE_CONNECTED) {
487 make_new_client(w_stm_fdopen(std::move(client_fd)));
488 continue;
489 }
490
491 if (res != ERROR_IO_PENDING) {
492 w_log(W_LOG_ERR, "ConnectNamedPipe: %s\n",
493 win32_strerror(GetLastError()));
494 continue;
495 }
496
497 res = WaitForMultipleObjectsEx(2, handles, false, INFINITE, true);
498 if (res == WAIT_OBJECT_0 + 1) {
499 // Signalled to stop
500 CancelIoEx((HANDLE)client_fd.handle(), &olap);
501 continue;
502 }
503
504 if (res != WAIT_OBJECT_0) {
505 w_log(
506 W_LOG_ERR,
507 "WaitForMultipleObjectsEx: ConnectNamedPipe: "
508 "unexpected status %u\n",
509 res);
510 CancelIoEx((HANDLE)client_fd.handle(), &olap);
511 continue;
512 }
513 }
514 make_new_client(w_stm_fdopen(std::move(client_fd)));
515 }
516 }
517 #endif
518
519 #ifndef _WIN32
accept_loop(FileDescriptor && listenerDescriptor)520 static void accept_loop(FileDescriptor&& listenerDescriptor) {
521 auto listener = w_stm_fdopen(std::move(listenerDescriptor));
522 while (!stopping) {
523 FileDescriptor client_fd;
524 struct watchman_event_poll pfd[2];
525 int bufsize;
526
527 #ifdef HAVE_LIBGIMLI_H
528 if (hb) {
529 gimli_heartbeat_set(hb, GIMLI_HB_RUNNING);
530 }
531 #endif
532
533 pfd[0].evt = listener->getEvents();
534 pfd[1].evt = listener_thread_event.get();
535
536 if (w_poll_events(pfd, 2, 60000) == 0) {
537 if (stopping) {
538 break;
539 }
540 // Timed out, or error.
541 // Arrange to sanity check that we're working
542 w_check_my_sock();
543 continue;
544 }
545
546 if (stopping) {
547 break;
548 }
549
550 #ifdef HAVE_ACCEPT4
551 client_fd = FileDescriptor(
552 accept4(listener->getFileDescriptor().fd(), nullptr, 0, SOCK_CLOEXEC));
553 #else
554 client_fd =
555 FileDescriptor(accept(listener->getFileDescriptor().fd(), nullptr, 0));
556 #endif
557 if (!client_fd) {
558 continue;
559 }
560 client_fd.setCloExec();
561 bufsize = WATCHMAN_IO_BUF_SIZE;
562 setsockopt(
563 client_fd.fd(),
564 SOL_SOCKET,
565 SO_SNDBUF,
566 (void*)&bufsize,
567 sizeof(bufsize));
568
569 make_new_client(w_stm_fdopen(std::move(client_fd)));
570 }
571 }
572 #endif
573
w_start_listener(const char * path)574 bool w_start_listener(const char *path)
575 {
576 #ifndef _WIN32
577 struct sigaction sa;
578 sigset_t sigset;
579 #endif
580
581 #ifdef HAVE_LIBGIMLI_H
582 hb = gimli_heartbeat_attach();
583 #endif
584
585 #if defined(HAVE_KQUEUE) || defined(HAVE_FSEVENTS)
586 {
587 struct rlimit limit;
588 # ifndef __OpenBSD__
589 int mib[2] = { CTL_KERN,
590 # ifdef KERN_MAXFILESPERPROC
591 KERN_MAXFILESPERPROC
592 # else
593 KERN_MAXFILES
594 # endif
595 };
596 # endif
597 int maxperproc;
598
599 getrlimit(RLIMIT_NOFILE, &limit);
600
601 # ifndef __OpenBSD__
602 {
603 size_t len;
604
605 len = sizeof(maxperproc);
606 sysctl(mib, 2, &maxperproc, &len, NULL, 0);
607 w_log(W_LOG_ERR, "file limit is %" PRIu64
608 " kern.maxfilesperproc=%i\n",
609 limit.rlim_cur, maxperproc);
610 }
611 # else
612 maxperproc = limit.rlim_max;
613 w_log(W_LOG_ERR, "openfiles-cur is %" PRIu64
614 " openfiles-max=%i\n",
615 limit.rlim_cur, maxperproc);
616 # endif
617
618 if (limit.rlim_cur != RLIM_INFINITY &&
619 maxperproc > 0 &&
620 limit.rlim_cur < (rlim_t)maxperproc) {
621 limit.rlim_cur = maxperproc;
622
623 if (setrlimit(RLIMIT_NOFILE, &limit)) {
624 w_log(W_LOG_ERR,
625 "failed to raise limit to %" PRIu64 " (%s).\n",
626 limit.rlim_cur,
627 strerror(errno));
628 } else {
629 w_log(W_LOG_ERR,
630 "raised file limit to %" PRIu64 "\n",
631 limit.rlim_cur);
632 }
633 }
634
635 getrlimit(RLIMIT_NOFILE, &limit);
636 #ifndef HAVE_FSEVENTS
637 if (limit.rlim_cur < 10240) {
638 w_log(W_LOG_ERR,
639 "Your file descriptor limit is very low (%" PRIu64 "), "
640 "please consult the watchman docs on raising the limits\n",
641 limit.rlim_cur);
642 }
643 #endif
644 }
645 #endif
646
647 #ifndef _WIN32
648 signal(SIGPIPE, SIG_IGN);
649
650 /* allow SIGUSR1 and SIGCHLD to wake up a blocked thread, without restarting
651 * syscalls */
652 memset(&sa, 0, sizeof(sa));
653 sa.sa_handler = wakeme;
654 sa.sa_flags = 0;
655 sigaction(SIGUSR1, &sa, NULL);
656 sigaction(SIGCHLD, &sa, NULL);
657
658 // Block SIGCHLD everywhere
659 sigemptyset(&sigset);
660 sigaddset(&sigset, SIGCHLD);
661 sigprocmask(SIG_BLOCK, &sigset, NULL);
662
663 if (listener_fd) {
664 // Assume that it was prepped by w_listener_prep_inetd()
665 w_log(W_LOG_ERR, "Using socket from inetd as listening socket\n");
666 } else {
667 listener_fd = get_listener_socket(path);
668 if (!listener_fd) {
669 return false;
670 }
671 }
672 listener_fd.setCloExec();
673 #endif
674
675 #ifdef HAVE_LIBGIMLI_H
676 if (hb) {
677 gimli_heartbeat_set(hb, GIMLI_HB_RUNNING);
678 } else {
679 w_setup_signal_handlers();
680 }
681 #else
682 w_setup_signal_handlers();
683 #endif
684 listener_fd.setNonBlock();
685
686 // Now run the dispatch
687 #ifndef _WIN32
688 listener_thread_event = w_event_make();
689 accept_loop(std::move(listener_fd));
690 #else
691 named_pipe_accept_loop(path);
692 #endif
693
694 // Wait for clients, waking any sleeping clients up in the process
695 {
696 int interval = 2000;
697 int last_count = 0, n_clients = 0;
698 const int max_interval = 1000000; // 1 second
699
700 do {
701 {
702 auto clientsLock = clients.rlock();
703 n_clients = clientsLock->size();
704
705 for (auto client : *clientsLock) {
706 client->ping->notify();
707 }
708 }
709
710 if (n_clients != last_count) {
711 w_log(W_LOG_ERR, "waiting for %d clients to terminate\n", n_clients);
712 }
713 usleep(interval);
714 interval = std::min(interval * 2, max_interval);
715 } while (n_clients > 0);
716 }
717
718 w_state_shutdown();
719
720 return true;
721 }
722
723 /* get-pid */
cmd_get_pid(struct watchman_client * client,const json_ref &)724 static void cmd_get_pid(struct watchman_client* client, const json_ref&) {
725 auto resp = make_response();
726
727 resp.set("pid", json_integer(getpid()));
728
729 send_and_dispose_response(client, std::move(resp));
730 }
731 W_CMD_REG("get-pid", cmd_get_pid, CMD_DAEMON, NULL)
732
733
734 /* vim:ts=2:sw=2:et:
735 */
736