1 /*
2 *
3 * Copyright 2015-2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/surface/server.h"
22
23 #include <limits.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30
31 #include <utility>
32
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/channel/channelz.h"
35 #include "src/core/lib/channel/connected_channel.h"
36 #include "src/core/lib/debug/stats.h"
37 #include "src/core/lib/gpr/spinlock.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/gprpp/mpscq.h"
40 #include "src/core/lib/iomgr/executor.h"
41 #include "src/core/lib/iomgr/iomgr.h"
42 #include "src/core/lib/slice/slice_internal.h"
43 #include "src/core/lib/surface/api_trace.h"
44 #include "src/core/lib/surface/call.h"
45 #include "src/core/lib/surface/channel.h"
46 #include "src/core/lib/surface/completion_queue.h"
47 #include "src/core/lib/surface/init.h"
48 #include "src/core/lib/transport/metadata.h"
49 #include "src/core/lib/transport/static_metadata.h"
50
51 grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel");
52
53 using grpc_core::LockedMultiProducerSingleConsumerQueue;
54
55 namespace {
56
57 void server_on_recv_initial_metadata(void* ptr, grpc_error* error);
58 void server_recv_trailing_metadata_ready(void* user_data, grpc_error* error);
59
60 struct listener {
61 void* arg;
62 void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
63 size_t pollset_count);
64 void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure);
65 struct listener* next;
66 intptr_t socket_uuid;
67 grpc_closure destroy_done;
68 };
69
70 enum requested_call_type { BATCH_CALL, REGISTERED_CALL };
71
72 struct registered_method;
73
74 struct requested_call {
requested_call__anoncbe2fe5a0111::requested_call75 requested_call(void* tag_arg, grpc_completion_queue* call_cq,
76 grpc_call** call_arg, grpc_metadata_array* initial_md,
77 grpc_call_details* details)
78 : type(BATCH_CALL),
79 tag(tag_arg),
80 cq_bound_to_call(call_cq),
81 call(call_arg),
82 initial_metadata(initial_md) {
83 details->reserved = nullptr;
84 data.batch.details = details;
85 }
86
requested_call__anoncbe2fe5a0111::requested_call87 requested_call(void* tag_arg, grpc_completion_queue* call_cq,
88 grpc_call** call_arg, grpc_metadata_array* initial_md,
89 registered_method* rm, gpr_timespec* deadline,
90 grpc_byte_buffer** optional_payload)
91 : type(REGISTERED_CALL),
92 tag(tag_arg),
93 cq_bound_to_call(call_cq),
94 call(call_arg),
95 initial_metadata(initial_md) {
96 data.registered.method = rm;
97 data.registered.deadline = deadline;
98 data.registered.optional_payload = optional_payload;
99 }
100
101 grpc_core::MultiProducerSingleConsumerQueue::Node mpscq_node;
102 const requested_call_type type;
103 void* const tag;
104 grpc_completion_queue* const cq_bound_to_call;
105 grpc_call** const call;
106 grpc_cq_completion completion;
107 grpc_metadata_array* const initial_metadata;
108 union {
109 struct {
110 grpc_call_details* details;
111 } batch;
112 struct {
113 registered_method* method;
114 gpr_timespec* deadline;
115 grpc_byte_buffer** optional_payload;
116 } registered;
117 } data;
118 };
119
120 struct channel_registered_method {
121 registered_method* server_registered_method;
122 uint32_t flags;
123 bool has_host;
124 grpc_core::ExternallyManagedSlice method;
125 grpc_core::ExternallyManagedSlice host;
126 };
127
128 struct channel_data {
129 grpc_server* server;
130 grpc_channel* channel;
131 size_t cq_idx;
132 /* linked list of all channels on a server */
133 channel_data* next;
134 channel_data* prev;
135 channel_registered_method* registered_methods;
136 uint32_t registered_method_slots;
137 uint32_t registered_method_max_probes;
138 grpc_closure finish_destroy_channel_closure;
139 intptr_t channelz_socket_uuid;
140 };
141
142 struct shutdown_tag {
143 void* tag;
144 grpc_completion_queue* cq;
145 grpc_cq_completion completion;
146 };
147
148 enum call_state {
149 /* waiting for metadata */
150 NOT_STARTED,
151 /* initial metadata read, not flow controlled in yet */
152 PENDING,
153 /* flow controlled in, on completion queue */
154 ACTIVATED,
155 /* cancelled before being queued */
156 ZOMBIED
157 };
158
159 struct call_data;
160
161 grpc_call_error ValidateServerRequest(
162 grpc_completion_queue* cq_for_notification, void* tag,
163 grpc_byte_buffer** optional_payload, registered_method* rm);
164
165 // RPCs that come in from the transport must be matched against RPC requests
166 // from the application. An incoming request from the application can be matched
167 // to an RPC that has already arrived or can be queued up for later use.
168 // Likewise, an RPC coming in from the transport can either be matched to a
169 // request that already arrived from the application or can be queued up for
170 // later use (marked pending). If there is a match, the request's tag is posted
171 // on the request's notification CQ.
172 //
173 // RequestMatcherInterface is the base class to provide this functionality.
174 class RequestMatcherInterface {
175 public:
~RequestMatcherInterface()176 virtual ~RequestMatcherInterface() {}
177
178 // Unref the calls associated with any incoming RPCs in the pending queue (not
179 // yet matched to an application-requested RPC).
180 virtual void ZombifyPending() = 0;
181
182 // Mark all application-requested RPCs failed if they have not been matched to
183 // an incoming RPC. The error parameter indicates why the RPCs are being
184 // failed (always server shutdown in all current implementations).
185 virtual void KillRequests(grpc_error* error) = 0;
186
187 // How many request queues are supported by this matcher. This is an abstract
188 // concept that essentially maps to gRPC completion queues.
189 virtual size_t request_queue_count() const = 0;
190
191 // This function is invoked when the application requests a new RPC whose
192 // information is in the call parameter. The request_queue_index marks the
193 // queue onto which to place this RPC, and is typically associated with a gRPC
194 // CQ. If there are pending RPCs waiting to be matched, publish one (match it
195 // and notify the CQ).
196 virtual void RequestCallWithPossiblePublish(size_t request_queue_index,
197 requested_call* call) = 0;
198
199 // This function is invoked on an incoming RPC, represented by the calld
200 // object. The RequestMatcher will try to match it against an
201 // application-requested RPC if possible or will place it in the pending queue
202 // otherwise. To enable some measure of fairness between server CQs, the match
203 // is done starting at the start_request_queue_index parameter in a cyclic
204 // order rather than always starting at 0.
205 virtual void MatchOrQueue(size_t start_request_queue_index,
206 call_data* calld) = 0;
207
208 // Returns the server associated with this request matcher
209 virtual grpc_server* server() const = 0;
210 };
211
212 struct call_data {
call_data__anoncbe2fe5a0111::call_data213 call_data(grpc_call_element* elem, const grpc_call_element_args& args)
214 : call(grpc_call_from_top_element(elem)),
215 call_combiner(args.call_combiner) {
216 GRPC_CLOSURE_INIT(&server_on_recv_initial_metadata,
217 ::server_on_recv_initial_metadata, elem,
218 grpc_schedule_on_exec_ctx);
219 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
220 ::server_recv_trailing_metadata_ready, elem,
221 grpc_schedule_on_exec_ctx);
222 }
~call_data__anoncbe2fe5a0111::call_data223 ~call_data() {
224 GPR_ASSERT(state != PENDING);
225 GRPC_ERROR_UNREF(recv_initial_metadata_error);
226 if (host_set) {
227 grpc_slice_unref_internal(host);
228 }
229 if (path_set) {
230 grpc_slice_unref_internal(path);
231 }
232 grpc_metadata_array_destroy(&initial_metadata);
233 grpc_byte_buffer_destroy(payload);
234 }
235
236 grpc_call* call;
237
238 gpr_atm state = NOT_STARTED;
239
240 bool path_set = false;
241 bool host_set = false;
242 grpc_slice path;
243 grpc_slice host;
244 grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
245
246 grpc_completion_queue* cq_new = nullptr;
247
248 grpc_metadata_batch* recv_initial_metadata = nullptr;
249 uint32_t recv_initial_metadata_flags = 0;
250 grpc_metadata_array initial_metadata =
251 grpc_metadata_array(); // Zero-initialize the C struct.
252
253 RequestMatcherInterface* matcher = nullptr;
254 grpc_byte_buffer* payload = nullptr;
255
256 grpc_closure got_initial_metadata;
257 grpc_closure server_on_recv_initial_metadata;
258 grpc_closure kill_zombie_closure;
259 grpc_closure* on_done_recv_initial_metadata;
260 grpc_closure recv_trailing_metadata_ready;
261 grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
262 grpc_closure* original_recv_trailing_metadata_ready;
263 grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE;
264 bool seen_recv_trailing_metadata_ready = false;
265
266 grpc_closure publish;
267
268 call_data* pending_next = nullptr;
269 grpc_core::CallCombiner* call_combiner;
270 };
271
272 struct registered_method {
registered_method__anoncbe2fe5a0111::registered_method273 registered_method(
274 const char* method_arg, const char* host_arg,
275 grpc_server_register_method_payload_handling payload_handling_arg,
276 uint32_t flags_arg)
277 : method(method_arg == nullptr ? "" : method_arg),
278 host(host_arg == nullptr ? "" : host_arg),
279 payload_handling(payload_handling_arg),
280 flags(flags_arg) {}
281
282 ~registered_method() = default;
283
284 const std::string method;
285 const std::string host;
286 const grpc_server_register_method_payload_handling payload_handling;
287 const uint32_t flags;
288 /* one request matcher per method */
289 std::unique_ptr<RequestMatcherInterface> matcher;
290 registered_method* next;
291 };
292
293 struct channel_broadcaster {
294 grpc_channel** channels;
295 size_t num_channels;
296 };
297 } // namespace
298
299 struct grpc_server {
300 grpc_channel_args* channel_args;
301
302 grpc_resource_user* default_resource_user;
303
304 grpc_completion_queue** cqs;
305 grpc_pollset** pollsets;
306 size_t cq_count;
307 size_t pollset_count;
308 bool started;
309
310 /* The two following mutexes control access to server-state
311 mu_global controls access to non-call-related state (e.g., channel state)
312 mu_call controls access to call-related state (e.g., the call lists)
313
314 If they are ever required to be nested, you must lock mu_global
315 before mu_call. This is currently used in shutdown processing
316 (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
317 gpr_mu mu_global; /* mutex for server and channel state */
318 gpr_mu mu_call; /* mutex for call-specific state */
319
320 /* startup synchronization: flag is protected by mu_global, signals whether
321 we are doing the listener start routine or not */
322 bool starting;
323 gpr_cv starting_cv;
324
325 // TODO(vjpai): Convert from a linked-list head pointer to a std::vector once
326 // grpc_server has a real constructor/destructor
327 registered_method* registered_methods;
328 /** one request matcher for unregistered methods */
329 // TODO(vjpai): Convert to a std::unique_ptr once grpc_server has a real
330 // constructor and destructor.
331 RequestMatcherInterface* unregistered_request_matcher;
332
333 gpr_atm shutdown_flag;
334 uint8_t shutdown_published;
335 size_t num_shutdown_tags;
336 shutdown_tag* shutdown_tags;
337
338 channel_data root_channel_data;
339
340 listener* listeners;
341 int listeners_destroyed;
342 grpc_core::RefCount internal_refcount;
343
344 /** when did we print the last shutdown progress message */
345 gpr_timespec last_shutdown_message_time;
346
347 grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
348 };
349
350 #define SERVER_FROM_CALL_ELEM(elem) \
351 (((channel_data*)(elem)->channel_data)->server)
352
353 namespace {
354 void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
355 requested_call* rc);
356 void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
357 grpc_error* error);
358 /* Before calling maybe_finish_shutdown, we must hold mu_global and not
359 hold mu_call */
360 void maybe_finish_shutdown(grpc_server* server);
361
kill_zombie(void * elem,grpc_error *)362 void kill_zombie(void* elem, grpc_error* /*error*/) {
363 grpc_call_unref(
364 grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
365 }
366
367 /*
368 * channel broadcaster
369 */
370
371 /* assumes server locked */
channel_broadcaster_init(grpc_server * s,channel_broadcaster * cb)372 void channel_broadcaster_init(grpc_server* s, channel_broadcaster* cb) {
373 channel_data* c;
374 size_t count = 0;
375 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
376 count++;
377 }
378 cb->num_channels = count;
379 cb->channels = static_cast<grpc_channel**>(
380 gpr_malloc(sizeof(*cb->channels) * cb->num_channels));
381 count = 0;
382 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
383 cb->channels[count++] = c->channel;
384 GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
385 }
386 }
387
388 struct shutdown_cleanup_args {
389 grpc_closure closure;
390 grpc_slice slice;
391 };
392
shutdown_cleanup(void * arg,grpc_error *)393 void shutdown_cleanup(void* arg, grpc_error* /*error*/) {
394 struct shutdown_cleanup_args* a =
395 static_cast<struct shutdown_cleanup_args*>(arg);
396 grpc_slice_unref_internal(a->slice);
397 gpr_free(a);
398 }
399
send_shutdown(grpc_channel * channel,bool send_goaway,grpc_error * send_disconnect)400 void send_shutdown(grpc_channel* channel, bool send_goaway,
401 grpc_error* send_disconnect) {
402 struct shutdown_cleanup_args* sc =
403 static_cast<struct shutdown_cleanup_args*>(gpr_malloc(sizeof(*sc)));
404 GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
405 grpc_schedule_on_exec_ctx);
406 grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
407 grpc_channel_element* elem;
408
409 op->goaway_error =
410 send_goaway ? grpc_error_set_int(
411 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"),
412 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK)
413 : GRPC_ERROR_NONE;
414 op->set_accept_stream = true;
415 sc->slice = grpc_slice_from_copied_string("Server shutdown");
416 op->disconnect_with_error = send_disconnect;
417
418 elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
419 elem->filter->start_transport_op(elem, op);
420 }
421
channel_broadcaster_shutdown(channel_broadcaster * cb,bool send_goaway,grpc_error * force_disconnect)422 void channel_broadcaster_shutdown(channel_broadcaster* cb, bool send_goaway,
423 grpc_error* force_disconnect) {
424 size_t i;
425
426 for (i = 0; i < cb->num_channels; i++) {
427 send_shutdown(cb->channels[i], send_goaway,
428 GRPC_ERROR_REF(force_disconnect));
429 GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
430 }
431 gpr_free(cb->channels);
432 GRPC_ERROR_UNREF(force_disconnect);
433 }
434
435 /*
436 * request_matcher
437 */
438
439 // The RealRequestMatcher is an implementation of RequestMatcherInterface that
440 // actually uses all the features of RequestMatcherInterface: expecting the
441 // application to explicitly request RPCs and then matching those to incoming
442 // RPCs, along with a slow path by which incoming RPCs are put on a locked
443 // pending list if they aren't able to be matched to an application request.
444 class RealRequestMatcher : public RequestMatcherInterface {
445 public:
RealRequestMatcher(grpc_server * server)446 explicit RealRequestMatcher(grpc_server* server)
447 : server_(server), requests_per_cq_(server->cq_count) {}
448
~RealRequestMatcher()449 ~RealRequestMatcher() override {
450 for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
451 GPR_ASSERT(queue.Pop() == nullptr);
452 }
453 }
454
ZombifyPending()455 void ZombifyPending() override {
456 while (pending_head_ != nullptr) {
457 call_data* calld = pending_head_;
458 pending_head_ = calld->pending_next;
459 gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
460 GRPC_CLOSURE_INIT(
461 &calld->kill_zombie_closure, kill_zombie,
462 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
463 grpc_schedule_on_exec_ctx);
464 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
465 GRPC_ERROR_NONE);
466 }
467 }
468
KillRequests(grpc_error * error)469 void KillRequests(grpc_error* error) override {
470 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
471 requested_call* rc;
472 while ((rc = reinterpret_cast<requested_call*>(
473 requests_per_cq_[i].Pop())) != nullptr) {
474 fail_call(server_, i, rc, GRPC_ERROR_REF(error));
475 }
476 }
477 GRPC_ERROR_UNREF(error);
478 }
479
request_queue_count() const480 size_t request_queue_count() const override {
481 return requests_per_cq_.size();
482 }
483
RequestCallWithPossiblePublish(size_t request_queue_index,requested_call * call)484 void RequestCallWithPossiblePublish(size_t request_queue_index,
485 requested_call* call) override {
486 if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
487 /* this was the first queued request: we need to lock and start
488 matching calls */
489 gpr_mu_lock(&server_->mu_call);
490 call_data* calld;
491 while ((calld = pending_head_) != nullptr) {
492 requested_call* rc = reinterpret_cast<requested_call*>(
493 requests_per_cq_[request_queue_index].Pop());
494 if (rc == nullptr) break;
495 pending_head_ = calld->pending_next;
496 gpr_mu_unlock(&server_->mu_call);
497 if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
498 // Zombied Call
499 GRPC_CLOSURE_INIT(
500 &calld->kill_zombie_closure, kill_zombie,
501 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
502 grpc_schedule_on_exec_ctx);
503 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
504 GRPC_ERROR_NONE);
505 } else {
506 publish_call(server_, calld, request_queue_index, rc);
507 }
508 gpr_mu_lock(&server_->mu_call);
509 }
510 gpr_mu_unlock(&server_->mu_call);
511 }
512 }
513
MatchOrQueue(size_t start_request_queue_index,call_data * calld)514 void MatchOrQueue(size_t start_request_queue_index,
515 call_data* calld) override {
516 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
517 size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
518 requested_call* rc =
519 reinterpret_cast<requested_call*>(requests_per_cq_[cq_idx].TryPop());
520 if (rc == nullptr) {
521 continue;
522 } else {
523 GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
524 gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
525 publish_call(server_, calld, cq_idx, rc);
526 return; /* early out */
527 }
528 }
529
530 /* no cq to take the request found: queue it on the slow list */
531 GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
532 gpr_mu_lock(&server_->mu_call);
533
534 // We need to ensure that all the queues are empty. We do this under
535 // the server mu_call lock to ensure that if something is added to
536 // an empty request queue, it will block until the call is actually
537 // added to the pending list.
538 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
539 size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
540 requested_call* rc =
541 reinterpret_cast<requested_call*>(requests_per_cq_[cq_idx].Pop());
542 if (rc == nullptr) {
543 continue;
544 } else {
545 gpr_mu_unlock(&server_->mu_call);
546 GRPC_STATS_INC_SERVER_CQS_CHECKED(i + requests_per_cq_.size());
547 gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
548 publish_call(server_, calld, cq_idx, rc);
549 return; /* early out */
550 }
551 }
552
553 gpr_atm_no_barrier_store(&calld->state, PENDING);
554 if (pending_head_ == nullptr) {
555 pending_tail_ = pending_head_ = calld;
556 } else {
557 pending_tail_->pending_next = calld;
558 pending_tail_ = calld;
559 }
560 gpr_mu_unlock(&server_->mu_call);
561 }
562
server() const563 grpc_server* server() const override { return server_; }
564
565 private:
566 grpc_server* const server_;
567 call_data* pending_head_ = nullptr;
568 call_data* pending_tail_ = nullptr;
569 std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
570 };
571
572 // AllocatingRequestMatchers don't allow the application to request an RPC in
573 // advance or queue up any incoming RPC for later match. Instead, MatchOrQueue
574 // will call out to an allocation function passed in at the construction of the
575 // object. These request matchers are designed for the C++ callback API, so they
576 // only support 1 completion queue (passed in at the constructor).
577 class AllocatingRequestMatcherBase : public RequestMatcherInterface {
578 public:
AllocatingRequestMatcherBase(grpc_server * server,grpc_completion_queue * cq)579 AllocatingRequestMatcherBase(grpc_server* server, grpc_completion_queue* cq)
580 : server_(server), cq_(cq) {
581 size_t idx;
582 for (idx = 0; idx < server->cq_count; idx++) {
583 if (server->cqs[idx] == cq) {
584 break;
585 }
586 }
587 GPR_ASSERT(idx < server->cq_count);
588 cq_idx_ = idx;
589 }
590
ZombifyPending()591 void ZombifyPending() override {}
592
KillRequests(grpc_error * error)593 void KillRequests(grpc_error* error) override { GRPC_ERROR_UNREF(error); }
594
request_queue_count() const595 size_t request_queue_count() const override { return 0; }
596
RequestCallWithPossiblePublish(size_t,requested_call *)597 void RequestCallWithPossiblePublish(size_t /*request_queue_index*/,
598 requested_call* /*call*/) final {
599 GPR_ASSERT(false);
600 }
601
server() const602 grpc_server* server() const override { return server_; }
603
604 // Supply the completion queue related to this request matcher
cq() const605 grpc_completion_queue* cq() const { return cq_; }
606
607 // Supply the completion queue's index relative to the server.
cq_idx() const608 size_t cq_idx() const { return cq_idx_; }
609
610 private:
611 grpc_server* const server_;
612 grpc_completion_queue* const cq_;
613 size_t cq_idx_;
614 };
615
616 // An allocating request matcher for non-registered methods (used for generic
617 // API and unimplemented RPCs).
618 class AllocatingRequestMatcherBatch : public AllocatingRequestMatcherBase {
619 public:
AllocatingRequestMatcherBatch(grpc_server * server,grpc_completion_queue * cq,std::function<grpc_core::ServerBatchCallAllocation ()> allocator)620 AllocatingRequestMatcherBatch(
621 grpc_server* server, grpc_completion_queue* cq,
622 std::function<grpc_core::ServerBatchCallAllocation()> allocator)
623 : AllocatingRequestMatcherBase(server, cq),
624 allocator_(std::move(allocator)) {}
MatchOrQueue(size_t,call_data * calld)625 void MatchOrQueue(size_t /*start_request_queue_index*/,
626 call_data* calld) override {
627 grpc_core::ServerBatchCallAllocation call_info = allocator_();
628 GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag),
629 nullptr, nullptr) == GRPC_CALL_OK);
630 requested_call* rc = new requested_call(
631 static_cast<void*>(call_info.tag), cq(), call_info.call,
632 call_info.initial_metadata, call_info.details);
633 gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
634 publish_call(server(), calld, cq_idx(), rc);
635 }
636
637 private:
638 std::function<grpc_core::ServerBatchCallAllocation()> allocator_;
639 };
640
641 // An allocating request matcher for registered methods.
642 class AllocatingRequestMatcherRegistered : public AllocatingRequestMatcherBase {
643 public:
AllocatingRequestMatcherRegistered(grpc_server * server,grpc_completion_queue * cq,registered_method * rm,std::function<grpc_core::ServerRegisteredCallAllocation ()> allocator)644 AllocatingRequestMatcherRegistered(
645 grpc_server* server, grpc_completion_queue* cq, registered_method* rm,
646 std::function<grpc_core::ServerRegisteredCallAllocation()> allocator)
647 : AllocatingRequestMatcherBase(server, cq),
648 registered_method_(rm),
649 allocator_(std::move(allocator)) {}
MatchOrQueue(size_t,call_data * calld)650 void MatchOrQueue(size_t /*start_request_queue_index*/,
651 call_data* calld) override {
652 grpc_core::ServerRegisteredCallAllocation call_info = allocator_();
653 GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag),
654 call_info.optional_payload,
655 registered_method_) == GRPC_CALL_OK);
656 requested_call* rc = new requested_call(
657 static_cast<void*>(call_info.tag), cq(), call_info.call,
658 call_info.initial_metadata, registered_method_, call_info.deadline,
659 call_info.optional_payload);
660 gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
661 publish_call(server(), calld, cq_idx(), rc);
662 }
663
664 private:
665 registered_method* const registered_method_;
666 std::function<grpc_core::ServerRegisteredCallAllocation()> allocator_;
667 };
668
669 /*
670 * server proper
671 */
672
server_ref(grpc_server * server)673 void server_ref(grpc_server* server) { server->internal_refcount.Ref(); }
674
server_delete(grpc_server * server)675 void server_delete(grpc_server* server) {
676 registered_method* rm;
677 size_t i;
678 server->channelz_server.reset();
679 grpc_channel_args_destroy(server->channel_args);
680 gpr_mu_destroy(&server->mu_global);
681 gpr_mu_destroy(&server->mu_call);
682 gpr_cv_destroy(&server->starting_cv);
683 while ((rm = server->registered_methods) != nullptr) {
684 server->registered_methods = rm->next;
685 delete rm;
686 }
687 delete server->unregistered_request_matcher;
688 for (i = 0; i < server->cq_count; i++) {
689 GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
690 }
691 gpr_free(server->cqs);
692 gpr_free(server->pollsets);
693 gpr_free(server->shutdown_tags);
694 gpr_free(server);
695 }
696
server_unref(grpc_server * server)697 void server_unref(grpc_server* server) {
698 if (GPR_UNLIKELY(server->internal_refcount.Unref())) {
699 server_delete(server);
700 }
701 }
702
is_channel_orphaned(channel_data * chand)703 int is_channel_orphaned(channel_data* chand) { return chand->next == chand; }
704
orphan_channel(channel_data * chand)705 void orphan_channel(channel_data* chand) {
706 chand->next->prev = chand->prev;
707 chand->prev->next = chand->next;
708 chand->next = chand->prev = chand;
709 }
710
finish_destroy_channel(void * cd,grpc_error *)711 void finish_destroy_channel(void* cd, grpc_error* /*error*/) {
712 channel_data* chand = static_cast<channel_data*>(cd);
713 grpc_server* server = chand->server;
714 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
715 server_unref(server);
716 }
717
destroy_channel(channel_data * chand)718 void destroy_channel(channel_data* chand) {
719 if (is_channel_orphaned(chand)) return;
720 GPR_ASSERT(chand->server != nullptr);
721 orphan_channel(chand);
722 server_ref(chand->server);
723 maybe_finish_shutdown(chand->server);
724 GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
725 finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
726
727 if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace)) {
728 gpr_log(GPR_INFO, "Disconnected client");
729 }
730
731 grpc_transport_op* op =
732 grpc_make_transport_op(&chand->finish_destroy_channel_closure);
733 op->set_accept_stream = true;
734 grpc_channel_next_op(grpc_channel_stack_element(
735 grpc_channel_get_channel_stack(chand->channel), 0),
736 op);
737 }
738
done_request_event(void * req,grpc_cq_completion *)739 void done_request_event(void* req, grpc_cq_completion* /*c*/) {
740 delete static_cast<requested_call*>(req);
741 }
742
publish_call(grpc_server * server,call_data * calld,size_t cq_idx,requested_call * rc)743 void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
744 requested_call* rc) {
745 grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
746 grpc_call* call = calld->call;
747 *rc->call = call;
748 calld->cq_new = server->cqs[cq_idx];
749 GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
750 switch (rc->type) {
751 case BATCH_CALL:
752 GPR_ASSERT(calld->host_set);
753 GPR_ASSERT(calld->path_set);
754 rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
755 rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
756 rc->data.batch.details->deadline =
757 grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
758 rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
759 break;
760 case REGISTERED_CALL:
761 *rc->data.registered.deadline =
762 grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
763 if (rc->data.registered.optional_payload) {
764 *rc->data.registered.optional_payload = calld->payload;
765 calld->payload = nullptr;
766 }
767 break;
768 default:
769 GPR_UNREACHABLE_CODE(return );
770 }
771
772 grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
773 rc, &rc->completion, true);
774 }
775
publish_new_rpc(void * arg,grpc_error * error)776 void publish_new_rpc(void* arg, grpc_error* error) {
777 grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
778 call_data* calld = static_cast<call_data*>(call_elem->call_data);
779 channel_data* chand = static_cast<channel_data*>(call_elem->channel_data);
780 RequestMatcherInterface* rm = calld->matcher;
781 grpc_server* server = rm->server();
782
783 if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
784 gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
785 GRPC_CLOSURE_INIT(
786 &calld->kill_zombie_closure, kill_zombie,
787 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
788 grpc_schedule_on_exec_ctx);
789 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
790 GRPC_ERROR_REF(error));
791 return;
792 }
793
794 rm->MatchOrQueue(chand->cq_idx, calld);
795 }
796
finish_start_new_rpc(grpc_server * server,grpc_call_element * elem,RequestMatcherInterface * rm,grpc_server_register_method_payload_handling payload_handling)797 void finish_start_new_rpc(
798 grpc_server* server, grpc_call_element* elem, RequestMatcherInterface* rm,
799 grpc_server_register_method_payload_handling payload_handling) {
800 call_data* calld = static_cast<call_data*>(elem->call_data);
801
802 if (gpr_atm_acq_load(&server->shutdown_flag)) {
803 gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
804 GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
805 grpc_schedule_on_exec_ctx);
806 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
807 GRPC_ERROR_NONE);
808 return;
809 }
810
811 calld->matcher = rm;
812
813 switch (payload_handling) {
814 case GRPC_SRM_PAYLOAD_NONE:
815 publish_new_rpc(elem, GRPC_ERROR_NONE);
816 break;
817 case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
818 grpc_op op;
819 op.op = GRPC_OP_RECV_MESSAGE;
820 op.flags = 0;
821 op.reserved = nullptr;
822 op.data.recv_message.recv_message = &calld->payload;
823 GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,
824 grpc_schedule_on_exec_ctx);
825 grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish);
826 break;
827 }
828 }
829 }
830
start_new_rpc(grpc_call_element * elem)831 void start_new_rpc(grpc_call_element* elem) {
832 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
833 call_data* calld = static_cast<call_data*>(elem->call_data);
834 grpc_server* server = chand->server;
835 uint32_t i;
836 uint32_t hash;
837 channel_registered_method* rm;
838
839 if (chand->registered_methods && calld->path_set && calld->host_set) {
840 /* TODO(ctiller): unify these two searches */
841 /* check for an exact match with host */
842 hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash_internal(calld->host),
843 grpc_slice_hash_internal(calld->path));
844 for (i = 0; i <= chand->registered_method_max_probes; i++) {
845 rm = &chand->registered_methods[(hash + i) %
846 chand->registered_method_slots];
847 if (rm->server_registered_method == nullptr) break;
848 if (!rm->has_host) continue;
849 if (rm->host != calld->host) continue;
850 if (rm->method != calld->path) continue;
851 if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
852 0 == (calld->recv_initial_metadata_flags &
853 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
854 continue;
855 }
856 finish_start_new_rpc(server, elem,
857 rm->server_registered_method->matcher.get(),
858 rm->server_registered_method->payload_handling);
859 return;
860 }
861 /* check for a wildcard method definition (no host set) */
862 hash = GRPC_MDSTR_KV_HASH(0, grpc_slice_hash_internal(calld->path));
863 for (i = 0; i <= chand->registered_method_max_probes; i++) {
864 rm = &chand->registered_methods[(hash + i) %
865 chand->registered_method_slots];
866 if (rm->server_registered_method == nullptr) break;
867 if (rm->has_host) continue;
868 if (rm->method != calld->path) continue;
869 if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
870 0 == (calld->recv_initial_metadata_flags &
871 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
872 continue;
873 }
874 finish_start_new_rpc(server, elem,
875 rm->server_registered_method->matcher.get(),
876 rm->server_registered_method->payload_handling);
877 return;
878 }
879 }
880 finish_start_new_rpc(server, elem, server->unregistered_request_matcher,
881 GRPC_SRM_PAYLOAD_NONE);
882 }
883
num_listeners(grpc_server * server)884 int num_listeners(grpc_server* server) {
885 listener* l;
886 int n = 0;
887 for (l = server->listeners; l; l = l->next) {
888 n++;
889 }
890 return n;
891 }
892
done_shutdown_event(void * server,grpc_cq_completion *)893 void done_shutdown_event(void* server, grpc_cq_completion* /*completion*/) {
894 server_unref(static_cast<grpc_server*>(server));
895 }
896
num_channels(grpc_server * server)897 int num_channels(grpc_server* server) {
898 channel_data* chand;
899 int n = 0;
900 for (chand = server->root_channel_data.next;
901 chand != &server->root_channel_data; chand = chand->next) {
902 n++;
903 }
904 return n;
905 }
906
kill_pending_work_locked(grpc_server * server,grpc_error * error)907 void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
908 if (server->started) {
909 server->unregistered_request_matcher->KillRequests(GRPC_ERROR_REF(error));
910 server->unregistered_request_matcher->ZombifyPending();
911 for (registered_method* rm = server->registered_methods; rm;
912 rm = rm->next) {
913 rm->matcher->KillRequests(GRPC_ERROR_REF(error));
914 rm->matcher->ZombifyPending();
915 }
916 }
917 GRPC_ERROR_UNREF(error);
918 }
919
maybe_finish_shutdown(grpc_server * server)920 void maybe_finish_shutdown(grpc_server* server) {
921 size_t i;
922 if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
923 return;
924 }
925
926 gpr_mu_lock(&server->mu_call);
927 kill_pending_work_locked(
928 server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
929 gpr_mu_unlock(&server->mu_call);
930
931 if (server->root_channel_data.next != &server->root_channel_data ||
932 server->listeners_destroyed < num_listeners(server)) {
933 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
934 server->last_shutdown_message_time),
935 gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
936 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
937 gpr_log(GPR_DEBUG,
938 "Waiting for %d channels and %d/%d listeners to be destroyed"
939 " before shutting down server",
940 num_channels(server),
941 num_listeners(server) - server->listeners_destroyed,
942 num_listeners(server));
943 }
944 return;
945 }
946 server->shutdown_published = 1;
947 for (i = 0; i < server->num_shutdown_tags; i++) {
948 server_ref(server);
949 grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
950 GRPC_ERROR_NONE, done_shutdown_event, server,
951 &server->shutdown_tags[i].completion);
952 }
953 }
954
server_on_recv_initial_metadata(void * ptr,grpc_error * error)955 void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
956 grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
957 call_data* calld = static_cast<call_data*>(elem->call_data);
958 grpc_millis op_deadline;
959
960 if (error == GRPC_ERROR_NONE) {
961 GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);
962 GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.authority !=
963 nullptr);
964 calld->path = grpc_slice_ref_internal(
965 GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
966 calld->host = grpc_slice_ref_internal(
967 GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));
968 calld->path_set = true;
969 calld->host_set = true;
970 grpc_metadata_batch_remove(calld->recv_initial_metadata, GRPC_BATCH_PATH);
971 grpc_metadata_batch_remove(calld->recv_initial_metadata,
972 GRPC_BATCH_AUTHORITY);
973 } else {
974 GRPC_ERROR_REF(error);
975 }
976 op_deadline = calld->recv_initial_metadata->deadline;
977 if (op_deadline != GRPC_MILLIS_INF_FUTURE) {
978 calld->deadline = op_deadline;
979 }
980 if (calld->host_set && calld->path_set) {
981 /* do nothing */
982 } else {
983 /* Pass the error reference to calld->recv_initial_metadata_error */
984 grpc_error* src_error = error;
985 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
986 "Missing :authority or :path", &src_error, 1);
987 GRPC_ERROR_UNREF(src_error);
988 calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
989 }
990 grpc_closure* closure = calld->on_done_recv_initial_metadata;
991 calld->on_done_recv_initial_metadata = nullptr;
992 if (calld->seen_recv_trailing_metadata_ready) {
993 GRPC_CALL_COMBINER_START(calld->call_combiner,
994 &calld->recv_trailing_metadata_ready,
995 calld->recv_trailing_metadata_error,
996 "continue server_recv_trailing_metadata_ready");
997 }
998 grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
999 }
1000
server_recv_trailing_metadata_ready(void * user_data,grpc_error * error)1001 void server_recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
1002 grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
1003 call_data* calld = static_cast<call_data*>(elem->call_data);
1004 if (calld->on_done_recv_initial_metadata != nullptr) {
1005 calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
1006 calld->seen_recv_trailing_metadata_ready = true;
1007 GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
1008 server_recv_trailing_metadata_ready, elem,
1009 grpc_schedule_on_exec_ctx);
1010 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1011 "deferring server_recv_trailing_metadata_ready "
1012 "until after server_on_recv_initial_metadata");
1013 return;
1014 }
1015 error =
1016 grpc_error_add_child(GRPC_ERROR_REF(error),
1017 GRPC_ERROR_REF(calld->recv_initial_metadata_error));
1018 grpc_core::Closure::Run(DEBUG_LOCATION,
1019 calld->original_recv_trailing_metadata_ready, error);
1020 }
1021
server_mutate_op(grpc_call_element * elem,grpc_transport_stream_op_batch * op)1022 void server_mutate_op(grpc_call_element* elem,
1023 grpc_transport_stream_op_batch* op) {
1024 call_data* calld = static_cast<call_data*>(elem->call_data);
1025
1026 if (op->recv_initial_metadata) {
1027 GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == nullptr);
1028 calld->recv_initial_metadata =
1029 op->payload->recv_initial_metadata.recv_initial_metadata;
1030 calld->on_done_recv_initial_metadata =
1031 op->payload->recv_initial_metadata.recv_initial_metadata_ready;
1032 op->payload->recv_initial_metadata.recv_initial_metadata_ready =
1033 &calld->server_on_recv_initial_metadata;
1034 op->payload->recv_initial_metadata.recv_flags =
1035 &calld->recv_initial_metadata_flags;
1036 }
1037 if (op->recv_trailing_metadata) {
1038 calld->original_recv_trailing_metadata_ready =
1039 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1040 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1041 &calld->recv_trailing_metadata_ready;
1042 }
1043 }
1044
server_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)1045 void server_start_transport_stream_op_batch(
1046 grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
1047 server_mutate_op(elem, op);
1048 grpc_call_next_op(elem, op);
1049 }
1050
got_initial_metadata(void * ptr,grpc_error * error)1051 void got_initial_metadata(void* ptr, grpc_error* error) {
1052 grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
1053 call_data* calld = static_cast<call_data*>(elem->call_data);
1054 if (error == GRPC_ERROR_NONE) {
1055 start_new_rpc(elem);
1056 } else {
1057 if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
1058 GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
1059 grpc_schedule_on_exec_ctx);
1060 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
1061 GRPC_ERROR_NONE);
1062 } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
1063 /* zombied call will be destroyed when it's removed from the pending
1064 queue... later */
1065 }
1066 }
1067 }
1068
accept_stream(void * cd,grpc_transport *,const void * transport_server_data)1069 void accept_stream(void* cd, grpc_transport* /*transport*/,
1070 const void* transport_server_data) {
1071 channel_data* chand = static_cast<channel_data*>(cd);
1072 /* create a call */
1073 grpc_call_create_args args;
1074 args.channel = chand->channel;
1075 args.server = chand->server;
1076 args.parent = nullptr;
1077 args.propagation_mask = 0;
1078 args.cq = nullptr;
1079 args.pollset_set_alternative = nullptr;
1080 args.server_transport_data = transport_server_data;
1081 args.add_initial_metadata = nullptr;
1082 args.add_initial_metadata_count = 0;
1083 args.send_deadline = GRPC_MILLIS_INF_FUTURE;
1084 grpc_call* call;
1085 grpc_error* error = grpc_call_create(&args, &call);
1086 grpc_call_element* elem =
1087 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1088 if (error != GRPC_ERROR_NONE) {
1089 got_initial_metadata(elem, error);
1090 GRPC_ERROR_UNREF(error);
1091 return;
1092 }
1093 call_data* calld = static_cast<call_data*>(elem->call_data);
1094 grpc_op op;
1095 op.op = GRPC_OP_RECV_INITIAL_METADATA;
1096 op.flags = 0;
1097 op.reserved = nullptr;
1098 op.data.recv_initial_metadata.recv_initial_metadata =
1099 &calld->initial_metadata;
1100 GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
1101 grpc_schedule_on_exec_ctx);
1102 grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
1103 }
1104
server_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)1105 grpc_error* server_init_call_elem(grpc_call_element* elem,
1106 const grpc_call_element_args* args) {
1107 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1108 server_ref(chand->server);
1109 new (elem->call_data) call_data(elem, *args);
1110 return GRPC_ERROR_NONE;
1111 }
1112
server_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)1113 void server_destroy_call_elem(grpc_call_element* elem,
1114 const grpc_call_final_info* /*final_info*/,
1115 grpc_closure* /*ignored*/) {
1116 call_data* calld = static_cast<call_data*>(elem->call_data);
1117 calld->~call_data();
1118 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1119 server_unref(chand->server);
1120 }
1121
server_init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)1122 grpc_error* server_init_channel_elem(grpc_channel_element* elem,
1123 grpc_channel_element_args* args) {
1124 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1125 GPR_ASSERT(args->is_first);
1126 GPR_ASSERT(!args->is_last);
1127 chand->server = nullptr;
1128 chand->channel = nullptr;
1129 chand->next = chand->prev = chand;
1130 chand->registered_methods = nullptr;
1131 return GRPC_ERROR_NONE;
1132 }
1133
server_destroy_channel_elem(grpc_channel_element * elem)1134 void server_destroy_channel_elem(grpc_channel_element* elem) {
1135 size_t i;
1136 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1137 if (chand->registered_methods) {
1138 for (i = 0; i < chand->registered_method_slots; i++) {
1139 grpc_slice_unref_internal(chand->registered_methods[i].method);
1140 GPR_DEBUG_ASSERT(chand->registered_methods[i].method.refcount ==
1141 &grpc_core::kNoopRefcount ||
1142 chand->registered_methods[i].method.refcount == nullptr);
1143 if (chand->registered_methods[i].has_host) {
1144 grpc_slice_unref_internal(chand->registered_methods[i].host);
1145 GPR_DEBUG_ASSERT(chand->registered_methods[i].host.refcount ==
1146 &grpc_core::kNoopRefcount ||
1147 chand->registered_methods[i].host.refcount == nullptr);
1148 }
1149 }
1150 gpr_free(chand->registered_methods);
1151 }
1152 if (chand->server) {
1153 if (chand->server->channelz_server != nullptr &&
1154 chand->channelz_socket_uuid != 0) {
1155 chand->server->channelz_server->RemoveChildSocket(
1156 chand->channelz_socket_uuid);
1157 }
1158 gpr_mu_lock(&chand->server->mu_global);
1159 chand->next->prev = chand->prev;
1160 chand->prev->next = chand->next;
1161 chand->next = chand->prev = chand;
1162 maybe_finish_shutdown(chand->server);
1163 gpr_mu_unlock(&chand->server->mu_global);
1164 server_unref(chand->server);
1165 }
1166 }
1167
register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)1168 void register_completion_queue(grpc_server* server, grpc_completion_queue* cq,
1169 void* reserved) {
1170 size_t i, n;
1171 GPR_ASSERT(!reserved);
1172 for (i = 0; i < server->cq_count; i++) {
1173 if (server->cqs[i] == cq) return;
1174 }
1175
1176 GRPC_CQ_INTERNAL_REF(cq, "server");
1177 n = server->cq_count++;
1178 server->cqs = static_cast<grpc_completion_queue**>(gpr_realloc(
1179 server->cqs, server->cq_count * sizeof(grpc_completion_queue*)));
1180 server->cqs[n] = cq;
1181 }
1182
streq(const std::string & a,const char * b)1183 bool streq(const std::string& a, const char* b) {
1184 return (a.empty() && b == nullptr) ||
1185 ((b != nullptr) && !strcmp(a.c_str(), b));
1186 }
1187
1188 class ConnectivityWatcher
1189 : public grpc_core::AsyncConnectivityStateWatcherInterface {
1190 public:
ConnectivityWatcher(channel_data * chand)1191 explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) {
1192 GRPC_CHANNEL_INTERNAL_REF(chand_->channel, "connectivity");
1193 }
1194
~ConnectivityWatcher()1195 ~ConnectivityWatcher() {
1196 GRPC_CHANNEL_INTERNAL_UNREF(chand_->channel, "connectivity");
1197 }
1198
1199 private:
OnConnectivityStateChange(grpc_connectivity_state new_state)1200 void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
1201 // Don't do anything until we are being shut down.
1202 if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
1203 // Shut down channel.
1204 grpc_server* server = chand_->server;
1205 gpr_mu_lock(&server->mu_global);
1206 destroy_channel(chand_);
1207 gpr_mu_unlock(&server->mu_global);
1208 }
1209
1210 channel_data* chand_;
1211 };
1212
done_published_shutdown(void * done_arg,grpc_cq_completion * storage)1213 void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) {
1214 (void)done_arg;
1215 gpr_free(storage);
1216 }
1217
listener_destroy_done(void * s,grpc_error *)1218 void listener_destroy_done(void* s, grpc_error* /*error*/) {
1219 grpc_server* server = static_cast<grpc_server*>(s);
1220 gpr_mu_lock(&server->mu_global);
1221 server->listeners_destroyed++;
1222 maybe_finish_shutdown(server);
1223 gpr_mu_unlock(&server->mu_global);
1224 }
1225
queue_call_request(grpc_server * server,size_t cq_idx,requested_call * rc)1226 grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
1227 requested_call* rc) {
1228 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1229 fail_call(server, cq_idx, rc,
1230 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1231 return GRPC_CALL_OK;
1232 }
1233 RequestMatcherInterface* rm;
1234 switch (rc->type) {
1235 case BATCH_CALL:
1236 rm = server->unregistered_request_matcher;
1237 break;
1238 case REGISTERED_CALL:
1239 rm = rc->data.registered.method->matcher.get();
1240 break;
1241 }
1242 rm->RequestCallWithPossiblePublish(cq_idx, rc);
1243 return GRPC_CALL_OK;
1244 }
1245
fail_call(grpc_server * server,size_t cq_idx,requested_call * rc,grpc_error * error)1246 void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
1247 grpc_error* error) {
1248 *rc->call = nullptr;
1249 rc->initial_metadata->count = 0;
1250 GPR_ASSERT(error != GRPC_ERROR_NONE);
1251
1252 grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,
1253 &rc->completion);
1254 }
1255 } // namespace
1256
1257 namespace grpc_core {
1258
SetServerRegisteredMethodAllocator(grpc_server * server,grpc_completion_queue * cq,void * method_tag,std::function<ServerRegisteredCallAllocation ()> allocator)1259 void SetServerRegisteredMethodAllocator(
1260 grpc_server* server, grpc_completion_queue* cq, void* method_tag,
1261 std::function<ServerRegisteredCallAllocation()> allocator) {
1262 registered_method* rm = static_cast<registered_method*>(method_tag);
1263 rm->matcher.reset(new AllocatingRequestMatcherRegistered(
1264 server, cq, rm, std::move(allocator)));
1265 }
1266
SetServerBatchMethodAllocator(grpc_server * server,grpc_completion_queue * cq,std::function<ServerBatchCallAllocation ()> allocator)1267 void SetServerBatchMethodAllocator(
1268 grpc_server* server, grpc_completion_queue* cq,
1269 std::function<ServerBatchCallAllocation()> allocator) {
1270 GPR_DEBUG_ASSERT(server->unregistered_request_matcher == nullptr);
1271 server->unregistered_request_matcher =
1272 new AllocatingRequestMatcherBatch(server, cq, std::move(allocator));
1273 }
1274
1275 }; // namespace grpc_core
1276
1277 const grpc_channel_filter grpc_server_top_filter = {
1278 server_start_transport_stream_op_batch,
1279 grpc_channel_next_op,
1280 sizeof(call_data),
1281 server_init_call_elem,
1282 grpc_call_stack_ignore_set_pollset_or_pollset_set,
1283 server_destroy_call_elem,
1284 sizeof(channel_data),
1285 server_init_channel_elem,
1286 server_destroy_channel_elem,
1287 grpc_channel_next_get_info,
1288 "server",
1289 };
1290
grpc_server_register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)1291 void grpc_server_register_completion_queue(grpc_server* server,
1292 grpc_completion_queue* cq,
1293 void* reserved) {
1294 GRPC_API_TRACE(
1295 "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
1296 (server, cq, reserved));
1297
1298 auto cq_type = grpc_get_cq_completion_type(cq);
1299 if (cq_type != GRPC_CQ_NEXT && cq_type != GRPC_CQ_CALLBACK) {
1300 gpr_log(GPR_INFO,
1301 "Completion queue of type %d is being registered as a "
1302 "server-completion-queue",
1303 static_cast<int>(cq_type));
1304 /* Ideally we should log an error and abort but ruby-wrapped-language API
1305 calls grpc_completion_queue_pluck() on server completion queues */
1306 }
1307
1308 register_completion_queue(server, cq, reserved);
1309 }
1310
grpc_server_create(const grpc_channel_args * args,void * reserved)1311 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
1312 grpc_core::ExecCtx exec_ctx;
1313 GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
1314
1315 grpc_server* server =
1316 static_cast<grpc_server*>(gpr_zalloc(sizeof(grpc_server)));
1317
1318 gpr_mu_init(&server->mu_global);
1319 gpr_mu_init(&server->mu_call);
1320 gpr_cv_init(&server->starting_cv);
1321
1322 /* decremented by grpc_server_destroy */
1323 new (&server->internal_refcount) grpc_core::RefCount();
1324 server->root_channel_data.next = server->root_channel_data.prev =
1325 &server->root_channel_data;
1326
1327 server->channel_args = grpc_channel_args_copy(args);
1328
1329 const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
1330 if (grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT)) {
1331 arg = grpc_channel_args_find(
1332 args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE);
1333 size_t channel_tracer_max_memory = grpc_channel_arg_get_integer(
1334 arg,
1335 {GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});
1336 server->channelz_server =
1337 grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
1338 server, channel_tracer_max_memory);
1339 server->channelz_server->AddTraceEvent(
1340 grpc_core::channelz::ChannelTrace::Severity::Info,
1341 grpc_slice_from_static_string("Server created"));
1342 }
1343
1344 if (args != nullptr) {
1345 grpc_resource_quota* resource_quota =
1346 grpc_resource_quota_from_channel_args(args, false /* create */);
1347 if (resource_quota != nullptr) {
1348 server->default_resource_user =
1349 grpc_resource_user_create(resource_quota, "default");
1350 }
1351 }
1352
1353 return server;
1354 }
1355
grpc_server_register_method(grpc_server * server,const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)1356 void* grpc_server_register_method(
1357 grpc_server* server, const char* method, const char* host,
1358 grpc_server_register_method_payload_handling payload_handling,
1359 uint32_t flags) {
1360 registered_method* m;
1361 GRPC_API_TRACE(
1362 "grpc_server_register_method(server=%p, method=%s, host=%s, "
1363 "flags=0x%08x)",
1364 4, (server, method, host, flags));
1365 if (!method) {
1366 gpr_log(GPR_ERROR,
1367 "grpc_server_register_method method string cannot be NULL");
1368 return nullptr;
1369 }
1370 for (m = server->registered_methods; m; m = m->next) {
1371 if (streq(m->method, method) && streq(m->host, host)) {
1372 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
1373 host ? host : "*");
1374 return nullptr;
1375 }
1376 }
1377 if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) {
1378 gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
1379 flags);
1380 return nullptr;
1381 }
1382 m = new registered_method(method, host, payload_handling, flags);
1383 m->next = server->registered_methods;
1384 server->registered_methods = m;
1385 return m;
1386 }
1387
grpc_server_start(grpc_server * server)1388 void grpc_server_start(grpc_server* server) {
1389 size_t i;
1390 grpc_core::ExecCtx exec_ctx;
1391
1392 GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
1393
1394 server->started = true;
1395 server->pollset_count = 0;
1396 server->pollsets = static_cast<grpc_pollset**>(
1397 gpr_malloc(sizeof(grpc_pollset*) * server->cq_count));
1398 for (i = 0; i < server->cq_count; i++) {
1399 if (grpc_cq_can_listen(server->cqs[i])) {
1400 server->pollsets[server->pollset_count++] =
1401 grpc_cq_pollset(server->cqs[i]);
1402 }
1403 }
1404 if (server->unregistered_request_matcher == nullptr) {
1405 server->unregistered_request_matcher = new RealRequestMatcher(server);
1406 }
1407 for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
1408 if (rm->matcher == nullptr) {
1409 rm->matcher.reset(new RealRequestMatcher(server));
1410 }
1411 }
1412
1413 gpr_mu_lock(&server->mu_global);
1414 server->starting = true;
1415 gpr_mu_unlock(&server->mu_global);
1416
1417 for (listener* l = server->listeners; l; l = l->next) {
1418 l->start(server, l->arg, server->pollsets, server->pollset_count);
1419 }
1420
1421 gpr_mu_lock(&server->mu_global);
1422 server->starting = false;
1423 gpr_cv_signal(&server->starting_cv);
1424 gpr_mu_unlock(&server->mu_global);
1425 }
1426
grpc_server_get_pollsets(grpc_server * server,grpc_pollset *** pollsets,size_t * pollset_count)1427 void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
1428 size_t* pollset_count) {
1429 *pollset_count = server->pollset_count;
1430 *pollsets = server->pollsets;
1431 }
1432
grpc_server_setup_transport(grpc_server * s,grpc_transport * transport,grpc_pollset * accepting_pollset,const grpc_channel_args * args,const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> & socket_node,grpc_resource_user * resource_user)1433 void grpc_server_setup_transport(
1434 grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
1435 const grpc_channel_args* args,
1436 const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
1437 socket_node,
1438 grpc_resource_user* resource_user) {
1439 size_t num_registered_methods;
1440 size_t alloc;
1441 registered_method* rm;
1442 channel_registered_method* crm;
1443 grpc_channel* channel;
1444 channel_data* chand;
1445 uint32_t hash;
1446 size_t slots;
1447 uint32_t probes;
1448 uint32_t max_probes = 0;
1449 grpc_transport_op* op = nullptr;
1450
1451 channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport,
1452 resource_user);
1453 chand = static_cast<channel_data*>(
1454 grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
1455 ->channel_data);
1456 chand->server = s;
1457 server_ref(s);
1458 chand->channel = channel;
1459 if (socket_node != nullptr) {
1460 chand->channelz_socket_uuid = socket_node->uuid();
1461 s->channelz_server->AddChildSocket(socket_node);
1462 } else {
1463 chand->channelz_socket_uuid = 0;
1464 }
1465
1466 size_t cq_idx;
1467 for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
1468 if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
1469 }
1470 if (cq_idx == s->cq_count) {
1471 /* completion queue not found: pick a random one to publish new calls to */
1472 cq_idx = static_cast<size_t>(rand()) % s->cq_count;
1473 }
1474 chand->cq_idx = cq_idx;
1475
1476 num_registered_methods = 0;
1477 for (rm = s->registered_methods; rm; rm = rm->next) {
1478 num_registered_methods++;
1479 }
1480 /* build a lookup table phrased in terms of mdstr's in this channels context
1481 to quickly find registered methods */
1482 if (num_registered_methods > 0) {
1483 slots = 2 * num_registered_methods;
1484 alloc = sizeof(channel_registered_method) * slots;
1485 chand->registered_methods =
1486 static_cast<channel_registered_method*>(gpr_zalloc(alloc));
1487 for (rm = s->registered_methods; rm; rm = rm->next) {
1488 grpc_core::ExternallyManagedSlice host;
1489 grpc_core::ExternallyManagedSlice method(rm->method.c_str());
1490 const bool has_host = !rm->host.empty();
1491 if (has_host) {
1492 host = grpc_core::ExternallyManagedSlice(rm->host.c_str());
1493 }
1494 hash = GRPC_MDSTR_KV_HASH(has_host ? host.Hash() : 0, method.Hash());
1495 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
1496 .server_registered_method != nullptr;
1497 probes++)
1498 ;
1499 if (probes > max_probes) max_probes = probes;
1500 crm = &chand->registered_methods[(hash + probes) % slots];
1501 crm->server_registered_method = rm;
1502 crm->flags = rm->flags;
1503 crm->has_host = has_host;
1504 if (has_host) {
1505 crm->host = host;
1506 }
1507 crm->method = method;
1508 }
1509 GPR_ASSERT(slots <= UINT32_MAX);
1510 chand->registered_method_slots = static_cast<uint32_t>(slots);
1511 chand->registered_method_max_probes = max_probes;
1512 }
1513
1514 gpr_mu_lock(&s->mu_global);
1515 chand->next = &s->root_channel_data;
1516 chand->prev = chand->next->prev;
1517 chand->next->prev = chand->prev->next = chand;
1518 gpr_mu_unlock(&s->mu_global);
1519
1520 op = grpc_make_transport_op(nullptr);
1521 op->set_accept_stream = true;
1522 op->set_accept_stream_fn = accept_stream;
1523 op->set_accept_stream_user_data = chand;
1524 op->start_connectivity_watch.reset(new ConnectivityWatcher(chand));
1525 if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
1526 op->disconnect_with_error =
1527 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");
1528 }
1529 grpc_transport_perform_op(transport, op);
1530 }
1531
1532 /*
1533 - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via
1534 grpc_server_request_call and grpc_server_request_registered call will now be
1535 cancelled). See 'kill_pending_work_locked()'
1536
1537 - Shuts down the listeners (i.e the server will no longer listen on the port
1538 for new incoming channels).
1539
1540 - Iterates through all channels on the server and sends shutdown msg (see
1541 'channel_broadcaster_shutdown()' for details) to the clients via the
1542 transport layer. The transport layer then guarantees the following:
1543 -- Sends shutdown to the client (for eg: HTTP2 transport sends GOAWAY)
1544 -- If the server has outstanding calls that are in the process, the
1545 connection is NOT closed until the server is done with all those calls
1546 -- Once, there are no more calls in progress, the channel is closed
1547 */
grpc_server_shutdown_and_notify(grpc_server * server,grpc_completion_queue * cq,void * tag)1548 void grpc_server_shutdown_and_notify(grpc_server* server,
1549 grpc_completion_queue* cq, void* tag) {
1550 listener* l;
1551 shutdown_tag* sdt;
1552 channel_broadcaster broadcaster;
1553 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1554 grpc_core::ExecCtx exec_ctx;
1555
1556 GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1557 (server, cq, tag));
1558
1559 /* wait for startup to be finished: locks mu_global */
1560 gpr_mu_lock(&server->mu_global);
1561 while (server->starting) {
1562 gpr_cv_wait(&server->starting_cv, &server->mu_global,
1563 gpr_inf_future(GPR_CLOCK_MONOTONIC));
1564 }
1565
1566 /* stay locked, and gather up some stuff to do */
1567 GPR_ASSERT(grpc_cq_begin_op(cq, tag));
1568 if (server->shutdown_published) {
1569 grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, nullptr,
1570 static_cast<grpc_cq_completion*>(
1571 gpr_malloc(sizeof(grpc_cq_completion))));
1572 gpr_mu_unlock(&server->mu_global);
1573 return;
1574 }
1575 server->shutdown_tags = static_cast<shutdown_tag*>(
1576 gpr_realloc(server->shutdown_tags,
1577 sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)));
1578 sdt = &server->shutdown_tags[server->num_shutdown_tags++];
1579 sdt->tag = tag;
1580 sdt->cq = cq;
1581 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1582 gpr_mu_unlock(&server->mu_global);
1583 return;
1584 }
1585
1586 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
1587
1588 channel_broadcaster_init(server, &broadcaster);
1589
1590 gpr_atm_rel_store(&server->shutdown_flag, 1);
1591
1592 /* collect all unregistered then registered calls */
1593 gpr_mu_lock(&server->mu_call);
1594 kill_pending_work_locked(
1595 server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1596 gpr_mu_unlock(&server->mu_call);
1597
1598 maybe_finish_shutdown(server);
1599 gpr_mu_unlock(&server->mu_global);
1600
1601 /* Shutdown listeners */
1602 for (l = server->listeners; l; l = l->next) {
1603 GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server,
1604 grpc_schedule_on_exec_ctx);
1605 l->destroy(server, l->arg, &l->destroy_done);
1606 if (server->channelz_server != nullptr && l->socket_uuid != 0) {
1607 server->channelz_server->RemoveChildListenSocket(l->socket_uuid);
1608 }
1609 }
1610
1611 channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
1612 GRPC_ERROR_NONE);
1613
1614 if (server->default_resource_user != nullptr) {
1615 grpc_resource_quota_unref(
1616 grpc_resource_user_quota(server->default_resource_user));
1617 grpc_resource_user_shutdown(server->default_resource_user);
1618 grpc_resource_user_unref(server->default_resource_user);
1619 }
1620 }
1621
grpc_server_cancel_all_calls(grpc_server * server)1622 void grpc_server_cancel_all_calls(grpc_server* server) {
1623 channel_broadcaster broadcaster;
1624 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1625 grpc_core::ExecCtx exec_ctx;
1626
1627 GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1628
1629 gpr_mu_lock(&server->mu_global);
1630 channel_broadcaster_init(server, &broadcaster);
1631 gpr_mu_unlock(&server->mu_global);
1632
1633 channel_broadcaster_shutdown(
1634 &broadcaster, false /* send_goaway */,
1635 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));
1636 }
1637
grpc_server_destroy(grpc_server * server)1638 void grpc_server_destroy(grpc_server* server) {
1639 listener* l;
1640 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1641 grpc_core::ExecCtx exec_ctx;
1642
1643 GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1644
1645 gpr_mu_lock(&server->mu_global);
1646 GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
1647 GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
1648
1649 while (server->listeners) {
1650 l = server->listeners;
1651 server->listeners = l->next;
1652 gpr_free(l);
1653 }
1654
1655 gpr_mu_unlock(&server->mu_global);
1656
1657 server_unref(server);
1658 }
1659
grpc_server_add_listener(grpc_server * server,void * listener_arg,void (* start)(grpc_server * server,void * arg,grpc_pollset ** pollsets,size_t pollset_count),void (* destroy)(grpc_server * server,void * arg,grpc_closure * on_done),grpc_core::RefCountedPtr<grpc_core::channelz::ListenSocketNode> node)1660 void grpc_server_add_listener(
1661 grpc_server* server, void* listener_arg,
1662 void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
1663 size_t pollset_count),
1664 void (*destroy)(grpc_server* server, void* arg, grpc_closure* on_done),
1665 grpc_core::RefCountedPtr<grpc_core::channelz::ListenSocketNode> node) {
1666 listener* l = static_cast<listener*>(gpr_malloc(sizeof(listener)));
1667 l->arg = listener_arg;
1668 l->start = start;
1669 l->destroy = destroy;
1670 l->socket_uuid = 0;
1671 if (node != nullptr) {
1672 l->socket_uuid = node->uuid();
1673 if (server->channelz_server != nullptr) {
1674 server->channelz_server->AddChildListenSocket(std::move(node));
1675 }
1676 }
1677 l->next = server->listeners;
1678 server->listeners = l;
1679 }
1680
1681 namespace {
ValidateServerRequest(grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,registered_method * rm)1682 grpc_call_error ValidateServerRequest(
1683 grpc_completion_queue* cq_for_notification, void* tag,
1684 grpc_byte_buffer** optional_payload, registered_method* rm) {
1685 if ((rm == nullptr && optional_payload != nullptr) ||
1686 ((rm != nullptr) && ((optional_payload == nullptr) !=
1687 (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)))) {
1688 return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
1689 }
1690 if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
1691 return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1692 }
1693 return GRPC_CALL_OK;
1694 }
ValidateServerRequestAndCq(size_t * cq_idx,grpc_server * server,grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,registered_method * rm)1695 grpc_call_error ValidateServerRequestAndCq(
1696 size_t* cq_idx, grpc_server* server,
1697 grpc_completion_queue* cq_for_notification, void* tag,
1698 grpc_byte_buffer** optional_payload, registered_method* rm) {
1699 size_t idx;
1700 for (idx = 0; idx < server->cq_count; idx++) {
1701 if (server->cqs[idx] == cq_for_notification) {
1702 break;
1703 }
1704 }
1705 if (idx == server->cq_count) {
1706 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1707 }
1708 grpc_call_error error =
1709 ValidateServerRequest(cq_for_notification, tag, optional_payload, rm);
1710 if (error != GRPC_CALL_OK) {
1711 return error;
1712 }
1713
1714 *cq_idx = idx;
1715 return GRPC_CALL_OK;
1716 }
1717 } // namespace
1718
grpc_server_request_call(grpc_server * server,grpc_call ** call,grpc_call_details * details,grpc_metadata_array * initial_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1719 grpc_call_error grpc_server_request_call(
1720 grpc_server* server, grpc_call** call, grpc_call_details* details,
1721 grpc_metadata_array* initial_metadata,
1722 grpc_completion_queue* cq_bound_to_call,
1723 grpc_completion_queue* cq_for_notification, void* tag) {
1724 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1725 grpc_core::ExecCtx exec_ctx;
1726 GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1727 GRPC_API_TRACE(
1728 "grpc_server_request_call("
1729 "server=%p, call=%p, details=%p, initial_metadata=%p, "
1730 "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
1731 7,
1732 (server, call, details, initial_metadata, cq_bound_to_call,
1733 cq_for_notification, tag));
1734
1735 size_t cq_idx;
1736 grpc_call_error error = ValidateServerRequestAndCq(
1737 &cq_idx, server, cq_for_notification, tag, nullptr, nullptr);
1738 if (error != GRPC_CALL_OK) {
1739 return error;
1740 }
1741
1742 requested_call* rc = new requested_call(tag, cq_bound_to_call, call,
1743 initial_metadata, details);
1744 return queue_call_request(server, cq_idx, rc);
1745 }
1746
grpc_server_request_registered_call(grpc_server * server,void * rmp,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * initial_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1747 grpc_call_error grpc_server_request_registered_call(
1748 grpc_server* server, void* rmp, grpc_call** call, gpr_timespec* deadline,
1749 grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
1750 grpc_completion_queue* cq_bound_to_call,
1751 grpc_completion_queue* cq_for_notification, void* tag) {
1752 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1753 grpc_core::ExecCtx exec_ctx;
1754 GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1755 registered_method* rm = static_cast<registered_method*>(rmp);
1756 GRPC_API_TRACE(
1757 "grpc_server_request_registered_call("
1758 "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
1759 "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
1760 "tag=%p)",
1761 9,
1762 (server, rmp, call, deadline, initial_metadata, optional_payload,
1763 cq_bound_to_call, cq_for_notification, tag));
1764
1765 size_t cq_idx;
1766 grpc_call_error error = ValidateServerRequestAndCq(
1767 &cq_idx, server, cq_for_notification, tag, optional_payload, rm);
1768 if (error != GRPC_CALL_OK) {
1769 return error;
1770 }
1771
1772 requested_call* rc =
1773 new requested_call(tag, cq_bound_to_call, call, initial_metadata, rm,
1774 deadline, optional_payload);
1775 return queue_call_request(server, cq_idx, rc);
1776 }
1777
grpc_server_get_channel_args(grpc_server * server)1778 const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
1779 return server->channel_args;
1780 }
1781
grpc_server_get_default_resource_user(grpc_server * server)1782 grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) {
1783 return server->default_resource_user;
1784 }
1785
grpc_server_has_open_connections(grpc_server * server)1786 int grpc_server_has_open_connections(grpc_server* server) {
1787 int r;
1788 gpr_mu_lock(&server->mu_global);
1789 r = server->root_channel_data.next != &server->root_channel_data;
1790 gpr_mu_unlock(&server->mu_global);
1791 return r;
1792 }
1793
grpc_server_get_channelz_node(grpc_server * server)1794 grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
1795 grpc_server* server) {
1796 if (server == nullptr) {
1797 return nullptr;
1798 }
1799 return server->channelz_server.get();
1800 }
1801