1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_POSIX_SOCKET_EV_POLL
24
25 #include "src/core/lib/iomgr/ev_poll_posix.h"
26
27 #include <assert.h>
28 #include <errno.h>
29 #include <limits.h>
30 #include <poll.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 #include <string>
36
37 #include "absl/strings/str_cat.h"
38
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41
42 #include "src/core/lib/debug/stats.h"
43 #include "src/core/lib/gpr/murmur_hash.h"
44 #include "src/core/lib/gpr/tls.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/thd.h"
47 #include "src/core/lib/iomgr/block_annotate.h"
48 #include "src/core/lib/iomgr/iomgr_internal.h"
49 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
50 #include "src/core/lib/profiling/timers.h"
51
52 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
53
54 /*******************************************************************************
55 * FD declarations
56 */
57 typedef struct grpc_fd_watcher {
58 struct grpc_fd_watcher* next;
59 struct grpc_fd_watcher* prev;
60 grpc_pollset* pollset;
61 grpc_pollset_worker* worker;
62 grpc_fd* fd;
63 } grpc_fd_watcher;
64
65 typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
66
67 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
68 struct grpc_fork_fd_list {
69 /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
70 set to nullptr. */
71 grpc_fd* fd;
72 grpc_cached_wakeup_fd* cached_wakeup_fd;
73
74 grpc_fork_fd_list* next;
75 grpc_fork_fd_list* prev;
76 };
77
78 struct grpc_fd {
79 int fd;
80 /* refst format:
81 bit0: 1=active/0=orphaned
82 bit1-n: refcount
83 meaning that mostly we ref by two to avoid altering the orphaned bit,
84 and just unref by 1 when we're ready to flag the object as orphaned */
85 gpr_atm refst;
86
87 gpr_mu mu;
88 int shutdown;
89 int closed;
90 int released;
91 gpr_atm pollhup;
92 grpc_error* shutdown_error;
93
94 /* The watcher list.
95
96 The following watcher related fields are protected by watcher_mu.
97
98 An fd_watcher is an ephemeral object created when an fd wants to
99 begin polling, and destroyed after the poll.
100
101 It denotes the fd's interest in whether to read poll or write poll
102 or both or neither on this fd.
103
104 If a watcher is asked to poll for reads or writes, the read_watcher
105 or write_watcher fields are set respectively. A watcher may be asked
106 to poll for both, in which case both fields will be set.
107
108 read_watcher and write_watcher may be NULL if no watcher has been
109 asked to poll for reads or writes.
110
111 If an fd_watcher is not asked to poll for reads or writes, it's added
112 to a linked list of inactive watchers, rooted at inactive_watcher_root.
113 If at a later time there becomes need of a poller to poll, one of
114 the inactive pollers may be kicked out of their poll loops to take
115 that responsibility. */
116 grpc_fd_watcher inactive_watcher_root;
117 grpc_fd_watcher* read_watcher;
118 grpc_fd_watcher* write_watcher;
119
120 grpc_closure* read_closure;
121 grpc_closure* write_closure;
122
123 grpc_closure* on_done_closure;
124
125 grpc_iomgr_object iomgr_object;
126
127 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
128 grpc_fork_fd_list* fork_fd_list;
129 };
130
131 /* True when GRPC_ENABLE_FORK_SUPPORT=1. */
132 static bool track_fds_for_fork = false;
133
134 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
135 static grpc_fork_fd_list* fork_fd_list_head = nullptr;
136 static gpr_mu fork_fd_list_mu;
137
138 /* Begin polling on an fd.
139 Registers that the given pollset is interested in this fd - so that if read
140 or writability interest changes, the pollset can be kicked to pick up that
141 new interest.
142 Return value is:
143 (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
144 i.e. a combination of read_mask and write_mask determined by the fd's current
145 interest in said events.
146 Polling strategies that do not need to alter their behavior depending on the
147 fd's current interest (such as epoll) do not need to call this function.
148 MUST NOT be called with a pollset lock taken */
149 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
150 grpc_pollset_worker* worker, uint32_t read_mask,
151 uint32_t write_mask, grpc_fd_watcher* rec);
152 /* Complete polling previously started with fd_begin_poll
153 MUST NOT be called with a pollset lock taken
154 if got_read or got_write are 1, also does the become_{readable,writable} as
155 appropriate. */
156 static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write);
157
158 /* Return 1 if this fd is orphaned, 0 otherwise */
159 static bool fd_is_orphaned(grpc_fd* fd);
160
161 #ifndef NDEBUG
162 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
163 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
164 int line);
165 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
166 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
167 #else
168 static void fd_ref(grpc_fd* fd);
169 static void fd_unref(grpc_fd* fd);
170 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
171 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
172 #endif
173
174 #define CLOSURE_NOT_READY ((grpc_closure*)0)
175 #define CLOSURE_READY ((grpc_closure*)1)
176
177 /*******************************************************************************
178 * pollset declarations
179 */
180
181 typedef struct grpc_cached_wakeup_fd {
182 grpc_wakeup_fd fd;
183 struct grpc_cached_wakeup_fd* next;
184
185 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
186 grpc_fork_fd_list* fork_fd_list;
187 } grpc_cached_wakeup_fd;
188
189 struct grpc_pollset_worker {
190 grpc_cached_wakeup_fd* wakeup_fd;
191 int reevaluate_polling_on_wakeup;
192 int kicked_specifically;
193 struct grpc_pollset_worker* next;
194 struct grpc_pollset_worker* prev;
195 };
196
197 struct grpc_pollset {
198 gpr_mu mu;
199 grpc_pollset_worker root_worker;
200 int shutting_down;
201 int called_shutdown;
202 int kicked_without_pollers;
203 grpc_closure* shutdown_done;
204 int pollset_set_count;
205 /* all polled fds */
206 size_t fd_count;
207 size_t fd_capacity;
208 grpc_fd** fds;
209 /* Local cache of eventfds for workers */
210 grpc_cached_wakeup_fd* local_wakeup_cache;
211 };
212
213 /* Add an fd to a pollset */
214 static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
215
216 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
217
218 /* Convert a timespec to milliseconds:
219 - very small or negative poll times are clamped to zero to do a
220 non-blocking poll (which becomes spin polling)
221 - other small values are rounded up to one millisecond
222 - longer than a millisecond polls are rounded up to the next nearest
223 millisecond to avoid spinning
224 - infinite timeouts are converted to -1 */
225 static int poll_deadline_to_millis_timeout(grpc_millis deadline);
226
227 /* Allow kick to wakeup the currently polling worker */
228 #define GRPC_POLLSET_CAN_KICK_SELF 1
229 /* Force the wakee to repoll when awoken */
230 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
231 /* As per pollset_kick, with an extended set of flags (defined above)
232 -- mostly for fd_posix's use. */
233 static grpc_error* pollset_kick_ext(grpc_pollset* p,
234 grpc_pollset_worker* specific_worker,
235 uint32_t flags) GRPC_MUST_USE_RESULT;
236
237 /* Return 1 if the pollset has active threads in pollset_work (pollset must
238 * be locked) */
239 static bool pollset_has_workers(grpc_pollset* pollset);
240
241 /*******************************************************************************
242 * pollset_set definitions
243 */
244
245 struct grpc_pollset_set {
246 gpr_mu mu;
247
248 size_t pollset_count;
249 size_t pollset_capacity;
250 grpc_pollset** pollsets;
251
252 size_t pollset_set_count;
253 size_t pollset_set_capacity;
254 struct grpc_pollset_set** pollset_sets;
255
256 size_t fd_count;
257 size_t fd_capacity;
258 grpc_fd** fds;
259 };
260
261 /*******************************************************************************
262 * functions to track opened fds. No-ops unless track_fds_for_fork is true.
263 */
264
fork_fd_list_remove_node(grpc_fork_fd_list * node)265 static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
266 if (track_fds_for_fork) {
267 gpr_mu_lock(&fork_fd_list_mu);
268 if (fork_fd_list_head == node) {
269 fork_fd_list_head = node->next;
270 }
271 if (node->prev != nullptr) {
272 node->prev->next = node->next;
273 }
274 if (node->next != nullptr) {
275 node->next->prev = node->prev;
276 }
277 gpr_free(node);
278 gpr_mu_unlock(&fork_fd_list_mu);
279 }
280 }
281
fork_fd_list_add_node(grpc_fork_fd_list * node)282 static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
283 gpr_mu_lock(&fork_fd_list_mu);
284 node->next = fork_fd_list_head;
285 node->prev = nullptr;
286 if (fork_fd_list_head != nullptr) {
287 fork_fd_list_head->prev = node;
288 }
289 fork_fd_list_head = node;
290 gpr_mu_unlock(&fork_fd_list_mu);
291 }
292
fork_fd_list_add_grpc_fd(grpc_fd * fd)293 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
294 if (track_fds_for_fork) {
295 fd->fork_fd_list =
296 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
297 fd->fork_fd_list->fd = fd;
298 fd->fork_fd_list->cached_wakeup_fd = nullptr;
299 fork_fd_list_add_node(fd->fork_fd_list);
300 }
301 }
302
fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd * fd)303 static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
304 if (track_fds_for_fork) {
305 fd->fork_fd_list =
306 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
307 fd->fork_fd_list->cached_wakeup_fd = fd;
308 fd->fork_fd_list->fd = nullptr;
309 fork_fd_list_add_node(fd->fork_fd_list);
310 }
311 }
312
313 /*******************************************************************************
314 * fd_posix.c
315 */
316
317 #ifndef NDEBUG
318 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
319 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
ref_by(grpc_fd * fd,int n,const char * reason,const char * file,int line)320 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
321 int line) {
322 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
323 gpr_log(GPR_DEBUG,
324 "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
325 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
326 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
327 }
328 #else
329 #define REF_BY(fd, n, reason) \
330 do { \
331 ref_by(fd, n); \
332 (void)(reason); \
333 } while (0)
334 #define UNREF_BY(fd, n, reason) \
335 do { \
336 unref_by(fd, n); \
337 (void)(reason); \
338 } while (0)
339 static void ref_by(grpc_fd* fd, int n) {
340 #endif
341 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
342 }
343
344 #ifndef NDEBUG
345 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
346 int line) {
347 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
348 gpr_log(GPR_DEBUG,
349 "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
350 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
351 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
352 }
353 #else
354 static void unref_by(grpc_fd* fd, int n) {
355 #endif
356 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
357 if (old == n) {
358 gpr_mu_destroy(&fd->mu);
359 grpc_iomgr_unregister_object(&fd->iomgr_object);
360 fork_fd_list_remove_node(fd->fork_fd_list);
361 if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
362 gpr_free(fd);
363 } else {
364 GPR_ASSERT(old > n);
365 }
366 }
367
368 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
369 // Avoid unused-parameter warning for debug-only parameter
370 (void)track_err;
371 GPR_DEBUG_ASSERT(track_err == false);
372 grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
373 gpr_mu_init(&r->mu);
374 gpr_atm_rel_store(&r->refst, 1);
375 r->shutdown = 0;
376 r->read_closure = CLOSURE_NOT_READY;
377 r->write_closure = CLOSURE_NOT_READY;
378 r->fd = fd;
379 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
380 &r->inactive_watcher_root;
381 r->read_watcher = r->write_watcher = nullptr;
382 r->on_done_closure = nullptr;
383 r->closed = 0;
384 r->released = 0;
385 gpr_atm_no_barrier_store(&r->pollhup, 0);
386
387 std::string name2 = absl::StrCat(name, " fd=", fd);
388 grpc_iomgr_register_object(&r->iomgr_object, name2.c_str());
389 fork_fd_list_add_grpc_fd(r);
390 return r;
391 }
392
393 static bool fd_is_orphaned(grpc_fd* fd) {
394 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
395 }
396
397 static grpc_error* pollset_kick_locked(grpc_fd_watcher* watcher) {
398 gpr_mu_lock(&watcher->pollset->mu);
399 GPR_ASSERT(watcher->worker);
400 grpc_error* err = pollset_kick_ext(watcher->pollset, watcher->worker,
401 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
402 gpr_mu_unlock(&watcher->pollset->mu);
403 return err;
404 }
405
406 static void maybe_wake_one_watcher_locked(grpc_fd* fd) {
407 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
408 pollset_kick_locked(fd->inactive_watcher_root.next);
409 } else if (fd->read_watcher) {
410 pollset_kick_locked(fd->read_watcher);
411 } else if (fd->write_watcher) {
412 pollset_kick_locked(fd->write_watcher);
413 }
414 }
415
416 static void wake_all_watchers_locked(grpc_fd* fd) {
417 grpc_fd_watcher* watcher;
418 for (watcher = fd->inactive_watcher_root.next;
419 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
420 pollset_kick_locked(watcher);
421 }
422 if (fd->read_watcher) {
423 pollset_kick_locked(fd->read_watcher);
424 }
425 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
426 pollset_kick_locked(fd->write_watcher);
427 }
428 }
429
430 static int has_watchers(grpc_fd* fd) {
431 return fd->read_watcher != nullptr || fd->write_watcher != nullptr ||
432 fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
433 }
434
435 static void close_fd_locked(grpc_fd* fd) {
436 fd->closed = 1;
437 if (!fd->released) {
438 close(fd->fd);
439 }
440 grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE);
441 }
442
443 static int fd_wrapped_fd(grpc_fd* fd) {
444 if (fd->released || fd->closed) {
445 return -1;
446 } else {
447 return fd->fd;
448 }
449 }
450
451 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
452 const char* reason) {
453 fd->on_done_closure = on_done;
454 fd->released = release_fd != nullptr;
455 if (release_fd != nullptr) {
456 *release_fd = fd->fd;
457 fd->released = true;
458 }
459 gpr_mu_lock(&fd->mu);
460 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
461 if (!has_watchers(fd)) {
462 close_fd_locked(fd);
463 } else {
464 wake_all_watchers_locked(fd);
465 }
466 gpr_mu_unlock(&fd->mu);
467 UNREF_BY(fd, 2, reason); /* drop the reference */
468 }
469
470 /* increment refcount by two to avoid changing the orphan bit */
471 #ifndef NDEBUG
472 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
473 int line) {
474 ref_by(fd, 2, reason, file, line);
475 }
476
477 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
478 int line) {
479 unref_by(fd, 2, reason, file, line);
480 }
481 #else
482 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
483
484 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
485 #endif
486
487 static grpc_error* fd_shutdown_error(grpc_fd* fd) {
488 if (!fd->shutdown) {
489 return GRPC_ERROR_NONE;
490 } else {
491 return grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
492 "FD shutdown", &fd->shutdown_error, 1),
493 GRPC_ERROR_INT_GRPC_STATUS,
494 GRPC_STATUS_UNAVAILABLE);
495 }
496 }
497
498 static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
499 grpc_closure* closure) {
500 if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
501 grpc_core::ExecCtx::Run(
502 DEBUG_LOCATION, closure,
503 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
504 GRPC_ERROR_INT_GRPC_STATUS,
505 GRPC_STATUS_UNAVAILABLE));
506 } else if (*st == CLOSURE_NOT_READY) {
507 /* not ready ==> switch to a waiting state by setting the closure */
508 *st = closure;
509 } else if (*st == CLOSURE_READY) {
510 /* already ready ==> queue the closure to run immediately */
511 *st = CLOSURE_NOT_READY;
512 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd));
513 maybe_wake_one_watcher_locked(fd);
514 } else {
515 /* upcallptr was set to a different closure. This is an error! */
516 gpr_log(GPR_ERROR,
517 "User called a notify_on function with a previous callback still "
518 "pending");
519 abort();
520 }
521 }
522
523 /* returns 1 if state becomes not ready */
524 static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
525 if (*st == CLOSURE_READY) {
526 /* duplicate ready ==> ignore */
527 return 0;
528 } else if (*st == CLOSURE_NOT_READY) {
529 /* not ready, and not waiting ==> flag ready */
530 *st = CLOSURE_READY;
531 return 0;
532 } else {
533 /* waiting ==> queue closure */
534 grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd));
535 *st = CLOSURE_NOT_READY;
536 return 1;
537 }
538 }
539
540 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
541 gpr_mu_lock(&fd->mu);
542 /* only shutdown once */
543 if (!fd->shutdown) {
544 fd->shutdown = 1;
545 fd->shutdown_error = why;
546 /* signal read/write closed to OS so that future operations fail */
547 shutdown(fd->fd, SHUT_RDWR);
548 set_ready_locked(fd, &fd->read_closure);
549 set_ready_locked(fd, &fd->write_closure);
550 } else {
551 GRPC_ERROR_UNREF(why);
552 }
553 gpr_mu_unlock(&fd->mu);
554 }
555
556 static bool fd_is_shutdown(grpc_fd* fd) {
557 gpr_mu_lock(&fd->mu);
558 bool r = fd->shutdown;
559 gpr_mu_unlock(&fd->mu);
560 return r;
561 }
562
563 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
564 gpr_mu_lock(&fd->mu);
565 notify_on_locked(fd, &fd->read_closure, closure);
566 gpr_mu_unlock(&fd->mu);
567 }
568
569 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
570 gpr_mu_lock(&fd->mu);
571 notify_on_locked(fd, &fd->write_closure, closure);
572 gpr_mu_unlock(&fd->mu);
573 }
574
575 static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {
576 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
577 gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
578 }
579 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
580 }
581
582 static void fd_set_readable(grpc_fd* fd) {
583 gpr_mu_lock(&fd->mu);
584 set_ready_locked(fd, &fd->read_closure);
585 gpr_mu_unlock(&fd->mu);
586 }
587
588 static void fd_set_writable(grpc_fd* fd) {
589 gpr_mu_lock(&fd->mu);
590 set_ready_locked(fd, &fd->write_closure);
591 gpr_mu_unlock(&fd->mu);
592 }
593
594 static void fd_set_error(grpc_fd* /*fd*/) {
595 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
596 gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
597 }
598 }
599
600 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
601 grpc_pollset_worker* worker, uint32_t read_mask,
602 uint32_t write_mask, grpc_fd_watcher* watcher) {
603 uint32_t mask = 0;
604 grpc_closure* cur;
605 int requested;
606 /* keep track of pollers that have requested our events, in case they change
607 */
608 GRPC_FD_REF(fd, "poll");
609
610 gpr_mu_lock(&fd->mu);
611
612 /* if we are shutdown, then don't add to the watcher set */
613 if (fd->shutdown) {
614 watcher->fd = nullptr;
615 watcher->pollset = nullptr;
616 watcher->worker = nullptr;
617 gpr_mu_unlock(&fd->mu);
618 GRPC_FD_UNREF(fd, "poll");
619 return 0;
620 }
621
622 /* if there is nobody polling for read, but we need to, then start doing so */
623 cur = fd->read_closure;
624 requested = cur != CLOSURE_READY;
625 if (read_mask && fd->read_watcher == nullptr && requested) {
626 fd->read_watcher = watcher;
627 mask |= read_mask;
628 }
629 /* if there is nobody polling for write, but we need to, then start doing so
630 */
631 cur = fd->write_closure;
632 requested = cur != CLOSURE_READY;
633 if (write_mask && fd->write_watcher == nullptr && requested) {
634 fd->write_watcher = watcher;
635 mask |= write_mask;
636 }
637 /* if not polling, remember this watcher in case we need someone to later */
638 if (mask == 0 && worker != nullptr) {
639 watcher->next = &fd->inactive_watcher_root;
640 watcher->prev = watcher->next->prev;
641 watcher->next->prev = watcher->prev->next = watcher;
642 }
643 watcher->pollset = pollset;
644 watcher->worker = worker;
645 watcher->fd = fd;
646 gpr_mu_unlock(&fd->mu);
647
648 return mask;
649 }
650
651 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
652 int was_polling = 0;
653 int kick = 0;
654 grpc_fd* fd = watcher->fd;
655
656 if (fd == nullptr) {
657 return;
658 }
659
660 gpr_mu_lock(&fd->mu);
661
662 if (watcher == fd->read_watcher) {
663 /* remove read watcher, kick if we still need a read */
664 was_polling = 1;
665 if (!got_read) {
666 kick = 1;
667 }
668 fd->read_watcher = nullptr;
669 }
670 if (watcher == fd->write_watcher) {
671 /* remove write watcher, kick if we still need a write */
672 was_polling = 1;
673 if (!got_write) {
674 kick = 1;
675 }
676 fd->write_watcher = nullptr;
677 }
678 if (!was_polling && watcher->worker != nullptr) {
679 /* remove from inactive list */
680 watcher->next->prev = watcher->prev;
681 watcher->prev->next = watcher->next;
682 }
683 if (got_read) {
684 if (set_ready_locked(fd, &fd->read_closure)) {
685 kick = 1;
686 }
687 }
688 if (got_write) {
689 if (set_ready_locked(fd, &fd->write_closure)) {
690 kick = 1;
691 }
692 }
693 if (kick) {
694 maybe_wake_one_watcher_locked(fd);
695 }
696 if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
697 close_fd_locked(fd);
698 }
699 gpr_mu_unlock(&fd->mu);
700
701 GRPC_FD_UNREF(fd, "poll");
702 }
703
704 /*******************************************************************************
705 * pollset_posix.c
706 */
707
708 GPR_TLS_DECL(g_current_thread_poller);
709 GPR_TLS_DECL(g_current_thread_worker);
710
711 static void remove_worker(grpc_pollset* /*p*/, grpc_pollset_worker* worker) {
712 worker->prev->next = worker->next;
713 worker->next->prev = worker->prev;
714 }
715
716 static bool pollset_has_workers(grpc_pollset* p) {
717 return p->root_worker.next != &p->root_worker;
718 }
719
720 static bool pollset_in_pollset_sets(grpc_pollset* p) {
721 return p->pollset_set_count;
722 }
723
724 static bool pollset_has_observers(grpc_pollset* p) {
725 return pollset_has_workers(p) || pollset_in_pollset_sets(p);
726 }
727
728 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
729 if (pollset_has_workers(p)) {
730 grpc_pollset_worker* w = p->root_worker.next;
731 remove_worker(p, w);
732 return w;
733 } else {
734 return nullptr;
735 }
736 }
737
738 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
739 worker->next = &p->root_worker;
740 worker->prev = worker->next->prev;
741 worker->prev->next = worker->next->prev = worker;
742 }
743
744 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
745 worker->prev = &p->root_worker;
746 worker->next = worker->prev->next;
747 worker->prev->next = worker->next->prev = worker;
748 }
749
750 static void kick_append_error(grpc_error** composite, grpc_error* error) {
751 if (error == GRPC_ERROR_NONE) return;
752 if (*composite == GRPC_ERROR_NONE) {
753 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure");
754 }
755 *composite = grpc_error_add_child(*composite, error);
756 }
757
758 static grpc_error* pollset_kick_ext(grpc_pollset* p,
759 grpc_pollset_worker* specific_worker,
760 uint32_t flags) {
761 GPR_TIMER_SCOPE("pollset_kick_ext", 0);
762 grpc_error* error = GRPC_ERROR_NONE;
763 GRPC_STATS_INC_POLLSET_KICK();
764
765 /* pollset->mu already held */
766 if (specific_worker != nullptr) {
767 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
768 GPR_TIMER_SCOPE("pollset_kick_ext.broadcast", 0);
769 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
770 for (specific_worker = p->root_worker.next;
771 specific_worker != &p->root_worker;
772 specific_worker = specific_worker->next) {
773 kick_append_error(
774 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
775 }
776 p->kicked_without_pollers = true;
777 } else if (gpr_tls_get(&g_current_thread_worker) !=
778 (intptr_t)specific_worker) {
779 GPR_TIMER_MARK("different_thread_worker", 0);
780 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
781 specific_worker->reevaluate_polling_on_wakeup = true;
782 }
783 specific_worker->kicked_specifically = true;
784 kick_append_error(&error,
785 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
786 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
787 GPR_TIMER_MARK("kick_yoself", 0);
788 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
789 specific_worker->reevaluate_polling_on_wakeup = true;
790 }
791 specific_worker->kicked_specifically = true;
792 kick_append_error(&error,
793 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
794 }
795 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
796 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
797 GPR_TIMER_MARK("kick_anonymous", 0);
798 specific_worker = pop_front_worker(p);
799 if (specific_worker != nullptr) {
800 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
801 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
802 push_back_worker(p, specific_worker);
803 specific_worker = pop_front_worker(p);
804 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
805 gpr_tls_get(&g_current_thread_worker) ==
806 (intptr_t)specific_worker) {
807 push_back_worker(p, specific_worker);
808 specific_worker = nullptr;
809 }
810 }
811 if (specific_worker != nullptr) {
812 GPR_TIMER_MARK("finally_kick", 0);
813 push_back_worker(p, specific_worker);
814 kick_append_error(
815 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
816 }
817 } else {
818 GPR_TIMER_MARK("kicked_no_pollers", 0);
819 p->kicked_without_pollers = true;
820 }
821 }
822
823 GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
824 return error;
825 }
826
827 static grpc_error* pollset_kick(grpc_pollset* p,
828 grpc_pollset_worker* specific_worker) {
829 return pollset_kick_ext(p, specific_worker, 0);
830 }
831
832 /* global state management */
833
834 static grpc_error* pollset_global_init(void) {
835 gpr_tls_init(&g_current_thread_poller);
836 gpr_tls_init(&g_current_thread_worker);
837 return GRPC_ERROR_NONE;
838 }
839
840 static void pollset_global_shutdown(void) {
841 gpr_tls_destroy(&g_current_thread_poller);
842 gpr_tls_destroy(&g_current_thread_worker);
843 }
844
845 /* main interface */
846
847 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
848 gpr_mu_init(&pollset->mu);
849 *mu = &pollset->mu;
850 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
851 pollset->shutting_down = 0;
852 pollset->called_shutdown = 0;
853 pollset->kicked_without_pollers = 0;
854 pollset->local_wakeup_cache = nullptr;
855 pollset->kicked_without_pollers = 0;
856 pollset->fd_count = 0;
857 pollset->fd_capacity = 0;
858 pollset->fds = nullptr;
859 pollset->pollset_set_count = 0;
860 }
861
862 static void pollset_destroy(grpc_pollset* pollset) {
863 GPR_ASSERT(!pollset_has_workers(pollset));
864 while (pollset->local_wakeup_cache) {
865 grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
866 fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
867 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
868 gpr_free(pollset->local_wakeup_cache);
869 pollset->local_wakeup_cache = next;
870 }
871 gpr_free(pollset->fds);
872 gpr_mu_destroy(&pollset->mu);
873 }
874
875 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
876 gpr_mu_lock(&pollset->mu);
877 size_t i;
878 /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
879 for (i = 0; i < pollset->fd_count; i++) {
880 if (pollset->fds[i] == fd) goto exit;
881 }
882 if (pollset->fd_count == pollset->fd_capacity) {
883 pollset->fd_capacity =
884 GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
885 pollset->fds = static_cast<grpc_fd**>(
886 gpr_realloc(pollset->fds, sizeof(grpc_fd*) * pollset->fd_capacity));
887 }
888 pollset->fds[pollset->fd_count++] = fd;
889 GRPC_FD_REF(fd, "multipoller");
890 pollset_kick(pollset, nullptr);
891 exit:
892 gpr_mu_unlock(&pollset->mu);
893 }
894
895 static void finish_shutdown(grpc_pollset* pollset) {
896 size_t i;
897 for (i = 0; i < pollset->fd_count; i++) {
898 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
899 }
900 pollset->fd_count = 0;
901 grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done,
902 GRPC_ERROR_NONE);
903 }
904
905 static void work_combine_error(grpc_error** composite, grpc_error* error) {
906 if (error == GRPC_ERROR_NONE) return;
907 if (*composite == GRPC_ERROR_NONE) {
908 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work");
909 }
910 *composite = grpc_error_add_child(*composite, error);
911 }
912
913 static grpc_error* pollset_work(grpc_pollset* pollset,
914 grpc_pollset_worker** worker_hdl,
915 grpc_millis deadline) {
916 GPR_TIMER_SCOPE("pollset_work", 0);
917 grpc_pollset_worker worker;
918 if (worker_hdl) *worker_hdl = &worker;
919 grpc_error* error = GRPC_ERROR_NONE;
920
921 /* Avoid malloc for small number of elements. */
922 enum { inline_elements = 96 };
923 struct pollfd pollfd_space[inline_elements];
924 struct grpc_fd_watcher watcher_space[inline_elements];
925
926 /* pollset->mu already held */
927 int added_worker = 0;
928 int locked = 1;
929 int queued_work = 0;
930 int keep_polling = 0;
931 /* this must happen before we (potentially) drop pollset->mu */
932 worker.next = worker.prev = nullptr;
933 worker.reevaluate_polling_on_wakeup = 0;
934 if (pollset->local_wakeup_cache != nullptr) {
935 worker.wakeup_fd = pollset->local_wakeup_cache;
936 pollset->local_wakeup_cache = worker.wakeup_fd->next;
937 } else {
938 worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
939 gpr_malloc(sizeof(*worker.wakeup_fd)));
940 error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
941 fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
942 if (error != GRPC_ERROR_NONE) {
943 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
944 return error;
945 }
946 }
947 worker.kicked_specifically = 0;
948 /* If we're shutting down then we don't execute any extended work */
949 if (pollset->shutting_down) {
950 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
951 goto done;
952 }
953 /* Start polling, and keep doing so while we're being asked to
954 re-evaluate our pollers (this allows poll() based pollers to
955 ensure they don't miss wakeups) */
956 keep_polling = 1;
957 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
958 while (keep_polling) {
959 keep_polling = 0;
960 if (!pollset->kicked_without_pollers ||
961 deadline <= grpc_core::ExecCtx::Get()->Now()) {
962 if (!added_worker) {
963 push_front_worker(pollset, &worker);
964 added_worker = 1;
965 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
966 }
967 GPR_TIMER_SCOPE("maybe_work_and_unlock", 0);
968 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
969 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
970
971 int timeout;
972 int r;
973 size_t i, fd_count;
974 nfds_t pfd_count;
975 grpc_fd_watcher* watchers;
976 struct pollfd* pfds;
977
978 timeout = poll_deadline_to_millis_timeout(deadline);
979
980 if (pollset->fd_count + 2 <= inline_elements) {
981 pfds = pollfd_space;
982 watchers = watcher_space;
983 } else {
984 /* Allocate one buffer to hold both pfds and watchers arrays */
985 const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
986 const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
987 void* buf = gpr_malloc(pfd_size + watch_size);
988 pfds = static_cast<struct pollfd*>(buf);
989 watchers = static_cast<grpc_fd_watcher*>(
990 (void*)(static_cast<char*>(buf) + pfd_size));
991 }
992
993 fd_count = 0;
994 pfd_count = 1;
995 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
996 pfds[0].events = POLLIN;
997 pfds[0].revents = 0;
998 for (i = 0; i < pollset->fd_count; i++) {
999 if (fd_is_orphaned(pollset->fds[i]) ||
1000 gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
1001 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
1002 } else {
1003 pollset->fds[fd_count++] = pollset->fds[i];
1004 watchers[pfd_count].fd = pollset->fds[i];
1005 GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
1006 pfds[pfd_count].fd = pollset->fds[i]->fd;
1007 pfds[pfd_count].revents = 0;
1008 pfd_count++;
1009 }
1010 }
1011 pollset->fd_count = fd_count;
1012 gpr_mu_unlock(&pollset->mu);
1013
1014 for (i = 1; i < pfd_count; i++) {
1015 grpc_fd* fd = watchers[i].fd;
1016 pfds[i].events = static_cast<short>(
1017 fd_begin_poll(fd, pollset, &worker, POLLIN, POLLOUT, &watchers[i]));
1018 GRPC_FD_UNREF(fd, "multipoller_start");
1019 }
1020
1021 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1022 even going into the blocking annotation if possible */
1023 GRPC_SCHEDULING_START_BLOCKING_REGION;
1024 GRPC_STATS_INC_SYSCALL_POLL();
1025 r = grpc_poll_function(pfds, pfd_count, timeout);
1026 GRPC_SCHEDULING_END_BLOCKING_REGION;
1027
1028 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1029 gpr_log(GPR_INFO, "%p poll=%d", pollset, r);
1030 }
1031
1032 if (r < 0) {
1033 if (errno != EINTR) {
1034 work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
1035 }
1036
1037 for (i = 1; i < pfd_count; i++) {
1038 if (watchers[i].fd == nullptr) {
1039 fd_end_poll(&watchers[i], 0, 0);
1040 } else {
1041 // Wake up all the file descriptors, if we have an invalid one
1042 // we can identify it on the next pollset_work()
1043 fd_end_poll(&watchers[i], 1, 1);
1044 }
1045 }
1046 } else if (r == 0) {
1047 for (i = 1; i < pfd_count; i++) {
1048 fd_end_poll(&watchers[i], 0, 0);
1049 }
1050 } else {
1051 if (pfds[0].revents & POLLIN_CHECK) {
1052 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1053 gpr_log(GPR_INFO, "%p: got_wakeup", pollset);
1054 }
1055 work_combine_error(
1056 &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
1057 }
1058 for (i = 1; i < pfd_count; i++) {
1059 if (watchers[i].fd == nullptr) {
1060 fd_end_poll(&watchers[i], 0, 0);
1061 } else {
1062 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1063 gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
1064 pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
1065 (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
1066 }
1067 /* This is a mitigation to prevent poll() from spinning on a
1068 ** POLLHUP https://github.com/grpc/grpc/pull/13665
1069 */
1070 if (pfds[i].revents & POLLHUP) {
1071 gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
1072 }
1073 fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
1074 pfds[i].revents & POLLOUT_CHECK);
1075 }
1076 }
1077 }
1078
1079 if (pfds != pollfd_space) {
1080 /* pfds and watchers are in the same memory block pointed to by pfds */
1081 gpr_free(pfds);
1082 }
1083
1084 locked = 0;
1085 } else {
1086 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1087 pollset->kicked_without_pollers = 0;
1088 }
1089 /* Finished execution - start cleaning up.
1090 Note that we may arrive here from outside the enclosing while() loop.
1091 In that case we won't loop though as we haven't added worker to the
1092 worker list, which means nobody could ask us to re-evaluate polling). */
1093 done:
1094 if (!locked) {
1095 queued_work |= grpc_core::ExecCtx::Get()->Flush();
1096 gpr_mu_lock(&pollset->mu);
1097 locked = 1;
1098 }
1099 /* If we're forced to re-evaluate polling (via pollset_kick with
1100 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1101 a loop */
1102 if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
1103 worker.reevaluate_polling_on_wakeup = 0;
1104 pollset->kicked_without_pollers = 0;
1105 if (queued_work || worker.kicked_specifically) {
1106 /* If there's queued work on the list, then set the deadline to be
1107 immediate so we get back out of the polling loop quickly */
1108 deadline = 0;
1109 }
1110 keep_polling = 1;
1111 }
1112 }
1113 gpr_tls_set(&g_current_thread_poller, 0);
1114 if (added_worker) {
1115 remove_worker(pollset, &worker);
1116 gpr_tls_set(&g_current_thread_worker, 0);
1117 }
1118 /* release wakeup fd to the local pool */
1119 worker.wakeup_fd->next = pollset->local_wakeup_cache;
1120 pollset->local_wakeup_cache = worker.wakeup_fd;
1121 /* check shutdown conditions */
1122 if (pollset->shutting_down) {
1123 if (pollset_has_workers(pollset)) {
1124 pollset_kick(pollset, nullptr);
1125 } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1126 pollset->called_shutdown = 1;
1127 gpr_mu_unlock(&pollset->mu);
1128 finish_shutdown(pollset);
1129 grpc_core::ExecCtx::Get()->Flush();
1130 /* Continuing to access pollset here is safe -- it is the caller's
1131 * responsibility to not destroy when it has outstanding calls to
1132 * pollset_work.
1133 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
1134 gpr_mu_lock(&pollset->mu);
1135 }
1136 }
1137 if (worker_hdl) *worker_hdl = nullptr;
1138 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1139 return error;
1140 }
1141
1142 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1143 GPR_ASSERT(!pollset->shutting_down);
1144 pollset->shutting_down = 1;
1145 pollset->shutdown_done = closure;
1146 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1147 if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1148 pollset->called_shutdown = 1;
1149 finish_shutdown(pollset);
1150 }
1151 }
1152
1153 static int poll_deadline_to_millis_timeout(grpc_millis deadline) {
1154 if (deadline == GRPC_MILLIS_INF_FUTURE) return -1;
1155 if (deadline == 0) return 0;
1156 grpc_millis n = deadline - grpc_core::ExecCtx::Get()->Now();
1157 if (n < 0) return 0;
1158 if (n > INT_MAX) return -1;
1159 return static_cast<int>(n);
1160 }
1161
1162 /*******************************************************************************
1163 * pollset_set_posix.c
1164 */
1165
1166 static grpc_pollset_set* pollset_set_create(void) {
1167 grpc_pollset_set* pollset_set =
1168 static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pollset_set)));
1169 gpr_mu_init(&pollset_set->mu);
1170 return pollset_set;
1171 }
1172
1173 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
1174 size_t i;
1175 gpr_mu_destroy(&pollset_set->mu);
1176 for (i = 0; i < pollset_set->fd_count; i++) {
1177 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1178 }
1179 for (i = 0; i < pollset_set->pollset_count; i++) {
1180 grpc_pollset* pollset = pollset_set->pollsets[i];
1181 gpr_mu_lock(&pollset->mu);
1182 pollset->pollset_set_count--;
1183 /* check shutdown */
1184 if (pollset->shutting_down && !pollset->called_shutdown &&
1185 !pollset_has_observers(pollset)) {
1186 pollset->called_shutdown = 1;
1187 gpr_mu_unlock(&pollset->mu);
1188 finish_shutdown(pollset);
1189 } else {
1190 gpr_mu_unlock(&pollset->mu);
1191 }
1192 }
1193 gpr_free(pollset_set->pollsets);
1194 gpr_free(pollset_set->pollset_sets);
1195 gpr_free(pollset_set->fds);
1196 gpr_free(pollset_set);
1197 }
1198
1199 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
1200 grpc_pollset* pollset) {
1201 size_t i, j;
1202 gpr_mu_lock(&pollset->mu);
1203 pollset->pollset_set_count++;
1204 gpr_mu_unlock(&pollset->mu);
1205 gpr_mu_lock(&pollset_set->mu);
1206 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1207 pollset_set->pollset_capacity =
1208 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1209 pollset_set->pollsets = static_cast<grpc_pollset**>(gpr_realloc(
1210 pollset_set->pollsets,
1211 pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)));
1212 }
1213 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1214 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1215 if (fd_is_orphaned(pollset_set->fds[i])) {
1216 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1217 } else {
1218 pollset_add_fd(pollset, pollset_set->fds[i]);
1219 pollset_set->fds[j++] = pollset_set->fds[i];
1220 }
1221 }
1222 pollset_set->fd_count = j;
1223 gpr_mu_unlock(&pollset_set->mu);
1224 }
1225
1226 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
1227 grpc_pollset* pollset) {
1228 size_t i;
1229 gpr_mu_lock(&pollset_set->mu);
1230 for (i = 0; i < pollset_set->pollset_count; i++) {
1231 if (pollset_set->pollsets[i] == pollset) {
1232 pollset_set->pollset_count--;
1233 GPR_SWAP(grpc_pollset*, pollset_set->pollsets[i],
1234 pollset_set->pollsets[pollset_set->pollset_count]);
1235 break;
1236 }
1237 }
1238 gpr_mu_unlock(&pollset_set->mu);
1239 gpr_mu_lock(&pollset->mu);
1240 pollset->pollset_set_count--;
1241 /* check shutdown */
1242 if (pollset->shutting_down && !pollset->called_shutdown &&
1243 !pollset_has_observers(pollset)) {
1244 pollset->called_shutdown = 1;
1245 gpr_mu_unlock(&pollset->mu);
1246 finish_shutdown(pollset);
1247 } else {
1248 gpr_mu_unlock(&pollset->mu);
1249 }
1250 }
1251
1252 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1253 grpc_pollset_set* item) {
1254 size_t i, j;
1255 gpr_mu_lock(&bag->mu);
1256 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1257 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1258 bag->pollset_sets = static_cast<grpc_pollset_set**>(
1259 gpr_realloc(bag->pollset_sets,
1260 bag->pollset_set_capacity * sizeof(*bag->pollset_sets)));
1261 }
1262 bag->pollset_sets[bag->pollset_set_count++] = item;
1263 for (i = 0, j = 0; i < bag->fd_count; i++) {
1264 if (fd_is_orphaned(bag->fds[i])) {
1265 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1266 } else {
1267 pollset_set_add_fd(item, bag->fds[i]);
1268 bag->fds[j++] = bag->fds[i];
1269 }
1270 }
1271 bag->fd_count = j;
1272 gpr_mu_unlock(&bag->mu);
1273 }
1274
1275 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1276 grpc_pollset_set* item) {
1277 size_t i;
1278 gpr_mu_lock(&bag->mu);
1279 for (i = 0; i < bag->pollset_set_count; i++) {
1280 if (bag->pollset_sets[i] == item) {
1281 bag->pollset_set_count--;
1282 GPR_SWAP(grpc_pollset_set*, bag->pollset_sets[i],
1283 bag->pollset_sets[bag->pollset_set_count]);
1284 break;
1285 }
1286 }
1287 gpr_mu_unlock(&bag->mu);
1288 }
1289
1290 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1291 size_t i;
1292 gpr_mu_lock(&pollset_set->mu);
1293 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1294 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1295 pollset_set->fds = static_cast<grpc_fd**>(
1296 gpr_realloc(pollset_set->fds,
1297 pollset_set->fd_capacity * sizeof(*pollset_set->fds)));
1298 }
1299 GRPC_FD_REF(fd, "pollset_set");
1300 pollset_set->fds[pollset_set->fd_count++] = fd;
1301 for (i = 0; i < pollset_set->pollset_count; i++) {
1302 pollset_add_fd(pollset_set->pollsets[i], fd);
1303 }
1304 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1305 pollset_set_add_fd(pollset_set->pollset_sets[i], fd);
1306 }
1307 gpr_mu_unlock(&pollset_set->mu);
1308 }
1309
1310 static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1311 size_t i;
1312 gpr_mu_lock(&pollset_set->mu);
1313 for (i = 0; i < pollset_set->fd_count; i++) {
1314 if (pollset_set->fds[i] == fd) {
1315 pollset_set->fd_count--;
1316 GPR_SWAP(grpc_fd*, pollset_set->fds[i],
1317 pollset_set->fds[pollset_set->fd_count]);
1318 GRPC_FD_UNREF(fd, "pollset_set");
1319 break;
1320 }
1321 }
1322 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1323 pollset_set_del_fd(pollset_set->pollset_sets[i], fd);
1324 }
1325 gpr_mu_unlock(&pollset_set->mu);
1326 }
1327
1328 /*******************************************************************************
1329 * event engine binding
1330 */
1331
1332 static bool is_any_background_poller_thread(void) { return false; }
1333
1334 static void shutdown_background_closure(void) {}
1335
1336 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1337 grpc_error* /*error*/) {
1338 return false;
1339 }
1340
1341 static void shutdown_engine(void) {
1342 pollset_global_shutdown();
1343 if (track_fds_for_fork) {
1344 gpr_mu_destroy(&fork_fd_list_mu);
1345 grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1346 }
1347 }
1348
1349 static const grpc_event_engine_vtable vtable = {
1350 sizeof(grpc_pollset),
1351 false,
1352 false,
1353
1354 fd_create,
1355 fd_wrapped_fd,
1356 fd_orphan,
1357 fd_shutdown,
1358 fd_notify_on_read,
1359 fd_notify_on_write,
1360 fd_notify_on_error,
1361 fd_set_readable,
1362 fd_set_writable,
1363 fd_set_error,
1364 fd_is_shutdown,
1365
1366 pollset_init,
1367 pollset_shutdown,
1368 pollset_destroy,
1369 pollset_work,
1370 pollset_kick,
1371 pollset_add_fd,
1372
1373 pollset_set_create,
1374 pollset_set_destroy,
1375 pollset_set_add_pollset,
1376 pollset_set_del_pollset,
1377 pollset_set_add_pollset_set,
1378 pollset_set_del_pollset_set,
1379 pollset_set_add_fd,
1380 pollset_set_del_fd,
1381
1382 is_any_background_poller_thread,
1383 shutdown_background_closure,
1384 shutdown_engine,
1385 add_closure_to_background_poller,
1386 };
1387
1388 /* Called by the child process's post-fork handler to close open fds, including
1389 * worker wakeup fds. This allows gRPC to shutdown in the child process without
1390 * interfering with connections or RPCs ongoing in the parent. */
1391 static void reset_event_manager_on_fork() {
1392 gpr_mu_lock(&fork_fd_list_mu);
1393 while (fork_fd_list_head != nullptr) {
1394 if (fork_fd_list_head->fd != nullptr) {
1395 if (!fork_fd_list_head->fd->closed) {
1396 close(fork_fd_list_head->fd->fd);
1397 }
1398 fork_fd_list_head->fd->fd = -1;
1399 } else {
1400 close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
1401 fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
1402 close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
1403 fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
1404 }
1405 fork_fd_list_head = fork_fd_list_head->next;
1406 }
1407 gpr_mu_unlock(&fork_fd_list_mu);
1408 }
1409
1410 const grpc_event_engine_vtable* grpc_init_poll_posix(
1411 bool /*explicit_request*/) {
1412 if (!grpc_has_wakeup_fd()) {
1413 gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
1414 return nullptr;
1415 }
1416 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1417 return nullptr;
1418 }
1419 if (grpc_core::Fork::Enabled()) {
1420 track_fds_for_fork = true;
1421 gpr_mu_init(&fork_fd_list_mu);
1422 grpc_core::Fork::SetResetChildPollingEngineFunc(
1423 reset_event_manager_on_fork);
1424 }
1425 return &vtable;
1426 }
1427
1428 #endif /* GRPC_POSIX_SOCKET_EV_POLL */
1429