1 /*
2 *
3 * Copyright 2015-2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 #include <grpc/support/port_platform.h>
19
20 #include "src/core/lib/surface/completion_queue.h"
21
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <string.h>
25
26 #include <vector>
27
28 #include "absl/strings/str_format.h"
29 #include "absl/strings/str_join.h"
30
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/atm.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/string_util.h>
35 #include <grpc/support/time.h>
36
37 #include "src/core/lib/debug/stats.h"
38 #include "src/core/lib/gpr/spinlock.h"
39 #include "src/core/lib/gpr/string.h"
40 #include "src/core/lib/gpr/tls.h"
41 #include "src/core/lib/gprpp/atomic.h"
42 #include "src/core/lib/iomgr/executor.h"
43 #include "src/core/lib/iomgr/pollset.h"
44 #include "src/core/lib/iomgr/timer.h"
45 #include "src/core/lib/profiling/timers.h"
46 #include "src/core/lib/surface/api_trace.h"
47 #include "src/core/lib/surface/call.h"
48 #include "src/core/lib/surface/event_string.h"
49
50 grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
51 grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
52 grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");
53
54 namespace {
55
56 // Specifies a cq thread local cache.
57 // The first event that occurs on a thread
58 // with a cq cache will go into that cache, and
59 // will only be returned on the thread that initialized the cache.
60 // NOTE: Only one event will ever be cached.
61 GPR_TLS_DECL(g_cached_event);
62 GPR_TLS_DECL(g_cached_cq);
63
64 struct plucker {
65 grpc_pollset_worker** worker;
66 void* tag;
67 };
68 struct cq_poller_vtable {
69 bool can_get_pollset;
70 bool can_listen;
71 size_t (*size)(void);
72 void (*init)(grpc_pollset* pollset, gpr_mu** mu);
73 grpc_error* (*kick)(grpc_pollset* pollset,
74 grpc_pollset_worker* specific_worker);
75 grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
76 grpc_millis deadline);
77 void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
78 void (*destroy)(grpc_pollset* pollset);
79 };
80 typedef struct non_polling_worker {
81 gpr_cv cv;
82 bool kicked;
83 struct non_polling_worker* next;
84 struct non_polling_worker* prev;
85 } non_polling_worker;
86
87 struct non_polling_poller {
88 gpr_mu mu;
89 bool kicked_without_poller;
90 non_polling_worker* root;
91 grpc_closure* shutdown;
92 };
non_polling_poller_size(void)93 size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); }
94
non_polling_poller_init(grpc_pollset * pollset,gpr_mu ** mu)95 void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
96 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
97 gpr_mu_init(&npp->mu);
98 *mu = &npp->mu;
99 }
100
non_polling_poller_destroy(grpc_pollset * pollset)101 void non_polling_poller_destroy(grpc_pollset* pollset) {
102 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
103 gpr_mu_destroy(&npp->mu);
104 }
105
non_polling_poller_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_millis deadline)106 grpc_error* non_polling_poller_work(grpc_pollset* pollset,
107 grpc_pollset_worker** worker,
108 grpc_millis deadline) {
109 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
110 if (npp->shutdown) return GRPC_ERROR_NONE;
111 if (npp->kicked_without_poller) {
112 npp->kicked_without_poller = false;
113 return GRPC_ERROR_NONE;
114 }
115 non_polling_worker w;
116 gpr_cv_init(&w.cv);
117 if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
118 if (npp->root == nullptr) {
119 npp->root = w.next = w.prev = &w;
120 } else {
121 w.next = npp->root;
122 w.prev = w.next->prev;
123 w.next->prev = w.prev->next = &w;
124 }
125 w.kicked = false;
126 gpr_timespec deadline_ts =
127 grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC);
128 while (!npp->shutdown && !w.kicked &&
129 !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
130 ;
131 grpc_core::ExecCtx::Get()->InvalidateNow();
132 if (&w == npp->root) {
133 npp->root = w.next;
134 if (&w == npp->root) {
135 if (npp->shutdown) {
136 grpc_core::ExecCtx::Run(DEBUG_LOCATION, npp->shutdown, GRPC_ERROR_NONE);
137 }
138 npp->root = nullptr;
139 }
140 }
141 w.next->prev = w.prev;
142 w.prev->next = w.next;
143 gpr_cv_destroy(&w.cv);
144 if (worker != nullptr) *worker = nullptr;
145 return GRPC_ERROR_NONE;
146 }
147
non_polling_poller_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)148 grpc_error* non_polling_poller_kick(grpc_pollset* pollset,
149 grpc_pollset_worker* specific_worker) {
150 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
151 if (specific_worker == nullptr)
152 specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
153 if (specific_worker != nullptr) {
154 non_polling_worker* w =
155 reinterpret_cast<non_polling_worker*>(specific_worker);
156 if (!w->kicked) {
157 w->kicked = true;
158 gpr_cv_signal(&w->cv);
159 }
160 } else {
161 p->kicked_without_poller = true;
162 }
163 return GRPC_ERROR_NONE;
164 }
165
non_polling_poller_shutdown(grpc_pollset * pollset,grpc_closure * closure)166 void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
167 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
168 GPR_ASSERT(closure != nullptr);
169 p->shutdown = closure;
170 if (p->root == nullptr) {
171 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
172 } else {
173 non_polling_worker* w = p->root;
174 do {
175 gpr_cv_signal(&w->cv);
176 w = w->next;
177 } while (w != p->root);
178 }
179 }
180
181 const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
182 /* GRPC_CQ_DEFAULT_POLLING */
183 {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
184 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
185 /* GRPC_CQ_NON_LISTENING */
186 {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
187 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
188 /* GRPC_CQ_NON_POLLING */
189 {false, false, non_polling_poller_size, non_polling_poller_init,
190 non_polling_poller_kick, non_polling_poller_work,
191 non_polling_poller_shutdown, non_polling_poller_destroy},
192 };
193
194 } // namespace
195
196 struct cq_vtable {
197 grpc_cq_completion_type cq_completion_type;
198 size_t data_size;
199 void (*init)(void* data,
200 grpc_experimental_completion_queue_functor* shutdown_callback);
201 void (*shutdown)(grpc_completion_queue* cq);
202 void (*destroy)(void* data);
203 bool (*begin_op)(grpc_completion_queue* cq, void* tag);
204 void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
205 void (*done)(void* done_arg, grpc_cq_completion* storage),
206 void* done_arg, grpc_cq_completion* storage, bool internal);
207 grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
208 void* reserved);
209 grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
210 gpr_timespec deadline, void* reserved);
211 };
212
213 namespace {
214
215 /* Queue that holds the cq_completion_events. Internally uses
216 * MultiProducerSingleConsumerQueue (a lockfree multiproducer single consumer
217 * queue). It uses a queue_lock to support multiple consumers.
218 * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
219 class CqEventQueue {
220 public:
221 CqEventQueue() = default;
222 ~CqEventQueue() = default;
223
224 /* Note: The counter is not incremented/decremented atomically with push/pop.
225 * The count is only eventually consistent */
num_items() const226 intptr_t num_items() const {
227 return num_queue_items_.Load(grpc_core::MemoryOrder::RELAXED);
228 }
229
230 bool Push(grpc_cq_completion* c);
231 grpc_cq_completion* Pop();
232
233 private:
234 /* Spinlock to serialize consumers i.e pop() operations */
235 gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER;
236
237 grpc_core::MultiProducerSingleConsumerQueue queue_;
238
239 /* A lazy counter of number of items in the queue. This is NOT atomically
240 incremented/decremented along with push/pop operations and hence is only
241 eventually consistent */
242 grpc_core::Atomic<intptr_t> num_queue_items_{0};
243 };
244
245 struct cq_next_data {
~cq_next_data__anon757cab210211::cq_next_data246 ~cq_next_data() {
247 GPR_ASSERT(queue.num_items() == 0);
248 #ifndef NDEBUG
249 if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
250 gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
251 }
252 #endif
253 }
254
255 /** Completed events for completion-queues of type GRPC_CQ_NEXT */
256 CqEventQueue queue;
257
258 /** Counter of how many things have ever been queued on this completion queue
259 useful for avoiding locks to check the queue */
260 grpc_core::Atomic<intptr_t> things_queued_ever{0};
261
262 /** Number of outstanding events (+1 if not shut down)
263 Initial count is dropped by grpc_completion_queue_shutdown */
264 grpc_core::Atomic<intptr_t> pending_events{1};
265
266 /** 0 initially. 1 once we initiated shutdown */
267 bool shutdown_called = false;
268 };
269
270 struct cq_pluck_data {
cq_pluck_data__anon757cab210211::cq_pluck_data271 cq_pluck_data() {
272 completed_tail = &completed_head;
273 completed_head.next = reinterpret_cast<uintptr_t>(completed_tail);
274 }
275
~cq_pluck_data__anon757cab210211::cq_pluck_data276 ~cq_pluck_data() {
277 GPR_ASSERT(completed_head.next ==
278 reinterpret_cast<uintptr_t>(&completed_head));
279 #ifndef NDEBUG
280 if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
281 gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
282 }
283 #endif
284 }
285
286 /** Completed events for completion-queues of type GRPC_CQ_PLUCK */
287 grpc_cq_completion completed_head;
288 grpc_cq_completion* completed_tail;
289
290 /** Number of pending events (+1 if we're not shutdown).
291 Initial count is dropped by grpc_completion_queue_shutdown. */
292 grpc_core::Atomic<intptr_t> pending_events{1};
293
294 /** Counter of how many things have ever been queued on this completion queue
295 useful for avoiding locks to check the queue */
296 grpc_core::Atomic<intptr_t> things_queued_ever{0};
297
298 /** 0 initially. 1 once we completed shutting */
299 /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
300 * (pending_events == 0). So consider removing this in future and use
301 * pending_events */
302 grpc_core::Atomic<bool> shutdown{false};
303
304 /** 0 initially. 1 once we initiated shutdown */
305 bool shutdown_called = false;
306
307 int num_pluckers = 0;
308 plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
309 };
310
311 struct cq_callback_data {
cq_callback_data__anon757cab210211::cq_callback_data312 cq_callback_data(
313 grpc_experimental_completion_queue_functor* shutdown_callback)
314 : shutdown_callback(shutdown_callback) {}
315
~cq_callback_data__anon757cab210211::cq_callback_data316 ~cq_callback_data() {
317 #ifndef NDEBUG
318 if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
319 gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
320 }
321 #endif
322 }
323
324 /** No actual completed events queue, unlike other types */
325
326 /** Number of pending events (+1 if we're not shutdown).
327 Initial count is dropped by grpc_completion_queue_shutdown. */
328 grpc_core::Atomic<intptr_t> pending_events{1};
329
330 /** Counter of how many things have ever been queued on this completion queue
331 useful for avoiding locks to check the queue */
332 grpc_core::Atomic<intptr_t> things_queued_ever{0};
333
334 /** 0 initially. 1 once we initiated shutdown */
335 bool shutdown_called = false;
336
337 /** A callback that gets invoked when the CQ completes shutdown */
338 grpc_experimental_completion_queue_functor* shutdown_callback;
339 };
340
341 } // namespace
342
343 /* Completion queue structure */
344 struct grpc_completion_queue {
345 /** Once owning_refs drops to zero, we will destroy the cq */
346 grpc_core::RefCount owning_refs;
347
348 gpr_mu* mu;
349
350 const cq_vtable* vtable;
351 const cq_poller_vtable* poller_vtable;
352
353 #ifndef NDEBUG
354 void** outstanding_tags;
355 size_t outstanding_tag_count;
356 size_t outstanding_tag_capacity;
357 #endif
358
359 grpc_closure pollset_shutdown_done;
360 int num_polls;
361 };
362
363 /* Forward declarations */
364 static void cq_finish_shutdown_next(grpc_completion_queue* cq);
365 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
366 static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
367 static void cq_shutdown_next(grpc_completion_queue* cq);
368 static void cq_shutdown_pluck(grpc_completion_queue* cq);
369 static void cq_shutdown_callback(grpc_completion_queue* cq);
370
371 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
372 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
373 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
374
375 // A cq_end_op function is called when an operation on a given CQ with
376 // a given tag has completed. The storage argument is a reference to the
377 // space reserved for this completion as it is placed into the corresponding
378 // queue. The done argument is a callback that will be invoked when it is
379 // safe to free up that storage. The storage MUST NOT be freed until the
380 // done callback is invoked.
381 static void cq_end_op_for_next(
382 grpc_completion_queue* cq, void* tag, grpc_error* error,
383 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
384 grpc_cq_completion* storage, bool internal);
385
386 static void cq_end_op_for_pluck(
387 grpc_completion_queue* cq, void* tag, grpc_error* error,
388 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
389 grpc_cq_completion* storage, bool internal);
390
391 static void cq_end_op_for_callback(
392 grpc_completion_queue* cq, void* tag, grpc_error* error,
393 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
394 grpc_cq_completion* storage, bool internal);
395
396 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
397 void* reserved);
398
399 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
400 gpr_timespec deadline, void* reserved);
401
402 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
403 static void cq_init_next(
404 void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
405 static void cq_init_pluck(
406 void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
407 static void cq_init_callback(
408 void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
409 static void cq_destroy_next(void* data);
410 static void cq_destroy_pluck(void* data);
411 static void cq_destroy_callback(void* data);
412
413 /* Completion queue vtables based on the completion-type */
414 static const cq_vtable g_cq_vtable[] = {
415 /* GRPC_CQ_NEXT */
416 {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
417 cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
418 nullptr},
419 /* GRPC_CQ_PLUCK */
420 {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
421 cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
422 cq_pluck},
423 /* GRPC_CQ_CALLBACK */
424 {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
425 cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
426 cq_end_op_for_callback, nullptr, nullptr},
427 };
428
429 #define DATA_FROM_CQ(cq) ((void*)(cq + 1))
430 #define POLLSET_FROM_CQ(cq) \
431 ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
432
433 grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
434
435 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
436 do { \
437 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) && \
438 (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace) || \
439 (event)->type != GRPC_QUEUE_TIMEOUT)) { \
440 gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, \
441 grpc_event_string(event).c_str()); \
442 } \
443 } while (0)
444
445 static void on_pollset_shutdown_done(void* cq, grpc_error* error);
446
grpc_cq_global_init()447 void grpc_cq_global_init() {
448 gpr_tls_init(&g_cached_event);
449 gpr_tls_init(&g_cached_cq);
450 }
451
grpc_completion_queue_thread_local_cache_init(grpc_completion_queue * cq)452 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
453 if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == nullptr) {
454 gpr_tls_set(&g_cached_event, (intptr_t)0);
455 gpr_tls_set(&g_cached_cq, (intptr_t)cq);
456 }
457 }
458
grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue * cq,void ** tag,int * ok)459 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
460 void** tag, int* ok) {
461 grpc_cq_completion* storage =
462 (grpc_cq_completion*)gpr_tls_get(&g_cached_event);
463 int ret = 0;
464 if (storage != nullptr &&
465 (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
466 *tag = storage->tag;
467 grpc_core::ExecCtx exec_ctx;
468 *ok = (storage->next & static_cast<uintptr_t>(1)) == 1;
469 storage->done(storage->done_arg, storage);
470 ret = 1;
471 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
472 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
473 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
474 gpr_mu_lock(cq->mu);
475 cq_finish_shutdown_next(cq);
476 gpr_mu_unlock(cq->mu);
477 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
478 }
479 }
480 gpr_tls_set(&g_cached_event, (intptr_t)0);
481 gpr_tls_set(&g_cached_cq, (intptr_t)0);
482
483 return ret;
484 }
485
Push(grpc_cq_completion * c)486 bool CqEventQueue::Push(grpc_cq_completion* c) {
487 queue_.Push(
488 reinterpret_cast<grpc_core::MultiProducerSingleConsumerQueue::Node*>(c));
489 return num_queue_items_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED) == 0;
490 }
491
Pop()492 grpc_cq_completion* CqEventQueue::Pop() {
493 grpc_cq_completion* c = nullptr;
494
495 if (gpr_spinlock_trylock(&queue_lock_)) {
496 GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
497
498 bool is_empty = false;
499 c = reinterpret_cast<grpc_cq_completion*>(queue_.PopAndCheckEnd(&is_empty));
500 gpr_spinlock_unlock(&queue_lock_);
501
502 if (c == nullptr && !is_empty) {
503 GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
504 }
505 } else {
506 GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES();
507 }
508
509 if (c) {
510 num_queue_items_.FetchSub(1, grpc_core::MemoryOrder::RELAXED);
511 }
512
513 return c;
514 }
515
grpc_completion_queue_create_internal(grpc_cq_completion_type completion_type,grpc_cq_polling_type polling_type,grpc_experimental_completion_queue_functor * shutdown_callback)516 grpc_completion_queue* grpc_completion_queue_create_internal(
517 grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
518 grpc_experimental_completion_queue_functor* shutdown_callback) {
519 GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
520
521 grpc_completion_queue* cq;
522
523 GRPC_API_TRACE(
524 "grpc_completion_queue_create_internal(completion_type=%d, "
525 "polling_type=%d)",
526 2, (completion_type, polling_type));
527
528 const cq_vtable* vtable = &g_cq_vtable[completion_type];
529 const cq_poller_vtable* poller_vtable =
530 &g_poller_vtable_by_poller_type[polling_type];
531
532 grpc_core::ExecCtx exec_ctx;
533 GRPC_STATS_INC_CQS_CREATED();
534
535 cq = static_cast<grpc_completion_queue*>(
536 gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
537 poller_vtable->size()));
538
539 cq->vtable = vtable;
540 cq->poller_vtable = poller_vtable;
541
542 /* One for destroy(), one for pollset_shutdown */
543 new (&cq->owning_refs) grpc_core::RefCount(2);
544
545 poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
546 vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
547
548 GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
549 grpc_schedule_on_exec_ctx);
550 return cq;
551 }
552
cq_init_next(void * data,grpc_experimental_completion_queue_functor *)553 static void cq_init_next(
554 void* data,
555 grpc_experimental_completion_queue_functor* /*shutdown_callback*/) {
556 new (data) cq_next_data();
557 }
558
cq_destroy_next(void * data)559 static void cq_destroy_next(void* data) {
560 cq_next_data* cqd = static_cast<cq_next_data*>(data);
561 cqd->~cq_next_data();
562 }
563
cq_init_pluck(void * data,grpc_experimental_completion_queue_functor *)564 static void cq_init_pluck(
565 void* data,
566 grpc_experimental_completion_queue_functor* /*shutdown_callback*/) {
567 new (data) cq_pluck_data();
568 }
569
cq_destroy_pluck(void * data)570 static void cq_destroy_pluck(void* data) {
571 cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
572 cqd->~cq_pluck_data();
573 }
574
cq_init_callback(void * data,grpc_experimental_completion_queue_functor * shutdown_callback)575 static void cq_init_callback(
576 void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
577 new (data) cq_callback_data(shutdown_callback);
578 }
579
cq_destroy_callback(void * data)580 static void cq_destroy_callback(void* data) {
581 cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
582 cqd->~cq_callback_data();
583 }
584
grpc_get_cq_completion_type(grpc_completion_queue * cq)585 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
586 return cq->vtable->cq_completion_type;
587 }
588
grpc_get_cq_poll_num(grpc_completion_queue * cq)589 int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
590 int cur_num_polls;
591 gpr_mu_lock(cq->mu);
592 cur_num_polls = cq->num_polls;
593 gpr_mu_unlock(cq->mu);
594 return cur_num_polls;
595 }
596
597 #ifndef NDEBUG
grpc_cq_internal_ref(grpc_completion_queue * cq,const char * reason,const char * file,int line)598 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
599 const char* file, int line) {
600 grpc_core::DebugLocation debug_location(file, line);
601 #else
602 void grpc_cq_internal_ref(grpc_completion_queue* cq) {
603 grpc_core::DebugLocation debug_location;
604 const char* reason = nullptr;
605 #endif
606 cq->owning_refs.Ref(debug_location, reason);
607 }
608
609 static void on_pollset_shutdown_done(void* arg, grpc_error* /*error*/) {
610 grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
611 GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
612 }
613
614 #ifndef NDEBUG
615 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
616 const char* file, int line) {
617 grpc_core::DebugLocation debug_location(file, line);
618 #else
619 void grpc_cq_internal_unref(grpc_completion_queue* cq) {
620 grpc_core::DebugLocation debug_location;
621 const char* reason = nullptr;
622 #endif
623 if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
624 cq->vtable->destroy(DATA_FROM_CQ(cq));
625 cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
626 #ifndef NDEBUG
627 gpr_free(cq->outstanding_tags);
628 #endif
629 gpr_free(cq);
630 }
631 }
632
633 #ifndef NDEBUG
634 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
635 int found = 0;
636 if (lock_cq) {
637 gpr_mu_lock(cq->mu);
638 }
639
640 for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
641 if (cq->outstanding_tags[i] == tag) {
642 cq->outstanding_tag_count--;
643 GPR_SWAP(void*, cq->outstanding_tags[i],
644 cq->outstanding_tags[cq->outstanding_tag_count]);
645 found = 1;
646 break;
647 }
648 }
649
650 if (lock_cq) {
651 gpr_mu_unlock(cq->mu);
652 }
653
654 GPR_ASSERT(found);
655 }
656 #else
657 static void cq_check_tag(grpc_completion_queue* /*cq*/, void* /*tag*/,
658 bool /*lock_cq*/) {}
659 #endif
660
661 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* /*tag*/) {
662 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
663 return cqd->pending_events.IncrementIfNonzero();
664 }
665
666 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* /*tag*/) {
667 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
668 return cqd->pending_events.IncrementIfNonzero();
669 }
670
671 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
672 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
673 return cqd->pending_events.IncrementIfNonzero();
674 }
675
676 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
677 #ifndef NDEBUG
678 gpr_mu_lock(cq->mu);
679 if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
680 cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
681 cq->outstanding_tags = static_cast<void**>(gpr_realloc(
682 cq->outstanding_tags,
683 sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity));
684 }
685 cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
686 gpr_mu_unlock(cq->mu);
687 #endif
688 return cq->vtable->begin_op(cq, tag);
689 }
690
691 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
692 * completion
693 * type of GRPC_CQ_NEXT) */
694 static void cq_end_op_for_next(
695 grpc_completion_queue* cq, void* tag, grpc_error* error,
696 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
697 grpc_cq_completion* storage, bool /*internal*/) {
698 GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
699
700 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
701 (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
702 error != GRPC_ERROR_NONE)) {
703 const char* errmsg = grpc_error_string(error);
704 GRPC_API_TRACE(
705 "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
706 "done=%p, done_arg=%p, storage=%p)",
707 6, (cq, tag, errmsg, done, done_arg, storage));
708 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
709 error != GRPC_ERROR_NONE) {
710 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
711 }
712 }
713 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
714 int is_success = (error == GRPC_ERROR_NONE);
715
716 storage->tag = tag;
717 storage->done = done;
718 storage->done_arg = done_arg;
719 storage->next = static_cast<uintptr_t>(is_success);
720
721 cq_check_tag(cq, tag, true); /* Used in debug builds only */
722
723 if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq &&
724 (grpc_cq_completion*)gpr_tls_get(&g_cached_event) == nullptr) {
725 gpr_tls_set(&g_cached_event, (intptr_t)storage);
726 } else {
727 /* Add the completion to the queue */
728 bool is_first = cqd->queue.Push(storage);
729 cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
730 /* Since we do not hold the cq lock here, it is important to do an 'acquire'
731 load here (instead of a 'no_barrier' load) to match with the release
732 store
733 (done via pending_events.FetchSub(1, ACQ_REL)) in cq_shutdown_next
734 */
735 if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 1) {
736 /* Only kick if this is the first item queued */
737 if (is_first) {
738 gpr_mu_lock(cq->mu);
739 grpc_error* kick_error =
740 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
741 gpr_mu_unlock(cq->mu);
742
743 if (kick_error != GRPC_ERROR_NONE) {
744 const char* msg = grpc_error_string(kick_error);
745 gpr_log(GPR_ERROR, "Kick failed: %s", msg);
746 GRPC_ERROR_UNREF(kick_error);
747 }
748 }
749 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) ==
750 1) {
751 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
752 gpr_mu_lock(cq->mu);
753 cq_finish_shutdown_next(cq);
754 gpr_mu_unlock(cq->mu);
755 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
756 }
757 } else {
758 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
759 cqd->pending_events.Store(0, grpc_core::MemoryOrder::RELEASE);
760 gpr_mu_lock(cq->mu);
761 cq_finish_shutdown_next(cq);
762 gpr_mu_unlock(cq->mu);
763 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
764 }
765 }
766
767 GRPC_ERROR_UNREF(error);
768 }
769
770 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
771 * completion
772 * type of GRPC_CQ_PLUCK) */
773 static void cq_end_op_for_pluck(
774 grpc_completion_queue* cq, void* tag, grpc_error* error,
775 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
776 grpc_cq_completion* storage, bool /*internal*/) {
777 GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
778
779 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
780 int is_success = (error == GRPC_ERROR_NONE);
781
782 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
783 (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
784 error != GRPC_ERROR_NONE)) {
785 const char* errmsg = grpc_error_string(error);
786 GRPC_API_TRACE(
787 "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
788 "done=%p, done_arg=%p, storage=%p)",
789 6, (cq, tag, errmsg, done, done_arg, storage));
790 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
791 error != GRPC_ERROR_NONE) {
792 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
793 }
794 }
795
796 storage->tag = tag;
797 storage->done = done;
798 storage->done_arg = done_arg;
799 storage->next =
800 ((uintptr_t)&cqd->completed_head) | (static_cast<uintptr_t>(is_success));
801
802 gpr_mu_lock(cq->mu);
803 cq_check_tag(cq, tag, false); /* Used in debug builds only */
804
805 /* Add to the list of completions */
806 cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
807 cqd->completed_tail->next =
808 ((uintptr_t)storage) | (1u & cqd->completed_tail->next);
809 cqd->completed_tail = storage;
810
811 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
812 cq_finish_shutdown_pluck(cq);
813 gpr_mu_unlock(cq->mu);
814 } else {
815 grpc_pollset_worker* pluck_worker = nullptr;
816 for (int i = 0; i < cqd->num_pluckers; i++) {
817 if (cqd->pluckers[i].tag == tag) {
818 pluck_worker = *cqd->pluckers[i].worker;
819 break;
820 }
821 }
822
823 grpc_error* kick_error =
824 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
825
826 gpr_mu_unlock(cq->mu);
827
828 if (kick_error != GRPC_ERROR_NONE) {
829 const char* msg = grpc_error_string(kick_error);
830 gpr_log(GPR_ERROR, "Kick failed: %s", msg);
831
832 GRPC_ERROR_UNREF(kick_error);
833 }
834 }
835
836 GRPC_ERROR_UNREF(error);
837 }
838
839 static void functor_callback(void* arg, grpc_error* error) {
840 auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg);
841 functor->functor_run(functor, error == GRPC_ERROR_NONE);
842 }
843
844 /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
845 static void cq_end_op_for_callback(
846 grpc_completion_queue* cq, void* tag, grpc_error* error,
847 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
848 grpc_cq_completion* storage, bool internal) {
849 GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
850
851 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
852
853 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
854 (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
855 error != GRPC_ERROR_NONE)) {
856 const char* errmsg = grpc_error_string(error);
857 GRPC_API_TRACE(
858 "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
859 "done=%p, done_arg=%p, storage=%p)",
860 6, (cq, tag, errmsg, done, done_arg, storage));
861 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
862 error != GRPC_ERROR_NONE) {
863 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
864 }
865 }
866
867 // The callback-based CQ isn't really a queue at all and thus has no need
868 // for reserved storage. Invoke the done callback right away to release it.
869 done(done_arg, storage);
870
871 cq_check_tag(cq, tag, true); /* Used in debug builds only */
872
873 cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
874 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
875 cq_finish_shutdown_callback(cq);
876 }
877
878 // If possible, schedule the callback onto an existing thread-local
879 // ApplicationCallbackExecCtx, which is a work queue. This is possible for:
880 // 1. The callback is internally-generated and there is an ACEC available
881 // 2. The callback is marked inlineable and there is an ACEC available
882 // 3. We are already running in a background poller thread (which always has
883 // an ACEC available at the base of the stack).
884 auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
885 if (((internal || functor->inlineable) &&
886 grpc_core::ApplicationCallbackExecCtx::Available()) ||
887 grpc_iomgr_is_any_background_poller_thread()) {
888 grpc_core::ApplicationCallbackExecCtx::Enqueue(functor,
889 (error == GRPC_ERROR_NONE));
890 GRPC_ERROR_UNREF(error);
891 return;
892 }
893
894 // Schedule the callback on a closure if not internal or triggered
895 // from a background poller thread.
896 grpc_core::Executor::Run(
897 GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
898 }
899
900 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
901 void (*done)(void* done_arg, grpc_cq_completion* storage),
902 void* done_arg, grpc_cq_completion* storage,
903 bool internal) {
904 cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
905 }
906
907 struct cq_is_finished_arg {
908 gpr_atm last_seen_things_queued_ever;
909 grpc_completion_queue* cq;
910 grpc_millis deadline;
911 grpc_cq_completion* stolen_completion;
912 void* tag; /* for pluck */
913 bool first_loop;
914 };
915 class ExecCtxNext : public grpc_core::ExecCtx {
916 public:
917 ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
918
919 bool CheckReadyToFinish() override {
920 cq_is_finished_arg* a =
921 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
922 grpc_completion_queue* cq = a->cq;
923 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
924 GPR_ASSERT(a->stolen_completion == nullptr);
925
926 intptr_t current_last_seen_things_queued_ever =
927 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
928
929 if (current_last_seen_things_queued_ever !=
930 a->last_seen_things_queued_ever) {
931 a->last_seen_things_queued_ever =
932 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
933
934 /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
935 * might return NULL in some cases even if the queue is not empty; but
936 * that
937 * is ok and doesn't affect correctness. Might effect the tail latencies a
938 * bit) */
939 a->stolen_completion = cqd->queue.Pop();
940 if (a->stolen_completion != nullptr) {
941 return true;
942 }
943 }
944 return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
945 }
946
947 private:
948 void* check_ready_to_finish_arg_;
949 };
950
951 #ifndef NDEBUG
952 static void dump_pending_tags(grpc_completion_queue* cq) {
953 if (!GRPC_TRACE_FLAG_ENABLED(grpc_trace_pending_tags)) return;
954 std::vector<std::string> parts;
955 parts.push_back("PENDING TAGS:");
956 gpr_mu_lock(cq->mu);
957 for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
958 parts.push_back(absl::StrFormat(" %p", cq->outstanding_tags[i]));
959 }
960 gpr_mu_unlock(cq->mu);
961 gpr_log(GPR_DEBUG, "%s", absl::StrJoin(parts, "").c_str());
962 }
963 #else
964 static void dump_pending_tags(grpc_completion_queue* /*cq*/) {}
965 #endif
966
967 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
968 void* reserved) {
969 GPR_TIMER_SCOPE("grpc_completion_queue_next", 0);
970
971 grpc_event ret;
972 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
973
974 GRPC_API_TRACE(
975 "grpc_completion_queue_next("
976 "cq=%p, "
977 "deadline=gpr_timespec { tv_sec: %" PRId64
978 ", tv_nsec: %d, clock_type: %d }, "
979 "reserved=%p)",
980 5,
981 (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
982 reserved));
983 GPR_ASSERT(!reserved);
984
985 dump_pending_tags(cq);
986
987 GRPC_CQ_INTERNAL_REF(cq, "next");
988
989 grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
990 cq_is_finished_arg is_finished_arg = {
991 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
992 cq,
993 deadline_millis,
994 nullptr,
995 nullptr,
996 true};
997 ExecCtxNext exec_ctx(&is_finished_arg);
998 for (;;) {
999 grpc_millis iteration_deadline = deadline_millis;
1000
1001 if (is_finished_arg.stolen_completion != nullptr) {
1002 grpc_cq_completion* c = is_finished_arg.stolen_completion;
1003 is_finished_arg.stolen_completion = nullptr;
1004 ret.type = GRPC_OP_COMPLETE;
1005 ret.success = c->next & 1u;
1006 ret.tag = c->tag;
1007 c->done(c->done_arg, c);
1008 break;
1009 }
1010
1011 grpc_cq_completion* c = cqd->queue.Pop();
1012
1013 if (c != nullptr) {
1014 ret.type = GRPC_OP_COMPLETE;
1015 ret.success = c->next & 1u;
1016 ret.tag = c->tag;
1017 c->done(c->done_arg, c);
1018 break;
1019 } else {
1020 /* If c == NULL it means either the queue is empty OR in an transient
1021 inconsistent state. If it is the latter, we shold do a 0-timeout poll
1022 so that the thread comes back quickly from poll to make a second
1023 attempt at popping. Not doing this can potentially deadlock this
1024 thread forever (if the deadline is infinity) */
1025 if (cqd->queue.num_items() > 0) {
1026 iteration_deadline = 0;
1027 }
1028 }
1029
1030 if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) == 0) {
1031 /* Before returning, check if the queue has any items left over (since
1032 MultiProducerSingleConsumerQueue::Pop() can sometimes return NULL
1033 even if the queue is not empty. If so, keep retrying but do not
1034 return GRPC_QUEUE_SHUTDOWN */
1035 if (cqd->queue.num_items() > 0) {
1036 /* Go to the beginning of the loop. No point doing a poll because
1037 (cq->shutdown == true) is only possible when there is no pending
1038 work (i.e cq->pending_events == 0) and any outstanding completion
1039 events should have already been queued on this cq */
1040 continue;
1041 }
1042
1043 ret.type = GRPC_QUEUE_SHUTDOWN;
1044 ret.success = 0;
1045 break;
1046 }
1047
1048 if (!is_finished_arg.first_loop &&
1049 grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1050 ret.type = GRPC_QUEUE_TIMEOUT;
1051 ret.success = 0;
1052 dump_pending_tags(cq);
1053 break;
1054 }
1055
1056 /* The main polling work happens in grpc_pollset_work */
1057 gpr_mu_lock(cq->mu);
1058 cq->num_polls++;
1059 grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr,
1060 iteration_deadline);
1061 gpr_mu_unlock(cq->mu);
1062
1063 if (err != GRPC_ERROR_NONE) {
1064 const char* msg = grpc_error_string(err);
1065 gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
1066
1067 GRPC_ERROR_UNREF(err);
1068 ret.type = GRPC_QUEUE_TIMEOUT;
1069 ret.success = 0;
1070 dump_pending_tags(cq);
1071 break;
1072 }
1073 is_finished_arg.first_loop = false;
1074 }
1075
1076 if (cqd->queue.num_items() > 0 &&
1077 cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) > 0) {
1078 gpr_mu_lock(cq->mu);
1079 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
1080 gpr_mu_unlock(cq->mu);
1081 }
1082
1083 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1084 GRPC_CQ_INTERNAL_UNREF(cq, "next");
1085
1086 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1087
1088 return ret;
1089 }
1090
1091 /* Finishes the completion queue shutdown. This means that there are no more
1092 completion events / tags expected from the completion queue
1093 - Must be called under completion queue lock
1094 - Must be called only once in completion queue's lifetime
1095 - grpc_completion_queue_shutdown() MUST have been called before calling
1096 this function */
1097 static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
1098 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1099
1100 GPR_ASSERT(cqd->shutdown_called);
1101 GPR_ASSERT(cqd->pending_events.Load(grpc_core::MemoryOrder::RELAXED) == 0);
1102
1103 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1104 }
1105
1106 static void cq_shutdown_next(grpc_completion_queue* cq) {
1107 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1108
1109 /* Need an extra ref for cq here because:
1110 * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1111 * Pollset shutdown decrements the cq ref count which can potentially destroy
1112 * the cq (if that happens to be the last ref).
1113 * Creating an extra ref here prevents the cq from getting destroyed while
1114 * this function is still active */
1115 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1116 gpr_mu_lock(cq->mu);
1117 if (cqd->shutdown_called) {
1118 gpr_mu_unlock(cq->mu);
1119 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1120 return;
1121 }
1122 cqd->shutdown_called = true;
1123 /* Doing acq/release FetchSub here to match with
1124 * cq_begin_op_for_next and cq_end_op_for_next functions which read/write
1125 * on this counter without necessarily holding a lock on cq */
1126 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1127 cq_finish_shutdown_next(cq);
1128 }
1129 gpr_mu_unlock(cq->mu);
1130 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1131 }
1132
1133 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
1134 gpr_timespec deadline, void* reserved) {
1135 return cq->vtable->next(cq, deadline, reserved);
1136 }
1137
1138 static int add_plucker(grpc_completion_queue* cq, void* tag,
1139 grpc_pollset_worker** worker) {
1140 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1141 if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
1142 return 0;
1143 }
1144 cqd->pluckers[cqd->num_pluckers].tag = tag;
1145 cqd->pluckers[cqd->num_pluckers].worker = worker;
1146 cqd->num_pluckers++;
1147 return 1;
1148 }
1149
1150 static void del_plucker(grpc_completion_queue* cq, void* tag,
1151 grpc_pollset_worker** worker) {
1152 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1153 for (int i = 0; i < cqd->num_pluckers; i++) {
1154 if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1155 cqd->num_pluckers--;
1156 GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
1157 return;
1158 }
1159 }
1160 GPR_UNREACHABLE_CODE(return );
1161 }
1162
1163 class ExecCtxPluck : public grpc_core::ExecCtx {
1164 public:
1165 ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
1166
1167 bool CheckReadyToFinish() override {
1168 cq_is_finished_arg* a =
1169 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1170 grpc_completion_queue* cq = a->cq;
1171 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1172
1173 GPR_ASSERT(a->stolen_completion == nullptr);
1174 gpr_atm current_last_seen_things_queued_ever =
1175 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1176 if (current_last_seen_things_queued_ever !=
1177 a->last_seen_things_queued_ever) {
1178 gpr_mu_lock(cq->mu);
1179 a->last_seen_things_queued_ever =
1180 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1181 grpc_cq_completion* c;
1182 grpc_cq_completion* prev = &cqd->completed_head;
1183 while ((c = (grpc_cq_completion*)(prev->next &
1184 ~static_cast<uintptr_t>(1))) !=
1185 &cqd->completed_head) {
1186 if (c->tag == a->tag) {
1187 prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1188 (c->next & ~static_cast<uintptr_t>(1));
1189 if (c == cqd->completed_tail) {
1190 cqd->completed_tail = prev;
1191 }
1192 gpr_mu_unlock(cq->mu);
1193 a->stolen_completion = c;
1194 return true;
1195 }
1196 prev = c;
1197 }
1198 gpr_mu_unlock(cq->mu);
1199 }
1200 return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
1201 }
1202
1203 private:
1204 void* check_ready_to_finish_arg_;
1205 };
1206
1207 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
1208 gpr_timespec deadline, void* reserved) {
1209 GPR_TIMER_SCOPE("grpc_completion_queue_pluck", 0);
1210
1211 grpc_event ret;
1212 grpc_cq_completion* c;
1213 grpc_cq_completion* prev;
1214 grpc_pollset_worker* worker = nullptr;
1215 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1216
1217 if (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace)) {
1218 GRPC_API_TRACE(
1219 "grpc_completion_queue_pluck("
1220 "cq=%p, tag=%p, "
1221 "deadline=gpr_timespec { tv_sec: %" PRId64
1222 ", tv_nsec: %d, clock_type: %d }, "
1223 "reserved=%p)",
1224 6,
1225 (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1226 reserved));
1227 }
1228 GPR_ASSERT(!reserved);
1229
1230 dump_pending_tags(cq);
1231
1232 GRPC_CQ_INTERNAL_REF(cq, "pluck");
1233 gpr_mu_lock(cq->mu);
1234 grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
1235 cq_is_finished_arg is_finished_arg = {
1236 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
1237 cq,
1238 deadline_millis,
1239 nullptr,
1240 tag,
1241 true};
1242 ExecCtxPluck exec_ctx(&is_finished_arg);
1243 for (;;) {
1244 if (is_finished_arg.stolen_completion != nullptr) {
1245 gpr_mu_unlock(cq->mu);
1246 c = is_finished_arg.stolen_completion;
1247 is_finished_arg.stolen_completion = nullptr;
1248 ret.type = GRPC_OP_COMPLETE;
1249 ret.success = c->next & 1u;
1250 ret.tag = c->tag;
1251 c->done(c->done_arg, c);
1252 break;
1253 }
1254 prev = &cqd->completed_head;
1255 while (
1256 (c = (grpc_cq_completion*)(prev->next & ~static_cast<uintptr_t>(1))) !=
1257 &cqd->completed_head) {
1258 if (c->tag == tag) {
1259 prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1260 (c->next & ~static_cast<uintptr_t>(1));
1261 if (c == cqd->completed_tail) {
1262 cqd->completed_tail = prev;
1263 }
1264 gpr_mu_unlock(cq->mu);
1265 ret.type = GRPC_OP_COMPLETE;
1266 ret.success = c->next & 1u;
1267 ret.tag = c->tag;
1268 c->done(c->done_arg, c);
1269 goto done;
1270 }
1271 prev = c;
1272 }
1273 if (cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED)) {
1274 gpr_mu_unlock(cq->mu);
1275 ret.type = GRPC_QUEUE_SHUTDOWN;
1276 ret.success = 0;
1277 break;
1278 }
1279 if (!add_plucker(cq, tag, &worker)) {
1280 gpr_log(GPR_DEBUG,
1281 "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1282 "is %d",
1283 GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
1284 gpr_mu_unlock(cq->mu);
1285 /* TODO(ctiller): should we use a different result here */
1286 ret.type = GRPC_QUEUE_TIMEOUT;
1287 ret.success = 0;
1288 dump_pending_tags(cq);
1289 break;
1290 }
1291 if (!is_finished_arg.first_loop &&
1292 grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1293 del_plucker(cq, tag, &worker);
1294 gpr_mu_unlock(cq->mu);
1295 ret.type = GRPC_QUEUE_TIMEOUT;
1296 ret.success = 0;
1297 dump_pending_tags(cq);
1298 break;
1299 }
1300 cq->num_polls++;
1301 grpc_error* err =
1302 cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
1303 if (err != GRPC_ERROR_NONE) {
1304 del_plucker(cq, tag, &worker);
1305 gpr_mu_unlock(cq->mu);
1306 const char* msg = grpc_error_string(err);
1307 gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
1308
1309 GRPC_ERROR_UNREF(err);
1310 ret.type = GRPC_QUEUE_TIMEOUT;
1311 ret.success = 0;
1312 dump_pending_tags(cq);
1313 break;
1314 }
1315 is_finished_arg.first_loop = false;
1316 del_plucker(cq, tag, &worker);
1317 }
1318 done:
1319 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1320 GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
1321
1322 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1323
1324 return ret;
1325 }
1326
1327 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
1328 gpr_timespec deadline, void* reserved) {
1329 return cq->vtable->pluck(cq, tag, deadline, reserved);
1330 }
1331
1332 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
1333 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1334
1335 GPR_ASSERT(cqd->shutdown_called);
1336 GPR_ASSERT(!cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED));
1337 cqd->shutdown.Store(1, grpc_core::MemoryOrder::RELAXED);
1338
1339 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1340 }
1341
1342 /* NOTE: This function is almost exactly identical to cq_shutdown_next() but
1343 * merging them is a bit tricky and probably not worth it */
1344 static void cq_shutdown_pluck(grpc_completion_queue* cq) {
1345 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1346
1347 /* Need an extra ref for cq here because:
1348 * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1349 * Pollset shutdown decrements the cq ref count which can potentially destroy
1350 * the cq (if that happens to be the last ref).
1351 * Creating an extra ref here prevents the cq from getting destroyed while
1352 * this function is still active */
1353 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
1354 gpr_mu_lock(cq->mu);
1355 if (cqd->shutdown_called) {
1356 gpr_mu_unlock(cq->mu);
1357 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1358 return;
1359 }
1360 cqd->shutdown_called = true;
1361 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1362 cq_finish_shutdown_pluck(cq);
1363 }
1364 gpr_mu_unlock(cq->mu);
1365 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1366 }
1367
1368 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
1369 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1370 auto* callback = cqd->shutdown_callback;
1371
1372 GPR_ASSERT(cqd->shutdown_called);
1373
1374 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1375 if (grpc_iomgr_is_any_background_poller_thread()) {
1376 grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
1377 return;
1378 }
1379
1380 // Schedule the callback on a closure if not internal or triggered
1381 // from a background poller thread.
1382 grpc_core::Executor::Run(
1383 GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
1384 GRPC_ERROR_NONE);
1385 }
1386
1387 static void cq_shutdown_callback(grpc_completion_queue* cq) {
1388 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1389
1390 /* Need an extra ref for cq here because:
1391 * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1392 * Pollset shutdown decrements the cq ref count which can potentially destroy
1393 * the cq (if that happens to be the last ref).
1394 * Creating an extra ref here prevents the cq from getting destroyed while
1395 * this function is still active */
1396 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1397 gpr_mu_lock(cq->mu);
1398 if (cqd->shutdown_called) {
1399 gpr_mu_unlock(cq->mu);
1400 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1401 return;
1402 }
1403 cqd->shutdown_called = true;
1404 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1405 gpr_mu_unlock(cq->mu);
1406 cq_finish_shutdown_callback(cq);
1407 } else {
1408 gpr_mu_unlock(cq->mu);
1409 }
1410 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1411 }
1412
1413 /* Shutdown simply drops a ref that we reserved at creation time; if we drop
1414 to zero here, then enter shutdown mode and wake up any waiters */
1415 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
1416 GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0);
1417 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1418 grpc_core::ExecCtx exec_ctx;
1419 GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
1420 cq->vtable->shutdown(cq);
1421 }
1422
1423 void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
1424 GPR_TIMER_SCOPE("grpc_completion_queue_destroy", 0);
1425 GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
1426 grpc_completion_queue_shutdown(cq);
1427
1428 grpc_core::ExecCtx exec_ctx;
1429 GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
1430 }
1431
1432 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
1433 return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
1434 }
1435
1436 bool grpc_cq_can_listen(grpc_completion_queue* cq) {
1437 return cq->poller_vtable->can_listen;
1438 }
1439