1 /* Copyright 2012-present Facebook, Inc.
2 * Licensed under the Apache License, Version 2.0 */
3
4 #include "watchman.h"
5 #ifdef HAVE_LIBGIMLI_H
6 # include <libgimli.h>
7 #endif
8 #ifdef __APPLE__
9 #include <launch.h>
10 #endif
11
12 /* This needs to be recursive safe because we may log to clients
13 * while we are dispatching subscriptions to clients */
14 pthread_mutex_t w_client_lock;
15 w_ht_t *clients = NULL;
16 static int listener_fd;
17 static pthread_t reaper_thread;
18 static pthread_t listener_thread;
19 #ifdef _WIN32
20 static HANDLE listener_thread_event;
21 #endif
22 static volatile bool stopping = false;
23 #ifdef HAVE_LIBGIMLI_H
24 static volatile struct gimli_heartbeat *hb = NULL;
25 #endif
26
make_response(void)27 json_t *make_response(void)
28 {
29 json_t *resp = json_object();
30
31 set_prop(resp, "version", json_string_nocheck(PACKAGE_VERSION));
32
33 return resp;
34 }
35
36 static int proc_pid;
37 static uint64_t proc_start_time;
38
clock_id_string(uint32_t root_number,uint32_t ticks,char * buf,size_t bufsize)39 bool clock_id_string(uint32_t root_number, uint32_t ticks, char *buf,
40 size_t bufsize)
41 {
42 int res = snprintf(buf, bufsize, "c:%" PRIu64 ":%d:%u:%" PRIu32,
43 proc_start_time, proc_pid, root_number, ticks);
44
45 if (res == -1) {
46 return false;
47 }
48 return (size_t)res < bufsize;
49 }
50
51 // Renders the current clock id string to the supplied buffer.
52 // Must be called with the root locked.
current_clock_id_string(w_root_t * root,char * buf,size_t bufsize)53 static bool current_clock_id_string(w_root_t *root,
54 char *buf, size_t bufsize)
55 {
56 return clock_id_string(root->number, root->ticks, buf, bufsize);
57 }
58
59 /* Add the current clock value to the response.
60 * must be called with the root locked */
annotate_with_clock(w_root_t * root,json_t * resp)61 void annotate_with_clock(w_root_t *root, json_t *resp)
62 {
63 char buf[128];
64
65 if (current_clock_id_string(root, buf, sizeof(buf))) {
66 set_prop(resp, "clock", json_string_nocheck(buf));
67 }
68 }
69
70 /* must be called with the w_client_lock held */
enqueue_response(struct watchman_client * client,json_t * json,bool ping)71 bool enqueue_response(struct watchman_client *client,
72 json_t *json, bool ping)
73 {
74 struct watchman_client_response *resp;
75
76 resp = calloc(1, sizeof(*resp));
77 if (!resp) {
78 return false;
79 }
80 resp->json = json;
81
82 if (client->tail) {
83 client->tail->next = resp;
84 } else {
85 client->head = resp;
86 }
87 client->tail = resp;
88
89 if (ping) {
90 w_event_set(client->ping);
91 }
92
93 return true;
94 }
95
send_and_dispose_response(struct watchman_client * client,json_t * response)96 void send_and_dispose_response(struct watchman_client *client,
97 json_t *response)
98 {
99 pthread_mutex_lock(&w_client_lock);
100 if (!enqueue_response(client, response, false)) {
101 json_decref(response);
102 }
103 pthread_mutex_unlock(&w_client_lock);
104 }
105
send_error_response(struct watchman_client * client,const char * fmt,...)106 void send_error_response(struct watchman_client *client,
107 const char *fmt, ...)
108 {
109 char buf[WATCHMAN_NAME_MAX];
110 va_list ap;
111 json_t *resp = make_response();
112
113 va_start(ap, fmt);
114 vsnprintf(buf, sizeof(buf), fmt, ap);
115 va_end(ap);
116
117 w_log(W_LOG_ERR, "send_error_response: %s\n", buf);
118 set_prop(resp, "error", json_string_nocheck(buf));
119
120 send_and_dispose_response(client, resp);
121 }
122
client_delete(struct watchman_client * client)123 static void client_delete(struct watchman_client *client)
124 {
125 struct watchman_client_response *resp;
126
127 w_log(W_LOG_DBG, "client_delete %p\n", client);
128
129 /* cancel subscriptions */
130 w_ht_free(client->subscriptions);
131
132 while (client->head) {
133 resp = client->head;
134 client->head = resp->next;
135 json_decref(resp->json);
136 free(resp);
137 }
138
139 w_json_buffer_free(&client->reader);
140 w_json_buffer_free(&client->writer);
141 w_event_destroy(client->ping);
142 w_stm_shutdown(client->stm);
143 w_stm_close(client->stm);
144 free(client);
145 }
146
delete_subscription(w_ht_val_t val)147 static void delete_subscription(w_ht_val_t val)
148 {
149 struct watchman_client_subscription *sub = w_ht_val_ptr(val);
150
151 w_string_delref(sub->name);
152 w_query_delref(sub->query);
153 if (sub->drop_or_defer) {
154 w_ht_free(sub->drop_or_defer);
155 }
156 free(sub);
157 }
158
159 static const struct watchman_hash_funcs subscription_hash_funcs = {
160 w_ht_string_copy,
161 w_ht_string_del,
162 w_ht_string_equal,
163 w_ht_string_hash,
164 NULL,
165 delete_subscription
166 };
167
w_clockspec_new_clock(uint32_t root_number,uint32_t ticks)168 struct w_clockspec *w_clockspec_new_clock(uint32_t root_number, uint32_t ticks)
169 {
170 struct w_clockspec *spec;
171 spec = calloc(1, sizeof(*spec));
172 if (!spec) {
173 return NULL;
174 }
175 spec->tag = w_cs_clock;
176 spec->clock.start_time = proc_start_time;
177 spec->clock.pid = proc_pid;
178 spec->clock.root_number = root_number;
179 spec->clock.ticks = ticks;
180 return spec;
181 }
182
w_clockspec_parse(json_t * value)183 struct w_clockspec *w_clockspec_parse(json_t *value)
184 {
185 const char *str;
186 uint64_t start_time;
187 int pid;
188 uint32_t root_number;
189 uint32_t ticks;
190
191 struct w_clockspec *spec;
192
193 spec = calloc(1, sizeof(*spec));
194 if (!spec) {
195 return NULL;
196 }
197
198 if (json_is_integer(value)) {
199 spec->tag = w_cs_timestamp;
200 spec->timestamp.tv_usec = 0;
201 spec->timestamp.tv_sec = (time_t)json_integer_value(value);
202 return spec;
203 }
204
205 str = json_string_value(value);
206 if (!str) {
207 free(spec);
208 return NULL;
209 }
210
211 if (str[0] == 'n' && str[1] == ':') {
212 spec->tag = w_cs_named_cursor;
213 // spec owns the ref to the string
214 spec->named_cursor.cursor = w_string_new(str);
215 return spec;
216 }
217
218 if (sscanf(str, "c:%" PRIu64 ":%d:%" PRIu32 ":%" PRIu32,
219 &start_time, &pid, &root_number, &ticks) == 4) {
220 spec->tag = w_cs_clock;
221 spec->clock.start_time = start_time;
222 spec->clock.pid = pid;
223 spec->clock.root_number = root_number;
224 spec->clock.ticks = ticks;
225 return spec;
226 }
227
228 if (sscanf(str, "c:%d:%" PRIu32, &pid, &ticks) == 2) {
229 // old-style clock value (<= 2.8.2) -- by setting clock time and root number
230 // to 0 we guarantee that this is treated as a fresh instance
231 spec->tag = w_cs_clock;
232 spec->clock.start_time = 0;
233 spec->clock.pid = pid;
234 spec->clock.root_number = root_number;
235 spec->clock.ticks = ticks;
236 return spec;
237 }
238
239 free(spec);
240 return NULL;
241 }
242
243 // must be called with the root locked
244 // spec can be null, in which case a fresh instance is assumed
w_clockspec_eval(w_root_t * root,const struct w_clockspec * spec,struct w_query_since * since)245 void w_clockspec_eval(w_root_t *root,
246 const struct w_clockspec *spec,
247 struct w_query_since *since)
248 {
249 if (spec == NULL) {
250 since->is_timestamp = false;
251 since->clock.is_fresh_instance = true;
252 since->clock.ticks = 0;
253 return;
254 }
255
256 if (spec->tag == w_cs_timestamp) {
257 // just copy the values over
258 since->is_timestamp = true;
259 since->timestamp = spec->timestamp;
260 return;
261 }
262
263 since->is_timestamp = false;
264
265 if (spec->tag == w_cs_named_cursor) {
266 w_ht_val_t ticks_val;
267 w_string_t *cursor = spec->named_cursor.cursor;
268 since->clock.is_fresh_instance = !w_ht_lookup(root->cursors,
269 w_ht_ptr_val(cursor),
270 &ticks_val, false);
271 if (!since->clock.is_fresh_instance) {
272 since->clock.is_fresh_instance = ticks_val < root->last_age_out_tick;
273 }
274 if (since->clock.is_fresh_instance) {
275 since->clock.ticks = 0;
276 } else {
277 since->clock.ticks = (uint32_t)ticks_val;
278 }
279
280 // Bump the tick value and record it against the cursor.
281 // We need to bump the tick value so that repeated queries
282 // when nothing has changed in the filesystem won't continue
283 // to return the same set of files; we only want the first
284 // of these to return the files and the rest to return nothing
285 // until something subsequently changes
286 w_ht_replace(root->cursors, w_ht_ptr_val(cursor), ++root->ticks);
287
288 w_log(W_LOG_DBG, "resolved cursor %.*s -> %" PRIu32 "\n",
289 cursor->len, cursor->buf, since->clock.ticks);
290 return;
291 }
292
293 // spec->tag == w_cs_clock
294 if (spec->clock.start_time == proc_start_time &&
295 spec->clock.pid == proc_pid &&
296 spec->clock.root_number == root->number) {
297
298 since->clock.is_fresh_instance =
299 spec->clock.ticks < root->last_age_out_tick;
300 if (since->clock.is_fresh_instance) {
301 since->clock.ticks = 0;
302 } else {
303 since->clock.ticks = spec->clock.ticks;
304 }
305 if (spec->clock.ticks == root->ticks) {
306 /* Force ticks to increment. This avoids returning and querying the
307 * same tick value over and over when no files have changed in the
308 * meantime */
309 root->ticks++;
310 }
311 return;
312 }
313
314 // If the pid, start time or root number don't match, they asked a different
315 // incarnation of the server or a different instance of this root, so we treat
316 // them as having never spoken to us before
317 since->clock.is_fresh_instance = true;
318 since->clock.ticks = 0;
319 }
320
w_clockspec_free(struct w_clockspec * spec)321 void w_clockspec_free(struct w_clockspec *spec)
322 {
323 if (spec->tag == w_cs_named_cursor) {
324 w_string_delref(spec->named_cursor.cursor);
325 }
326 free(spec);
327 }
328
add_root_warnings_to_response(json_t * response,w_root_t * root)329 void add_root_warnings_to_response(json_t *response, w_root_t *root) {
330 char *str = NULL;
331 char *full = NULL;
332
333 if (!root->last_recrawl_reason && !root->warning) {
334 return;
335 }
336
337 if (root->last_recrawl_reason) {
338 ignore_result(asprintf(&str,
339 "Recrawled this watch %d times, most recently because:\n"
340 "%.*s\n"
341 "To resolve, please review the information on\n"
342 "%s#recrawl",
343 root->recrawl_count,
344 root->last_recrawl_reason->len,
345 root->last_recrawl_reason->buf,
346 cfg_get_trouble_url()));
347 }
348
349 ignore_result(asprintf(&full,
350 "%.*s%s" // root->warning
351 "%s\n" // str (last recrawl reason)
352 "To clear this warning, run:\n"
353 "`watchman watch-del %.*s ; watchman watch-project %.*s`\n",
354 root->warning ? root->warning->len : 0,
355 root->warning ? root->warning->buf : "",
356 root->warning && str ? "\n" : "", // newline if we have both strings
357 str ? str : "",
358 root->root_path->len,
359 root->root_path->buf,
360 root->root_path->len,
361 root->root_path->buf));
362
363 if (full) {
364 set_prop(response, "warning", json_string_nocheck(full));
365 }
366 free(str);
367 free(full);
368 }
369
resolve_root_or_err(struct watchman_client * client,json_t * args,int root_index,bool create)370 w_root_t *resolve_root_or_err(
371 struct watchman_client *client,
372 json_t *args,
373 int root_index,
374 bool create)
375 {
376 w_root_t *root;
377 const char *root_name;
378 char *errmsg = NULL;
379 json_t *ele;
380
381 ele = json_array_get(args, root_index);
382 if (!ele) {
383 send_error_response(client, "wrong number of arguments");
384 return NULL;
385 }
386
387 root_name = json_string_value(ele);
388 if (!root_name) {
389 send_error_response(client,
390 "invalid value for argument %d, expected "
391 "a string naming the root dir",
392 root_index);
393 return NULL;
394 }
395
396 if (client->client_mode) {
397 root = w_root_resolve_for_client_mode(root_name, &errmsg);
398 } else {
399 root = w_root_resolve(root_name, create, &errmsg);
400 }
401
402 if (!root) {
403 send_error_response(client,
404 "unable to resolve root %s: %s",
405 root_name, errmsg);
406 free(errmsg);
407 }
408 return root;
409 }
410
w_request_shutdown(void)411 void w_request_shutdown(void) {
412 stopping = true;
413 // Knock listener thread out of poll/accept
414 #ifndef _WIN32
415 pthread_kill(listener_thread, SIGUSR1);
416 pthread_kill(reaper_thread, SIGUSR1);
417 #else
418 SetEvent(listener_thread_event);
419 #endif
420 }
421
cmd_shutdown(struct watchman_client * client,json_t * args)422 static void cmd_shutdown(
423 struct watchman_client *client,
424 json_t *args)
425 {
426 json_t *resp = make_response();
427 unused_parameter(args);
428
429 w_log(W_LOG_ERR, "shutdown-server was requested, exiting!\n");
430 stopping = true;
431
432 w_request_shutdown();
433
434 set_prop(resp, "shutdown-server", json_true());
435 send_and_dispose_response(client, resp);
436 }
437 W_CMD_REG("shutdown-server", cmd_shutdown, CMD_DAEMON|CMD_POISON_IMMUNE, NULL)
438
439 // The client thread reads and decodes json packets,
440 // then dispatches the commands that it finds
client_thread(void * ptr)441 static void *client_thread(void *ptr)
442 {
443 struct watchman_client *client = ptr;
444 struct watchman_event_poll pfd[2];
445 struct watchman_client_response *queued_responses_to_send;
446 json_t *request;
447 json_error_t jerr;
448 bool send_ok = true;
449
450 w_stm_set_nonblock(client->stm, true);
451 w_set_thread_name("client:stm=%p", client->stm);
452
453 w_stm_get_events(client->stm, &pfd[0].evt);
454 pfd[1].evt = client->ping;
455
456 while (!stopping) {
457 // Wait for input from either the client socket or
458 // via the ping pipe, which signals that some other
459 // thread wants to unilaterally send data to the client
460
461 ignore_result(w_poll_events(pfd, 2, 2000));
462
463 if (stopping) {
464 break;
465 }
466
467 if (pfd[0].ready) {
468 request = w_json_buffer_next(&client->reader, client->stm, &jerr);
469
470 if (!request && errno == EAGAIN) {
471 // That's fine
472 } else if (!request) {
473 // Not so cool
474 if (client->reader.wpos == client->reader.rpos) {
475 // If they disconnected in between PDUs, no need to log
476 // any error
477 goto disconected;
478 }
479 send_error_response(client, "invalid json at position %d: %s",
480 jerr.position, jerr.text);
481 w_log(W_LOG_ERR, "invalid data from client: %s\n", jerr.text);
482
483 goto disconected;
484 } else if (request) {
485 client->pdu_type = client->reader.pdu_type;
486 dispatch_command(client, request, CMD_DAEMON);
487 json_decref(request);
488 }
489 }
490
491 if (pfd[1].ready) {
492 w_event_test_and_clear(client->ping);
493 }
494
495 /* de-queue the pending responses under the lock */
496 pthread_mutex_lock(&w_client_lock);
497 queued_responses_to_send = client->head;
498 client->head = NULL;
499 client->tail = NULL;
500 pthread_mutex_unlock(&w_client_lock);
501
502 /* now send our response(s) */
503 while (queued_responses_to_send) {
504 struct watchman_client_response *response_to_send =
505 queued_responses_to_send;
506
507 if (send_ok) {
508 w_stm_set_nonblock(client->stm, false);
509 /* Return the data in the same format that was used to ask for it.
510 * Don't bother sending any more messages if the client disconnects,
511 * but still free their memory.
512 */
513 send_ok = w_ser_write_pdu(client->pdu_type, &client->writer,
514 client->stm, response_to_send->json);
515 w_stm_set_nonblock(client->stm, true);
516 }
517
518 queued_responses_to_send = response_to_send->next;
519
520 json_decref(response_to_send->json);
521 free(response_to_send);
522 }
523 }
524
525 disconected:
526 // Remove the client from the map before we tear it down, as this makes
527 // it easier to flush out pending writes on windows without worrying
528 // about w_log_to_clients contending for the write buffers
529 pthread_mutex_lock(&w_client_lock);
530 w_ht_del(clients, w_ht_ptr_val(client));
531 pthread_mutex_unlock(&w_client_lock);
532
533 w_client_vacate_states(client);
534 client_delete(client);
535
536 return NULL;
537 }
538
w_should_log_to_clients(int level)539 bool w_should_log_to_clients(int level)
540 {
541 w_ht_iter_t iter;
542 bool result = false;
543
544 pthread_mutex_lock(&w_client_lock);
545
546 if (!clients) {
547 pthread_mutex_unlock(&w_client_lock);
548 return false;
549 }
550
551 if (w_ht_first(clients, &iter)) do {
552 struct watchman_client *client = w_ht_val_ptr(iter.value);
553
554 if (client->log_level != W_LOG_OFF && client->log_level >= level) {
555 result = true;
556 break;
557 }
558
559 } while (w_ht_next(clients, &iter));
560 pthread_mutex_unlock(&w_client_lock);
561
562 return result;
563 }
564
w_log_to_clients(int level,const char * buf)565 void w_log_to_clients(int level, const char *buf)
566 {
567 json_t *json = NULL;
568 w_ht_iter_t iter;
569
570 if (!clients) {
571 return;
572 }
573
574 pthread_mutex_lock(&w_client_lock);
575 if (w_ht_first(clients, &iter)) do {
576 struct watchman_client *client = w_ht_val_ptr(iter.value);
577
578 if (client->log_level != W_LOG_OFF && client->log_level >= level) {
579 json = make_response();
580 if (json) {
581 set_prop(json, "log", json_string_nocheck(buf));
582 if (!enqueue_response(client, json, true)) {
583 json_decref(json);
584 }
585 }
586 }
587
588 } while (w_ht_next(clients, &iter));
589 pthread_mutex_unlock(&w_client_lock);
590 }
591
child_reaper(void * arg)592 static void *child_reaper(void *arg)
593 {
594 #ifndef _WIN32
595 sigset_t sigset;
596
597 // By default, keep both SIGCHLD and SIGUSR1 blocked
598 sigemptyset(&sigset);
599 sigaddset(&sigset, SIGUSR1);
600 sigaddset(&sigset, SIGCHLD);
601 pthread_sigmask(SIG_BLOCK, &sigset, NULL);
602
603 // SIGCHLD is ordinarily blocked, so we listen for it only in
604 // sigsuspend, when we're also listening for the SIGUSR1 that tells
605 // us to exit.
606 pthread_sigmask(SIG_BLOCK, NULL, &sigset);
607 sigdelset(&sigset, SIGCHLD);
608 sigdelset(&sigset, SIGUSR1);
609
610 #endif
611 unused_parameter(arg);
612 w_set_thread_name("child_reaper");
613
614 #ifdef _WIN32
615 while (!stopping) {
616 usleep(200000);
617 w_reap_children(true);
618 }
619 #else
620 while (!stopping) {
621 w_reap_children(false);
622 sigsuspend(&sigset);
623 }
624 #endif
625
626 return 0;
627 }
628
629 // This is just a placeholder.
630 // This catches SIGUSR1 so we don't terminate.
631 // We use this to interrupt blocking syscalls
632 // on the worker threads
wakeme(int signo)633 static void wakeme(int signo)
634 {
635 unused_parameter(signo);
636 }
637
638 #if defined(HAVE_KQUEUE) || defined(HAVE_FSEVENTS)
639 #ifdef __OpenBSD__
640 #include <sys/siginfo.h>
641 #endif
642 #include <sys/param.h>
643 #include <sys/sysctl.h>
644 #include <sys/time.h>
645 #include <sys/resource.h>
646 #endif
647
648 #ifdef __APPLE__
649 /* When running under launchd, we prefer to obtain our listening
650 * socket from it. We don't strictly need to run this way, but if we didn't,
651 * when the user runs `watchman shutdown-server` the launchd job is left in
652 * a waiting state and needs to be explicitly triggered to get it working
653 * again.
654 * By having the socket registered in our job description, launchd knows
655 * that we want to be activated in this way and takes care of it for us.
656 *
657 * This is made more fun because Yosemite introduces launch_activate_socket()
658 * as a shortcut for this flow and deprecated pretty much everything else
659 * in launch.h. We use the deprecated functions so that we can run on
660 * older releases.
661 * */
662 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
get_listener_socket_from_launchd(void)663 static int get_listener_socket_from_launchd(void)
664 {
665 launch_data_t req, resp, socks;
666
667 req = launch_data_new_string(LAUNCH_KEY_CHECKIN);
668 if (req == NULL) {
669 w_log(W_LOG_ERR, "unable to create LAUNCH_KEY_CHECKIN\n");
670 return -1;
671 }
672
673 resp = launch_msg(req);
674 launch_data_free(req);
675
676 if (resp == NULL) {
677 w_log(W_LOG_ERR, "launchd checkin failed %s\n", strerror(errno));
678 return -1;
679 }
680
681 if (launch_data_get_type(resp) == LAUNCH_DATA_ERRNO) {
682 w_log(W_LOG_ERR, "launchd checkin failed: %s\n",
683 strerror(launch_data_get_errno(resp)));
684 launch_data_free(resp);
685 return -1;
686 }
687
688 socks = launch_data_dict_lookup(resp, LAUNCH_JOBKEY_SOCKETS);
689 if (socks == NULL) {
690 w_log(W_LOG_ERR, "launchd didn't provide any sockets\n");
691 launch_data_free(resp);
692 return -1;
693 }
694
695 // the "sock" name here is coupled with the plist in main.c
696 socks = launch_data_dict_lookup(socks, "sock");
697 if (socks == NULL) {
698 w_log(W_LOG_ERR, "launchd: \"sock\" wasn't present in Sockets\n");
699 launch_data_free(resp);
700 return -1;
701 }
702
703 return launch_data_get_fd(launch_data_array_get_index(socks, 0));
704 }
705 #endif
706
707 #ifndef _WIN32
get_listener_socket(const char * path)708 static int get_listener_socket(const char *path)
709 {
710 struct sockaddr_un un;
711
712 #ifdef __APPLE__
713 listener_fd = get_listener_socket_from_launchd();
714 if (listener_fd != -1) {
715 w_log(W_LOG_ERR, "Using socket from launchd as listening socket\n");
716 return listener_fd;
717 }
718 #endif
719
720 if (strlen(path) >= sizeof(un.sun_path) - 1) {
721 w_log(W_LOG_ERR, "%s: path is too long\n",
722 path);
723 return -1;
724 }
725
726 listener_fd = socket(PF_LOCAL, SOCK_STREAM, 0);
727 if (listener_fd == -1) {
728 w_log(W_LOG_ERR, "socket: %s\n",
729 strerror(errno));
730 return -1;
731 }
732
733 un.sun_family = PF_LOCAL;
734 strcpy(un.sun_path, path);
735 unlink(path);
736
737 if (bind(listener_fd, (struct sockaddr*)&un, sizeof(un)) != 0) {
738 w_log(W_LOG_ERR, "bind(%s): %s\n",
739 path, strerror(errno));
740 close(listener_fd);
741 return -1;
742 }
743
744 if (listen(listener_fd, 200) != 0) {
745 w_log(W_LOG_ERR, "listen(%s): %s\n",
746 path, strerror(errno));
747 close(listener_fd);
748 return -1;
749 }
750
751 return listener_fd;
752 }
753 #endif
754
make_new_client(w_stm_t stm)755 static struct watchman_client *make_new_client(w_stm_t stm) {
756 struct watchman_client *client;
757 pthread_attr_t attr;
758 pthread_t thr;
759
760 pthread_attr_init(&attr);
761 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
762
763 client = calloc(1, sizeof(*client));
764 if (!client) {
765 pthread_attr_destroy(&attr);
766 return NULL;
767 }
768 client->stm = stm;
769 w_log(W_LOG_DBG, "accepted client:stm=%p\n", client->stm);
770
771 if (!w_json_buffer_init(&client->reader)) {
772 // FIXME: error handling
773 }
774 if (!w_json_buffer_init(&client->writer)) {
775 // FIXME: error handling
776 }
777 client->ping = w_event_make();
778 if (!client->ping) {
779 // FIXME: error handling
780 }
781 client->subscriptions = w_ht_new(2, &subscription_hash_funcs);
782
783 pthread_mutex_lock(&w_client_lock);
784 w_ht_set(clients, w_ht_ptr_val(client), w_ht_ptr_val(client));
785 pthread_mutex_unlock(&w_client_lock);
786
787 // Start a thread for the client.
788 // We used to use libevent for this, but we have
789 // a low volume of concurrent clients and the json
790 // parse/encode APIs are not easily used in a non-blocking
791 // server architecture.
792 if (pthread_create(&thr, &attr, client_thread, client)) {
793 // It didn't work out, sorry!
794 pthread_mutex_lock(&w_client_lock);
795 w_ht_del(clients, w_ht_ptr_val(client));
796 pthread_mutex_unlock(&w_client_lock);
797 client_delete(client);
798 }
799
800 pthread_attr_destroy(&attr);
801
802 return client;
803 }
804
805 #ifdef _WIN32
named_pipe_accept_loop(const char * path)806 static void named_pipe_accept_loop(const char *path) {
807 HANDLE handles[2];
808 OVERLAPPED olap;
809 HANDLE connected_event = CreateEvent(NULL, FALSE, TRUE, NULL);
810
811 if (!connected_event) {
812 w_log(W_LOG_ERR, "named_pipe_accept_loop: CreateEvent failed: %s\n",
813 win32_strerror(GetLastError()));
814 return;
815 }
816
817 listener_thread_event = CreateEvent(NULL, FALSE, TRUE, NULL);
818
819 handles[0] = connected_event;
820 handles[1] = listener_thread_event;
821 memset(&olap, 0, sizeof(olap));
822 olap.hEvent = connected_event;
823
824 w_log(W_LOG_ERR, "waiting for pipe clients on %s\n", path);
825 while (!stopping) {
826 w_stm_t stm;
827 HANDLE client_fd;
828 DWORD res;
829
830 client_fd = CreateNamedPipe(
831 path,
832 PIPE_ACCESS_DUPLEX|FILE_FLAG_OVERLAPPED,
833 PIPE_TYPE_BYTE|PIPE_READMODE_BYTE|
834 PIPE_REJECT_REMOTE_CLIENTS,
835 PIPE_UNLIMITED_INSTANCES,
836 WATCHMAN_IO_BUF_SIZE,
837 512, 0, NULL);
838
839 if (client_fd == INVALID_HANDLE_VALUE) {
840 w_log(W_LOG_ERR, "CreateNamedPipe(%s) failed: %s\n",
841 path, win32_strerror(GetLastError()));
842 continue;
843 }
844
845 ResetEvent(connected_event);
846 if (!ConnectNamedPipe(client_fd, &olap)) {
847 res = GetLastError();
848
849 if (res == ERROR_PIPE_CONNECTED) {
850 goto good_client;
851 }
852
853 if (res != ERROR_IO_PENDING) {
854 w_log(W_LOG_ERR, "ConnectNamedPipe: %s\n",
855 win32_strerror(GetLastError()));
856 CloseHandle(client_fd);
857 continue;
858 }
859
860 res = WaitForMultipleObjectsEx(2, handles, false, INFINITE, true);
861 if (res == WAIT_OBJECT_0 + 1) {
862 // Signalled to stop
863 CancelIoEx(client_fd, &olap);
864 CloseHandle(client_fd);
865 continue;
866 }
867
868 if (res == WAIT_OBJECT_0) {
869 goto good_client;
870 }
871
872 w_log(W_LOG_ERR, "WaitForMultipleObjectsEx: ConnectNamedPipe: "
873 "unexpected status %u\n", res);
874 CancelIoEx(client_fd, &olap);
875 CloseHandle(client_fd);
876 } else {
877 good_client:
878 stm = w_stm_handleopen(client_fd);
879 if (!stm) {
880 w_log(W_LOG_ERR, "Failed to allocate stm for pipe handle: %s\n",
881 strerror(errno));
882 CloseHandle(client_fd);
883 continue;
884 }
885
886 make_new_client(stm);
887 }
888 }
889 }
890 #endif
891
892 #ifndef _WIN32
accept_loop()893 static void accept_loop() {
894 while (!stopping) {
895 int client_fd;
896 struct pollfd pfd;
897 int bufsize;
898 w_stm_t stm;
899
900 #ifdef HAVE_LIBGIMLI_H
901 if (hb) {
902 gimli_heartbeat_set(hb, GIMLI_HB_RUNNING);
903 }
904 #endif
905
906 pfd.events = POLLIN;
907 pfd.fd = listener_fd;
908 if (poll(&pfd, 1, 60000) < 1 || (pfd.revents & POLLIN) == 0) {
909 if (stopping) {
910 break;
911 }
912 // Timed out, or error.
913 // Arrange to sanity check that we're working
914 w_check_my_sock();
915 continue;
916 }
917
918 #ifdef HAVE_ACCEPT4
919 client_fd = accept4(listener_fd, NULL, 0, SOCK_CLOEXEC);
920 #else
921 client_fd = accept(listener_fd, NULL, 0);
922 #endif
923 if (client_fd == -1) {
924 continue;
925 }
926 w_set_cloexec(client_fd);
927 bufsize = WATCHMAN_IO_BUF_SIZE;
928 setsockopt(client_fd, SOL_SOCKET, SO_SNDBUF,
929 (void*)&bufsize, sizeof(bufsize));
930
931 stm = w_stm_fdopen(client_fd);
932 if (!stm) {
933 w_log(W_LOG_ERR, "Failed to allocate stm for fd: %s\n",
934 strerror(errno));
935 close(client_fd);
936 continue;
937 }
938 make_new_client(stm);
939 }
940 }
941 #endif
942
w_start_listener(const char * path)943 bool w_start_listener(const char *path)
944 {
945 #ifndef _WIN32
946 struct sigaction sa;
947 sigset_t sigset;
948 #endif
949 void *ignored;
950 struct timeval tv;
951
952 listener_thread = pthread_self();
953
954 #ifdef HAVE_LIBGIMLI_H
955 hb = gimli_heartbeat_attach();
956 #endif
957
958 #if defined(HAVE_KQUEUE) || defined(HAVE_FSEVENTS)
959 {
960 struct rlimit limit;
961 # ifndef __OpenBSD__
962 int mib[2] = { CTL_KERN,
963 # ifdef KERN_MAXFILESPERPROC
964 KERN_MAXFILESPERPROC
965 # else
966 KERN_MAXFILES
967 # endif
968 };
969 # endif
970 int maxperproc;
971
972 getrlimit(RLIMIT_NOFILE, &limit);
973
974 # ifndef __OpenBSD__
975 {
976 size_t len;
977
978 len = sizeof(maxperproc);
979 sysctl(mib, 2, &maxperproc, &len, NULL, 0);
980 w_log(W_LOG_ERR, "file limit is %" PRIu64
981 " kern.maxfilesperproc=%i\n",
982 limit.rlim_cur, maxperproc);
983 }
984 # else
985 maxperproc = limit.rlim_max;
986 w_log(W_LOG_ERR, "openfiles-cur is %" PRIu64
987 " openfiles-max=%i\n",
988 limit.rlim_cur, maxperproc);
989 # endif
990
991 if (limit.rlim_cur != RLIM_INFINITY &&
992 maxperproc > 0 &&
993 limit.rlim_cur < (rlim_t)maxperproc) {
994 limit.rlim_cur = maxperproc;
995
996 if (setrlimit(RLIMIT_NOFILE, &limit)) {
997 w_log(W_LOG_ERR,
998 "failed to raise limit to %" PRIu64 " (%s).\n",
999 limit.rlim_cur,
1000 strerror(errno));
1001 } else {
1002 w_log(W_LOG_ERR,
1003 "raised file limit to %" PRIu64 "\n",
1004 limit.rlim_cur);
1005 }
1006 }
1007
1008 getrlimit(RLIMIT_NOFILE, &limit);
1009 #ifndef HAVE_FSEVENTS
1010 if (limit.rlim_cur < 10240) {
1011 w_log(W_LOG_ERR,
1012 "Your file descriptor limit is very low (%" PRIu64 "), "
1013 "please consult the watchman docs on raising the limits\n",
1014 limit.rlim_cur);
1015 }
1016 #endif
1017 }
1018 #endif
1019
1020 proc_pid = (int)getpid();
1021 if (gettimeofday(&tv, NULL) == -1) {
1022 w_log(W_LOG_ERR, "gettimeofday failed: %s\n", strerror(errno));
1023 return false;
1024 }
1025 proc_start_time = (uint64_t)tv.tv_sec;
1026
1027 #ifndef _WIN32
1028 signal(SIGPIPE, SIG_IGN);
1029
1030 /* allow SIGUSR1 and SIGCHLD to wake up a blocked thread, without restarting
1031 * syscalls */
1032 memset(&sa, 0, sizeof(sa));
1033 sa.sa_handler = wakeme;
1034 sa.sa_flags = 0;
1035 sigaction(SIGUSR1, &sa, NULL);
1036 sigaction(SIGCHLD, &sa, NULL);
1037
1038 // Block SIGCHLD everywhere
1039 sigemptyset(&sigset);
1040 sigaddset(&sigset, SIGCHLD);
1041 sigprocmask(SIG_BLOCK, &sigset, NULL);
1042
1043 listener_fd = get_listener_socket(path);
1044 if (listener_fd == -1) {
1045 return false;
1046 }
1047 w_set_cloexec(listener_fd);
1048 #endif
1049
1050 if (pthread_create(&reaper_thread, NULL, child_reaper, NULL)) {
1051 w_log(W_LOG_FATAL, "pthread_create(reaper): %s\n",
1052 strerror(errno));
1053 return false;
1054 }
1055
1056 if (!clients) {
1057 clients = w_ht_new(2, NULL);
1058 }
1059
1060 w_state_load();
1061
1062 #ifdef HAVE_LIBGIMLI_H
1063 if (hb) {
1064 gimli_heartbeat_set(hb, GIMLI_HB_RUNNING);
1065 } else {
1066 w_setup_signal_handlers();
1067 }
1068 #else
1069 w_setup_signal_handlers();
1070 #endif
1071 w_set_nonblock(listener_fd);
1072
1073 // Now run the dispatch
1074 #ifndef _WIN32
1075 accept_loop();
1076 #else
1077 named_pipe_accept_loop(path);
1078 #endif
1079
1080 #ifndef _WIN32
1081 /* close out some resources to persuade valgrind to run clean */
1082 close(listener_fd);
1083 listener_fd = -1;
1084 #endif
1085
1086 // Wait for clients, waking any sleeping clients up in the process
1087 {
1088 int interval = 2000;
1089 int last_count = 0, n_clients = 0;
1090
1091 do {
1092 w_ht_iter_t iter;
1093
1094 pthread_mutex_lock(&w_client_lock);
1095 n_clients = w_ht_size(clients);
1096
1097 if (w_ht_first(clients, &iter)) do {
1098 struct watchman_client *client = w_ht_val_ptr(iter.value);
1099 w_event_set(client->ping);
1100 } while (w_ht_next(clients, &iter));
1101
1102 pthread_mutex_unlock(&w_client_lock);
1103
1104 if (n_clients != last_count) {
1105 w_log(W_LOG_ERR, "waiting for %d clients to terminate\n", n_clients);
1106 }
1107 usleep(interval);
1108 interval = MIN(interval * 2, 1000000);
1109 } while (n_clients > 0);
1110 }
1111
1112 w_root_free_watched_roots();
1113
1114 pthread_join(reaper_thread, &ignored);
1115 cfg_shutdown();
1116
1117 return true;
1118 }
1119
1120 /* vim:ts=2:sw=2:et:
1121 */
1122