1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_POSIX_SOCKET_EV_POLL
24 
25 #include "src/core/lib/iomgr/ev_poll_posix.h"
26 
27 #include <assert.h>
28 #include <errno.h>
29 #include <limits.h>
30 #include <poll.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34 
35 #include <string>
36 
37 #include "absl/strings/str_cat.h"
38 
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41 
42 #include "src/core/lib/debug/stats.h"
43 #include "src/core/lib/gpr/murmur_hash.h"
44 #include "src/core/lib/gpr/tls.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/thd.h"
47 #include "src/core/lib/iomgr/block_annotate.h"
48 #include "src/core/lib/iomgr/iomgr_internal.h"
49 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
50 #include "src/core/lib/profiling/timers.h"
51 
52 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
53 
54 /*******************************************************************************
55  * FD declarations
56  */
57 typedef struct grpc_fd_watcher {
58   struct grpc_fd_watcher* next;
59   struct grpc_fd_watcher* prev;
60   grpc_pollset* pollset;
61   grpc_pollset_worker* worker;
62   grpc_fd* fd;
63 } grpc_fd_watcher;
64 
65 typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
66 
67 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
68 struct grpc_fork_fd_list {
69   /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
70   set to nullptr. */
71   grpc_fd* fd;
72   grpc_cached_wakeup_fd* cached_wakeup_fd;
73 
74   grpc_fork_fd_list* next;
75   grpc_fork_fd_list* prev;
76 };
77 
78 struct grpc_fd {
79   int fd;
80   /* refst format:
81      bit0:   1=active/0=orphaned
82      bit1-n: refcount
83      meaning that mostly we ref by two to avoid altering the orphaned bit,
84      and just unref by 1 when we're ready to flag the object as orphaned */
85   gpr_atm refst;
86 
87   gpr_mu mu;
88   int shutdown;
89   int closed;
90   int released;
91   gpr_atm pollhup;
92   grpc_error* shutdown_error;
93 
94   /* The watcher list.
95 
96      The following watcher related fields are protected by watcher_mu.
97 
98      An fd_watcher is an ephemeral object created when an fd wants to
99      begin polling, and destroyed after the poll.
100 
101      It denotes the fd's interest in whether to read poll or write poll
102      or both or neither on this fd.
103 
104      If a watcher is asked to poll for reads or writes, the read_watcher
105      or write_watcher fields are set respectively. A watcher may be asked
106      to poll for both, in which case both fields will be set.
107 
108      read_watcher and write_watcher may be NULL if no watcher has been
109      asked to poll for reads or writes.
110 
111      If an fd_watcher is not asked to poll for reads or writes, it's added
112      to a linked list of inactive watchers, rooted at inactive_watcher_root.
113      If at a later time there becomes need of a poller to poll, one of
114      the inactive pollers may be kicked out of their poll loops to take
115      that responsibility. */
116   grpc_fd_watcher inactive_watcher_root;
117   grpc_fd_watcher* read_watcher;
118   grpc_fd_watcher* write_watcher;
119 
120   grpc_closure* read_closure;
121   grpc_closure* write_closure;
122 
123   grpc_closure* on_done_closure;
124 
125   grpc_iomgr_object iomgr_object;
126 
127   /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
128   grpc_fork_fd_list* fork_fd_list;
129 };
130 
131 /* True when GRPC_ENABLE_FORK_SUPPORT=1. */
132 static bool track_fds_for_fork = false;
133 
134 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
135 static grpc_fork_fd_list* fork_fd_list_head = nullptr;
136 static gpr_mu fork_fd_list_mu;
137 
138 /* Begin polling on an fd.
139    Registers that the given pollset is interested in this fd - so that if read
140    or writability interest changes, the pollset can be kicked to pick up that
141    new interest.
142    Return value is:
143      (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
144    i.e. a combination of read_mask and write_mask determined by the fd's current
145    interest in said events.
146    Polling strategies that do not need to alter their behavior depending on the
147    fd's current interest (such as epoll) do not need to call this function.
148    MUST NOT be called with a pollset lock taken */
149 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
150                               grpc_pollset_worker* worker, uint32_t read_mask,
151                               uint32_t write_mask, grpc_fd_watcher* rec);
152 /* Complete polling previously started with fd_begin_poll
153    MUST NOT be called with a pollset lock taken
154    if got_read or got_write are 1, also does the become_{readable,writable} as
155    appropriate. */
156 static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write);
157 
158 /* Return 1 if this fd is orphaned, 0 otherwise */
159 static bool fd_is_orphaned(grpc_fd* fd);
160 
161 #ifndef NDEBUG
162 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
163 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
164                      int line);
165 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
166 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
167 #else
168 static void fd_ref(grpc_fd* fd);
169 static void fd_unref(grpc_fd* fd);
170 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
171 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
172 #endif
173 
174 #define CLOSURE_NOT_READY ((grpc_closure*)0)
175 #define CLOSURE_READY ((grpc_closure*)1)
176 
177 /*******************************************************************************
178  * pollset declarations
179  */
180 
181 typedef struct grpc_cached_wakeup_fd {
182   grpc_wakeup_fd fd;
183   struct grpc_cached_wakeup_fd* next;
184 
185   /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
186   grpc_fork_fd_list* fork_fd_list;
187 } grpc_cached_wakeup_fd;
188 
189 struct grpc_pollset_worker {
190   grpc_cached_wakeup_fd* wakeup_fd;
191   int reevaluate_polling_on_wakeup;
192   int kicked_specifically;
193   struct grpc_pollset_worker* next;
194   struct grpc_pollset_worker* prev;
195 };
196 
197 struct grpc_pollset {
198   gpr_mu mu;
199   grpc_pollset_worker root_worker;
200   int shutting_down;
201   int called_shutdown;
202   int kicked_without_pollers;
203   grpc_closure* shutdown_done;
204   int pollset_set_count;
205   /* all polled fds */
206   size_t fd_count;
207   size_t fd_capacity;
208   grpc_fd** fds;
209   /* Local cache of eventfds for workers */
210   grpc_cached_wakeup_fd* local_wakeup_cache;
211 };
212 
213 /* Add an fd to a pollset */
214 static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
215 
216 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
217 
218 /* Convert a timespec to milliseconds:
219    - very small or negative poll times are clamped to zero to do a
220      non-blocking poll (which becomes spin polling)
221    - other small values are rounded up to one millisecond
222    - longer than a millisecond polls are rounded up to the next nearest
223      millisecond to avoid spinning
224    - infinite timeouts are converted to -1 */
225 static int poll_deadline_to_millis_timeout(grpc_millis deadline);
226 
227 /* Allow kick to wakeup the currently polling worker */
228 #define GRPC_POLLSET_CAN_KICK_SELF 1
229 /* Force the wakee to repoll when awoken */
230 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
231 /* As per pollset_kick, with an extended set of flags (defined above)
232    -- mostly for fd_posix's use. */
233 static grpc_error* pollset_kick_ext(grpc_pollset* p,
234                                     grpc_pollset_worker* specific_worker,
235                                     uint32_t flags) GRPC_MUST_USE_RESULT;
236 
237 /* Return 1 if the pollset has active threads in pollset_work (pollset must
238  * be locked) */
239 static bool pollset_has_workers(grpc_pollset* pollset);
240 
241 /*******************************************************************************
242  * pollset_set definitions
243  */
244 
245 struct grpc_pollset_set {
246   gpr_mu mu;
247 
248   size_t pollset_count;
249   size_t pollset_capacity;
250   grpc_pollset** pollsets;
251 
252   size_t pollset_set_count;
253   size_t pollset_set_capacity;
254   struct grpc_pollset_set** pollset_sets;
255 
256   size_t fd_count;
257   size_t fd_capacity;
258   grpc_fd** fds;
259 };
260 
261 /*******************************************************************************
262  * functions to track opened fds. No-ops unless track_fds_for_fork is true.
263  */
264 
fork_fd_list_remove_node(grpc_fork_fd_list * node)265 static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
266   if (track_fds_for_fork) {
267     gpr_mu_lock(&fork_fd_list_mu);
268     if (fork_fd_list_head == node) {
269       fork_fd_list_head = node->next;
270     }
271     if (node->prev != nullptr) {
272       node->prev->next = node->next;
273     }
274     if (node->next != nullptr) {
275       node->next->prev = node->prev;
276     }
277     gpr_free(node);
278     gpr_mu_unlock(&fork_fd_list_mu);
279   }
280 }
281 
fork_fd_list_add_node(grpc_fork_fd_list * node)282 static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
283   gpr_mu_lock(&fork_fd_list_mu);
284   node->next = fork_fd_list_head;
285   node->prev = nullptr;
286   if (fork_fd_list_head != nullptr) {
287     fork_fd_list_head->prev = node;
288   }
289   fork_fd_list_head = node;
290   gpr_mu_unlock(&fork_fd_list_mu);
291 }
292 
fork_fd_list_add_grpc_fd(grpc_fd * fd)293 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
294   if (track_fds_for_fork) {
295     fd->fork_fd_list =
296         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
297     fd->fork_fd_list->fd = fd;
298     fd->fork_fd_list->cached_wakeup_fd = nullptr;
299     fork_fd_list_add_node(fd->fork_fd_list);
300   }
301 }
302 
fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd * fd)303 static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
304   if (track_fds_for_fork) {
305     fd->fork_fd_list =
306         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
307     fd->fork_fd_list->cached_wakeup_fd = fd;
308     fd->fork_fd_list->fd = nullptr;
309     fork_fd_list_add_node(fd->fork_fd_list);
310   }
311 }
312 
313 /*******************************************************************************
314  * fd_posix.c
315  */
316 
317 #ifndef NDEBUG
318 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
319 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
ref_by(grpc_fd * fd,int n,const char * reason,const char * file,int line)320 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
321                    int line) {
322   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
323     gpr_log(GPR_DEBUG,
324             "FD %d %p   ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
325             fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
326             gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
327   }
328 #else
329 #define REF_BY(fd, n, reason) \
330   do {                        \
331     ref_by(fd, n);            \
332     (void)(reason);           \
333   } while (0)
334 #define UNREF_BY(fd, n, reason) \
335   do {                          \
336     unref_by(fd, n);            \
337     (void)(reason);             \
338   } while (0)
339 static void ref_by(grpc_fd* fd, int n) {
340 #endif
341   GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
342 }
343 
344 #ifndef NDEBUG
345 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
346                      int line) {
347   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
348     gpr_log(GPR_DEBUG,
349             "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
350             fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
351             gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
352   }
353 #else
354 static void unref_by(grpc_fd* fd, int n) {
355 #endif
356   gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
357   if (old == n) {
358     gpr_mu_destroy(&fd->mu);
359     grpc_iomgr_unregister_object(&fd->iomgr_object);
360     fork_fd_list_remove_node(fd->fork_fd_list);
361     if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
362     gpr_free(fd);
363   } else {
364     GPR_ASSERT(old > n);
365   }
366 }
367 
368 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
369   // Avoid unused-parameter warning for debug-only parameter
370   (void)track_err;
371   GPR_DEBUG_ASSERT(track_err == false);
372   grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
373   gpr_mu_init(&r->mu);
374   gpr_atm_rel_store(&r->refst, 1);
375   r->shutdown = 0;
376   r->read_closure = CLOSURE_NOT_READY;
377   r->write_closure = CLOSURE_NOT_READY;
378   r->fd = fd;
379   r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
380       &r->inactive_watcher_root;
381   r->read_watcher = r->write_watcher = nullptr;
382   r->on_done_closure = nullptr;
383   r->closed = 0;
384   r->released = 0;
385   gpr_atm_no_barrier_store(&r->pollhup, 0);
386 
387   std::string name2 = absl::StrCat(name, " fd=", fd);
388   grpc_iomgr_register_object(&r->iomgr_object, name2.c_str());
389   fork_fd_list_add_grpc_fd(r);
390   return r;
391 }
392 
393 static bool fd_is_orphaned(grpc_fd* fd) {
394   return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
395 }
396 
397 static grpc_error* pollset_kick_locked(grpc_fd_watcher* watcher) {
398   gpr_mu_lock(&watcher->pollset->mu);
399   GPR_ASSERT(watcher->worker);
400   grpc_error* err = pollset_kick_ext(watcher->pollset, watcher->worker,
401                                      GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
402   gpr_mu_unlock(&watcher->pollset->mu);
403   return err;
404 }
405 
406 static void maybe_wake_one_watcher_locked(grpc_fd* fd) {
407   if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
408     pollset_kick_locked(fd->inactive_watcher_root.next);
409   } else if (fd->read_watcher) {
410     pollset_kick_locked(fd->read_watcher);
411   } else if (fd->write_watcher) {
412     pollset_kick_locked(fd->write_watcher);
413   }
414 }
415 
416 static void wake_all_watchers_locked(grpc_fd* fd) {
417   grpc_fd_watcher* watcher;
418   for (watcher = fd->inactive_watcher_root.next;
419        watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
420     pollset_kick_locked(watcher);
421   }
422   if (fd->read_watcher) {
423     pollset_kick_locked(fd->read_watcher);
424   }
425   if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
426     pollset_kick_locked(fd->write_watcher);
427   }
428 }
429 
430 static int has_watchers(grpc_fd* fd) {
431   return fd->read_watcher != nullptr || fd->write_watcher != nullptr ||
432          fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
433 }
434 
435 static void close_fd_locked(grpc_fd* fd) {
436   fd->closed = 1;
437   if (!fd->released) {
438     close(fd->fd);
439   }
440   grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE);
441 }
442 
443 static int fd_wrapped_fd(grpc_fd* fd) {
444   if (fd->released || fd->closed) {
445     return -1;
446   } else {
447     return fd->fd;
448   }
449 }
450 
451 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
452                       const char* reason) {
453   fd->on_done_closure = on_done;
454   fd->released = release_fd != nullptr;
455   if (release_fd != nullptr) {
456     *release_fd = fd->fd;
457     fd->released = true;
458   }
459   gpr_mu_lock(&fd->mu);
460   REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
461   if (!has_watchers(fd)) {
462     close_fd_locked(fd);
463   } else {
464     wake_all_watchers_locked(fd);
465   }
466   gpr_mu_unlock(&fd->mu);
467   UNREF_BY(fd, 2, reason); /* drop the reference */
468 }
469 
470 /* increment refcount by two to avoid changing the orphan bit */
471 #ifndef NDEBUG
472 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
473                    int line) {
474   ref_by(fd, 2, reason, file, line);
475 }
476 
477 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
478                      int line) {
479   unref_by(fd, 2, reason, file, line);
480 }
481 #else
482 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
483 
484 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
485 #endif
486 
487 static grpc_error* fd_shutdown_error(grpc_fd* fd) {
488   if (!fd->shutdown) {
489     return GRPC_ERROR_NONE;
490   } else {
491     return grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
492                                   "FD shutdown", &fd->shutdown_error, 1),
493                               GRPC_ERROR_INT_GRPC_STATUS,
494                               GRPC_STATUS_UNAVAILABLE);
495   }
496 }
497 
498 static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
499                              grpc_closure* closure) {
500   if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
501     grpc_core::ExecCtx::Run(
502         DEBUG_LOCATION, closure,
503         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
504                            GRPC_ERROR_INT_GRPC_STATUS,
505                            GRPC_STATUS_UNAVAILABLE));
506   } else if (*st == CLOSURE_NOT_READY) {
507     /* not ready ==> switch to a waiting state by setting the closure */
508     *st = closure;
509   } else if (*st == CLOSURE_READY) {
510     /* already ready ==> queue the closure to run immediately */
511     *st = CLOSURE_NOT_READY;
512     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd));
513     maybe_wake_one_watcher_locked(fd);
514   } else {
515     /* upcallptr was set to a different closure.  This is an error! */
516     gpr_log(GPR_ERROR,
517             "User called a notify_on function with a previous callback still "
518             "pending");
519     abort();
520   }
521 }
522 
523 /* returns 1 if state becomes not ready */
524 static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
525   if (*st == CLOSURE_READY) {
526     /* duplicate ready ==> ignore */
527     return 0;
528   } else if (*st == CLOSURE_NOT_READY) {
529     /* not ready, and not waiting ==> flag ready */
530     *st = CLOSURE_READY;
531     return 0;
532   } else {
533     /* waiting ==> queue closure */
534     grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd));
535     *st = CLOSURE_NOT_READY;
536     return 1;
537   }
538 }
539 
540 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
541   gpr_mu_lock(&fd->mu);
542   /* only shutdown once */
543   if (!fd->shutdown) {
544     fd->shutdown = 1;
545     fd->shutdown_error = why;
546     /* signal read/write closed to OS so that future operations fail */
547     shutdown(fd->fd, SHUT_RDWR);
548     set_ready_locked(fd, &fd->read_closure);
549     set_ready_locked(fd, &fd->write_closure);
550   } else {
551     GRPC_ERROR_UNREF(why);
552   }
553   gpr_mu_unlock(&fd->mu);
554 }
555 
556 static bool fd_is_shutdown(grpc_fd* fd) {
557   gpr_mu_lock(&fd->mu);
558   bool r = fd->shutdown;
559   gpr_mu_unlock(&fd->mu);
560   return r;
561 }
562 
563 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
564   gpr_mu_lock(&fd->mu);
565   notify_on_locked(fd, &fd->read_closure, closure);
566   gpr_mu_unlock(&fd->mu);
567 }
568 
569 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
570   gpr_mu_lock(&fd->mu);
571   notify_on_locked(fd, &fd->write_closure, closure);
572   gpr_mu_unlock(&fd->mu);
573 }
574 
575 static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {
576   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
577     gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
578   }
579   grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
580 }
581 
582 static void fd_set_readable(grpc_fd* fd) {
583   gpr_mu_lock(&fd->mu);
584   set_ready_locked(fd, &fd->read_closure);
585   gpr_mu_unlock(&fd->mu);
586 }
587 
588 static void fd_set_writable(grpc_fd* fd) {
589   gpr_mu_lock(&fd->mu);
590   set_ready_locked(fd, &fd->write_closure);
591   gpr_mu_unlock(&fd->mu);
592 }
593 
594 static void fd_set_error(grpc_fd* /*fd*/) {
595   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
596     gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
597   }
598 }
599 
600 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
601                               grpc_pollset_worker* worker, uint32_t read_mask,
602                               uint32_t write_mask, grpc_fd_watcher* watcher) {
603   uint32_t mask = 0;
604   grpc_closure* cur;
605   int requested;
606   /* keep track of pollers that have requested our events, in case they change
607    */
608   GRPC_FD_REF(fd, "poll");
609 
610   gpr_mu_lock(&fd->mu);
611 
612   /* if we are shutdown, then don't add to the watcher set */
613   if (fd->shutdown) {
614     watcher->fd = nullptr;
615     watcher->pollset = nullptr;
616     watcher->worker = nullptr;
617     gpr_mu_unlock(&fd->mu);
618     GRPC_FD_UNREF(fd, "poll");
619     return 0;
620   }
621 
622   /* if there is nobody polling for read, but we need to, then start doing so */
623   cur = fd->read_closure;
624   requested = cur != CLOSURE_READY;
625   if (read_mask && fd->read_watcher == nullptr && requested) {
626     fd->read_watcher = watcher;
627     mask |= read_mask;
628   }
629   /* if there is nobody polling for write, but we need to, then start doing so
630    */
631   cur = fd->write_closure;
632   requested = cur != CLOSURE_READY;
633   if (write_mask && fd->write_watcher == nullptr && requested) {
634     fd->write_watcher = watcher;
635     mask |= write_mask;
636   }
637   /* if not polling, remember this watcher in case we need someone to later */
638   if (mask == 0 && worker != nullptr) {
639     watcher->next = &fd->inactive_watcher_root;
640     watcher->prev = watcher->next->prev;
641     watcher->next->prev = watcher->prev->next = watcher;
642   }
643   watcher->pollset = pollset;
644   watcher->worker = worker;
645   watcher->fd = fd;
646   gpr_mu_unlock(&fd->mu);
647 
648   return mask;
649 }
650 
651 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
652   int was_polling = 0;
653   int kick = 0;
654   grpc_fd* fd = watcher->fd;
655 
656   if (fd == nullptr) {
657     return;
658   }
659 
660   gpr_mu_lock(&fd->mu);
661 
662   if (watcher == fd->read_watcher) {
663     /* remove read watcher, kick if we still need a read */
664     was_polling = 1;
665     if (!got_read) {
666       kick = 1;
667     }
668     fd->read_watcher = nullptr;
669   }
670   if (watcher == fd->write_watcher) {
671     /* remove write watcher, kick if we still need a write */
672     was_polling = 1;
673     if (!got_write) {
674       kick = 1;
675     }
676     fd->write_watcher = nullptr;
677   }
678   if (!was_polling && watcher->worker != nullptr) {
679     /* remove from inactive list */
680     watcher->next->prev = watcher->prev;
681     watcher->prev->next = watcher->next;
682   }
683   if (got_read) {
684     if (set_ready_locked(fd, &fd->read_closure)) {
685       kick = 1;
686     }
687   }
688   if (got_write) {
689     if (set_ready_locked(fd, &fd->write_closure)) {
690       kick = 1;
691     }
692   }
693   if (kick) {
694     maybe_wake_one_watcher_locked(fd);
695   }
696   if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
697     close_fd_locked(fd);
698   }
699   gpr_mu_unlock(&fd->mu);
700 
701   GRPC_FD_UNREF(fd, "poll");
702 }
703 
704 /*******************************************************************************
705  * pollset_posix.c
706  */
707 
708 GPR_TLS_DECL(g_current_thread_poller);
709 GPR_TLS_DECL(g_current_thread_worker);
710 
711 static void remove_worker(grpc_pollset* /*p*/, grpc_pollset_worker* worker) {
712   worker->prev->next = worker->next;
713   worker->next->prev = worker->prev;
714 }
715 
716 static bool pollset_has_workers(grpc_pollset* p) {
717   return p->root_worker.next != &p->root_worker;
718 }
719 
720 static bool pollset_in_pollset_sets(grpc_pollset* p) {
721   return p->pollset_set_count;
722 }
723 
724 static bool pollset_has_observers(grpc_pollset* p) {
725   return pollset_has_workers(p) || pollset_in_pollset_sets(p);
726 }
727 
728 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
729   if (pollset_has_workers(p)) {
730     grpc_pollset_worker* w = p->root_worker.next;
731     remove_worker(p, w);
732     return w;
733   } else {
734     return nullptr;
735   }
736 }
737 
738 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
739   worker->next = &p->root_worker;
740   worker->prev = worker->next->prev;
741   worker->prev->next = worker->next->prev = worker;
742 }
743 
744 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
745   worker->prev = &p->root_worker;
746   worker->next = worker->prev->next;
747   worker->prev->next = worker->next->prev = worker;
748 }
749 
750 static void kick_append_error(grpc_error** composite, grpc_error* error) {
751   if (error == GRPC_ERROR_NONE) return;
752   if (*composite == GRPC_ERROR_NONE) {
753     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure");
754   }
755   *composite = grpc_error_add_child(*composite, error);
756 }
757 
758 static grpc_error* pollset_kick_ext(grpc_pollset* p,
759                                     grpc_pollset_worker* specific_worker,
760                                     uint32_t flags) {
761   GPR_TIMER_SCOPE("pollset_kick_ext", 0);
762   grpc_error* error = GRPC_ERROR_NONE;
763   GRPC_STATS_INC_POLLSET_KICK();
764 
765   /* pollset->mu already held */
766   if (specific_worker != nullptr) {
767     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
768       GPR_TIMER_SCOPE("pollset_kick_ext.broadcast", 0);
769       GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
770       for (specific_worker = p->root_worker.next;
771            specific_worker != &p->root_worker;
772            specific_worker = specific_worker->next) {
773         kick_append_error(
774             &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
775       }
776       p->kicked_without_pollers = true;
777     } else if (gpr_tls_get(&g_current_thread_worker) !=
778                (intptr_t)specific_worker) {
779       GPR_TIMER_MARK("different_thread_worker", 0);
780       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
781         specific_worker->reevaluate_polling_on_wakeup = true;
782       }
783       specific_worker->kicked_specifically = true;
784       kick_append_error(&error,
785                         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
786     } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
787       GPR_TIMER_MARK("kick_yoself", 0);
788       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
789         specific_worker->reevaluate_polling_on_wakeup = true;
790       }
791       specific_worker->kicked_specifically = true;
792       kick_append_error(&error,
793                         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
794     }
795   } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
796     GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
797     GPR_TIMER_MARK("kick_anonymous", 0);
798     specific_worker = pop_front_worker(p);
799     if (specific_worker != nullptr) {
800       if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
801         GPR_TIMER_MARK("kick_anonymous_not_self", 0);
802         push_back_worker(p, specific_worker);
803         specific_worker = pop_front_worker(p);
804         if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
805             gpr_tls_get(&g_current_thread_worker) ==
806                 (intptr_t)specific_worker) {
807           push_back_worker(p, specific_worker);
808           specific_worker = nullptr;
809         }
810       }
811       if (specific_worker != nullptr) {
812         GPR_TIMER_MARK("finally_kick", 0);
813         push_back_worker(p, specific_worker);
814         kick_append_error(
815             &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
816       }
817     } else {
818       GPR_TIMER_MARK("kicked_no_pollers", 0);
819       p->kicked_without_pollers = true;
820     }
821   }
822 
823   GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
824   return error;
825 }
826 
827 static grpc_error* pollset_kick(grpc_pollset* p,
828                                 grpc_pollset_worker* specific_worker) {
829   return pollset_kick_ext(p, specific_worker, 0);
830 }
831 
832 /* global state management */
833 
834 static grpc_error* pollset_global_init(void) {
835   gpr_tls_init(&g_current_thread_poller);
836   gpr_tls_init(&g_current_thread_worker);
837   return GRPC_ERROR_NONE;
838 }
839 
840 static void pollset_global_shutdown(void) {
841   gpr_tls_destroy(&g_current_thread_poller);
842   gpr_tls_destroy(&g_current_thread_worker);
843 }
844 
845 /* main interface */
846 
847 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
848   gpr_mu_init(&pollset->mu);
849   *mu = &pollset->mu;
850   pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
851   pollset->shutting_down = 0;
852   pollset->called_shutdown = 0;
853   pollset->kicked_without_pollers = 0;
854   pollset->local_wakeup_cache = nullptr;
855   pollset->kicked_without_pollers = 0;
856   pollset->fd_count = 0;
857   pollset->fd_capacity = 0;
858   pollset->fds = nullptr;
859   pollset->pollset_set_count = 0;
860 }
861 
862 static void pollset_destroy(grpc_pollset* pollset) {
863   GPR_ASSERT(!pollset_has_workers(pollset));
864   while (pollset->local_wakeup_cache) {
865     grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
866     fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
867     grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
868     gpr_free(pollset->local_wakeup_cache);
869     pollset->local_wakeup_cache = next;
870   }
871   gpr_free(pollset->fds);
872   gpr_mu_destroy(&pollset->mu);
873 }
874 
875 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
876   gpr_mu_lock(&pollset->mu);
877   size_t i;
878   /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
879   for (i = 0; i < pollset->fd_count; i++) {
880     if (pollset->fds[i] == fd) goto exit;
881   }
882   if (pollset->fd_count == pollset->fd_capacity) {
883     pollset->fd_capacity =
884         GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
885     pollset->fds = static_cast<grpc_fd**>(
886         gpr_realloc(pollset->fds, sizeof(grpc_fd*) * pollset->fd_capacity));
887   }
888   pollset->fds[pollset->fd_count++] = fd;
889   GRPC_FD_REF(fd, "multipoller");
890   pollset_kick(pollset, nullptr);
891 exit:
892   gpr_mu_unlock(&pollset->mu);
893 }
894 
895 static void finish_shutdown(grpc_pollset* pollset) {
896   size_t i;
897   for (i = 0; i < pollset->fd_count; i++) {
898     GRPC_FD_UNREF(pollset->fds[i], "multipoller");
899   }
900   pollset->fd_count = 0;
901   grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done,
902                           GRPC_ERROR_NONE);
903 }
904 
905 static void work_combine_error(grpc_error** composite, grpc_error* error) {
906   if (error == GRPC_ERROR_NONE) return;
907   if (*composite == GRPC_ERROR_NONE) {
908     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work");
909   }
910   *composite = grpc_error_add_child(*composite, error);
911 }
912 
913 static grpc_error* pollset_work(grpc_pollset* pollset,
914                                 grpc_pollset_worker** worker_hdl,
915                                 grpc_millis deadline) {
916   GPR_TIMER_SCOPE("pollset_work", 0);
917   grpc_pollset_worker worker;
918   if (worker_hdl) *worker_hdl = &worker;
919   grpc_error* error = GRPC_ERROR_NONE;
920 
921   /* Avoid malloc for small number of elements. */
922   enum { inline_elements = 96 };
923   struct pollfd pollfd_space[inline_elements];
924   struct grpc_fd_watcher watcher_space[inline_elements];
925 
926   /* pollset->mu already held */
927   int added_worker = 0;
928   int locked = 1;
929   int queued_work = 0;
930   int keep_polling = 0;
931   /* this must happen before we (potentially) drop pollset->mu */
932   worker.next = worker.prev = nullptr;
933   worker.reevaluate_polling_on_wakeup = 0;
934   if (pollset->local_wakeup_cache != nullptr) {
935     worker.wakeup_fd = pollset->local_wakeup_cache;
936     pollset->local_wakeup_cache = worker.wakeup_fd->next;
937   } else {
938     worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
939         gpr_malloc(sizeof(*worker.wakeup_fd)));
940     error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
941     fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
942     if (error != GRPC_ERROR_NONE) {
943       GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
944       return error;
945     }
946   }
947   worker.kicked_specifically = 0;
948   /* If we're shutting down then we don't execute any extended work */
949   if (pollset->shutting_down) {
950     GPR_TIMER_MARK("pollset_work.shutting_down", 0);
951     goto done;
952   }
953   /* Start polling, and keep doing so while we're being asked to
954      re-evaluate our pollers (this allows poll() based pollers to
955      ensure they don't miss wakeups) */
956   keep_polling = 1;
957   gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
958   while (keep_polling) {
959     keep_polling = 0;
960     if (!pollset->kicked_without_pollers ||
961         deadline <= grpc_core::ExecCtx::Get()->Now()) {
962       if (!added_worker) {
963         push_front_worker(pollset, &worker);
964         added_worker = 1;
965         gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
966       }
967       GPR_TIMER_SCOPE("maybe_work_and_unlock", 0);
968 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
969 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
970 
971       int timeout;
972       int r;
973       size_t i, fd_count;
974       nfds_t pfd_count;
975       grpc_fd_watcher* watchers;
976       struct pollfd* pfds;
977 
978       timeout = poll_deadline_to_millis_timeout(deadline);
979 
980       if (pollset->fd_count + 2 <= inline_elements) {
981         pfds = pollfd_space;
982         watchers = watcher_space;
983       } else {
984         /* Allocate one buffer to hold both pfds and watchers arrays */
985         const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
986         const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
987         void* buf = gpr_malloc(pfd_size + watch_size);
988         pfds = static_cast<struct pollfd*>(buf);
989         watchers = static_cast<grpc_fd_watcher*>(
990             (void*)(static_cast<char*>(buf) + pfd_size));
991       }
992 
993       fd_count = 0;
994       pfd_count = 1;
995       pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
996       pfds[0].events = POLLIN;
997       pfds[0].revents = 0;
998       for (i = 0; i < pollset->fd_count; i++) {
999         if (fd_is_orphaned(pollset->fds[i]) ||
1000             gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
1001           GRPC_FD_UNREF(pollset->fds[i], "multipoller");
1002         } else {
1003           pollset->fds[fd_count++] = pollset->fds[i];
1004           watchers[pfd_count].fd = pollset->fds[i];
1005           GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
1006           pfds[pfd_count].fd = pollset->fds[i]->fd;
1007           pfds[pfd_count].revents = 0;
1008           pfd_count++;
1009         }
1010       }
1011       pollset->fd_count = fd_count;
1012       gpr_mu_unlock(&pollset->mu);
1013 
1014       for (i = 1; i < pfd_count; i++) {
1015         grpc_fd* fd = watchers[i].fd;
1016         pfds[i].events = static_cast<short>(
1017             fd_begin_poll(fd, pollset, &worker, POLLIN, POLLOUT, &watchers[i]));
1018         GRPC_FD_UNREF(fd, "multipoller_start");
1019       }
1020 
1021       /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1022          even going into the blocking annotation if possible */
1023       GRPC_SCHEDULING_START_BLOCKING_REGION;
1024       GRPC_STATS_INC_SYSCALL_POLL();
1025       r = grpc_poll_function(pfds, pfd_count, timeout);
1026       GRPC_SCHEDULING_END_BLOCKING_REGION;
1027 
1028       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1029         gpr_log(GPR_INFO, "%p poll=%d", pollset, r);
1030       }
1031 
1032       if (r < 0) {
1033         if (errno != EINTR) {
1034           work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
1035         }
1036 
1037         for (i = 1; i < pfd_count; i++) {
1038           if (watchers[i].fd == nullptr) {
1039             fd_end_poll(&watchers[i], 0, 0);
1040           } else {
1041             // Wake up all the file descriptors, if we have an invalid one
1042             // we can identify it on the next pollset_work()
1043             fd_end_poll(&watchers[i], 1, 1);
1044           }
1045         }
1046       } else if (r == 0) {
1047         for (i = 1; i < pfd_count; i++) {
1048           fd_end_poll(&watchers[i], 0, 0);
1049         }
1050       } else {
1051         if (pfds[0].revents & POLLIN_CHECK) {
1052           if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1053             gpr_log(GPR_INFO, "%p: got_wakeup", pollset);
1054           }
1055           work_combine_error(
1056               &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
1057         }
1058         for (i = 1; i < pfd_count; i++) {
1059           if (watchers[i].fd == nullptr) {
1060             fd_end_poll(&watchers[i], 0, 0);
1061           } else {
1062             if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1063               gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
1064                       pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
1065                       (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
1066             }
1067             /* This is a mitigation to prevent poll() from spinning on a
1068              ** POLLHUP https://github.com/grpc/grpc/pull/13665
1069              */
1070             if (pfds[i].revents & POLLHUP) {
1071               gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
1072             }
1073             fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
1074                         pfds[i].revents & POLLOUT_CHECK);
1075           }
1076         }
1077       }
1078 
1079       if (pfds != pollfd_space) {
1080         /* pfds and watchers are in the same memory block pointed to by pfds */
1081         gpr_free(pfds);
1082       }
1083 
1084       locked = 0;
1085     } else {
1086       GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1087       pollset->kicked_without_pollers = 0;
1088     }
1089   /* Finished execution - start cleaning up.
1090      Note that we may arrive here from outside the enclosing while() loop.
1091      In that case we won't loop though as we haven't added worker to the
1092      worker list, which means nobody could ask us to re-evaluate polling). */
1093   done:
1094     if (!locked) {
1095       queued_work |= grpc_core::ExecCtx::Get()->Flush();
1096       gpr_mu_lock(&pollset->mu);
1097       locked = 1;
1098     }
1099     /* If we're forced to re-evaluate polling (via pollset_kick with
1100        GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1101        a loop */
1102     if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
1103       worker.reevaluate_polling_on_wakeup = 0;
1104       pollset->kicked_without_pollers = 0;
1105       if (queued_work || worker.kicked_specifically) {
1106         /* If there's queued work on the list, then set the deadline to be
1107            immediate so we get back out of the polling loop quickly */
1108         deadline = 0;
1109       }
1110       keep_polling = 1;
1111     }
1112   }
1113   gpr_tls_set(&g_current_thread_poller, 0);
1114   if (added_worker) {
1115     remove_worker(pollset, &worker);
1116     gpr_tls_set(&g_current_thread_worker, 0);
1117   }
1118   /* release wakeup fd to the local pool */
1119   worker.wakeup_fd->next = pollset->local_wakeup_cache;
1120   pollset->local_wakeup_cache = worker.wakeup_fd;
1121   /* check shutdown conditions */
1122   if (pollset->shutting_down) {
1123     if (pollset_has_workers(pollset)) {
1124       pollset_kick(pollset, nullptr);
1125     } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1126       pollset->called_shutdown = 1;
1127       gpr_mu_unlock(&pollset->mu);
1128       finish_shutdown(pollset);
1129       grpc_core::ExecCtx::Get()->Flush();
1130       /* Continuing to access pollset here is safe -- it is the caller's
1131        * responsibility to not destroy when it has outstanding calls to
1132        * pollset_work.
1133        * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
1134       gpr_mu_lock(&pollset->mu);
1135     }
1136   }
1137   if (worker_hdl) *worker_hdl = nullptr;
1138   GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1139   return error;
1140 }
1141 
1142 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1143   GPR_ASSERT(!pollset->shutting_down);
1144   pollset->shutting_down = 1;
1145   pollset->shutdown_done = closure;
1146   pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1147   if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1148     pollset->called_shutdown = 1;
1149     finish_shutdown(pollset);
1150   }
1151 }
1152 
1153 static int poll_deadline_to_millis_timeout(grpc_millis deadline) {
1154   if (deadline == GRPC_MILLIS_INF_FUTURE) return -1;
1155   if (deadline == 0) return 0;
1156   grpc_millis n = deadline - grpc_core::ExecCtx::Get()->Now();
1157   if (n < 0) return 0;
1158   if (n > INT_MAX) return -1;
1159   return static_cast<int>(n);
1160 }
1161 
1162 /*******************************************************************************
1163  * pollset_set_posix.c
1164  */
1165 
1166 static grpc_pollset_set* pollset_set_create(void) {
1167   grpc_pollset_set* pollset_set =
1168       static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pollset_set)));
1169   gpr_mu_init(&pollset_set->mu);
1170   return pollset_set;
1171 }
1172 
1173 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
1174   size_t i;
1175   gpr_mu_destroy(&pollset_set->mu);
1176   for (i = 0; i < pollset_set->fd_count; i++) {
1177     GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1178   }
1179   for (i = 0; i < pollset_set->pollset_count; i++) {
1180     grpc_pollset* pollset = pollset_set->pollsets[i];
1181     gpr_mu_lock(&pollset->mu);
1182     pollset->pollset_set_count--;
1183     /* check shutdown */
1184     if (pollset->shutting_down && !pollset->called_shutdown &&
1185         !pollset_has_observers(pollset)) {
1186       pollset->called_shutdown = 1;
1187       gpr_mu_unlock(&pollset->mu);
1188       finish_shutdown(pollset);
1189     } else {
1190       gpr_mu_unlock(&pollset->mu);
1191     }
1192   }
1193   gpr_free(pollset_set->pollsets);
1194   gpr_free(pollset_set->pollset_sets);
1195   gpr_free(pollset_set->fds);
1196   gpr_free(pollset_set);
1197 }
1198 
1199 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
1200                                     grpc_pollset* pollset) {
1201   size_t i, j;
1202   gpr_mu_lock(&pollset->mu);
1203   pollset->pollset_set_count++;
1204   gpr_mu_unlock(&pollset->mu);
1205   gpr_mu_lock(&pollset_set->mu);
1206   if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1207     pollset_set->pollset_capacity =
1208         GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1209     pollset_set->pollsets = static_cast<grpc_pollset**>(gpr_realloc(
1210         pollset_set->pollsets,
1211         pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)));
1212   }
1213   pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1214   for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1215     if (fd_is_orphaned(pollset_set->fds[i])) {
1216       GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1217     } else {
1218       pollset_add_fd(pollset, pollset_set->fds[i]);
1219       pollset_set->fds[j++] = pollset_set->fds[i];
1220     }
1221   }
1222   pollset_set->fd_count = j;
1223   gpr_mu_unlock(&pollset_set->mu);
1224 }
1225 
1226 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
1227                                     grpc_pollset* pollset) {
1228   size_t i;
1229   gpr_mu_lock(&pollset_set->mu);
1230   for (i = 0; i < pollset_set->pollset_count; i++) {
1231     if (pollset_set->pollsets[i] == pollset) {
1232       pollset_set->pollset_count--;
1233       GPR_SWAP(grpc_pollset*, pollset_set->pollsets[i],
1234                pollset_set->pollsets[pollset_set->pollset_count]);
1235       break;
1236     }
1237   }
1238   gpr_mu_unlock(&pollset_set->mu);
1239   gpr_mu_lock(&pollset->mu);
1240   pollset->pollset_set_count--;
1241   /* check shutdown */
1242   if (pollset->shutting_down && !pollset->called_shutdown &&
1243       !pollset_has_observers(pollset)) {
1244     pollset->called_shutdown = 1;
1245     gpr_mu_unlock(&pollset->mu);
1246     finish_shutdown(pollset);
1247   } else {
1248     gpr_mu_unlock(&pollset->mu);
1249   }
1250 }
1251 
1252 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1253                                         grpc_pollset_set* item) {
1254   size_t i, j;
1255   gpr_mu_lock(&bag->mu);
1256   if (bag->pollset_set_count == bag->pollset_set_capacity) {
1257     bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1258     bag->pollset_sets = static_cast<grpc_pollset_set**>(
1259         gpr_realloc(bag->pollset_sets,
1260                     bag->pollset_set_capacity * sizeof(*bag->pollset_sets)));
1261   }
1262   bag->pollset_sets[bag->pollset_set_count++] = item;
1263   for (i = 0, j = 0; i < bag->fd_count; i++) {
1264     if (fd_is_orphaned(bag->fds[i])) {
1265       GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1266     } else {
1267       pollset_set_add_fd(item, bag->fds[i]);
1268       bag->fds[j++] = bag->fds[i];
1269     }
1270   }
1271   bag->fd_count = j;
1272   gpr_mu_unlock(&bag->mu);
1273 }
1274 
1275 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1276                                         grpc_pollset_set* item) {
1277   size_t i;
1278   gpr_mu_lock(&bag->mu);
1279   for (i = 0; i < bag->pollset_set_count; i++) {
1280     if (bag->pollset_sets[i] == item) {
1281       bag->pollset_set_count--;
1282       GPR_SWAP(grpc_pollset_set*, bag->pollset_sets[i],
1283                bag->pollset_sets[bag->pollset_set_count]);
1284       break;
1285     }
1286   }
1287   gpr_mu_unlock(&bag->mu);
1288 }
1289 
1290 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1291   size_t i;
1292   gpr_mu_lock(&pollset_set->mu);
1293   if (pollset_set->fd_count == pollset_set->fd_capacity) {
1294     pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1295     pollset_set->fds = static_cast<grpc_fd**>(
1296         gpr_realloc(pollset_set->fds,
1297                     pollset_set->fd_capacity * sizeof(*pollset_set->fds)));
1298   }
1299   GRPC_FD_REF(fd, "pollset_set");
1300   pollset_set->fds[pollset_set->fd_count++] = fd;
1301   for (i = 0; i < pollset_set->pollset_count; i++) {
1302     pollset_add_fd(pollset_set->pollsets[i], fd);
1303   }
1304   for (i = 0; i < pollset_set->pollset_set_count; i++) {
1305     pollset_set_add_fd(pollset_set->pollset_sets[i], fd);
1306   }
1307   gpr_mu_unlock(&pollset_set->mu);
1308 }
1309 
1310 static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1311   size_t i;
1312   gpr_mu_lock(&pollset_set->mu);
1313   for (i = 0; i < pollset_set->fd_count; i++) {
1314     if (pollset_set->fds[i] == fd) {
1315       pollset_set->fd_count--;
1316       GPR_SWAP(grpc_fd*, pollset_set->fds[i],
1317                pollset_set->fds[pollset_set->fd_count]);
1318       GRPC_FD_UNREF(fd, "pollset_set");
1319       break;
1320     }
1321   }
1322   for (i = 0; i < pollset_set->pollset_set_count; i++) {
1323     pollset_set_del_fd(pollset_set->pollset_sets[i], fd);
1324   }
1325   gpr_mu_unlock(&pollset_set->mu);
1326 }
1327 
1328 /*******************************************************************************
1329  * event engine binding
1330  */
1331 
1332 static bool is_any_background_poller_thread(void) { return false; }
1333 
1334 static void shutdown_background_closure(void) {}
1335 
1336 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1337                                              grpc_error* /*error*/) {
1338   return false;
1339 }
1340 
1341 static void shutdown_engine(void) {
1342   pollset_global_shutdown();
1343   if (track_fds_for_fork) {
1344     gpr_mu_destroy(&fork_fd_list_mu);
1345     grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1346   }
1347 }
1348 
1349 static const grpc_event_engine_vtable vtable = {
1350     sizeof(grpc_pollset),
1351     false,
1352     false,
1353 
1354     fd_create,
1355     fd_wrapped_fd,
1356     fd_orphan,
1357     fd_shutdown,
1358     fd_notify_on_read,
1359     fd_notify_on_write,
1360     fd_notify_on_error,
1361     fd_set_readable,
1362     fd_set_writable,
1363     fd_set_error,
1364     fd_is_shutdown,
1365 
1366     pollset_init,
1367     pollset_shutdown,
1368     pollset_destroy,
1369     pollset_work,
1370     pollset_kick,
1371     pollset_add_fd,
1372 
1373     pollset_set_create,
1374     pollset_set_destroy,
1375     pollset_set_add_pollset,
1376     pollset_set_del_pollset,
1377     pollset_set_add_pollset_set,
1378     pollset_set_del_pollset_set,
1379     pollset_set_add_fd,
1380     pollset_set_del_fd,
1381 
1382     is_any_background_poller_thread,
1383     shutdown_background_closure,
1384     shutdown_engine,
1385     add_closure_to_background_poller,
1386 };
1387 
1388 /* Called by the child process's post-fork handler to close open fds, including
1389  * worker wakeup fds. This allows gRPC to shutdown in the child process without
1390  * interfering with connections or RPCs ongoing in the parent. */
1391 static void reset_event_manager_on_fork() {
1392   gpr_mu_lock(&fork_fd_list_mu);
1393   while (fork_fd_list_head != nullptr) {
1394     if (fork_fd_list_head->fd != nullptr) {
1395       if (!fork_fd_list_head->fd->closed) {
1396         close(fork_fd_list_head->fd->fd);
1397       }
1398       fork_fd_list_head->fd->fd = -1;
1399     } else {
1400       close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
1401       fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
1402       close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
1403       fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
1404     }
1405     fork_fd_list_head = fork_fd_list_head->next;
1406   }
1407   gpr_mu_unlock(&fork_fd_list_mu);
1408 }
1409 
1410 const grpc_event_engine_vtable* grpc_init_poll_posix(
1411     bool /*explicit_request*/) {
1412   if (!grpc_has_wakeup_fd()) {
1413     gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
1414     return nullptr;
1415   }
1416   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1417     return nullptr;
1418   }
1419   if (grpc_core::Fork::Enabled()) {
1420     track_fds_for_fork = true;
1421     gpr_mu_init(&fork_fd_list_mu);
1422     grpc_core::Fork::SetResetChildPollingEngineFunc(
1423         reset_event_manager_on_fork);
1424   }
1425   return &vtable;
1426 }
1427 
1428 #endif /* GRPC_POSIX_SOCKET_EV_POLL */
1429