1 /* Copyright 2012-present Facebook, Inc.
2  * Licensed under the Apache License, Version 2.0 */
3 
4 #include "watchman.h"
5 #ifdef HAVE_LIBGIMLI_H
6 # include <libgimli.h>
7 #endif
8 #ifdef __APPLE__
9 #include <launch.h>
10 #endif
11 
12 /* This needs to be recursive safe because we may log to clients
13  * while we are dispatching subscriptions to clients */
14 pthread_mutex_t w_client_lock;
15 w_ht_t *clients = NULL;
16 static int listener_fd;
17 static pthread_t reaper_thread;
18 static pthread_t listener_thread;
19 #ifdef _WIN32
20 static HANDLE listener_thread_event;
21 #endif
22 static volatile bool stopping = false;
23 #ifdef HAVE_LIBGIMLI_H
24 static volatile struct gimli_heartbeat *hb = NULL;
25 #endif
26 
make_response(void)27 json_t *make_response(void)
28 {
29   json_t *resp = json_object();
30 
31   set_prop(resp, "version", json_string_nocheck(PACKAGE_VERSION));
32 
33   return resp;
34 }
35 
36 static int proc_pid;
37 static uint64_t proc_start_time;
38 
clock_id_string(uint32_t root_number,uint32_t ticks,char * buf,size_t bufsize)39 bool clock_id_string(uint32_t root_number, uint32_t ticks, char *buf,
40     size_t bufsize)
41 {
42   int res = snprintf(buf, bufsize, "c:%" PRIu64 ":%d:%u:%" PRIu32,
43                      proc_start_time, proc_pid, root_number, ticks);
44 
45   if (res == -1) {
46     return false;
47   }
48   return (size_t)res < bufsize;
49 }
50 
51 // Renders the current clock id string to the supplied buffer.
52 // Must be called with the root locked.
current_clock_id_string(w_root_t * root,char * buf,size_t bufsize)53 static bool current_clock_id_string(w_root_t *root,
54     char *buf, size_t bufsize)
55 {
56   return clock_id_string(root->number, root->ticks, buf, bufsize);
57 }
58 
59 /* Add the current clock value to the response.
60  * must be called with the root locked */
annotate_with_clock(w_root_t * root,json_t * resp)61 void annotate_with_clock(w_root_t *root, json_t *resp)
62 {
63   char buf[128];
64 
65   if (current_clock_id_string(root, buf, sizeof(buf))) {
66     set_prop(resp, "clock", json_string_nocheck(buf));
67   }
68 }
69 
70 /* must be called with the w_client_lock held */
enqueue_response(struct watchman_client * client,json_t * json,bool ping)71 bool enqueue_response(struct watchman_client *client,
72     json_t *json, bool ping)
73 {
74   struct watchman_client_response *resp;
75 
76   resp = calloc(1, sizeof(*resp));
77   if (!resp) {
78     return false;
79   }
80   resp->json = json;
81 
82   if (client->tail) {
83     client->tail->next = resp;
84   } else {
85     client->head = resp;
86   }
87   client->tail = resp;
88 
89   if (ping) {
90     w_event_set(client->ping);
91   }
92 
93   return true;
94 }
95 
send_and_dispose_response(struct watchman_client * client,json_t * response)96 void send_and_dispose_response(struct watchman_client *client,
97     json_t *response)
98 {
99   pthread_mutex_lock(&w_client_lock);
100   if (!enqueue_response(client, response, false)) {
101     json_decref(response);
102   }
103   pthread_mutex_unlock(&w_client_lock);
104 }
105 
send_error_response(struct watchman_client * client,const char * fmt,...)106 void send_error_response(struct watchman_client *client,
107     const char *fmt, ...)
108 {
109   char buf[WATCHMAN_NAME_MAX];
110   va_list ap;
111   json_t *resp = make_response();
112 
113   va_start(ap, fmt);
114   vsnprintf(buf, sizeof(buf), fmt, ap);
115   va_end(ap);
116 
117   w_log(W_LOG_ERR, "send_error_response: %s\n", buf);
118   set_prop(resp, "error", json_string_nocheck(buf));
119 
120   send_and_dispose_response(client, resp);
121 }
122 
client_delete(struct watchman_client * client)123 static void client_delete(struct watchman_client *client)
124 {
125   struct watchman_client_response *resp;
126 
127   w_log(W_LOG_DBG, "client_delete %p\n", client);
128 
129   /* cancel subscriptions */
130   w_ht_free(client->subscriptions);
131 
132   while (client->head) {
133     resp = client->head;
134     client->head = resp->next;
135     json_decref(resp->json);
136     free(resp);
137   }
138 
139   w_json_buffer_free(&client->reader);
140   w_json_buffer_free(&client->writer);
141   w_event_destroy(client->ping);
142   w_stm_shutdown(client->stm);
143   w_stm_close(client->stm);
144   free(client);
145 }
146 
delete_subscription(w_ht_val_t val)147 static void delete_subscription(w_ht_val_t val)
148 {
149   struct watchman_client_subscription *sub = w_ht_val_ptr(val);
150 
151   w_string_delref(sub->name);
152   w_query_delref(sub->query);
153   if (sub->drop_or_defer) {
154     w_ht_free(sub->drop_or_defer);
155   }
156   free(sub);
157 }
158 
159 static const struct watchman_hash_funcs subscription_hash_funcs = {
160   w_ht_string_copy,
161   w_ht_string_del,
162   w_ht_string_equal,
163   w_ht_string_hash,
164   NULL,
165   delete_subscription
166 };
167 
w_clockspec_new_clock(uint32_t root_number,uint32_t ticks)168 struct w_clockspec *w_clockspec_new_clock(uint32_t root_number, uint32_t ticks)
169 {
170   struct w_clockspec *spec;
171   spec = calloc(1, sizeof(*spec));
172   if (!spec) {
173     return NULL;
174   }
175   spec->tag = w_cs_clock;
176   spec->clock.start_time = proc_start_time;
177   spec->clock.pid = proc_pid;
178   spec->clock.root_number = root_number;
179   spec->clock.ticks = ticks;
180   return spec;
181 }
182 
w_clockspec_parse(json_t * value)183 struct w_clockspec *w_clockspec_parse(json_t *value)
184 {
185   const char *str;
186   uint64_t start_time;
187   int pid;
188   uint32_t root_number;
189   uint32_t ticks;
190 
191   struct w_clockspec *spec;
192 
193   spec = calloc(1, sizeof(*spec));
194   if (!spec) {
195     return NULL;
196   }
197 
198   if (json_is_integer(value)) {
199     spec->tag = w_cs_timestamp;
200     spec->timestamp.tv_usec = 0;
201     spec->timestamp.tv_sec = (time_t)json_integer_value(value);
202     return spec;
203   }
204 
205   str = json_string_value(value);
206   if (!str) {
207     free(spec);
208     return NULL;
209   }
210 
211   if (str[0] == 'n' && str[1] == ':') {
212     spec->tag = w_cs_named_cursor;
213     // spec owns the ref to the string
214     spec->named_cursor.cursor = w_string_new(str);
215     return spec;
216   }
217 
218   if (sscanf(str, "c:%" PRIu64 ":%d:%" PRIu32 ":%" PRIu32,
219              &start_time, &pid, &root_number, &ticks) == 4) {
220     spec->tag = w_cs_clock;
221     spec->clock.start_time = start_time;
222     spec->clock.pid = pid;
223     spec->clock.root_number = root_number;
224     spec->clock.ticks = ticks;
225     return spec;
226   }
227 
228   if (sscanf(str, "c:%d:%" PRIu32, &pid, &ticks) == 2) {
229     // old-style clock value (<= 2.8.2) -- by setting clock time and root number
230     // to 0 we guarantee that this is treated as a fresh instance
231     spec->tag = w_cs_clock;
232     spec->clock.start_time = 0;
233     spec->clock.pid = pid;
234     spec->clock.root_number = root_number;
235     spec->clock.ticks = ticks;
236     return spec;
237   }
238 
239   free(spec);
240   return NULL;
241 }
242 
243 // must be called with the root locked
244 // spec can be null, in which case a fresh instance is assumed
w_clockspec_eval(w_root_t * root,const struct w_clockspec * spec,struct w_query_since * since)245 void w_clockspec_eval(w_root_t *root,
246     const struct w_clockspec *spec,
247     struct w_query_since *since)
248 {
249   if (spec == NULL) {
250     since->is_timestamp = false;
251     since->clock.is_fresh_instance = true;
252     since->clock.ticks = 0;
253     return;
254   }
255 
256   if (spec->tag == w_cs_timestamp) {
257     // just copy the values over
258     since->is_timestamp = true;
259     since->timestamp = spec->timestamp;
260     return;
261   }
262 
263   since->is_timestamp = false;
264 
265   if (spec->tag == w_cs_named_cursor) {
266     w_ht_val_t ticks_val;
267     w_string_t *cursor = spec->named_cursor.cursor;
268     since->clock.is_fresh_instance = !w_ht_lookup(root->cursors,
269                                                   w_ht_ptr_val(cursor),
270                                                   &ticks_val, false);
271     if (!since->clock.is_fresh_instance) {
272       since->clock.is_fresh_instance = ticks_val < root->last_age_out_tick;
273     }
274     if (since->clock.is_fresh_instance) {
275       since->clock.ticks = 0;
276     } else {
277       since->clock.ticks = (uint32_t)ticks_val;
278     }
279 
280     // Bump the tick value and record it against the cursor.
281     // We need to bump the tick value so that repeated queries
282     // when nothing has changed in the filesystem won't continue
283     // to return the same set of files; we only want the first
284     // of these to return the files and the rest to return nothing
285     // until something subsequently changes
286     w_ht_replace(root->cursors, w_ht_ptr_val(cursor), ++root->ticks);
287 
288     w_log(W_LOG_DBG, "resolved cursor %.*s -> %" PRIu32 "\n",
289         cursor->len, cursor->buf, since->clock.ticks);
290     return;
291   }
292 
293   // spec->tag == w_cs_clock
294   if (spec->clock.start_time == proc_start_time &&
295       spec->clock.pid == proc_pid &&
296       spec->clock.root_number == root->number) {
297 
298     since->clock.is_fresh_instance =
299       spec->clock.ticks < root->last_age_out_tick;
300     if (since->clock.is_fresh_instance) {
301       since->clock.ticks = 0;
302     } else {
303       since->clock.ticks = spec->clock.ticks;
304     }
305     if (spec->clock.ticks == root->ticks) {
306       /* Force ticks to increment.  This avoids returning and querying the
307        * same tick value over and over when no files have changed in the
308        * meantime */
309       root->ticks++;
310     }
311     return;
312   }
313 
314   // If the pid, start time or root number don't match, they asked a different
315   // incarnation of the server or a different instance of this root, so we treat
316   // them as having never spoken to us before
317   since->clock.is_fresh_instance = true;
318   since->clock.ticks = 0;
319 }
320 
w_clockspec_free(struct w_clockspec * spec)321 void w_clockspec_free(struct w_clockspec *spec)
322 {
323   if (spec->tag == w_cs_named_cursor) {
324     w_string_delref(spec->named_cursor.cursor);
325   }
326   free(spec);
327 }
328 
add_root_warnings_to_response(json_t * response,w_root_t * root)329 void add_root_warnings_to_response(json_t *response, w_root_t *root) {
330   char *str = NULL;
331   char *full = NULL;
332 
333   if (!root->last_recrawl_reason && !root->warning) {
334     return;
335   }
336 
337   if (root->last_recrawl_reason) {
338     ignore_result(asprintf(&str,
339           "Recrawled this watch %d times, most recently because:\n"
340           "%.*s\n"
341           "To resolve, please review the information on\n"
342           "%s#recrawl",
343           root->recrawl_count,
344           root->last_recrawl_reason->len,
345           root->last_recrawl_reason->buf,
346           cfg_get_trouble_url()));
347   }
348 
349   ignore_result(asprintf(&full,
350       "%.*s%s"   // root->warning
351       "%s\n"     // str (last recrawl reason)
352       "To clear this warning, run:\n"
353       "`watchman watch-del %.*s ; watchman watch-project %.*s`\n",
354       root->warning ? root->warning->len : 0,
355       root->warning ? root->warning->buf : "",
356       root->warning && str ? "\n" : "", // newline if we have both strings
357       str ? str : "",
358       root->root_path->len,
359       root->root_path->buf,
360       root->root_path->len,
361       root->root_path->buf));
362 
363   if (full) {
364     set_prop(response, "warning", json_string_nocheck(full));
365   }
366   free(str);
367   free(full);
368 }
369 
resolve_root_or_err(struct watchman_client * client,json_t * args,int root_index,bool create)370 w_root_t *resolve_root_or_err(
371     struct watchman_client *client,
372     json_t *args,
373     int root_index,
374     bool create)
375 {
376   w_root_t *root;
377   const char *root_name;
378   char *errmsg = NULL;
379   json_t *ele;
380 
381   ele = json_array_get(args, root_index);
382   if (!ele) {
383     send_error_response(client, "wrong number of arguments");
384     return NULL;
385   }
386 
387   root_name = json_string_value(ele);
388   if (!root_name) {
389     send_error_response(client,
390         "invalid value for argument %d, expected "
391         "a string naming the root dir",
392         root_index);
393     return NULL;
394   }
395 
396   if (client->client_mode) {
397     root = w_root_resolve_for_client_mode(root_name, &errmsg);
398   } else {
399     root = w_root_resolve(root_name, create, &errmsg);
400   }
401 
402   if (!root) {
403     send_error_response(client,
404         "unable to resolve root %s: %s",
405         root_name, errmsg);
406     free(errmsg);
407   }
408   return root;
409 }
410 
w_request_shutdown(void)411 void w_request_shutdown(void) {
412   stopping = true;
413   // Knock listener thread out of poll/accept
414 #ifndef _WIN32
415   pthread_kill(listener_thread, SIGUSR1);
416   pthread_kill(reaper_thread, SIGUSR1);
417 #else
418   SetEvent(listener_thread_event);
419 #endif
420 }
421 
cmd_shutdown(struct watchman_client * client,json_t * args)422 static void cmd_shutdown(
423     struct watchman_client *client,
424     json_t *args)
425 {
426   json_t *resp = make_response();
427   unused_parameter(args);
428 
429   w_log(W_LOG_ERR, "shutdown-server was requested, exiting!\n");
430   stopping = true;
431 
432   w_request_shutdown();
433 
434   set_prop(resp, "shutdown-server", json_true());
435   send_and_dispose_response(client, resp);
436 }
437 W_CMD_REG("shutdown-server", cmd_shutdown, CMD_DAEMON|CMD_POISON_IMMUNE, NULL)
438 
439 // The client thread reads and decodes json packets,
440 // then dispatches the commands that it finds
client_thread(void * ptr)441 static void *client_thread(void *ptr)
442 {
443   struct watchman_client *client = ptr;
444   struct watchman_event_poll pfd[2];
445   struct watchman_client_response *queued_responses_to_send;
446   json_t *request;
447   json_error_t jerr;
448   bool send_ok = true;
449 
450   w_stm_set_nonblock(client->stm, true);
451   w_set_thread_name("client:stm=%p", client->stm);
452 
453   w_stm_get_events(client->stm, &pfd[0].evt);
454   pfd[1].evt = client->ping;
455 
456   while (!stopping) {
457     // Wait for input from either the client socket or
458     // via the ping pipe, which signals that some other
459     // thread wants to unilaterally send data to the client
460 
461     ignore_result(w_poll_events(pfd, 2, 2000));
462 
463     if (stopping) {
464       break;
465     }
466 
467     if (pfd[0].ready) {
468       request = w_json_buffer_next(&client->reader, client->stm, &jerr);
469 
470       if (!request && errno == EAGAIN) {
471         // That's fine
472       } else if (!request) {
473         // Not so cool
474         if (client->reader.wpos == client->reader.rpos) {
475           // If they disconnected in between PDUs, no need to log
476           // any error
477           goto disconected;
478         }
479         send_error_response(client, "invalid json at position %d: %s",
480             jerr.position, jerr.text);
481         w_log(W_LOG_ERR, "invalid data from client: %s\n", jerr.text);
482 
483         goto disconected;
484       } else if (request) {
485         client->pdu_type = client->reader.pdu_type;
486         dispatch_command(client, request, CMD_DAEMON);
487         json_decref(request);
488       }
489     }
490 
491     if (pfd[1].ready) {
492       w_event_test_and_clear(client->ping);
493     }
494 
495     /* de-queue the pending responses under the lock */
496     pthread_mutex_lock(&w_client_lock);
497     queued_responses_to_send = client->head;
498     client->head = NULL;
499     client->tail = NULL;
500     pthread_mutex_unlock(&w_client_lock);
501 
502     /* now send our response(s) */
503     while (queued_responses_to_send) {
504       struct watchman_client_response *response_to_send =
505         queued_responses_to_send;
506 
507       if (send_ok) {
508         w_stm_set_nonblock(client->stm, false);
509         /* Return the data in the same format that was used to ask for it.
510          * Don't bother sending any more messages if the client disconnects,
511          * but still free their memory.
512          */
513         send_ok = w_ser_write_pdu(client->pdu_type, &client->writer,
514                                   client->stm, response_to_send->json);
515         w_stm_set_nonblock(client->stm, true);
516       }
517 
518       queued_responses_to_send = response_to_send->next;
519 
520       json_decref(response_to_send->json);
521       free(response_to_send);
522     }
523   }
524 
525 disconected:
526   // Remove the client from the map before we tear it down, as this makes
527   // it easier to flush out pending writes on windows without worrying
528   // about w_log_to_clients contending for the write buffers
529   pthread_mutex_lock(&w_client_lock);
530   w_ht_del(clients, w_ht_ptr_val(client));
531   pthread_mutex_unlock(&w_client_lock);
532 
533   w_client_vacate_states(client);
534   client_delete(client);
535 
536   return NULL;
537 }
538 
w_should_log_to_clients(int level)539 bool w_should_log_to_clients(int level)
540 {
541   w_ht_iter_t iter;
542   bool result = false;
543 
544   pthread_mutex_lock(&w_client_lock);
545 
546   if (!clients) {
547     pthread_mutex_unlock(&w_client_lock);
548     return false;
549   }
550 
551   if (w_ht_first(clients, &iter)) do {
552     struct watchman_client *client = w_ht_val_ptr(iter.value);
553 
554     if (client->log_level != W_LOG_OFF && client->log_level >= level) {
555       result = true;
556       break;
557     }
558 
559   } while (w_ht_next(clients, &iter));
560   pthread_mutex_unlock(&w_client_lock);
561 
562   return result;
563 }
564 
w_log_to_clients(int level,const char * buf)565 void w_log_to_clients(int level, const char *buf)
566 {
567   json_t *json = NULL;
568   w_ht_iter_t iter;
569 
570   if (!clients) {
571     return;
572   }
573 
574   pthread_mutex_lock(&w_client_lock);
575   if (w_ht_first(clients, &iter)) do {
576     struct watchman_client *client = w_ht_val_ptr(iter.value);
577 
578     if (client->log_level != W_LOG_OFF && client->log_level >= level) {
579       json = make_response();
580       if (json) {
581         set_prop(json, "log", json_string_nocheck(buf));
582         if (!enqueue_response(client, json, true)) {
583           json_decref(json);
584         }
585       }
586     }
587 
588   } while (w_ht_next(clients, &iter));
589   pthread_mutex_unlock(&w_client_lock);
590 }
591 
child_reaper(void * arg)592 static void *child_reaper(void *arg)
593 {
594 #ifndef _WIN32
595   sigset_t sigset;
596 
597   // By default, keep both SIGCHLD and SIGUSR1 blocked
598   sigemptyset(&sigset);
599   sigaddset(&sigset, SIGUSR1);
600   sigaddset(&sigset, SIGCHLD);
601   pthread_sigmask(SIG_BLOCK, &sigset, NULL);
602 
603   // SIGCHLD is ordinarily blocked, so we listen for it only in
604   // sigsuspend, when we're also listening for the SIGUSR1 that tells
605   // us to exit.
606   pthread_sigmask(SIG_BLOCK, NULL, &sigset);
607   sigdelset(&sigset, SIGCHLD);
608   sigdelset(&sigset, SIGUSR1);
609 
610 #endif
611   unused_parameter(arg);
612   w_set_thread_name("child_reaper");
613 
614 #ifdef _WIN32
615   while (!stopping) {
616     usleep(200000);
617     w_reap_children(true);
618   }
619 #else
620   while (!stopping) {
621     w_reap_children(false);
622     sigsuspend(&sigset);
623   }
624 #endif
625 
626   return 0;
627 }
628 
629 // This is just a placeholder.
630 // This catches SIGUSR1 so we don't terminate.
631 // We use this to interrupt blocking syscalls
632 // on the worker threads
wakeme(int signo)633 static void wakeme(int signo)
634 {
635   unused_parameter(signo);
636 }
637 
638 #if defined(HAVE_KQUEUE) || defined(HAVE_FSEVENTS)
639 #ifdef __OpenBSD__
640 #include <sys/siginfo.h>
641 #endif
642 #include <sys/param.h>
643 #include <sys/sysctl.h>
644 #include <sys/time.h>
645 #include <sys/resource.h>
646 #endif
647 
648 #ifdef __APPLE__
649 /* When running under launchd, we prefer to obtain our listening
650  * socket from it.  We don't strictly need to run this way, but if we didn't,
651  * when the user runs `watchman shutdown-server` the launchd job is left in
652  * a waiting state and needs to be explicitly triggered to get it working
653  * again.
654  * By having the socket registered in our job description, launchd knows
655  * that we want to be activated in this way and takes care of it for us.
656  *
657  * This is made more fun because Yosemite introduces launch_activate_socket()
658  * as a shortcut for this flow and deprecated pretty much everything else
659  * in launch.h.  We use the deprecated functions so that we can run on
660  * older releases.
661  * */
662 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
get_listener_socket_from_launchd(void)663 static int get_listener_socket_from_launchd(void)
664 {
665   launch_data_t req, resp, socks;
666 
667   req = launch_data_new_string(LAUNCH_KEY_CHECKIN);
668   if (req == NULL) {
669     w_log(W_LOG_ERR, "unable to create LAUNCH_KEY_CHECKIN\n");
670     return -1;
671   }
672 
673   resp = launch_msg(req);
674   launch_data_free(req);
675 
676   if (resp == NULL) {
677     w_log(W_LOG_ERR, "launchd checkin failed %s\n", strerror(errno));
678     return -1;
679   }
680 
681   if (launch_data_get_type(resp) == LAUNCH_DATA_ERRNO) {
682     w_log(W_LOG_ERR, "launchd checkin failed: %s\n",
683         strerror(launch_data_get_errno(resp)));
684     launch_data_free(resp);
685     return -1;
686   }
687 
688   socks = launch_data_dict_lookup(resp, LAUNCH_JOBKEY_SOCKETS);
689   if (socks == NULL) {
690     w_log(W_LOG_ERR, "launchd didn't provide any sockets\n");
691     launch_data_free(resp);
692     return -1;
693   }
694 
695   // the "sock" name here is coupled with the plist in main.c
696   socks = launch_data_dict_lookup(socks, "sock");
697   if (socks == NULL) {
698     w_log(W_LOG_ERR, "launchd: \"sock\" wasn't present in Sockets\n");
699     launch_data_free(resp);
700     return -1;
701   }
702 
703   return launch_data_get_fd(launch_data_array_get_index(socks, 0));
704 }
705 #endif
706 
707 #ifndef _WIN32
get_listener_socket(const char * path)708 static int get_listener_socket(const char *path)
709 {
710   struct sockaddr_un un;
711 
712 #ifdef __APPLE__
713   listener_fd = get_listener_socket_from_launchd();
714   if (listener_fd != -1) {
715     w_log(W_LOG_ERR, "Using socket from launchd as listening socket\n");
716     return listener_fd;
717   }
718 #endif
719 
720   if (strlen(path) >= sizeof(un.sun_path) - 1) {
721     w_log(W_LOG_ERR, "%s: path is too long\n",
722         path);
723     return -1;
724   }
725 
726   listener_fd = socket(PF_LOCAL, SOCK_STREAM, 0);
727   if (listener_fd == -1) {
728     w_log(W_LOG_ERR, "socket: %s\n",
729         strerror(errno));
730     return -1;
731   }
732 
733   un.sun_family = PF_LOCAL;
734   strcpy(un.sun_path, path);
735   unlink(path);
736 
737   if (bind(listener_fd, (struct sockaddr*)&un, sizeof(un)) != 0) {
738     w_log(W_LOG_ERR, "bind(%s): %s\n",
739       path, strerror(errno));
740     close(listener_fd);
741     return -1;
742   }
743 
744   if (listen(listener_fd, 200) != 0) {
745     w_log(W_LOG_ERR, "listen(%s): %s\n",
746         path, strerror(errno));
747     close(listener_fd);
748     return -1;
749   }
750 
751   return listener_fd;
752 }
753 #endif
754 
make_new_client(w_stm_t stm)755 static struct watchman_client *make_new_client(w_stm_t stm) {
756   struct watchman_client *client;
757   pthread_attr_t attr;
758   pthread_t thr;
759 
760   pthread_attr_init(&attr);
761   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
762 
763   client = calloc(1, sizeof(*client));
764   if (!client) {
765     pthread_attr_destroy(&attr);
766     return NULL;
767   }
768   client->stm = stm;
769   w_log(W_LOG_DBG, "accepted client:stm=%p\n", client->stm);
770 
771   if (!w_json_buffer_init(&client->reader)) {
772     // FIXME: error handling
773   }
774   if (!w_json_buffer_init(&client->writer)) {
775     // FIXME: error handling
776   }
777   client->ping = w_event_make();
778   if (!client->ping) {
779     // FIXME: error handling
780   }
781   client->subscriptions = w_ht_new(2, &subscription_hash_funcs);
782 
783   pthread_mutex_lock(&w_client_lock);
784   w_ht_set(clients, w_ht_ptr_val(client), w_ht_ptr_val(client));
785   pthread_mutex_unlock(&w_client_lock);
786 
787   // Start a thread for the client.
788   // We used to use libevent for this, but we have
789   // a low volume of concurrent clients and the json
790   // parse/encode APIs are not easily used in a non-blocking
791   // server architecture.
792   if (pthread_create(&thr, &attr, client_thread, client)) {
793     // It didn't work out, sorry!
794     pthread_mutex_lock(&w_client_lock);
795     w_ht_del(clients, w_ht_ptr_val(client));
796     pthread_mutex_unlock(&w_client_lock);
797     client_delete(client);
798   }
799 
800   pthread_attr_destroy(&attr);
801 
802   return client;
803 }
804 
805 #ifdef _WIN32
named_pipe_accept_loop(const char * path)806 static void named_pipe_accept_loop(const char *path) {
807   HANDLE handles[2];
808   OVERLAPPED olap;
809   HANDLE connected_event = CreateEvent(NULL, FALSE, TRUE, NULL);
810 
811   if (!connected_event) {
812     w_log(W_LOG_ERR, "named_pipe_accept_loop: CreateEvent failed: %s\n",
813         win32_strerror(GetLastError()));
814     return;
815   }
816 
817   listener_thread_event = CreateEvent(NULL, FALSE, TRUE, NULL);
818 
819   handles[0] = connected_event;
820   handles[1] = listener_thread_event;
821   memset(&olap, 0, sizeof(olap));
822   olap.hEvent = connected_event;
823 
824   w_log(W_LOG_ERR, "waiting for pipe clients on %s\n", path);
825   while (!stopping) {
826     w_stm_t stm;
827     HANDLE client_fd;
828     DWORD res;
829 
830     client_fd = CreateNamedPipe(
831         path,
832         PIPE_ACCESS_DUPLEX|FILE_FLAG_OVERLAPPED,
833         PIPE_TYPE_BYTE|PIPE_READMODE_BYTE|
834         PIPE_REJECT_REMOTE_CLIENTS,
835         PIPE_UNLIMITED_INSTANCES,
836         WATCHMAN_IO_BUF_SIZE,
837         512, 0, NULL);
838 
839     if (client_fd == INVALID_HANDLE_VALUE) {
840       w_log(W_LOG_ERR, "CreateNamedPipe(%s) failed: %s\n",
841           path, win32_strerror(GetLastError()));
842       continue;
843     }
844 
845     ResetEvent(connected_event);
846     if (!ConnectNamedPipe(client_fd, &olap)) {
847       res = GetLastError();
848 
849       if (res == ERROR_PIPE_CONNECTED) {
850         goto good_client;
851       }
852 
853       if (res != ERROR_IO_PENDING) {
854         w_log(W_LOG_ERR, "ConnectNamedPipe: %s\n",
855             win32_strerror(GetLastError()));
856         CloseHandle(client_fd);
857         continue;
858       }
859 
860       res = WaitForMultipleObjectsEx(2, handles, false, INFINITE, true);
861       if (res == WAIT_OBJECT_0 + 1) {
862         // Signalled to stop
863         CancelIoEx(client_fd, &olap);
864         CloseHandle(client_fd);
865         continue;
866       }
867 
868       if (res == WAIT_OBJECT_0) {
869         goto good_client;
870       }
871 
872       w_log(W_LOG_ERR, "WaitForMultipleObjectsEx: ConnectNamedPipe: "
873           "unexpected status %u\n", res);
874       CancelIoEx(client_fd, &olap);
875       CloseHandle(client_fd);
876     } else {
877 good_client:
878       stm = w_stm_handleopen(client_fd);
879       if (!stm) {
880         w_log(W_LOG_ERR, "Failed to allocate stm for pipe handle: %s\n",
881             strerror(errno));
882         CloseHandle(client_fd);
883         continue;
884       }
885 
886       make_new_client(stm);
887     }
888   }
889 }
890 #endif
891 
892 #ifndef _WIN32
accept_loop()893 static void accept_loop() {
894   while (!stopping) {
895     int client_fd;
896     struct pollfd pfd;
897     int bufsize;
898     w_stm_t stm;
899 
900 #ifdef HAVE_LIBGIMLI_H
901     if (hb) {
902       gimli_heartbeat_set(hb, GIMLI_HB_RUNNING);
903     }
904 #endif
905 
906     pfd.events = POLLIN;
907     pfd.fd = listener_fd;
908     if (poll(&pfd, 1, 60000) < 1 || (pfd.revents & POLLIN) == 0) {
909       if (stopping) {
910         break;
911       }
912       // Timed out, or error.
913       // Arrange to sanity check that we're working
914       w_check_my_sock();
915       continue;
916     }
917 
918 #ifdef HAVE_ACCEPT4
919     client_fd = accept4(listener_fd, NULL, 0, SOCK_CLOEXEC);
920 #else
921     client_fd = accept(listener_fd, NULL, 0);
922 #endif
923     if (client_fd == -1) {
924       continue;
925     }
926     w_set_cloexec(client_fd);
927     bufsize = WATCHMAN_IO_BUF_SIZE;
928     setsockopt(client_fd, SOL_SOCKET, SO_SNDBUF,
929         (void*)&bufsize, sizeof(bufsize));
930 
931     stm = w_stm_fdopen(client_fd);
932     if (!stm) {
933       w_log(W_LOG_ERR, "Failed to allocate stm for fd: %s\n",
934           strerror(errno));
935       close(client_fd);
936       continue;
937     }
938     make_new_client(stm);
939   }
940 }
941 #endif
942 
w_start_listener(const char * path)943 bool w_start_listener(const char *path)
944 {
945 #ifndef _WIN32
946   struct sigaction sa;
947   sigset_t sigset;
948 #endif
949   void *ignored;
950   struct timeval tv;
951 
952   listener_thread = pthread_self();
953 
954 #ifdef HAVE_LIBGIMLI_H
955   hb = gimli_heartbeat_attach();
956 #endif
957 
958 #if defined(HAVE_KQUEUE) || defined(HAVE_FSEVENTS)
959   {
960     struct rlimit limit;
961 # ifndef __OpenBSD__
962     int mib[2] = { CTL_KERN,
963 #  ifdef KERN_MAXFILESPERPROC
964       KERN_MAXFILESPERPROC
965 #  else
966       KERN_MAXFILES
967 #  endif
968     };
969 # endif
970     int maxperproc;
971 
972     getrlimit(RLIMIT_NOFILE, &limit);
973 
974 # ifndef __OpenBSD__
975     {
976       size_t len;
977 
978       len = sizeof(maxperproc);
979       sysctl(mib, 2, &maxperproc, &len, NULL, 0);
980       w_log(W_LOG_ERR, "file limit is %" PRIu64
981           " kern.maxfilesperproc=%i\n",
982           limit.rlim_cur, maxperproc);
983     }
984 # else
985     maxperproc = limit.rlim_max;
986     w_log(W_LOG_ERR, "openfiles-cur is %" PRIu64
987         " openfiles-max=%i\n",
988         limit.rlim_cur, maxperproc);
989 # endif
990 
991     if (limit.rlim_cur != RLIM_INFINITY &&
992         maxperproc > 0 &&
993         limit.rlim_cur < (rlim_t)maxperproc) {
994       limit.rlim_cur = maxperproc;
995 
996       if (setrlimit(RLIMIT_NOFILE, &limit)) {
997         w_log(W_LOG_ERR,
998           "failed to raise limit to %" PRIu64 " (%s).\n",
999           limit.rlim_cur,
1000           strerror(errno));
1001       } else {
1002         w_log(W_LOG_ERR,
1003             "raised file limit to %" PRIu64 "\n",
1004             limit.rlim_cur);
1005       }
1006     }
1007 
1008     getrlimit(RLIMIT_NOFILE, &limit);
1009 #ifndef HAVE_FSEVENTS
1010     if (limit.rlim_cur < 10240) {
1011       w_log(W_LOG_ERR,
1012           "Your file descriptor limit is very low (%" PRIu64 "), "
1013           "please consult the watchman docs on raising the limits\n",
1014           limit.rlim_cur);
1015     }
1016 #endif
1017   }
1018 #endif
1019 
1020   proc_pid = (int)getpid();
1021   if (gettimeofday(&tv, NULL) == -1) {
1022     w_log(W_LOG_ERR, "gettimeofday failed: %s\n", strerror(errno));
1023     return false;
1024   }
1025   proc_start_time = (uint64_t)tv.tv_sec;
1026 
1027 #ifndef _WIN32
1028   signal(SIGPIPE, SIG_IGN);
1029 
1030   /* allow SIGUSR1 and SIGCHLD to wake up a blocked thread, without restarting
1031    * syscalls */
1032   memset(&sa, 0, sizeof(sa));
1033   sa.sa_handler = wakeme;
1034   sa.sa_flags = 0;
1035   sigaction(SIGUSR1, &sa, NULL);
1036   sigaction(SIGCHLD, &sa, NULL);
1037 
1038   // Block SIGCHLD everywhere
1039   sigemptyset(&sigset);
1040   sigaddset(&sigset, SIGCHLD);
1041   sigprocmask(SIG_BLOCK, &sigset, NULL);
1042 
1043   listener_fd = get_listener_socket(path);
1044   if (listener_fd == -1) {
1045     return false;
1046   }
1047   w_set_cloexec(listener_fd);
1048 #endif
1049 
1050   if (pthread_create(&reaper_thread, NULL, child_reaper, NULL)) {
1051     w_log(W_LOG_FATAL, "pthread_create(reaper): %s\n",
1052         strerror(errno));
1053     return false;
1054   }
1055 
1056   if (!clients) {
1057     clients = w_ht_new(2, NULL);
1058   }
1059 
1060   w_state_load();
1061 
1062 #ifdef HAVE_LIBGIMLI_H
1063   if (hb) {
1064     gimli_heartbeat_set(hb, GIMLI_HB_RUNNING);
1065   } else {
1066     w_setup_signal_handlers();
1067   }
1068 #else
1069   w_setup_signal_handlers();
1070 #endif
1071   w_set_nonblock(listener_fd);
1072 
1073   // Now run the dispatch
1074 #ifndef _WIN32
1075   accept_loop();
1076 #else
1077   named_pipe_accept_loop(path);
1078 #endif
1079 
1080 #ifndef _WIN32
1081   /* close out some resources to persuade valgrind to run clean */
1082   close(listener_fd);
1083   listener_fd = -1;
1084 #endif
1085 
1086   // Wait for clients, waking any sleeping clients up in the process
1087   {
1088     int interval = 2000;
1089     int last_count = 0, n_clients = 0;
1090 
1091     do {
1092       w_ht_iter_t iter;
1093 
1094       pthread_mutex_lock(&w_client_lock);
1095       n_clients = w_ht_size(clients);
1096 
1097       if (w_ht_first(clients, &iter)) do {
1098         struct watchman_client *client = w_ht_val_ptr(iter.value);
1099         w_event_set(client->ping);
1100       } while (w_ht_next(clients, &iter));
1101 
1102       pthread_mutex_unlock(&w_client_lock);
1103 
1104       if (n_clients != last_count) {
1105         w_log(W_LOG_ERR, "waiting for %d clients to terminate\n", n_clients);
1106       }
1107       usleep(interval);
1108       interval = MIN(interval * 2, 1000000);
1109     } while (n_clients > 0);
1110   }
1111 
1112   w_root_free_watched_roots();
1113 
1114   pthread_join(reaper_thread, &ignored);
1115   cfg_shutdown();
1116 
1117   return true;
1118 }
1119 
1120 /* vim:ts=2:sw=2:et:
1121  */
1122