1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/surface/server.h"
22 
23 #include <limits.h>
24 #include <stdlib.h>
25 #include <string.h>
26 
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 
31 #include <utility>
32 
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/channel/channelz.h"
35 #include "src/core/lib/channel/connected_channel.h"
36 #include "src/core/lib/debug/stats.h"
37 #include "src/core/lib/gpr/spinlock.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/gprpp/mpscq.h"
40 #include "src/core/lib/iomgr/executor.h"
41 #include "src/core/lib/iomgr/iomgr.h"
42 #include "src/core/lib/slice/slice_internal.h"
43 #include "src/core/lib/surface/api_trace.h"
44 #include "src/core/lib/surface/call.h"
45 #include "src/core/lib/surface/channel.h"
46 #include "src/core/lib/surface/completion_queue.h"
47 #include "src/core/lib/surface/init.h"
48 #include "src/core/lib/transport/metadata.h"
49 #include "src/core/lib/transport/static_metadata.h"
50 
51 grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel");
52 
53 using grpc_core::LockedMultiProducerSingleConsumerQueue;
54 
55 namespace {
56 
57 void server_on_recv_initial_metadata(void* ptr, grpc_error* error);
58 void server_recv_trailing_metadata_ready(void* user_data, grpc_error* error);
59 
60 struct listener {
61   void* arg;
62   void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
63                 size_t pollset_count);
64   void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure);
65   struct listener* next;
66   intptr_t socket_uuid;
67   grpc_closure destroy_done;
68 };
69 
70 enum requested_call_type { BATCH_CALL, REGISTERED_CALL };
71 
72 struct registered_method;
73 
74 struct requested_call {
requested_call__anoncbe2fe5a0111::requested_call75   requested_call(void* tag_arg, grpc_completion_queue* call_cq,
76                  grpc_call** call_arg, grpc_metadata_array* initial_md,
77                  grpc_call_details* details)
78       : type(BATCH_CALL),
79         tag(tag_arg),
80         cq_bound_to_call(call_cq),
81         call(call_arg),
82         initial_metadata(initial_md) {
83     details->reserved = nullptr;
84     data.batch.details = details;
85   }
86 
requested_call__anoncbe2fe5a0111::requested_call87   requested_call(void* tag_arg, grpc_completion_queue* call_cq,
88                  grpc_call** call_arg, grpc_metadata_array* initial_md,
89                  registered_method* rm, gpr_timespec* deadline,
90                  grpc_byte_buffer** optional_payload)
91       : type(REGISTERED_CALL),
92         tag(tag_arg),
93         cq_bound_to_call(call_cq),
94         call(call_arg),
95         initial_metadata(initial_md) {
96     data.registered.method = rm;
97     data.registered.deadline = deadline;
98     data.registered.optional_payload = optional_payload;
99   }
100 
101   grpc_core::MultiProducerSingleConsumerQueue::Node mpscq_node;
102   const requested_call_type type;
103   void* const tag;
104   grpc_completion_queue* const cq_bound_to_call;
105   grpc_call** const call;
106   grpc_cq_completion completion;
107   grpc_metadata_array* const initial_metadata;
108   union {
109     struct {
110       grpc_call_details* details;
111     } batch;
112     struct {
113       registered_method* method;
114       gpr_timespec* deadline;
115       grpc_byte_buffer** optional_payload;
116     } registered;
117   } data;
118 };
119 
120 struct channel_registered_method {
121   registered_method* server_registered_method;
122   uint32_t flags;
123   bool has_host;
124   grpc_core::ExternallyManagedSlice method;
125   grpc_core::ExternallyManagedSlice host;
126 };
127 
128 struct channel_data {
129   grpc_server* server;
130   grpc_channel* channel;
131   size_t cq_idx;
132   /* linked list of all channels on a server */
133   channel_data* next;
134   channel_data* prev;
135   channel_registered_method* registered_methods;
136   uint32_t registered_method_slots;
137   uint32_t registered_method_max_probes;
138   grpc_closure finish_destroy_channel_closure;
139   intptr_t channelz_socket_uuid;
140 };
141 
142 struct shutdown_tag {
143   void* tag;
144   grpc_completion_queue* cq;
145   grpc_cq_completion completion;
146 };
147 
148 enum call_state {
149   /* waiting for metadata */
150   NOT_STARTED,
151   /* initial metadata read, not flow controlled in yet */
152   PENDING,
153   /* flow controlled in, on completion queue */
154   ACTIVATED,
155   /* cancelled before being queued */
156   ZOMBIED
157 };
158 
159 struct call_data;
160 
161 grpc_call_error ValidateServerRequest(
162     grpc_completion_queue* cq_for_notification, void* tag,
163     grpc_byte_buffer** optional_payload, registered_method* rm);
164 
165 // RPCs that come in from the transport must be matched against RPC requests
166 // from the application. An incoming request from the application can be matched
167 // to an RPC that has already arrived or can be queued up for later use.
168 // Likewise, an RPC coming in from the transport can either be matched to a
169 // request that already arrived from the application or can be queued up for
170 // later use (marked pending). If there is a match, the request's tag is posted
171 // on the request's notification CQ.
172 //
173 // RequestMatcherInterface is the base class to provide this functionality.
174 class RequestMatcherInterface {
175  public:
~RequestMatcherInterface()176   virtual ~RequestMatcherInterface() {}
177 
178   // Unref the calls associated with any incoming RPCs in the pending queue (not
179   // yet matched to an application-requested RPC).
180   virtual void ZombifyPending() = 0;
181 
182   // Mark all application-requested RPCs failed if they have not been matched to
183   // an incoming RPC. The error parameter indicates why the RPCs are being
184   // failed (always server shutdown in all current implementations).
185   virtual void KillRequests(grpc_error* error) = 0;
186 
187   // How many request queues are supported by this matcher. This is an abstract
188   // concept that essentially maps to gRPC completion queues.
189   virtual size_t request_queue_count() const = 0;
190 
191   // This function is invoked when the application requests a new RPC whose
192   // information is in the call parameter. The request_queue_index marks the
193   // queue onto which to place this RPC, and is typically associated with a gRPC
194   // CQ. If there are pending RPCs waiting to be matched, publish one (match it
195   // and notify the CQ).
196   virtual void RequestCallWithPossiblePublish(size_t request_queue_index,
197                                               requested_call* call) = 0;
198 
199   // This function is invoked on an incoming RPC, represented by the calld
200   // object. The RequestMatcher will try to match it against an
201   // application-requested RPC if possible or will place it in the pending queue
202   // otherwise. To enable some measure of fairness between server CQs, the match
203   // is done starting at the start_request_queue_index parameter in a cyclic
204   // order rather than always starting at 0.
205   virtual void MatchOrQueue(size_t start_request_queue_index,
206                             call_data* calld) = 0;
207 
208   // Returns the server associated with this request matcher
209   virtual grpc_server* server() const = 0;
210 };
211 
212 struct call_data {
call_data__anoncbe2fe5a0111::call_data213   call_data(grpc_call_element* elem, const grpc_call_element_args& args)
214       : call(grpc_call_from_top_element(elem)),
215         call_combiner(args.call_combiner) {
216     GRPC_CLOSURE_INIT(&server_on_recv_initial_metadata,
217                       ::server_on_recv_initial_metadata, elem,
218                       grpc_schedule_on_exec_ctx);
219     GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
220                       ::server_recv_trailing_metadata_ready, elem,
221                       grpc_schedule_on_exec_ctx);
222   }
~call_data__anoncbe2fe5a0111::call_data223   ~call_data() {
224     GPR_ASSERT(state != PENDING);
225     GRPC_ERROR_UNREF(recv_initial_metadata_error);
226     if (host_set) {
227       grpc_slice_unref_internal(host);
228     }
229     if (path_set) {
230       grpc_slice_unref_internal(path);
231     }
232     grpc_metadata_array_destroy(&initial_metadata);
233     grpc_byte_buffer_destroy(payload);
234   }
235 
236   grpc_call* call;
237 
238   gpr_atm state = NOT_STARTED;
239 
240   bool path_set = false;
241   bool host_set = false;
242   grpc_slice path;
243   grpc_slice host;
244   grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
245 
246   grpc_completion_queue* cq_new = nullptr;
247 
248   grpc_metadata_batch* recv_initial_metadata = nullptr;
249   uint32_t recv_initial_metadata_flags = 0;
250   grpc_metadata_array initial_metadata =
251       grpc_metadata_array();  // Zero-initialize the C struct.
252 
253   RequestMatcherInterface* matcher = nullptr;
254   grpc_byte_buffer* payload = nullptr;
255 
256   grpc_closure got_initial_metadata;
257   grpc_closure server_on_recv_initial_metadata;
258   grpc_closure kill_zombie_closure;
259   grpc_closure* on_done_recv_initial_metadata;
260   grpc_closure recv_trailing_metadata_ready;
261   grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
262   grpc_closure* original_recv_trailing_metadata_ready;
263   grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE;
264   bool seen_recv_trailing_metadata_ready = false;
265 
266   grpc_closure publish;
267 
268   call_data* pending_next = nullptr;
269   grpc_core::CallCombiner* call_combiner;
270 };
271 
272 struct registered_method {
registered_method__anoncbe2fe5a0111::registered_method273   registered_method(
274       const char* method_arg, const char* host_arg,
275       grpc_server_register_method_payload_handling payload_handling_arg,
276       uint32_t flags_arg)
277       : method(method_arg == nullptr ? "" : method_arg),
278         host(host_arg == nullptr ? "" : host_arg),
279         payload_handling(payload_handling_arg),
280         flags(flags_arg) {}
281 
282   ~registered_method() = default;
283 
284   const std::string method;
285   const std::string host;
286   const grpc_server_register_method_payload_handling payload_handling;
287   const uint32_t flags;
288   /* one request matcher per method */
289   std::unique_ptr<RequestMatcherInterface> matcher;
290   registered_method* next;
291 };
292 
293 struct channel_broadcaster {
294   grpc_channel** channels;
295   size_t num_channels;
296 };
297 }  // namespace
298 
299 struct grpc_server {
300   grpc_channel_args* channel_args;
301 
302   grpc_resource_user* default_resource_user;
303 
304   grpc_completion_queue** cqs;
305   grpc_pollset** pollsets;
306   size_t cq_count;
307   size_t pollset_count;
308   bool started;
309 
310   /* The two following mutexes control access to server-state
311      mu_global controls access to non-call-related state (e.g., channel state)
312      mu_call controls access to call-related state (e.g., the call lists)
313 
314      If they are ever required to be nested, you must lock mu_global
315      before mu_call. This is currently used in shutdown processing
316      (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
317   gpr_mu mu_global; /* mutex for server and channel state */
318   gpr_mu mu_call;   /* mutex for call-specific state */
319 
320   /* startup synchronization: flag is protected by mu_global, signals whether
321      we are doing the listener start routine or not */
322   bool starting;
323   gpr_cv starting_cv;
324 
325   // TODO(vjpai): Convert from a linked-list head pointer to a std::vector once
326   // grpc_server has a real constructor/destructor
327   registered_method* registered_methods;
328   /** one request matcher for unregistered methods */
329   // TODO(vjpai): Convert to a std::unique_ptr once grpc_server has a real
330   // constructor and destructor.
331   RequestMatcherInterface* unregistered_request_matcher;
332 
333   gpr_atm shutdown_flag;
334   uint8_t shutdown_published;
335   size_t num_shutdown_tags;
336   shutdown_tag* shutdown_tags;
337 
338   channel_data root_channel_data;
339 
340   listener* listeners;
341   int listeners_destroyed;
342   grpc_core::RefCount internal_refcount;
343 
344   /** when did we print the last shutdown progress message */
345   gpr_timespec last_shutdown_message_time;
346 
347   grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
348 };
349 
350 #define SERVER_FROM_CALL_ELEM(elem) \
351   (((channel_data*)(elem)->channel_data)->server)
352 
353 namespace {
354 void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
355                   requested_call* rc);
356 void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
357                grpc_error* error);
358 /* Before calling maybe_finish_shutdown, we must hold mu_global and not
359    hold mu_call */
360 void maybe_finish_shutdown(grpc_server* server);
361 
kill_zombie(void * elem,grpc_error *)362 void kill_zombie(void* elem, grpc_error* /*error*/) {
363   grpc_call_unref(
364       grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
365 }
366 
367 /*
368  * channel broadcaster
369  */
370 
371 /* assumes server locked */
channel_broadcaster_init(grpc_server * s,channel_broadcaster * cb)372 void channel_broadcaster_init(grpc_server* s, channel_broadcaster* cb) {
373   channel_data* c;
374   size_t count = 0;
375   for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
376     count++;
377   }
378   cb->num_channels = count;
379   cb->channels = static_cast<grpc_channel**>(
380       gpr_malloc(sizeof(*cb->channels) * cb->num_channels));
381   count = 0;
382   for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
383     cb->channels[count++] = c->channel;
384     GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
385   }
386 }
387 
388 struct shutdown_cleanup_args {
389   grpc_closure closure;
390   grpc_slice slice;
391 };
392 
shutdown_cleanup(void * arg,grpc_error *)393 void shutdown_cleanup(void* arg, grpc_error* /*error*/) {
394   struct shutdown_cleanup_args* a =
395       static_cast<struct shutdown_cleanup_args*>(arg);
396   grpc_slice_unref_internal(a->slice);
397   gpr_free(a);
398 }
399 
send_shutdown(grpc_channel * channel,bool send_goaway,grpc_error * send_disconnect)400 void send_shutdown(grpc_channel* channel, bool send_goaway,
401                    grpc_error* send_disconnect) {
402   struct shutdown_cleanup_args* sc =
403       static_cast<struct shutdown_cleanup_args*>(gpr_malloc(sizeof(*sc)));
404   GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
405                     grpc_schedule_on_exec_ctx);
406   grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
407   grpc_channel_element* elem;
408 
409   op->goaway_error =
410       send_goaway ? grpc_error_set_int(
411                         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"),
412                         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK)
413                   : GRPC_ERROR_NONE;
414   op->set_accept_stream = true;
415   sc->slice = grpc_slice_from_copied_string("Server shutdown");
416   op->disconnect_with_error = send_disconnect;
417 
418   elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
419   elem->filter->start_transport_op(elem, op);
420 }
421 
channel_broadcaster_shutdown(channel_broadcaster * cb,bool send_goaway,grpc_error * force_disconnect)422 void channel_broadcaster_shutdown(channel_broadcaster* cb, bool send_goaway,
423                                   grpc_error* force_disconnect) {
424   size_t i;
425 
426   for (i = 0; i < cb->num_channels; i++) {
427     send_shutdown(cb->channels[i], send_goaway,
428                   GRPC_ERROR_REF(force_disconnect));
429     GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
430   }
431   gpr_free(cb->channels);
432   GRPC_ERROR_UNREF(force_disconnect);
433 }
434 
435 /*
436  * request_matcher
437  */
438 
439 // The RealRequestMatcher is an implementation of RequestMatcherInterface that
440 // actually uses all the features of RequestMatcherInterface: expecting the
441 // application to explicitly request RPCs and then matching those to incoming
442 // RPCs, along with a slow path by which incoming RPCs are put on a locked
443 // pending list if they aren't able to be matched to an application request.
444 class RealRequestMatcher : public RequestMatcherInterface {
445  public:
RealRequestMatcher(grpc_server * server)446   explicit RealRequestMatcher(grpc_server* server)
447       : server_(server), requests_per_cq_(server->cq_count) {}
448 
~RealRequestMatcher()449   ~RealRequestMatcher() override {
450     for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
451       GPR_ASSERT(queue.Pop() == nullptr);
452     }
453   }
454 
ZombifyPending()455   void ZombifyPending() override {
456     while (pending_head_ != nullptr) {
457       call_data* calld = pending_head_;
458       pending_head_ = calld->pending_next;
459       gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
460       GRPC_CLOSURE_INIT(
461           &calld->kill_zombie_closure, kill_zombie,
462           grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
463           grpc_schedule_on_exec_ctx);
464       grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
465                               GRPC_ERROR_NONE);
466     }
467   }
468 
KillRequests(grpc_error * error)469   void KillRequests(grpc_error* error) override {
470     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
471       requested_call* rc;
472       while ((rc = reinterpret_cast<requested_call*>(
473                   requests_per_cq_[i].Pop())) != nullptr) {
474         fail_call(server_, i, rc, GRPC_ERROR_REF(error));
475       }
476     }
477     GRPC_ERROR_UNREF(error);
478   }
479 
request_queue_count() const480   size_t request_queue_count() const override {
481     return requests_per_cq_.size();
482   }
483 
RequestCallWithPossiblePublish(size_t request_queue_index,requested_call * call)484   void RequestCallWithPossiblePublish(size_t request_queue_index,
485                                       requested_call* call) override {
486     if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
487       /* this was the first queued request: we need to lock and start
488          matching calls */
489       gpr_mu_lock(&server_->mu_call);
490       call_data* calld;
491       while ((calld = pending_head_) != nullptr) {
492         requested_call* rc = reinterpret_cast<requested_call*>(
493             requests_per_cq_[request_queue_index].Pop());
494         if (rc == nullptr) break;
495         pending_head_ = calld->pending_next;
496         gpr_mu_unlock(&server_->mu_call);
497         if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
498           // Zombied Call
499           GRPC_CLOSURE_INIT(
500               &calld->kill_zombie_closure, kill_zombie,
501               grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
502               grpc_schedule_on_exec_ctx);
503           grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
504                                   GRPC_ERROR_NONE);
505         } else {
506           publish_call(server_, calld, request_queue_index, rc);
507         }
508         gpr_mu_lock(&server_->mu_call);
509       }
510       gpr_mu_unlock(&server_->mu_call);
511     }
512   }
513 
MatchOrQueue(size_t start_request_queue_index,call_data * calld)514   void MatchOrQueue(size_t start_request_queue_index,
515                     call_data* calld) override {
516     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
517       size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
518       requested_call* rc =
519           reinterpret_cast<requested_call*>(requests_per_cq_[cq_idx].TryPop());
520       if (rc == nullptr) {
521         continue;
522       } else {
523         GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
524         gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
525         publish_call(server_, calld, cq_idx, rc);
526         return; /* early out */
527       }
528     }
529 
530     /* no cq to take the request found: queue it on the slow list */
531     GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
532     gpr_mu_lock(&server_->mu_call);
533 
534     // We need to ensure that all the queues are empty.  We do this under
535     // the server mu_call lock to ensure that if something is added to
536     // an empty request queue, it will block until the call is actually
537     // added to the pending list.
538     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
539       size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
540       requested_call* rc =
541           reinterpret_cast<requested_call*>(requests_per_cq_[cq_idx].Pop());
542       if (rc == nullptr) {
543         continue;
544       } else {
545         gpr_mu_unlock(&server_->mu_call);
546         GRPC_STATS_INC_SERVER_CQS_CHECKED(i + requests_per_cq_.size());
547         gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
548         publish_call(server_, calld, cq_idx, rc);
549         return; /* early out */
550       }
551     }
552 
553     gpr_atm_no_barrier_store(&calld->state, PENDING);
554     if (pending_head_ == nullptr) {
555       pending_tail_ = pending_head_ = calld;
556     } else {
557       pending_tail_->pending_next = calld;
558       pending_tail_ = calld;
559     }
560     gpr_mu_unlock(&server_->mu_call);
561   }
562 
server() const563   grpc_server* server() const override { return server_; }
564 
565  private:
566   grpc_server* const server_;
567   call_data* pending_head_ = nullptr;
568   call_data* pending_tail_ = nullptr;
569   std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
570 };
571 
572 // AllocatingRequestMatchers don't allow the application to request an RPC in
573 // advance or queue up any incoming RPC for later match. Instead, MatchOrQueue
574 // will call out to an allocation function passed in at the construction of the
575 // object. These request matchers are designed for the C++ callback API, so they
576 // only support 1 completion queue (passed in at the constructor).
577 class AllocatingRequestMatcherBase : public RequestMatcherInterface {
578  public:
AllocatingRequestMatcherBase(grpc_server * server,grpc_completion_queue * cq)579   AllocatingRequestMatcherBase(grpc_server* server, grpc_completion_queue* cq)
580       : server_(server), cq_(cq) {
581     size_t idx;
582     for (idx = 0; idx < server->cq_count; idx++) {
583       if (server->cqs[idx] == cq) {
584         break;
585       }
586     }
587     GPR_ASSERT(idx < server->cq_count);
588     cq_idx_ = idx;
589   }
590 
ZombifyPending()591   void ZombifyPending() override {}
592 
KillRequests(grpc_error * error)593   void KillRequests(grpc_error* error) override { GRPC_ERROR_UNREF(error); }
594 
request_queue_count() const595   size_t request_queue_count() const override { return 0; }
596 
RequestCallWithPossiblePublish(size_t,requested_call *)597   void RequestCallWithPossiblePublish(size_t /*request_queue_index*/,
598                                       requested_call* /*call*/) final {
599     GPR_ASSERT(false);
600   }
601 
server() const602   grpc_server* server() const override { return server_; }
603 
604   // Supply the completion queue related to this request matcher
cq() const605   grpc_completion_queue* cq() const { return cq_; }
606 
607   // Supply the completion queue's index relative to the server.
cq_idx() const608   size_t cq_idx() const { return cq_idx_; }
609 
610  private:
611   grpc_server* const server_;
612   grpc_completion_queue* const cq_;
613   size_t cq_idx_;
614 };
615 
616 // An allocating request matcher for non-registered methods (used for generic
617 // API and unimplemented RPCs).
618 class AllocatingRequestMatcherBatch : public AllocatingRequestMatcherBase {
619  public:
AllocatingRequestMatcherBatch(grpc_server * server,grpc_completion_queue * cq,std::function<grpc_core::ServerBatchCallAllocation ()> allocator)620   AllocatingRequestMatcherBatch(
621       grpc_server* server, grpc_completion_queue* cq,
622       std::function<grpc_core::ServerBatchCallAllocation()> allocator)
623       : AllocatingRequestMatcherBase(server, cq),
624         allocator_(std::move(allocator)) {}
MatchOrQueue(size_t,call_data * calld)625   void MatchOrQueue(size_t /*start_request_queue_index*/,
626                     call_data* calld) override {
627     grpc_core::ServerBatchCallAllocation call_info = allocator_();
628     GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag),
629                                      nullptr, nullptr) == GRPC_CALL_OK);
630     requested_call* rc = new requested_call(
631         static_cast<void*>(call_info.tag), cq(), call_info.call,
632         call_info.initial_metadata, call_info.details);
633     gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
634     publish_call(server(), calld, cq_idx(), rc);
635   }
636 
637  private:
638   std::function<grpc_core::ServerBatchCallAllocation()> allocator_;
639 };
640 
641 // An allocating request matcher for registered methods.
642 class AllocatingRequestMatcherRegistered : public AllocatingRequestMatcherBase {
643  public:
AllocatingRequestMatcherRegistered(grpc_server * server,grpc_completion_queue * cq,registered_method * rm,std::function<grpc_core::ServerRegisteredCallAllocation ()> allocator)644   AllocatingRequestMatcherRegistered(
645       grpc_server* server, grpc_completion_queue* cq, registered_method* rm,
646       std::function<grpc_core::ServerRegisteredCallAllocation()> allocator)
647       : AllocatingRequestMatcherBase(server, cq),
648         registered_method_(rm),
649         allocator_(std::move(allocator)) {}
MatchOrQueue(size_t,call_data * calld)650   void MatchOrQueue(size_t /*start_request_queue_index*/,
651                     call_data* calld) override {
652     grpc_core::ServerRegisteredCallAllocation call_info = allocator_();
653     GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag),
654                                      call_info.optional_payload,
655                                      registered_method_) == GRPC_CALL_OK);
656     requested_call* rc = new requested_call(
657         static_cast<void*>(call_info.tag), cq(), call_info.call,
658         call_info.initial_metadata, registered_method_, call_info.deadline,
659         call_info.optional_payload);
660     gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
661     publish_call(server(), calld, cq_idx(), rc);
662   }
663 
664  private:
665   registered_method* const registered_method_;
666   std::function<grpc_core::ServerRegisteredCallAllocation()> allocator_;
667 };
668 
669 /*
670  * server proper
671  */
672 
server_ref(grpc_server * server)673 void server_ref(grpc_server* server) { server->internal_refcount.Ref(); }
674 
server_delete(grpc_server * server)675 void server_delete(grpc_server* server) {
676   registered_method* rm;
677   size_t i;
678   server->channelz_server.reset();
679   grpc_channel_args_destroy(server->channel_args);
680   gpr_mu_destroy(&server->mu_global);
681   gpr_mu_destroy(&server->mu_call);
682   gpr_cv_destroy(&server->starting_cv);
683   while ((rm = server->registered_methods) != nullptr) {
684     server->registered_methods = rm->next;
685     delete rm;
686   }
687   delete server->unregistered_request_matcher;
688   for (i = 0; i < server->cq_count; i++) {
689     GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
690   }
691   gpr_free(server->cqs);
692   gpr_free(server->pollsets);
693   gpr_free(server->shutdown_tags);
694   gpr_free(server);
695 }
696 
server_unref(grpc_server * server)697 void server_unref(grpc_server* server) {
698   if (GPR_UNLIKELY(server->internal_refcount.Unref())) {
699     server_delete(server);
700   }
701 }
702 
is_channel_orphaned(channel_data * chand)703 int is_channel_orphaned(channel_data* chand) { return chand->next == chand; }
704 
orphan_channel(channel_data * chand)705 void orphan_channel(channel_data* chand) {
706   chand->next->prev = chand->prev;
707   chand->prev->next = chand->next;
708   chand->next = chand->prev = chand;
709 }
710 
finish_destroy_channel(void * cd,grpc_error *)711 void finish_destroy_channel(void* cd, grpc_error* /*error*/) {
712   channel_data* chand = static_cast<channel_data*>(cd);
713   grpc_server* server = chand->server;
714   GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
715   server_unref(server);
716 }
717 
destroy_channel(channel_data * chand)718 void destroy_channel(channel_data* chand) {
719   if (is_channel_orphaned(chand)) return;
720   GPR_ASSERT(chand->server != nullptr);
721   orphan_channel(chand);
722   server_ref(chand->server);
723   maybe_finish_shutdown(chand->server);
724   GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
725                     finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
726 
727   if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace)) {
728     gpr_log(GPR_INFO, "Disconnected client");
729   }
730 
731   grpc_transport_op* op =
732       grpc_make_transport_op(&chand->finish_destroy_channel_closure);
733   op->set_accept_stream = true;
734   grpc_channel_next_op(grpc_channel_stack_element(
735                            grpc_channel_get_channel_stack(chand->channel), 0),
736                        op);
737 }
738 
done_request_event(void * req,grpc_cq_completion *)739 void done_request_event(void* req, grpc_cq_completion* /*c*/) {
740   delete static_cast<requested_call*>(req);
741 }
742 
publish_call(grpc_server * server,call_data * calld,size_t cq_idx,requested_call * rc)743 void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
744                   requested_call* rc) {
745   grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
746   grpc_call* call = calld->call;
747   *rc->call = call;
748   calld->cq_new = server->cqs[cq_idx];
749   GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
750   switch (rc->type) {
751     case BATCH_CALL:
752       GPR_ASSERT(calld->host_set);
753       GPR_ASSERT(calld->path_set);
754       rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
755       rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
756       rc->data.batch.details->deadline =
757           grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
758       rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
759       break;
760     case REGISTERED_CALL:
761       *rc->data.registered.deadline =
762           grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
763       if (rc->data.registered.optional_payload) {
764         *rc->data.registered.optional_payload = calld->payload;
765         calld->payload = nullptr;
766       }
767       break;
768     default:
769       GPR_UNREACHABLE_CODE(return );
770   }
771 
772   grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
773                  rc, &rc->completion, true);
774 }
775 
publish_new_rpc(void * arg,grpc_error * error)776 void publish_new_rpc(void* arg, grpc_error* error) {
777   grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
778   call_data* calld = static_cast<call_data*>(call_elem->call_data);
779   channel_data* chand = static_cast<channel_data*>(call_elem->channel_data);
780   RequestMatcherInterface* rm = calld->matcher;
781   grpc_server* server = rm->server();
782 
783   if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
784     gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
785     GRPC_CLOSURE_INIT(
786         &calld->kill_zombie_closure, kill_zombie,
787         grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
788         grpc_schedule_on_exec_ctx);
789     grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
790                             GRPC_ERROR_REF(error));
791     return;
792   }
793 
794   rm->MatchOrQueue(chand->cq_idx, calld);
795 }
796 
finish_start_new_rpc(grpc_server * server,grpc_call_element * elem,RequestMatcherInterface * rm,grpc_server_register_method_payload_handling payload_handling)797 void finish_start_new_rpc(
798     grpc_server* server, grpc_call_element* elem, RequestMatcherInterface* rm,
799     grpc_server_register_method_payload_handling payload_handling) {
800   call_data* calld = static_cast<call_data*>(elem->call_data);
801 
802   if (gpr_atm_acq_load(&server->shutdown_flag)) {
803     gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
804     GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
805                       grpc_schedule_on_exec_ctx);
806     grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
807                             GRPC_ERROR_NONE);
808     return;
809   }
810 
811   calld->matcher = rm;
812 
813   switch (payload_handling) {
814     case GRPC_SRM_PAYLOAD_NONE:
815       publish_new_rpc(elem, GRPC_ERROR_NONE);
816       break;
817     case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
818       grpc_op op;
819       op.op = GRPC_OP_RECV_MESSAGE;
820       op.flags = 0;
821       op.reserved = nullptr;
822       op.data.recv_message.recv_message = &calld->payload;
823       GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,
824                         grpc_schedule_on_exec_ctx);
825       grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish);
826       break;
827     }
828   }
829 }
830 
start_new_rpc(grpc_call_element * elem)831 void start_new_rpc(grpc_call_element* elem) {
832   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
833   call_data* calld = static_cast<call_data*>(elem->call_data);
834   grpc_server* server = chand->server;
835   uint32_t i;
836   uint32_t hash;
837   channel_registered_method* rm;
838 
839   if (chand->registered_methods && calld->path_set && calld->host_set) {
840     /* TODO(ctiller): unify these two searches */
841     /* check for an exact match with host */
842     hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash_internal(calld->host),
843                               grpc_slice_hash_internal(calld->path));
844     for (i = 0; i <= chand->registered_method_max_probes; i++) {
845       rm = &chand->registered_methods[(hash + i) %
846                                       chand->registered_method_slots];
847       if (rm->server_registered_method == nullptr) break;
848       if (!rm->has_host) continue;
849       if (rm->host != calld->host) continue;
850       if (rm->method != calld->path) continue;
851       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
852           0 == (calld->recv_initial_metadata_flags &
853                 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
854         continue;
855       }
856       finish_start_new_rpc(server, elem,
857                            rm->server_registered_method->matcher.get(),
858                            rm->server_registered_method->payload_handling);
859       return;
860     }
861     /* check for a wildcard method definition (no host set) */
862     hash = GRPC_MDSTR_KV_HASH(0, grpc_slice_hash_internal(calld->path));
863     for (i = 0; i <= chand->registered_method_max_probes; i++) {
864       rm = &chand->registered_methods[(hash + i) %
865                                       chand->registered_method_slots];
866       if (rm->server_registered_method == nullptr) break;
867       if (rm->has_host) continue;
868       if (rm->method != calld->path) continue;
869       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
870           0 == (calld->recv_initial_metadata_flags &
871                 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
872         continue;
873       }
874       finish_start_new_rpc(server, elem,
875                            rm->server_registered_method->matcher.get(),
876                            rm->server_registered_method->payload_handling);
877       return;
878     }
879   }
880   finish_start_new_rpc(server, elem, server->unregistered_request_matcher,
881                        GRPC_SRM_PAYLOAD_NONE);
882 }
883 
num_listeners(grpc_server * server)884 int num_listeners(grpc_server* server) {
885   listener* l;
886   int n = 0;
887   for (l = server->listeners; l; l = l->next) {
888     n++;
889   }
890   return n;
891 }
892 
done_shutdown_event(void * server,grpc_cq_completion *)893 void done_shutdown_event(void* server, grpc_cq_completion* /*completion*/) {
894   server_unref(static_cast<grpc_server*>(server));
895 }
896 
num_channels(grpc_server * server)897 int num_channels(grpc_server* server) {
898   channel_data* chand;
899   int n = 0;
900   for (chand = server->root_channel_data.next;
901        chand != &server->root_channel_data; chand = chand->next) {
902     n++;
903   }
904   return n;
905 }
906 
kill_pending_work_locked(grpc_server * server,grpc_error * error)907 void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
908   if (server->started) {
909     server->unregistered_request_matcher->KillRequests(GRPC_ERROR_REF(error));
910     server->unregistered_request_matcher->ZombifyPending();
911     for (registered_method* rm = server->registered_methods; rm;
912          rm = rm->next) {
913       rm->matcher->KillRequests(GRPC_ERROR_REF(error));
914       rm->matcher->ZombifyPending();
915     }
916   }
917   GRPC_ERROR_UNREF(error);
918 }
919 
maybe_finish_shutdown(grpc_server * server)920 void maybe_finish_shutdown(grpc_server* server) {
921   size_t i;
922   if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
923     return;
924   }
925 
926   gpr_mu_lock(&server->mu_call);
927   kill_pending_work_locked(
928       server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
929   gpr_mu_unlock(&server->mu_call);
930 
931   if (server->root_channel_data.next != &server->root_channel_data ||
932       server->listeners_destroyed < num_listeners(server)) {
933     if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
934                                   server->last_shutdown_message_time),
935                      gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
936       server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
937       gpr_log(GPR_DEBUG,
938               "Waiting for %d channels and %d/%d listeners to be destroyed"
939               " before shutting down server",
940               num_channels(server),
941               num_listeners(server) - server->listeners_destroyed,
942               num_listeners(server));
943     }
944     return;
945   }
946   server->shutdown_published = 1;
947   for (i = 0; i < server->num_shutdown_tags; i++) {
948     server_ref(server);
949     grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
950                    GRPC_ERROR_NONE, done_shutdown_event, server,
951                    &server->shutdown_tags[i].completion);
952   }
953 }
954 
server_on_recv_initial_metadata(void * ptr,grpc_error * error)955 void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
956   grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
957   call_data* calld = static_cast<call_data*>(elem->call_data);
958   grpc_millis op_deadline;
959 
960   if (error == GRPC_ERROR_NONE) {
961     GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);
962     GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.authority !=
963                      nullptr);
964     calld->path = grpc_slice_ref_internal(
965         GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
966     calld->host = grpc_slice_ref_internal(
967         GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));
968     calld->path_set = true;
969     calld->host_set = true;
970     grpc_metadata_batch_remove(calld->recv_initial_metadata, GRPC_BATCH_PATH);
971     grpc_metadata_batch_remove(calld->recv_initial_metadata,
972                                GRPC_BATCH_AUTHORITY);
973   } else {
974     GRPC_ERROR_REF(error);
975   }
976   op_deadline = calld->recv_initial_metadata->deadline;
977   if (op_deadline != GRPC_MILLIS_INF_FUTURE) {
978     calld->deadline = op_deadline;
979   }
980   if (calld->host_set && calld->path_set) {
981     /* do nothing */
982   } else {
983     /* Pass the error reference to calld->recv_initial_metadata_error */
984     grpc_error* src_error = error;
985     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
986         "Missing :authority or :path", &src_error, 1);
987     GRPC_ERROR_UNREF(src_error);
988     calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
989   }
990   grpc_closure* closure = calld->on_done_recv_initial_metadata;
991   calld->on_done_recv_initial_metadata = nullptr;
992   if (calld->seen_recv_trailing_metadata_ready) {
993     GRPC_CALL_COMBINER_START(calld->call_combiner,
994                              &calld->recv_trailing_metadata_ready,
995                              calld->recv_trailing_metadata_error,
996                              "continue server_recv_trailing_metadata_ready");
997   }
998   grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
999 }
1000 
server_recv_trailing_metadata_ready(void * user_data,grpc_error * error)1001 void server_recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
1002   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
1003   call_data* calld = static_cast<call_data*>(elem->call_data);
1004   if (calld->on_done_recv_initial_metadata != nullptr) {
1005     calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
1006     calld->seen_recv_trailing_metadata_ready = true;
1007     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
1008                       server_recv_trailing_metadata_ready, elem,
1009                       grpc_schedule_on_exec_ctx);
1010     GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1011                             "deferring server_recv_trailing_metadata_ready "
1012                             "until after server_on_recv_initial_metadata");
1013     return;
1014   }
1015   error =
1016       grpc_error_add_child(GRPC_ERROR_REF(error),
1017                            GRPC_ERROR_REF(calld->recv_initial_metadata_error));
1018   grpc_core::Closure::Run(DEBUG_LOCATION,
1019                           calld->original_recv_trailing_metadata_ready, error);
1020 }
1021 
server_mutate_op(grpc_call_element * elem,grpc_transport_stream_op_batch * op)1022 void server_mutate_op(grpc_call_element* elem,
1023                       grpc_transport_stream_op_batch* op) {
1024   call_data* calld = static_cast<call_data*>(elem->call_data);
1025 
1026   if (op->recv_initial_metadata) {
1027     GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == nullptr);
1028     calld->recv_initial_metadata =
1029         op->payload->recv_initial_metadata.recv_initial_metadata;
1030     calld->on_done_recv_initial_metadata =
1031         op->payload->recv_initial_metadata.recv_initial_metadata_ready;
1032     op->payload->recv_initial_metadata.recv_initial_metadata_ready =
1033         &calld->server_on_recv_initial_metadata;
1034     op->payload->recv_initial_metadata.recv_flags =
1035         &calld->recv_initial_metadata_flags;
1036   }
1037   if (op->recv_trailing_metadata) {
1038     calld->original_recv_trailing_metadata_ready =
1039         op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1040     op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1041         &calld->recv_trailing_metadata_ready;
1042   }
1043 }
1044 
server_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)1045 void server_start_transport_stream_op_batch(
1046     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
1047   server_mutate_op(elem, op);
1048   grpc_call_next_op(elem, op);
1049 }
1050 
got_initial_metadata(void * ptr,grpc_error * error)1051 void got_initial_metadata(void* ptr, grpc_error* error) {
1052   grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
1053   call_data* calld = static_cast<call_data*>(elem->call_data);
1054   if (error == GRPC_ERROR_NONE) {
1055     start_new_rpc(elem);
1056   } else {
1057     if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
1058       GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
1059                         grpc_schedule_on_exec_ctx);
1060       grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
1061                               GRPC_ERROR_NONE);
1062     } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
1063       /* zombied call will be destroyed when it's removed from the pending
1064          queue... later */
1065     }
1066   }
1067 }
1068 
accept_stream(void * cd,grpc_transport *,const void * transport_server_data)1069 void accept_stream(void* cd, grpc_transport* /*transport*/,
1070                    const void* transport_server_data) {
1071   channel_data* chand = static_cast<channel_data*>(cd);
1072   /* create a call */
1073   grpc_call_create_args args;
1074   args.channel = chand->channel;
1075   args.server = chand->server;
1076   args.parent = nullptr;
1077   args.propagation_mask = 0;
1078   args.cq = nullptr;
1079   args.pollset_set_alternative = nullptr;
1080   args.server_transport_data = transport_server_data;
1081   args.add_initial_metadata = nullptr;
1082   args.add_initial_metadata_count = 0;
1083   args.send_deadline = GRPC_MILLIS_INF_FUTURE;
1084   grpc_call* call;
1085   grpc_error* error = grpc_call_create(&args, &call);
1086   grpc_call_element* elem =
1087       grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1088   if (error != GRPC_ERROR_NONE) {
1089     got_initial_metadata(elem, error);
1090     GRPC_ERROR_UNREF(error);
1091     return;
1092   }
1093   call_data* calld = static_cast<call_data*>(elem->call_data);
1094   grpc_op op;
1095   op.op = GRPC_OP_RECV_INITIAL_METADATA;
1096   op.flags = 0;
1097   op.reserved = nullptr;
1098   op.data.recv_initial_metadata.recv_initial_metadata =
1099       &calld->initial_metadata;
1100   GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
1101                     grpc_schedule_on_exec_ctx);
1102   grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
1103 }
1104 
server_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)1105 grpc_error* server_init_call_elem(grpc_call_element* elem,
1106                                   const grpc_call_element_args* args) {
1107   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1108   server_ref(chand->server);
1109   new (elem->call_data) call_data(elem, *args);
1110   return GRPC_ERROR_NONE;
1111 }
1112 
server_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)1113 void server_destroy_call_elem(grpc_call_element* elem,
1114                               const grpc_call_final_info* /*final_info*/,
1115                               grpc_closure* /*ignored*/) {
1116   call_data* calld = static_cast<call_data*>(elem->call_data);
1117   calld->~call_data();
1118   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1119   server_unref(chand->server);
1120 }
1121 
server_init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)1122 grpc_error* server_init_channel_elem(grpc_channel_element* elem,
1123                                      grpc_channel_element_args* args) {
1124   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1125   GPR_ASSERT(args->is_first);
1126   GPR_ASSERT(!args->is_last);
1127   chand->server = nullptr;
1128   chand->channel = nullptr;
1129   chand->next = chand->prev = chand;
1130   chand->registered_methods = nullptr;
1131   return GRPC_ERROR_NONE;
1132 }
1133 
server_destroy_channel_elem(grpc_channel_element * elem)1134 void server_destroy_channel_elem(grpc_channel_element* elem) {
1135   size_t i;
1136   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1137   if (chand->registered_methods) {
1138     for (i = 0; i < chand->registered_method_slots; i++) {
1139       grpc_slice_unref_internal(chand->registered_methods[i].method);
1140       GPR_DEBUG_ASSERT(chand->registered_methods[i].method.refcount ==
1141                            &grpc_core::kNoopRefcount ||
1142                        chand->registered_methods[i].method.refcount == nullptr);
1143       if (chand->registered_methods[i].has_host) {
1144         grpc_slice_unref_internal(chand->registered_methods[i].host);
1145         GPR_DEBUG_ASSERT(chand->registered_methods[i].host.refcount ==
1146                              &grpc_core::kNoopRefcount ||
1147                          chand->registered_methods[i].host.refcount == nullptr);
1148       }
1149     }
1150     gpr_free(chand->registered_methods);
1151   }
1152   if (chand->server) {
1153     if (chand->server->channelz_server != nullptr &&
1154         chand->channelz_socket_uuid != 0) {
1155       chand->server->channelz_server->RemoveChildSocket(
1156           chand->channelz_socket_uuid);
1157     }
1158     gpr_mu_lock(&chand->server->mu_global);
1159     chand->next->prev = chand->prev;
1160     chand->prev->next = chand->next;
1161     chand->next = chand->prev = chand;
1162     maybe_finish_shutdown(chand->server);
1163     gpr_mu_unlock(&chand->server->mu_global);
1164     server_unref(chand->server);
1165   }
1166 }
1167 
register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)1168 void register_completion_queue(grpc_server* server, grpc_completion_queue* cq,
1169                                void* reserved) {
1170   size_t i, n;
1171   GPR_ASSERT(!reserved);
1172   for (i = 0; i < server->cq_count; i++) {
1173     if (server->cqs[i] == cq) return;
1174   }
1175 
1176   GRPC_CQ_INTERNAL_REF(cq, "server");
1177   n = server->cq_count++;
1178   server->cqs = static_cast<grpc_completion_queue**>(gpr_realloc(
1179       server->cqs, server->cq_count * sizeof(grpc_completion_queue*)));
1180   server->cqs[n] = cq;
1181 }
1182 
streq(const std::string & a,const char * b)1183 bool streq(const std::string& a, const char* b) {
1184   return (a.empty() && b == nullptr) ||
1185          ((b != nullptr) && !strcmp(a.c_str(), b));
1186 }
1187 
1188 class ConnectivityWatcher
1189     : public grpc_core::AsyncConnectivityStateWatcherInterface {
1190  public:
ConnectivityWatcher(channel_data * chand)1191   explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) {
1192     GRPC_CHANNEL_INTERNAL_REF(chand_->channel, "connectivity");
1193   }
1194 
~ConnectivityWatcher()1195   ~ConnectivityWatcher() {
1196     GRPC_CHANNEL_INTERNAL_UNREF(chand_->channel, "connectivity");
1197   }
1198 
1199  private:
OnConnectivityStateChange(grpc_connectivity_state new_state)1200   void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
1201     // Don't do anything until we are being shut down.
1202     if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
1203     // Shut down channel.
1204     grpc_server* server = chand_->server;
1205     gpr_mu_lock(&server->mu_global);
1206     destroy_channel(chand_);
1207     gpr_mu_unlock(&server->mu_global);
1208   }
1209 
1210   channel_data* chand_;
1211 };
1212 
done_published_shutdown(void * done_arg,grpc_cq_completion * storage)1213 void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) {
1214   (void)done_arg;
1215   gpr_free(storage);
1216 }
1217 
listener_destroy_done(void * s,grpc_error *)1218 void listener_destroy_done(void* s, grpc_error* /*error*/) {
1219   grpc_server* server = static_cast<grpc_server*>(s);
1220   gpr_mu_lock(&server->mu_global);
1221   server->listeners_destroyed++;
1222   maybe_finish_shutdown(server);
1223   gpr_mu_unlock(&server->mu_global);
1224 }
1225 
queue_call_request(grpc_server * server,size_t cq_idx,requested_call * rc)1226 grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
1227                                    requested_call* rc) {
1228   if (gpr_atm_acq_load(&server->shutdown_flag)) {
1229     fail_call(server, cq_idx, rc,
1230               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1231     return GRPC_CALL_OK;
1232   }
1233   RequestMatcherInterface* rm;
1234   switch (rc->type) {
1235     case BATCH_CALL:
1236       rm = server->unregistered_request_matcher;
1237       break;
1238     case REGISTERED_CALL:
1239       rm = rc->data.registered.method->matcher.get();
1240       break;
1241   }
1242   rm->RequestCallWithPossiblePublish(cq_idx, rc);
1243   return GRPC_CALL_OK;
1244 }
1245 
fail_call(grpc_server * server,size_t cq_idx,requested_call * rc,grpc_error * error)1246 void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
1247                grpc_error* error) {
1248   *rc->call = nullptr;
1249   rc->initial_metadata->count = 0;
1250   GPR_ASSERT(error != GRPC_ERROR_NONE);
1251 
1252   grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,
1253                  &rc->completion);
1254 }
1255 }  // namespace
1256 
1257 namespace grpc_core {
1258 
SetServerRegisteredMethodAllocator(grpc_server * server,grpc_completion_queue * cq,void * method_tag,std::function<ServerRegisteredCallAllocation ()> allocator)1259 void SetServerRegisteredMethodAllocator(
1260     grpc_server* server, grpc_completion_queue* cq, void* method_tag,
1261     std::function<ServerRegisteredCallAllocation()> allocator) {
1262   registered_method* rm = static_cast<registered_method*>(method_tag);
1263   rm->matcher.reset(new AllocatingRequestMatcherRegistered(
1264       server, cq, rm, std::move(allocator)));
1265 }
1266 
SetServerBatchMethodAllocator(grpc_server * server,grpc_completion_queue * cq,std::function<ServerBatchCallAllocation ()> allocator)1267 void SetServerBatchMethodAllocator(
1268     grpc_server* server, grpc_completion_queue* cq,
1269     std::function<ServerBatchCallAllocation()> allocator) {
1270   GPR_DEBUG_ASSERT(server->unregistered_request_matcher == nullptr);
1271   server->unregistered_request_matcher =
1272       new AllocatingRequestMatcherBatch(server, cq, std::move(allocator));
1273 }
1274 
1275 };  // namespace grpc_core
1276 
1277 const grpc_channel_filter grpc_server_top_filter = {
1278     server_start_transport_stream_op_batch,
1279     grpc_channel_next_op,
1280     sizeof(call_data),
1281     server_init_call_elem,
1282     grpc_call_stack_ignore_set_pollset_or_pollset_set,
1283     server_destroy_call_elem,
1284     sizeof(channel_data),
1285     server_init_channel_elem,
1286     server_destroy_channel_elem,
1287     grpc_channel_next_get_info,
1288     "server",
1289 };
1290 
grpc_server_register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)1291 void grpc_server_register_completion_queue(grpc_server* server,
1292                                            grpc_completion_queue* cq,
1293                                            void* reserved) {
1294   GRPC_API_TRACE(
1295       "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
1296       (server, cq, reserved));
1297 
1298   auto cq_type = grpc_get_cq_completion_type(cq);
1299   if (cq_type != GRPC_CQ_NEXT && cq_type != GRPC_CQ_CALLBACK) {
1300     gpr_log(GPR_INFO,
1301             "Completion queue of type %d is being registered as a "
1302             "server-completion-queue",
1303             static_cast<int>(cq_type));
1304     /* Ideally we should log an error and abort but ruby-wrapped-language API
1305        calls grpc_completion_queue_pluck() on server completion queues */
1306   }
1307 
1308   register_completion_queue(server, cq, reserved);
1309 }
1310 
grpc_server_create(const grpc_channel_args * args,void * reserved)1311 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
1312   grpc_core::ExecCtx exec_ctx;
1313   GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
1314 
1315   grpc_server* server =
1316       static_cast<grpc_server*>(gpr_zalloc(sizeof(grpc_server)));
1317 
1318   gpr_mu_init(&server->mu_global);
1319   gpr_mu_init(&server->mu_call);
1320   gpr_cv_init(&server->starting_cv);
1321 
1322   /* decremented by grpc_server_destroy */
1323   new (&server->internal_refcount) grpc_core::RefCount();
1324   server->root_channel_data.next = server->root_channel_data.prev =
1325       &server->root_channel_data;
1326 
1327   server->channel_args = grpc_channel_args_copy(args);
1328 
1329   const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
1330   if (grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT)) {
1331     arg = grpc_channel_args_find(
1332         args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE);
1333     size_t channel_tracer_max_memory = grpc_channel_arg_get_integer(
1334         arg,
1335         {GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});
1336     server->channelz_server =
1337         grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
1338             server, channel_tracer_max_memory);
1339     server->channelz_server->AddTraceEvent(
1340         grpc_core::channelz::ChannelTrace::Severity::Info,
1341         grpc_slice_from_static_string("Server created"));
1342   }
1343 
1344   if (args != nullptr) {
1345     grpc_resource_quota* resource_quota =
1346         grpc_resource_quota_from_channel_args(args, false /* create */);
1347     if (resource_quota != nullptr) {
1348       server->default_resource_user =
1349           grpc_resource_user_create(resource_quota, "default");
1350     }
1351   }
1352 
1353   return server;
1354 }
1355 
grpc_server_register_method(grpc_server * server,const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)1356 void* grpc_server_register_method(
1357     grpc_server* server, const char* method, const char* host,
1358     grpc_server_register_method_payload_handling payload_handling,
1359     uint32_t flags) {
1360   registered_method* m;
1361   GRPC_API_TRACE(
1362       "grpc_server_register_method(server=%p, method=%s, host=%s, "
1363       "flags=0x%08x)",
1364       4, (server, method, host, flags));
1365   if (!method) {
1366     gpr_log(GPR_ERROR,
1367             "grpc_server_register_method method string cannot be NULL");
1368     return nullptr;
1369   }
1370   for (m = server->registered_methods; m; m = m->next) {
1371     if (streq(m->method, method) && streq(m->host, host)) {
1372       gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
1373               host ? host : "*");
1374       return nullptr;
1375     }
1376   }
1377   if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) {
1378     gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
1379             flags);
1380     return nullptr;
1381   }
1382   m = new registered_method(method, host, payload_handling, flags);
1383   m->next = server->registered_methods;
1384   server->registered_methods = m;
1385   return m;
1386 }
1387 
grpc_server_start(grpc_server * server)1388 void grpc_server_start(grpc_server* server) {
1389   size_t i;
1390   grpc_core::ExecCtx exec_ctx;
1391 
1392   GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
1393 
1394   server->started = true;
1395   server->pollset_count = 0;
1396   server->pollsets = static_cast<grpc_pollset**>(
1397       gpr_malloc(sizeof(grpc_pollset*) * server->cq_count));
1398   for (i = 0; i < server->cq_count; i++) {
1399     if (grpc_cq_can_listen(server->cqs[i])) {
1400       server->pollsets[server->pollset_count++] =
1401           grpc_cq_pollset(server->cqs[i]);
1402     }
1403   }
1404   if (server->unregistered_request_matcher == nullptr) {
1405     server->unregistered_request_matcher = new RealRequestMatcher(server);
1406   }
1407   for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
1408     if (rm->matcher == nullptr) {
1409       rm->matcher.reset(new RealRequestMatcher(server));
1410     }
1411   }
1412 
1413   gpr_mu_lock(&server->mu_global);
1414   server->starting = true;
1415   gpr_mu_unlock(&server->mu_global);
1416 
1417   for (listener* l = server->listeners; l; l = l->next) {
1418     l->start(server, l->arg, server->pollsets, server->pollset_count);
1419   }
1420 
1421   gpr_mu_lock(&server->mu_global);
1422   server->starting = false;
1423   gpr_cv_signal(&server->starting_cv);
1424   gpr_mu_unlock(&server->mu_global);
1425 }
1426 
grpc_server_get_pollsets(grpc_server * server,grpc_pollset *** pollsets,size_t * pollset_count)1427 void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
1428                               size_t* pollset_count) {
1429   *pollset_count = server->pollset_count;
1430   *pollsets = server->pollsets;
1431 }
1432 
grpc_server_setup_transport(grpc_server * s,grpc_transport * transport,grpc_pollset * accepting_pollset,const grpc_channel_args * args,const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> & socket_node,grpc_resource_user * resource_user)1433 void grpc_server_setup_transport(
1434     grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
1435     const grpc_channel_args* args,
1436     const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
1437         socket_node,
1438     grpc_resource_user* resource_user) {
1439   size_t num_registered_methods;
1440   size_t alloc;
1441   registered_method* rm;
1442   channel_registered_method* crm;
1443   grpc_channel* channel;
1444   channel_data* chand;
1445   uint32_t hash;
1446   size_t slots;
1447   uint32_t probes;
1448   uint32_t max_probes = 0;
1449   grpc_transport_op* op = nullptr;
1450 
1451   channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport,
1452                                 resource_user);
1453   chand = static_cast<channel_data*>(
1454       grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
1455           ->channel_data);
1456   chand->server = s;
1457   server_ref(s);
1458   chand->channel = channel;
1459   if (socket_node != nullptr) {
1460     chand->channelz_socket_uuid = socket_node->uuid();
1461     s->channelz_server->AddChildSocket(socket_node);
1462   } else {
1463     chand->channelz_socket_uuid = 0;
1464   }
1465 
1466   size_t cq_idx;
1467   for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
1468     if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
1469   }
1470   if (cq_idx == s->cq_count) {
1471     /* completion queue not found: pick a random one to publish new calls to */
1472     cq_idx = static_cast<size_t>(rand()) % s->cq_count;
1473   }
1474   chand->cq_idx = cq_idx;
1475 
1476   num_registered_methods = 0;
1477   for (rm = s->registered_methods; rm; rm = rm->next) {
1478     num_registered_methods++;
1479   }
1480   /* build a lookup table phrased in terms of mdstr's in this channels context
1481      to quickly find registered methods */
1482   if (num_registered_methods > 0) {
1483     slots = 2 * num_registered_methods;
1484     alloc = sizeof(channel_registered_method) * slots;
1485     chand->registered_methods =
1486         static_cast<channel_registered_method*>(gpr_zalloc(alloc));
1487     for (rm = s->registered_methods; rm; rm = rm->next) {
1488       grpc_core::ExternallyManagedSlice host;
1489       grpc_core::ExternallyManagedSlice method(rm->method.c_str());
1490       const bool has_host = !rm->host.empty();
1491       if (has_host) {
1492         host = grpc_core::ExternallyManagedSlice(rm->host.c_str());
1493       }
1494       hash = GRPC_MDSTR_KV_HASH(has_host ? host.Hash() : 0, method.Hash());
1495       for (probes = 0; chand->registered_methods[(hash + probes) % slots]
1496                            .server_registered_method != nullptr;
1497            probes++)
1498         ;
1499       if (probes > max_probes) max_probes = probes;
1500       crm = &chand->registered_methods[(hash + probes) % slots];
1501       crm->server_registered_method = rm;
1502       crm->flags = rm->flags;
1503       crm->has_host = has_host;
1504       if (has_host) {
1505         crm->host = host;
1506       }
1507       crm->method = method;
1508     }
1509     GPR_ASSERT(slots <= UINT32_MAX);
1510     chand->registered_method_slots = static_cast<uint32_t>(slots);
1511     chand->registered_method_max_probes = max_probes;
1512   }
1513 
1514   gpr_mu_lock(&s->mu_global);
1515   chand->next = &s->root_channel_data;
1516   chand->prev = chand->next->prev;
1517   chand->next->prev = chand->prev->next = chand;
1518   gpr_mu_unlock(&s->mu_global);
1519 
1520   op = grpc_make_transport_op(nullptr);
1521   op->set_accept_stream = true;
1522   op->set_accept_stream_fn = accept_stream;
1523   op->set_accept_stream_user_data = chand;
1524   op->start_connectivity_watch.reset(new ConnectivityWatcher(chand));
1525   if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
1526     op->disconnect_with_error =
1527         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");
1528   }
1529   grpc_transport_perform_op(transport, op);
1530 }
1531 
1532 /*
1533   - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via
1534     grpc_server_request_call and grpc_server_request_registered call will now be
1535     cancelled). See 'kill_pending_work_locked()'
1536 
1537   - Shuts down the listeners (i.e the server will no longer listen on the port
1538     for new incoming channels).
1539 
1540   - Iterates through all channels on the server and sends shutdown msg (see
1541     'channel_broadcaster_shutdown()' for details) to the clients via the
1542     transport layer. The transport layer then guarantees the following:
1543      -- Sends shutdown to the client (for eg: HTTP2 transport sends GOAWAY)
1544      -- If the server has outstanding calls that are in the process, the
1545         connection is NOT closed until the server is done with all those calls
1546      -- Once, there are no more calls in progress, the channel is closed
1547  */
grpc_server_shutdown_and_notify(grpc_server * server,grpc_completion_queue * cq,void * tag)1548 void grpc_server_shutdown_and_notify(grpc_server* server,
1549                                      grpc_completion_queue* cq, void* tag) {
1550   listener* l;
1551   shutdown_tag* sdt;
1552   channel_broadcaster broadcaster;
1553   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1554   grpc_core::ExecCtx exec_ctx;
1555 
1556   GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1557                  (server, cq, tag));
1558 
1559   /* wait for startup to be finished: locks mu_global */
1560   gpr_mu_lock(&server->mu_global);
1561   while (server->starting) {
1562     gpr_cv_wait(&server->starting_cv, &server->mu_global,
1563                 gpr_inf_future(GPR_CLOCK_MONOTONIC));
1564   }
1565 
1566   /* stay locked, and gather up some stuff to do */
1567   GPR_ASSERT(grpc_cq_begin_op(cq, tag));
1568   if (server->shutdown_published) {
1569     grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, nullptr,
1570                    static_cast<grpc_cq_completion*>(
1571                        gpr_malloc(sizeof(grpc_cq_completion))));
1572     gpr_mu_unlock(&server->mu_global);
1573     return;
1574   }
1575   server->shutdown_tags = static_cast<shutdown_tag*>(
1576       gpr_realloc(server->shutdown_tags,
1577                   sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)));
1578   sdt = &server->shutdown_tags[server->num_shutdown_tags++];
1579   sdt->tag = tag;
1580   sdt->cq = cq;
1581   if (gpr_atm_acq_load(&server->shutdown_flag)) {
1582     gpr_mu_unlock(&server->mu_global);
1583     return;
1584   }
1585 
1586   server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
1587 
1588   channel_broadcaster_init(server, &broadcaster);
1589 
1590   gpr_atm_rel_store(&server->shutdown_flag, 1);
1591 
1592   /* collect all unregistered then registered calls */
1593   gpr_mu_lock(&server->mu_call);
1594   kill_pending_work_locked(
1595       server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1596   gpr_mu_unlock(&server->mu_call);
1597 
1598   maybe_finish_shutdown(server);
1599   gpr_mu_unlock(&server->mu_global);
1600 
1601   /* Shutdown listeners */
1602   for (l = server->listeners; l; l = l->next) {
1603     GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server,
1604                       grpc_schedule_on_exec_ctx);
1605     l->destroy(server, l->arg, &l->destroy_done);
1606     if (server->channelz_server != nullptr && l->socket_uuid != 0) {
1607       server->channelz_server->RemoveChildListenSocket(l->socket_uuid);
1608     }
1609   }
1610 
1611   channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
1612                                GRPC_ERROR_NONE);
1613 
1614   if (server->default_resource_user != nullptr) {
1615     grpc_resource_quota_unref(
1616         grpc_resource_user_quota(server->default_resource_user));
1617     grpc_resource_user_shutdown(server->default_resource_user);
1618     grpc_resource_user_unref(server->default_resource_user);
1619   }
1620 }
1621 
grpc_server_cancel_all_calls(grpc_server * server)1622 void grpc_server_cancel_all_calls(grpc_server* server) {
1623   channel_broadcaster broadcaster;
1624   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1625   grpc_core::ExecCtx exec_ctx;
1626 
1627   GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1628 
1629   gpr_mu_lock(&server->mu_global);
1630   channel_broadcaster_init(server, &broadcaster);
1631   gpr_mu_unlock(&server->mu_global);
1632 
1633   channel_broadcaster_shutdown(
1634       &broadcaster, false /* send_goaway */,
1635       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));
1636 }
1637 
grpc_server_destroy(grpc_server * server)1638 void grpc_server_destroy(grpc_server* server) {
1639   listener* l;
1640   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1641   grpc_core::ExecCtx exec_ctx;
1642 
1643   GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1644 
1645   gpr_mu_lock(&server->mu_global);
1646   GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
1647   GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
1648 
1649   while (server->listeners) {
1650     l = server->listeners;
1651     server->listeners = l->next;
1652     gpr_free(l);
1653   }
1654 
1655   gpr_mu_unlock(&server->mu_global);
1656 
1657   server_unref(server);
1658 }
1659 
grpc_server_add_listener(grpc_server * server,void * listener_arg,void (* start)(grpc_server * server,void * arg,grpc_pollset ** pollsets,size_t pollset_count),void (* destroy)(grpc_server * server,void * arg,grpc_closure * on_done),grpc_core::RefCountedPtr<grpc_core::channelz::ListenSocketNode> node)1660 void grpc_server_add_listener(
1661     grpc_server* server, void* listener_arg,
1662     void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
1663                   size_t pollset_count),
1664     void (*destroy)(grpc_server* server, void* arg, grpc_closure* on_done),
1665     grpc_core::RefCountedPtr<grpc_core::channelz::ListenSocketNode> node) {
1666   listener* l = static_cast<listener*>(gpr_malloc(sizeof(listener)));
1667   l->arg = listener_arg;
1668   l->start = start;
1669   l->destroy = destroy;
1670   l->socket_uuid = 0;
1671   if (node != nullptr) {
1672     l->socket_uuid = node->uuid();
1673     if (server->channelz_server != nullptr) {
1674       server->channelz_server->AddChildListenSocket(std::move(node));
1675     }
1676   }
1677   l->next = server->listeners;
1678   server->listeners = l;
1679 }
1680 
1681 namespace {
ValidateServerRequest(grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,registered_method * rm)1682 grpc_call_error ValidateServerRequest(
1683     grpc_completion_queue* cq_for_notification, void* tag,
1684     grpc_byte_buffer** optional_payload, registered_method* rm) {
1685   if ((rm == nullptr && optional_payload != nullptr) ||
1686       ((rm != nullptr) && ((optional_payload == nullptr) !=
1687                            (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)))) {
1688     return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
1689   }
1690   if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
1691     return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1692   }
1693   return GRPC_CALL_OK;
1694 }
ValidateServerRequestAndCq(size_t * cq_idx,grpc_server * server,grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,registered_method * rm)1695 grpc_call_error ValidateServerRequestAndCq(
1696     size_t* cq_idx, grpc_server* server,
1697     grpc_completion_queue* cq_for_notification, void* tag,
1698     grpc_byte_buffer** optional_payload, registered_method* rm) {
1699   size_t idx;
1700   for (idx = 0; idx < server->cq_count; idx++) {
1701     if (server->cqs[idx] == cq_for_notification) {
1702       break;
1703     }
1704   }
1705   if (idx == server->cq_count) {
1706     return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1707   }
1708   grpc_call_error error =
1709       ValidateServerRequest(cq_for_notification, tag, optional_payload, rm);
1710   if (error != GRPC_CALL_OK) {
1711     return error;
1712   }
1713 
1714   *cq_idx = idx;
1715   return GRPC_CALL_OK;
1716 }
1717 }  // namespace
1718 
grpc_server_request_call(grpc_server * server,grpc_call ** call,grpc_call_details * details,grpc_metadata_array * initial_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1719 grpc_call_error grpc_server_request_call(
1720     grpc_server* server, grpc_call** call, grpc_call_details* details,
1721     grpc_metadata_array* initial_metadata,
1722     grpc_completion_queue* cq_bound_to_call,
1723     grpc_completion_queue* cq_for_notification, void* tag) {
1724   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1725   grpc_core::ExecCtx exec_ctx;
1726   GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1727   GRPC_API_TRACE(
1728       "grpc_server_request_call("
1729       "server=%p, call=%p, details=%p, initial_metadata=%p, "
1730       "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
1731       7,
1732       (server, call, details, initial_metadata, cq_bound_to_call,
1733        cq_for_notification, tag));
1734 
1735   size_t cq_idx;
1736   grpc_call_error error = ValidateServerRequestAndCq(
1737       &cq_idx, server, cq_for_notification, tag, nullptr, nullptr);
1738   if (error != GRPC_CALL_OK) {
1739     return error;
1740   }
1741 
1742   requested_call* rc = new requested_call(tag, cq_bound_to_call, call,
1743                                           initial_metadata, details);
1744   return queue_call_request(server, cq_idx, rc);
1745 }
1746 
grpc_server_request_registered_call(grpc_server * server,void * rmp,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * initial_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1747 grpc_call_error grpc_server_request_registered_call(
1748     grpc_server* server, void* rmp, grpc_call** call, gpr_timespec* deadline,
1749     grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
1750     grpc_completion_queue* cq_bound_to_call,
1751     grpc_completion_queue* cq_for_notification, void* tag) {
1752   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1753   grpc_core::ExecCtx exec_ctx;
1754   GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1755   registered_method* rm = static_cast<registered_method*>(rmp);
1756   GRPC_API_TRACE(
1757       "grpc_server_request_registered_call("
1758       "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
1759       "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
1760       "tag=%p)",
1761       9,
1762       (server, rmp, call, deadline, initial_metadata, optional_payload,
1763        cq_bound_to_call, cq_for_notification, tag));
1764 
1765   size_t cq_idx;
1766   grpc_call_error error = ValidateServerRequestAndCq(
1767       &cq_idx, server, cq_for_notification, tag, optional_payload, rm);
1768   if (error != GRPC_CALL_OK) {
1769     return error;
1770   }
1771 
1772   requested_call* rc =
1773       new requested_call(tag, cq_bound_to_call, call, initial_metadata, rm,
1774                          deadline, optional_payload);
1775   return queue_call_request(server, cq_idx, rc);
1776 }
1777 
grpc_server_get_channel_args(grpc_server * server)1778 const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
1779   return server->channel_args;
1780 }
1781 
grpc_server_get_default_resource_user(grpc_server * server)1782 grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) {
1783   return server->default_resource_user;
1784 }
1785 
grpc_server_has_open_connections(grpc_server * server)1786 int grpc_server_has_open_connections(grpc_server* server) {
1787   int r;
1788   gpr_mu_lock(&server->mu_global);
1789   r = server->root_channel_data.next != &server->root_channel_data;
1790   gpr_mu_unlock(&server->mu_global);
1791   return r;
1792 }
1793 
grpc_server_get_channelz_node(grpc_server * server)1794 grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
1795     grpc_server* server) {
1796   if (server == nullptr) {
1797     return nullptr;
1798   }
1799   return server->channelz_server.get();
1800 }
1801