1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #include <grpc/support/log.h>
24
25 /* This polling engine is only relevant on linux kernels supporting epoll
26 epoll_create() or epoll_create1() */
27 #ifdef GRPC_LINUX_EPOLL
28 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
29
30 #include <assert.h>
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <limits.h>
34 #include <poll.h>
35 #include <pthread.h>
36 #include <string.h>
37 #include <sys/epoll.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40
41 #include <grpc/support/alloc.h>
42 #include <grpc/support/cpu.h>
43 #include <grpc/support/string_util.h>
44
45 #include "src/core/lib/debug/stats.h"
46 #include "src/core/lib/gpr/string.h"
47 #include "src/core/lib/gpr/tls.h"
48 #include "src/core/lib/gpr/useful.h"
49 #include "src/core/lib/gprpp/manual_constructor.h"
50 #include "src/core/lib/iomgr/block_annotate.h"
51 #include "src/core/lib/iomgr/ev_posix.h"
52 #include "src/core/lib/iomgr/iomgr_internal.h"
53 #include "src/core/lib/iomgr/lockfree_event.h"
54 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
55 #include "src/core/lib/profiling/timers.h"
56
57 static grpc_wakeup_fd global_wakeup_fd;
58
59 /*******************************************************************************
60 * Singleton epoll set related fields
61 */
62
63 #define MAX_EPOLL_EVENTS 100
64 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
65
66 /* NOTE ON SYNCHRONIZATION:
67 * - Fields in this struct are only modified by the designated poller. Hence
68 * there is no need for any locks to protect the struct.
69 * - num_events and cursor fields have to be of atomic type to provide memory
70 * visibility guarantees only. i.e In case of multiple pollers, the designated
71 * polling thread keeps changing; the thread that wrote these values may be
72 * different from the thread reading the values
73 */
74 typedef struct epoll_set {
75 int epfd;
76
77 /* The epoll_events after the last call to epoll_wait() */
78 struct epoll_event events[MAX_EPOLL_EVENTS];
79
80 /* The number of epoll_events after the last call to epoll_wait() */
81 gpr_atm num_events;
82
83 /* Index of the first event in epoll_events that has to be processed. This
84 * field is only valid if num_events > 0 */
85 gpr_atm cursor;
86 } epoll_set;
87
88 /* The global singleton epoll set */
89 static epoll_set g_epoll_set;
90
epoll_create_and_cloexec()91 static int epoll_create_and_cloexec() {
92 #ifdef GRPC_LINUX_EPOLL_CREATE1
93 int fd = epoll_create1(EPOLL_CLOEXEC);
94 if (fd < 0) {
95 gpr_log(GPR_ERROR, "epoll_create1 unavailable");
96 }
97 #else
98 int fd = epoll_create(MAX_EPOLL_EVENTS);
99 if (fd < 0) {
100 gpr_log(GPR_ERROR, "epoll_create unavailable");
101 } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
102 gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
103 return -1;
104 }
105 #endif
106 return fd;
107 }
108
109 /* Must be called *only* once */
epoll_set_init()110 static bool epoll_set_init() {
111 g_epoll_set.epfd = epoll_create_and_cloexec();
112 if (g_epoll_set.epfd < 0) {
113 return false;
114 }
115
116 gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
117 gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
118 gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
119 return true;
120 }
121
122 /* epoll_set_init() MUST be called before calling this. */
epoll_set_shutdown()123 static void epoll_set_shutdown() {
124 if (g_epoll_set.epfd >= 0) {
125 close(g_epoll_set.epfd);
126 g_epoll_set.epfd = -1;
127 }
128 }
129
130 /*******************************************************************************
131 * Fd Declarations
132 */
133
134 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
135 struct grpc_fork_fd_list {
136 grpc_fd* fd;
137 grpc_fd* next;
138 grpc_fd* prev;
139 };
140
141 struct grpc_fd {
142 int fd;
143
144 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
145 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
146 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
147
148 struct grpc_fd* freelist_next;
149
150 grpc_iomgr_object iomgr_object;
151
152 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
153 grpc_fork_fd_list* fork_fd_list;
154 };
155
156 static void fd_global_init(void);
157 static void fd_global_shutdown(void);
158
159 /*******************************************************************************
160 * Pollset Declarations
161 */
162
163 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
164
kick_state_string(kick_state st)165 static const char* kick_state_string(kick_state st) {
166 switch (st) {
167 case UNKICKED:
168 return "UNKICKED";
169 case KICKED:
170 return "KICKED";
171 case DESIGNATED_POLLER:
172 return "DESIGNATED_POLLER";
173 }
174 GPR_UNREACHABLE_CODE(return "UNKNOWN");
175 }
176
177 struct grpc_pollset_worker {
178 kick_state state;
179 int kick_state_mutator; // which line of code last changed kick state
180 bool initialized_cv;
181 grpc_pollset_worker* next;
182 grpc_pollset_worker* prev;
183 gpr_cv cv;
184 grpc_closure_list schedule_on_end_work;
185 };
186
187 #define SET_KICK_STATE(worker, kick_state) \
188 do { \
189 (worker)->state = (kick_state); \
190 (worker)->kick_state_mutator = __LINE__; \
191 } while (false)
192
193 #define MAX_NEIGHBORHOODS 1024
194
195 typedef struct pollset_neighborhood {
196 union {
197 char pad[GPR_CACHELINE_SIZE];
198 struct {
199 gpr_mu mu;
200 grpc_pollset* active_root;
201 };
202 };
203 } pollset_neighborhood;
204
205 struct grpc_pollset {
206 gpr_mu mu;
207 pollset_neighborhood* neighborhood;
208 bool reassigning_neighborhood;
209 grpc_pollset_worker* root_worker;
210 bool kicked_without_poller;
211
212 /* Set to true if the pollset is observed to have no workers available to
213 poll */
214 bool seen_inactive;
215 bool shutting_down; /* Is the pollset shutting down ? */
216 grpc_closure* shutdown_closure; /* Called after shutdown is complete */
217
218 /* Number of workers who are *about-to* attach themselves to the pollset
219 * worker list */
220 int begin_refs;
221
222 grpc_pollset* next;
223 grpc_pollset* prev;
224 };
225
226 /*******************************************************************************
227 * Pollset-set Declarations
228 */
229
230 struct grpc_pollset_set {
231 char unused;
232 };
233
234 /*******************************************************************************
235 * Common helpers
236 */
237
append_error(grpc_error ** composite,grpc_error * error,const char * desc)238 static bool append_error(grpc_error** composite, grpc_error* error,
239 const char* desc) {
240 if (error == GRPC_ERROR_NONE) return true;
241 if (*composite == GRPC_ERROR_NONE) {
242 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
243 }
244 *composite = grpc_error_add_child(*composite, error);
245 return false;
246 }
247
248 /*******************************************************************************
249 * Fd Definitions
250 */
251
252 /* We need to keep a freelist not because of any concerns of malloc performance
253 * but instead so that implementations with multiple threads in (for example)
254 * epoll_wait deal with the race between pollset removal and incoming poll
255 * notifications.
256 *
257 * The problem is that the poller ultimately holds a reference to this
258 * object, so it is very difficult to know when is safe to free it, at least
259 * without some expensive synchronization.
260 *
261 * If we keep the object freelisted, in the worst case losing this race just
262 * becomes a spurious read notification on a reused fd.
263 */
264
265 /* The alarm system needs to be able to wakeup 'some poller' sometimes
266 * (specifically when a new alarm needs to be triggered earlier than the next
267 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
268 * case occurs. */
269
270 static grpc_fd* fd_freelist = nullptr;
271 static gpr_mu fd_freelist_mu;
272
273 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
274 static grpc_fd* fork_fd_list_head = nullptr;
275 static gpr_mu fork_fd_list_mu;
276
fd_global_init(void)277 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
278
fd_global_shutdown(void)279 static void fd_global_shutdown(void) {
280 // TODO(guantaol): We don't have a reasonable explanation about this
281 // lock()/unlock() pattern. It can be a valid barrier if there is at most one
282 // pending lock() at this point. Otherwise, there is still a possibility of
283 // use-after-free race. Need to reason about the code and/or clean it up.
284 gpr_mu_lock(&fd_freelist_mu);
285 gpr_mu_unlock(&fd_freelist_mu);
286 while (fd_freelist != nullptr) {
287 grpc_fd* fd = fd_freelist;
288 fd_freelist = fd_freelist->freelist_next;
289 gpr_free(fd);
290 }
291 gpr_mu_destroy(&fd_freelist_mu);
292 }
293
fork_fd_list_add_grpc_fd(grpc_fd * fd)294 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
295 if (grpc_core::Fork::Enabled()) {
296 gpr_mu_lock(&fork_fd_list_mu);
297 fd->fork_fd_list =
298 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
299 fd->fork_fd_list->next = fork_fd_list_head;
300 fd->fork_fd_list->prev = nullptr;
301 if (fork_fd_list_head != nullptr) {
302 fork_fd_list_head->fork_fd_list->prev = fd;
303 }
304 fork_fd_list_head = fd;
305 gpr_mu_unlock(&fork_fd_list_mu);
306 }
307 }
308
fork_fd_list_remove_grpc_fd(grpc_fd * fd)309 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
310 if (grpc_core::Fork::Enabled()) {
311 gpr_mu_lock(&fork_fd_list_mu);
312 if (fork_fd_list_head == fd) {
313 fork_fd_list_head = fd->fork_fd_list->next;
314 }
315 if (fd->fork_fd_list->prev != nullptr) {
316 fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
317 }
318 if (fd->fork_fd_list->next != nullptr) {
319 fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
320 }
321 gpr_free(fd->fork_fd_list);
322 gpr_mu_unlock(&fork_fd_list_mu);
323 }
324 }
325
fd_create(int fd,const char * name,bool track_err)326 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
327 grpc_fd* new_fd = nullptr;
328
329 gpr_mu_lock(&fd_freelist_mu);
330 if (fd_freelist != nullptr) {
331 new_fd = fd_freelist;
332 fd_freelist = fd_freelist->freelist_next;
333 }
334 gpr_mu_unlock(&fd_freelist_mu);
335
336 if (new_fd == nullptr) {
337 new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
338 new_fd->read_closure.Init();
339 new_fd->write_closure.Init();
340 new_fd->error_closure.Init();
341 }
342 new_fd->fd = fd;
343 new_fd->read_closure->InitEvent();
344 new_fd->write_closure->InitEvent();
345 new_fd->error_closure->InitEvent();
346
347 new_fd->freelist_next = nullptr;
348
349 char* fd_name;
350 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
351 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
352 fork_fd_list_add_grpc_fd(new_fd);
353 #ifndef NDEBUG
354 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
355 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
356 }
357 #endif
358 gpr_free(fd_name);
359
360 struct epoll_event ev;
361 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
362 /* Use the least significant bit of ev.data.ptr to store track_err. We expect
363 * the addresses to be word aligned. We need to store track_err to avoid
364 * synchronization issues when accessing it after receiving an event.
365 * Accessing fd would be a data race there because the fd might have been
366 * returned to the free list at that point. */
367 ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
368 (track_err ? 1 : 0));
369 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
370 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
371 }
372
373 return new_fd;
374 }
375
fd_wrapped_fd(grpc_fd * fd)376 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
377
378 /* if 'releasing_fd' is true, it means that we are going to detach the internal
379 * fd from grpc_fd structure (i.e which means we should not be calling
380 * shutdown() syscall on that fd) */
fd_shutdown_internal(grpc_fd * fd,grpc_error * why,bool releasing_fd)381 static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
382 bool releasing_fd) {
383 if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
384 if (!releasing_fd) {
385 shutdown(fd->fd, SHUT_RDWR);
386 } else {
387 /* we need a dummy event for earlier linux versions. */
388 epoll_event dummy_event;
389 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_DEL, fd->fd, &dummy_event) !=
390 0) {
391 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
392 }
393 }
394 fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
395 fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
396 }
397 GRPC_ERROR_UNREF(why);
398 }
399
400 /* Might be called multiple times */
fd_shutdown(grpc_fd * fd,grpc_error * why)401 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
402 fd_shutdown_internal(fd, why, false);
403 }
404
fd_orphan(grpc_fd * fd,grpc_closure * on_done,int * release_fd,const char * reason)405 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
406 const char* reason) {
407 grpc_error* error = GRPC_ERROR_NONE;
408 bool is_release_fd = (release_fd != nullptr);
409
410 if (!fd->read_closure->IsShutdown()) {
411 fd_shutdown_internal(fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
412 is_release_fd);
413 }
414
415 /* If release_fd is not NULL, we should be relinquishing control of the file
416 descriptor fd->fd (but we still own the grpc_fd structure). */
417 if (is_release_fd) {
418 *release_fd = fd->fd;
419 } else {
420 close(fd->fd);
421 }
422
423 GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error));
424
425 grpc_iomgr_unregister_object(&fd->iomgr_object);
426 fork_fd_list_remove_grpc_fd(fd);
427 fd->read_closure->DestroyEvent();
428 fd->write_closure->DestroyEvent();
429 fd->error_closure->DestroyEvent();
430
431 gpr_mu_lock(&fd_freelist_mu);
432 fd->freelist_next = fd_freelist;
433 fd_freelist = fd;
434 gpr_mu_unlock(&fd_freelist_mu);
435 }
436
fd_is_shutdown(grpc_fd * fd)437 static bool fd_is_shutdown(grpc_fd* fd) {
438 return fd->read_closure->IsShutdown();
439 }
440
fd_notify_on_read(grpc_fd * fd,grpc_closure * closure)441 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
442 fd->read_closure->NotifyOn(closure);
443 }
444
fd_notify_on_write(grpc_fd * fd,grpc_closure * closure)445 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
446 fd->write_closure->NotifyOn(closure);
447 }
448
fd_notify_on_error(grpc_fd * fd,grpc_closure * closure)449 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
450 fd->error_closure->NotifyOn(closure);
451 }
452
fd_become_readable(grpc_fd * fd)453 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
454
fd_become_writable(grpc_fd * fd)455 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
456
fd_has_errors(grpc_fd * fd)457 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
458
459 /*******************************************************************************
460 * Pollset Definitions
461 */
462
463 GPR_TLS_DECL(g_current_thread_pollset);
464 GPR_TLS_DECL(g_current_thread_worker);
465
466 /* The designated poller */
467 static gpr_atm g_active_poller;
468
469 static pollset_neighborhood* g_neighborhoods;
470 static size_t g_num_neighborhoods;
471
472 /* Return true if first in list */
worker_insert(grpc_pollset * pollset,grpc_pollset_worker * worker)473 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
474 if (pollset->root_worker == nullptr) {
475 pollset->root_worker = worker;
476 worker->next = worker->prev = worker;
477 return true;
478 } else {
479 worker->next = pollset->root_worker;
480 worker->prev = worker->next->prev;
481 worker->next->prev = worker;
482 worker->prev->next = worker;
483 return false;
484 }
485 }
486
487 /* Return true if last in list */
488 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
489
worker_remove(grpc_pollset * pollset,grpc_pollset_worker * worker)490 static worker_remove_result worker_remove(grpc_pollset* pollset,
491 grpc_pollset_worker* worker) {
492 if (worker == pollset->root_worker) {
493 if (worker == worker->next) {
494 pollset->root_worker = nullptr;
495 return EMPTIED;
496 } else {
497 pollset->root_worker = worker->next;
498 worker->prev->next = worker->next;
499 worker->next->prev = worker->prev;
500 return NEW_ROOT;
501 }
502 } else {
503 worker->prev->next = worker->next;
504 worker->next->prev = worker->prev;
505 return REMOVED;
506 }
507 }
508
choose_neighborhood(void)509 static size_t choose_neighborhood(void) {
510 return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
511 }
512
pollset_global_init(void)513 static grpc_error* pollset_global_init(void) {
514 gpr_tls_init(&g_current_thread_pollset);
515 gpr_tls_init(&g_current_thread_worker);
516 gpr_atm_no_barrier_store(&g_active_poller, 0);
517 global_wakeup_fd.read_fd = -1;
518 grpc_error* err = grpc_wakeup_fd_init(&global_wakeup_fd);
519 if (err != GRPC_ERROR_NONE) return err;
520 struct epoll_event ev;
521 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
522 ev.data.ptr = &global_wakeup_fd;
523 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
524 &ev) != 0) {
525 return GRPC_OS_ERROR(errno, "epoll_ctl");
526 }
527 g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
528 g_neighborhoods = static_cast<pollset_neighborhood*>(
529 gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
530 for (size_t i = 0; i < g_num_neighborhoods; i++) {
531 gpr_mu_init(&g_neighborhoods[i].mu);
532 }
533 return GRPC_ERROR_NONE;
534 }
535
pollset_global_shutdown(void)536 static void pollset_global_shutdown(void) {
537 gpr_tls_destroy(&g_current_thread_pollset);
538 gpr_tls_destroy(&g_current_thread_worker);
539 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
540 for (size_t i = 0; i < g_num_neighborhoods; i++) {
541 gpr_mu_destroy(&g_neighborhoods[i].mu);
542 }
543 gpr_free(g_neighborhoods);
544 }
545
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)546 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
547 gpr_mu_init(&pollset->mu);
548 *mu = &pollset->mu;
549 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
550 pollset->reassigning_neighborhood = false;
551 pollset->root_worker = nullptr;
552 pollset->kicked_without_poller = false;
553 pollset->seen_inactive = true;
554 pollset->shutting_down = false;
555 pollset->shutdown_closure = nullptr;
556 pollset->begin_refs = 0;
557 pollset->next = pollset->prev = nullptr;
558 }
559
pollset_destroy(grpc_pollset * pollset)560 static void pollset_destroy(grpc_pollset* pollset) {
561 gpr_mu_lock(&pollset->mu);
562 if (!pollset->seen_inactive) {
563 pollset_neighborhood* neighborhood = pollset->neighborhood;
564 gpr_mu_unlock(&pollset->mu);
565 retry_lock_neighborhood:
566 gpr_mu_lock(&neighborhood->mu);
567 gpr_mu_lock(&pollset->mu);
568 if (!pollset->seen_inactive) {
569 if (pollset->neighborhood != neighborhood) {
570 gpr_mu_unlock(&neighborhood->mu);
571 neighborhood = pollset->neighborhood;
572 gpr_mu_unlock(&pollset->mu);
573 goto retry_lock_neighborhood;
574 }
575 pollset->prev->next = pollset->next;
576 pollset->next->prev = pollset->prev;
577 if (pollset == pollset->neighborhood->active_root) {
578 pollset->neighborhood->active_root =
579 pollset->next == pollset ? nullptr : pollset->next;
580 }
581 }
582 gpr_mu_unlock(&pollset->neighborhood->mu);
583 }
584 gpr_mu_unlock(&pollset->mu);
585 gpr_mu_destroy(&pollset->mu);
586 }
587
pollset_kick_all(grpc_pollset * pollset)588 static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
589 GPR_TIMER_SCOPE("pollset_kick_all", 0);
590 grpc_error* error = GRPC_ERROR_NONE;
591 if (pollset->root_worker != nullptr) {
592 grpc_pollset_worker* worker = pollset->root_worker;
593 do {
594 GRPC_STATS_INC_POLLSET_KICK();
595 switch (worker->state) {
596 case KICKED:
597 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
598 break;
599 case UNKICKED:
600 SET_KICK_STATE(worker, KICKED);
601 if (worker->initialized_cv) {
602 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
603 gpr_cv_signal(&worker->cv);
604 }
605 break;
606 case DESIGNATED_POLLER:
607 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
608 SET_KICK_STATE(worker, KICKED);
609 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
610 "pollset_kick_all");
611 break;
612 }
613
614 worker = worker->next;
615 } while (worker != pollset->root_worker);
616 }
617 // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
618 // in the else case
619 return error;
620 }
621
pollset_maybe_finish_shutdown(grpc_pollset * pollset)622 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
623 if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
624 pollset->begin_refs == 0) {
625 GPR_TIMER_MARK("pollset_finish_shutdown", 0);
626 GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE);
627 pollset->shutdown_closure = nullptr;
628 }
629 }
630
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)631 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
632 GPR_TIMER_SCOPE("pollset_shutdown", 0);
633 GPR_ASSERT(pollset->shutdown_closure == nullptr);
634 GPR_ASSERT(!pollset->shutting_down);
635 pollset->shutdown_closure = closure;
636 pollset->shutting_down = true;
637 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
638 pollset_maybe_finish_shutdown(pollset);
639 }
640
poll_deadline_to_millis_timeout(grpc_millis millis)641 static int poll_deadline_to_millis_timeout(grpc_millis millis) {
642 if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
643 grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now();
644 if (delta > INT_MAX) {
645 return INT_MAX;
646 } else if (delta < 0) {
647 return 0;
648 } else {
649 return static_cast<int>(delta);
650 }
651 }
652
653 /* Process the epoll events found by do_epoll_wait() function.
654 - g_epoll_set.cursor points to the index of the first event to be processed
655 - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
656 updates the g_epoll_set.cursor
657
658 NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
659 called by g_active_poller thread. So there is no need for synchronization
660 when accessing fields in g_epoll_set */
process_epoll_events(grpc_pollset * pollset)661 static grpc_error* process_epoll_events(grpc_pollset* pollset) {
662 GPR_TIMER_SCOPE("process_epoll_events", 0);
663
664 static const char* err_desc = "process_events";
665 grpc_error* error = GRPC_ERROR_NONE;
666 long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
667 long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
668 for (int idx = 0;
669 (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
670 idx++) {
671 long c = cursor++;
672 struct epoll_event* ev = &g_epoll_set.events[c];
673 void* data_ptr = ev->data.ptr;
674
675 if (data_ptr == &global_wakeup_fd) {
676 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
677 err_desc);
678 } else {
679 grpc_fd* fd = reinterpret_cast<grpc_fd*>(
680 reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
681 bool track_err =
682 reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
683 bool cancel = (ev->events & EPOLLHUP) != 0;
684 bool error = (ev->events & EPOLLERR) != 0;
685 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
686 bool write_ev = (ev->events & EPOLLOUT) != 0;
687 bool err_fallback = error && !track_err;
688
689 if (error && !err_fallback) {
690 fd_has_errors(fd);
691 }
692
693 if (read_ev || cancel || err_fallback) {
694 fd_become_readable(fd);
695 }
696
697 if (write_ev || cancel || err_fallback) {
698 fd_become_writable(fd);
699 }
700 }
701 }
702 gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
703 return error;
704 }
705
706 /* Do epoll_wait and store the events in g_epoll_set.events field. This does not
707 "process" any of the events yet; that is done in process_epoll_events().
708 *See process_epoll_events() function for more details.
709
710 NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
711 (i.e the designated poller thread) will be calling this function. So there is
712 no need for any synchronization when accesing fields in g_epoll_set */
do_epoll_wait(grpc_pollset * ps,grpc_millis deadline)713 static grpc_error* do_epoll_wait(grpc_pollset* ps, grpc_millis deadline) {
714 GPR_TIMER_SCOPE("do_epoll_wait", 0);
715
716 int r;
717 int timeout = poll_deadline_to_millis_timeout(deadline);
718 if (timeout != 0) {
719 GRPC_SCHEDULING_START_BLOCKING_REGION;
720 }
721 do {
722 GRPC_STATS_INC_SYSCALL_POLL();
723 r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
724 timeout);
725 } while (r < 0 && errno == EINTR);
726 if (timeout != 0) {
727 GRPC_SCHEDULING_END_BLOCKING_REGION;
728 }
729
730 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
731
732 GRPC_STATS_INC_POLL_EVENTS_RETURNED(r);
733
734 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
735 gpr_log(GPR_INFO, "ps: %p poll got %d events", ps, r);
736 }
737
738 gpr_atm_rel_store(&g_epoll_set.num_events, r);
739 gpr_atm_rel_store(&g_epoll_set.cursor, 0);
740
741 return GRPC_ERROR_NONE;
742 }
743
begin_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)744 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
745 grpc_pollset_worker** worker_hdl,
746 grpc_millis deadline) {
747 GPR_TIMER_SCOPE("begin_worker", 0);
748 if (worker_hdl != nullptr) *worker_hdl = worker;
749 worker->initialized_cv = false;
750 SET_KICK_STATE(worker, UNKICKED);
751 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
752 pollset->begin_refs++;
753
754 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
755 gpr_log(GPR_INFO, "PS:%p BEGIN_STARTS:%p", pollset, worker);
756 }
757
758 if (pollset->seen_inactive) {
759 // pollset has been observed to be inactive, we need to move back to the
760 // active list
761 bool is_reassigning = false;
762 if (!pollset->reassigning_neighborhood) {
763 is_reassigning = true;
764 pollset->reassigning_neighborhood = true;
765 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
766 }
767 pollset_neighborhood* neighborhood = pollset->neighborhood;
768 gpr_mu_unlock(&pollset->mu);
769 // pollset unlocked: state may change (even worker->kick_state)
770 retry_lock_neighborhood:
771 gpr_mu_lock(&neighborhood->mu);
772 gpr_mu_lock(&pollset->mu);
773 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
774 gpr_log(GPR_INFO, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
775 pollset, worker, kick_state_string(worker->state),
776 is_reassigning);
777 }
778 if (pollset->seen_inactive) {
779 if (neighborhood != pollset->neighborhood) {
780 gpr_mu_unlock(&neighborhood->mu);
781 neighborhood = pollset->neighborhood;
782 gpr_mu_unlock(&pollset->mu);
783 goto retry_lock_neighborhood;
784 }
785
786 /* In the brief time we released the pollset locks above, the worker MAY
787 have been kicked. In this case, the worker should get out of this
788 pollset ASAP and hence this should neither add the pollset to
789 neighborhood nor mark the pollset as active.
790
791 On a side note, the only way a worker's kick state could have changed
792 at this point is if it were "kicked specifically". Since the worker has
793 not added itself to the pollset yet (by calling worker_insert()), it is
794 not visible in the "kick any" path yet */
795 if (worker->state == UNKICKED) {
796 pollset->seen_inactive = false;
797 if (neighborhood->active_root == nullptr) {
798 neighborhood->active_root = pollset->next = pollset->prev = pollset;
799 /* Make this the designated poller if there isn't one already */
800 if (worker->state == UNKICKED &&
801 gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
802 SET_KICK_STATE(worker, DESIGNATED_POLLER);
803 }
804 } else {
805 pollset->next = neighborhood->active_root;
806 pollset->prev = pollset->next->prev;
807 pollset->next->prev = pollset->prev->next = pollset;
808 }
809 }
810 }
811 if (is_reassigning) {
812 GPR_ASSERT(pollset->reassigning_neighborhood);
813 pollset->reassigning_neighborhood = false;
814 }
815 gpr_mu_unlock(&neighborhood->mu);
816 }
817
818 worker_insert(pollset, worker);
819 pollset->begin_refs--;
820 if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
821 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
822 worker->initialized_cv = true;
823 gpr_cv_init(&worker->cv);
824 while (worker->state == UNKICKED && !pollset->shutting_down) {
825 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
826 gpr_log(GPR_INFO, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
827 pollset, worker, kick_state_string(worker->state),
828 pollset->shutting_down);
829 }
830
831 if (gpr_cv_wait(&worker->cv, &pollset->mu,
832 grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC)) &&
833 worker->state == UNKICKED) {
834 /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
835 received a kick */
836 SET_KICK_STATE(worker, KICKED);
837 }
838 }
839 grpc_core::ExecCtx::Get()->InvalidateNow();
840 }
841
842 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
843 gpr_log(GPR_INFO,
844 "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
845 "kicked_without_poller: %d",
846 pollset, worker, kick_state_string(worker->state),
847 pollset->shutting_down, pollset->kicked_without_poller);
848 }
849
850 /* We release pollset lock in this function at a couple of places:
851 * 1. Briefly when assigning pollset to a neighborhood
852 * 2. When doing gpr_cv_wait()
853 * It is possible that 'kicked_without_poller' was set to true during (1) and
854 * 'shutting_down' is set to true during (1) or (2). If either of them is
855 * true, this worker cannot do polling */
856 /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
857 * case; especially when the worker is the DESIGNATED_POLLER */
858
859 if (pollset->kicked_without_poller) {
860 pollset->kicked_without_poller = false;
861 return false;
862 }
863
864 return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
865 }
866
check_neighborhood_for_available_poller(pollset_neighborhood * neighborhood)867 static bool check_neighborhood_for_available_poller(
868 pollset_neighborhood* neighborhood) {
869 GPR_TIMER_SCOPE("check_neighborhood_for_available_poller", 0);
870 bool found_worker = false;
871 do {
872 grpc_pollset* inspect = neighborhood->active_root;
873 if (inspect == nullptr) {
874 break;
875 }
876 gpr_mu_lock(&inspect->mu);
877 GPR_ASSERT(!inspect->seen_inactive);
878 grpc_pollset_worker* inspect_worker = inspect->root_worker;
879 if (inspect_worker != nullptr) {
880 do {
881 switch (inspect_worker->state) {
882 case UNKICKED:
883 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
884 (gpr_atm)inspect_worker)) {
885 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
886 gpr_log(GPR_INFO, " .. choose next poller to be %p",
887 inspect_worker);
888 }
889 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
890 if (inspect_worker->initialized_cv) {
891 GPR_TIMER_MARK("signal worker", 0);
892 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
893 gpr_cv_signal(&inspect_worker->cv);
894 }
895 } else {
896 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
897 gpr_log(GPR_INFO, " .. beaten to choose next poller");
898 }
899 }
900 // even if we didn't win the cas, there's a worker, we can stop
901 found_worker = true;
902 break;
903 case KICKED:
904 break;
905 case DESIGNATED_POLLER:
906 found_worker = true; // ok, so someone else found the worker, but
907 // we'll accept that
908 break;
909 }
910 inspect_worker = inspect_worker->next;
911 } while (!found_worker && inspect_worker != inspect->root_worker);
912 }
913 if (!found_worker) {
914 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
915 gpr_log(GPR_INFO, " .. mark pollset %p inactive", inspect);
916 }
917 inspect->seen_inactive = true;
918 if (inspect == neighborhood->active_root) {
919 neighborhood->active_root =
920 inspect->next == inspect ? nullptr : inspect->next;
921 }
922 inspect->next->prev = inspect->prev;
923 inspect->prev->next = inspect->next;
924 inspect->next = inspect->prev = nullptr;
925 }
926 gpr_mu_unlock(&inspect->mu);
927 } while (!found_worker);
928 return found_worker;
929 }
930
end_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl)931 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
932 grpc_pollset_worker** worker_hdl) {
933 GPR_TIMER_SCOPE("end_worker", 0);
934 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
935 gpr_log(GPR_INFO, "PS:%p END_WORKER:%p", pollset, worker);
936 }
937 if (worker_hdl != nullptr) *worker_hdl = nullptr;
938 /* Make sure we appear kicked */
939 SET_KICK_STATE(worker, KICKED);
940 grpc_closure_list_move(&worker->schedule_on_end_work,
941 grpc_core::ExecCtx::Get()->closure_list());
942 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
943 if (worker->next != worker && worker->next->state == UNKICKED) {
944 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
945 gpr_log(GPR_INFO, " .. choose next poller to be peer %p", worker);
946 }
947 GPR_ASSERT(worker->next->initialized_cv);
948 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
949 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
950 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
951 gpr_cv_signal(&worker->next->cv);
952 if (grpc_core::ExecCtx::Get()->HasWork()) {
953 gpr_mu_unlock(&pollset->mu);
954 grpc_core::ExecCtx::Get()->Flush();
955 gpr_mu_lock(&pollset->mu);
956 }
957 } else {
958 gpr_atm_no_barrier_store(&g_active_poller, 0);
959 size_t poller_neighborhood_idx =
960 static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
961 gpr_mu_unlock(&pollset->mu);
962 bool found_worker = false;
963 bool scan_state[MAX_NEIGHBORHOODS];
964 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
965 pollset_neighborhood* neighborhood =
966 &g_neighborhoods[(poller_neighborhood_idx + i) %
967 g_num_neighborhoods];
968 if (gpr_mu_trylock(&neighborhood->mu)) {
969 found_worker = check_neighborhood_for_available_poller(neighborhood);
970 gpr_mu_unlock(&neighborhood->mu);
971 scan_state[i] = true;
972 } else {
973 scan_state[i] = false;
974 }
975 }
976 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
977 if (scan_state[i]) continue;
978 pollset_neighborhood* neighborhood =
979 &g_neighborhoods[(poller_neighborhood_idx + i) %
980 g_num_neighborhoods];
981 gpr_mu_lock(&neighborhood->mu);
982 found_worker = check_neighborhood_for_available_poller(neighborhood);
983 gpr_mu_unlock(&neighborhood->mu);
984 }
985 grpc_core::ExecCtx::Get()->Flush();
986 gpr_mu_lock(&pollset->mu);
987 }
988 } else if (grpc_core::ExecCtx::Get()->HasWork()) {
989 gpr_mu_unlock(&pollset->mu);
990 grpc_core::ExecCtx::Get()->Flush();
991 gpr_mu_lock(&pollset->mu);
992 }
993 if (worker->initialized_cv) {
994 gpr_cv_destroy(&worker->cv);
995 }
996 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
997 gpr_log(GPR_INFO, " .. remove worker");
998 }
999 if (EMPTIED == worker_remove(pollset, worker)) {
1000 pollset_maybe_finish_shutdown(pollset);
1001 }
1002 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
1003 }
1004
1005 /* pollset->po.mu lock must be held by the caller before calling this.
1006 The function pollset_work() may temporarily release the lock (pollset->po.mu)
1007 during the course of its execution but it will always re-acquire the lock and
1008 ensure that it is held by the time the function returns */
pollset_work(grpc_pollset * ps,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)1009 static grpc_error* pollset_work(grpc_pollset* ps,
1010 grpc_pollset_worker** worker_hdl,
1011 grpc_millis deadline) {
1012 GPR_TIMER_SCOPE("pollset_work", 0);
1013 grpc_pollset_worker worker;
1014 grpc_error* error = GRPC_ERROR_NONE;
1015 static const char* err_desc = "pollset_work";
1016 if (ps->kicked_without_poller) {
1017 ps->kicked_without_poller = false;
1018 return GRPC_ERROR_NONE;
1019 }
1020
1021 if (begin_worker(ps, &worker, worker_hdl, deadline)) {
1022 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1023 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1024 GPR_ASSERT(!ps->shutting_down);
1025 GPR_ASSERT(!ps->seen_inactive);
1026
1027 gpr_mu_unlock(&ps->mu); /* unlock */
1028 /* This is the designated polling thread at this point and should ideally do
1029 polling. However, if there are unprocessed events left from a previous
1030 call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1031 process the pending epoll events.
1032
1033 The reason for decoupling do_epoll_wait and process_epoll_events is to
1034 better distribute the work (i.e handling epoll events) across multiple
1035 threads
1036
1037 process_epoll_events() returns very quickly: It just queues the work on
1038 exec_ctx but does not execute it (the actual exectution or more
1039 accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1040 AFTER selecting a designated poller). So we are not waiting long periods
1041 without a designated poller */
1042 if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1043 gpr_atm_acq_load(&g_epoll_set.num_events)) {
1044 append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1045 }
1046 append_error(&error, process_epoll_events(ps), err_desc);
1047
1048 gpr_mu_lock(&ps->mu); /* lock */
1049
1050 gpr_tls_set(&g_current_thread_worker, 0);
1051 } else {
1052 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1053 }
1054 end_worker(ps, &worker, worker_hdl);
1055
1056 gpr_tls_set(&g_current_thread_pollset, 0);
1057 return error;
1058 }
1059
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)1060 static grpc_error* pollset_kick(grpc_pollset* pollset,
1061 grpc_pollset_worker* specific_worker) {
1062 GPR_TIMER_SCOPE("pollset_kick", 0);
1063 GRPC_STATS_INC_POLLSET_KICK();
1064 grpc_error* ret_err = GRPC_ERROR_NONE;
1065 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1066 gpr_strvec log;
1067 gpr_strvec_init(&log);
1068 char* tmp;
1069 gpr_asprintf(&tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
1070 specific_worker, (void*)gpr_tls_get(&g_current_thread_pollset),
1071 (void*)gpr_tls_get(&g_current_thread_worker),
1072 pollset->root_worker);
1073 gpr_strvec_add(&log, tmp);
1074 if (pollset->root_worker != nullptr) {
1075 gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
1076 kick_state_string(pollset->root_worker->state),
1077 pollset->root_worker->next,
1078 kick_state_string(pollset->root_worker->next->state));
1079 gpr_strvec_add(&log, tmp);
1080 }
1081 if (specific_worker != nullptr) {
1082 gpr_asprintf(&tmp, " worker_kick_state=%s",
1083 kick_state_string(specific_worker->state));
1084 gpr_strvec_add(&log, tmp);
1085 }
1086 tmp = gpr_strvec_flatten(&log, nullptr);
1087 gpr_strvec_destroy(&log);
1088 gpr_log(GPR_DEBUG, "%s", tmp);
1089 gpr_free(tmp);
1090 }
1091
1092 if (specific_worker == nullptr) {
1093 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
1094 grpc_pollset_worker* root_worker = pollset->root_worker;
1095 if (root_worker == nullptr) {
1096 GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER();
1097 pollset->kicked_without_poller = true;
1098 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1099 gpr_log(GPR_INFO, " .. kicked_without_poller");
1100 }
1101 goto done;
1102 }
1103 grpc_pollset_worker* next_worker = root_worker->next;
1104 if (root_worker->state == KICKED) {
1105 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1106 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1107 gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
1108 }
1109 SET_KICK_STATE(root_worker, KICKED);
1110 goto done;
1111 } else if (next_worker->state == KICKED) {
1112 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1113 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1114 gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
1115 }
1116 SET_KICK_STATE(next_worker, KICKED);
1117 goto done;
1118 } else if (root_worker ==
1119 next_worker && // only try and wake up a poller if
1120 // there is no next worker
1121 root_worker == (grpc_pollset_worker*)gpr_atm_no_barrier_load(
1122 &g_active_poller)) {
1123 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1124 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1125 gpr_log(GPR_INFO, " .. kicked %p", root_worker);
1126 }
1127 SET_KICK_STATE(root_worker, KICKED);
1128 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1129 goto done;
1130 } else if (next_worker->state == UNKICKED) {
1131 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1132 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1133 gpr_log(GPR_INFO, " .. kicked %p", next_worker);
1134 }
1135 GPR_ASSERT(next_worker->initialized_cv);
1136 SET_KICK_STATE(next_worker, KICKED);
1137 gpr_cv_signal(&next_worker->cv);
1138 goto done;
1139 } else if (next_worker->state == DESIGNATED_POLLER) {
1140 if (root_worker->state != DESIGNATED_POLLER) {
1141 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1142 gpr_log(
1143 GPR_INFO,
1144 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1145 root_worker, root_worker->initialized_cv, next_worker);
1146 }
1147 SET_KICK_STATE(root_worker, KICKED);
1148 if (root_worker->initialized_cv) {
1149 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1150 gpr_cv_signal(&root_worker->cv);
1151 }
1152 goto done;
1153 } else {
1154 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1155 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1156 gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
1157 root_worker);
1158 }
1159 SET_KICK_STATE(next_worker, KICKED);
1160 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1161 goto done;
1162 }
1163 } else {
1164 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1165 GPR_ASSERT(next_worker->state == KICKED);
1166 SET_KICK_STATE(next_worker, KICKED);
1167 goto done;
1168 }
1169 } else {
1170 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1171 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1172 gpr_log(GPR_INFO, " .. kicked while waking up");
1173 }
1174 goto done;
1175 }
1176
1177 GPR_UNREACHABLE_CODE(goto done);
1178 }
1179
1180 if (specific_worker->state == KICKED) {
1181 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1182 gpr_log(GPR_INFO, " .. specific worker already kicked");
1183 }
1184 goto done;
1185 } else if (gpr_tls_get(&g_current_thread_worker) ==
1186 (intptr_t)specific_worker) {
1187 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1188 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1189 gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
1190 }
1191 SET_KICK_STATE(specific_worker, KICKED);
1192 goto done;
1193 } else if (specific_worker ==
1194 (grpc_pollset_worker*)gpr_atm_no_barrier_load(&g_active_poller)) {
1195 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1196 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1197 gpr_log(GPR_INFO, " .. kick active poller");
1198 }
1199 SET_KICK_STATE(specific_worker, KICKED);
1200 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1201 goto done;
1202 } else if (specific_worker->initialized_cv) {
1203 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1204 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1205 gpr_log(GPR_INFO, " .. kick waiting worker");
1206 }
1207 SET_KICK_STATE(specific_worker, KICKED);
1208 gpr_cv_signal(&specific_worker->cv);
1209 goto done;
1210 } else {
1211 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1212 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1213 gpr_log(GPR_INFO, " .. kick non-waiting worker");
1214 }
1215 SET_KICK_STATE(specific_worker, KICKED);
1216 goto done;
1217 }
1218 done:
1219 return ret_err;
1220 }
1221
pollset_add_fd(grpc_pollset * pollset,grpc_fd * fd)1222 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {}
1223
1224 /*******************************************************************************
1225 * Pollset-set Definitions
1226 */
1227
pollset_set_create(void)1228 static grpc_pollset_set* pollset_set_create(void) {
1229 return (grpc_pollset_set*)(static_cast<intptr_t>(0xdeafbeef));
1230 }
1231
pollset_set_destroy(grpc_pollset_set * pss)1232 static void pollset_set_destroy(grpc_pollset_set* pss) {}
1233
pollset_set_add_fd(grpc_pollset_set * pss,grpc_fd * fd)1234 static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
1235
pollset_set_del_fd(grpc_pollset_set * pss,grpc_fd * fd)1236 static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
1237
pollset_set_add_pollset(grpc_pollset_set * pss,grpc_pollset * ps)1238 static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
1239
pollset_set_del_pollset(grpc_pollset_set * pss,grpc_pollset * ps)1240 static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
1241
pollset_set_add_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)1242 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1243 grpc_pollset_set* item) {}
1244
pollset_set_del_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)1245 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1246 grpc_pollset_set* item) {}
1247
1248 /*******************************************************************************
1249 * Event engine binding
1250 */
1251
is_any_background_poller_thread(void)1252 static bool is_any_background_poller_thread(void) { return false; }
1253
shutdown_background_closure(void)1254 static void shutdown_background_closure(void) {}
1255
add_closure_to_background_poller(grpc_closure * closure,grpc_error * error)1256 static bool add_closure_to_background_poller(grpc_closure* closure,
1257 grpc_error* error) {
1258 return false;
1259 }
1260
shutdown_engine(void)1261 static void shutdown_engine(void) {
1262 fd_global_shutdown();
1263 pollset_global_shutdown();
1264 epoll_set_shutdown();
1265 if (grpc_core::Fork::Enabled()) {
1266 gpr_mu_destroy(&fork_fd_list_mu);
1267 grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1268 }
1269 }
1270
1271 static const grpc_event_engine_vtable vtable = {
1272 sizeof(grpc_pollset),
1273 true,
1274 false,
1275
1276 fd_create,
1277 fd_wrapped_fd,
1278 fd_orphan,
1279 fd_shutdown,
1280 fd_notify_on_read,
1281 fd_notify_on_write,
1282 fd_notify_on_error,
1283 fd_become_readable,
1284 fd_become_writable,
1285 fd_has_errors,
1286 fd_is_shutdown,
1287
1288 pollset_init,
1289 pollset_shutdown,
1290 pollset_destroy,
1291 pollset_work,
1292 pollset_kick,
1293 pollset_add_fd,
1294
1295 pollset_set_create,
1296 pollset_set_destroy,
1297 pollset_set_add_pollset,
1298 pollset_set_del_pollset,
1299 pollset_set_add_pollset_set,
1300 pollset_set_del_pollset_set,
1301 pollset_set_add_fd,
1302 pollset_set_del_fd,
1303
1304 is_any_background_poller_thread,
1305 shutdown_background_closure,
1306 shutdown_engine,
1307 add_closure_to_background_poller,
1308 };
1309
1310 /* Called by the child process's post-fork handler to close open fds, including
1311 * the global epoll fd. This allows gRPC to shutdown in the child process
1312 * without interfering with connections or RPCs ongoing in the parent. */
reset_event_manager_on_fork()1313 static void reset_event_manager_on_fork() {
1314 gpr_mu_lock(&fork_fd_list_mu);
1315 while (fork_fd_list_head != nullptr) {
1316 close(fork_fd_list_head->fd);
1317 fork_fd_list_head->fd = -1;
1318 fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1319 }
1320 gpr_mu_unlock(&fork_fd_list_mu);
1321 shutdown_engine();
1322 grpc_init_epoll1_linux(true);
1323 }
1324
1325 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1326 * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1327 * support is available */
grpc_init_epoll1_linux(bool explicit_request)1328 const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
1329 if (!grpc_has_wakeup_fd()) {
1330 gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
1331 return nullptr;
1332 }
1333
1334 if (!epoll_set_init()) {
1335 return nullptr;
1336 }
1337
1338 fd_global_init();
1339
1340 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1341 fd_global_shutdown();
1342 epoll_set_shutdown();
1343 return nullptr;
1344 }
1345
1346 if (grpc_core::Fork::Enabled()) {
1347 gpr_mu_init(&fork_fd_list_mu);
1348 grpc_core::Fork::SetResetChildPollingEngineFunc(
1349 reset_event_manager_on_fork);
1350 }
1351 return &vtable;
1352 }
1353
1354 #else /* defined(GRPC_LINUX_EPOLL) */
1355 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1356 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
1357 /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1358 * NULL */
grpc_init_epoll1_linux(bool explicit_request)1359 const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
1360 return nullptr;
1361 }
1362 #endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
1363 #endif /* !defined(GRPC_LINUX_EPOLL) */
1364