1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
20 #ifndef _GNU_SOURCE
21 #define _GNU_SOURCE
22 #endif
23 
24 #include <grpc/support/port_platform.h>
25 
26 #include "src/core/lib/iomgr/port.h"
27 
28 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
29 
30 #include <errno.h>
31 #include <fcntl.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <string.h>
35 #include <sys/socket.h>
36 #include <sys/stat.h>
37 #include <sys/types.h>
38 #include <unistd.h>
39 
40 #include <string>
41 
42 #include "absl/strings/str_cat.h"
43 #include "absl/strings/str_format.h"
44 
45 #include <grpc/support/alloc.h>
46 #include <grpc/support/log.h>
47 #include <grpc/support/sync.h>
48 #include <grpc/support/time.h>
49 
50 #include "src/core/lib/address_utils/sockaddr_utils.h"
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gprpp/memory.h"
54 #include "src/core/lib/iomgr/exec_ctx.h"
55 #include "src/core/lib/iomgr/resolve_address.h"
56 #include "src/core/lib/iomgr/sockaddr.h"
57 #include "src/core/lib/iomgr/socket_utils_posix.h"
58 #include "src/core/lib/iomgr/tcp_posix.h"
59 #include "src/core/lib/iomgr/tcp_server.h"
60 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
61 #include "src/core/lib/iomgr/unix_sockets_posix.h"
62 #include "src/core/lib/resource_quota/api.h"
63 
tcp_server_create(grpc_closure * shutdown_complete,const grpc_channel_args * args,grpc_tcp_server ** server)64 static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
65                                            const grpc_channel_args* args,
66                                            grpc_tcp_server** server) {
67   grpc_tcp_server* s = new grpc_tcp_server;
68   s->so_reuseport = grpc_is_socket_reuse_port_supported();
69   s->expand_wildcard_addrs = false;
70   for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) {
71     if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
72       if (args->args[i].type == GRPC_ARG_INTEGER) {
73         s->so_reuseport = grpc_is_socket_reuse_port_supported() &&
74                           (args->args[i].value.integer != 0);
75       } else {
76         gpr_free(s);
77         return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT
78                                                     " must be an integer");
79       }
80     } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) {
81       if (args->args[i].type == GRPC_ARG_INTEGER) {
82         s->expand_wildcard_addrs = (args->args[i].value.integer != 0);
83       } else {
84         gpr_free(s);
85         return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
86             GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer");
87       }
88     }
89   }
90   gpr_ref_init(&s->refs, 1);
91   gpr_mu_init(&s->mu);
92   s->active_ports = 0;
93   s->destroyed_ports = 0;
94   s->shutdown = false;
95   s->shutdown_starting.head = nullptr;
96   s->shutdown_starting.tail = nullptr;
97   s->shutdown_complete = shutdown_complete;
98   s->on_accept_cb = nullptr;
99   s->on_accept_cb_arg = nullptr;
100   s->head = nullptr;
101   s->tail = nullptr;
102   s->nports = 0;
103   s->channel_args = grpc_channel_args_copy(args);
104   s->fd_handler = nullptr;
105   s->memory_quota =
106       grpc_core::ResourceQuotaFromChannelArgs(args)->memory_quota();
107   gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
108   *server = s;
109   return GRPC_ERROR_NONE;
110 }
111 
finish_shutdown(grpc_tcp_server * s)112 static void finish_shutdown(grpc_tcp_server* s) {
113   gpr_mu_lock(&s->mu);
114   GPR_ASSERT(s->shutdown);
115   gpr_mu_unlock(&s->mu);
116   if (s->shutdown_complete != nullptr) {
117     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
118                             GRPC_ERROR_NONE);
119   }
120   gpr_mu_destroy(&s->mu);
121   while (s->head) {
122     grpc_tcp_listener* sp = s->head;
123     s->head = sp->next;
124     gpr_free(sp);
125   }
126   grpc_channel_args_destroy(s->channel_args);
127   delete s->fd_handler;
128   delete s;
129 }
130 
destroyed_port(void * server,grpc_error_handle)131 static void destroyed_port(void* server, grpc_error_handle /*error*/) {
132   grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
133   gpr_mu_lock(&s->mu);
134   s->destroyed_ports++;
135   if (s->destroyed_ports == s->nports) {
136     gpr_mu_unlock(&s->mu);
137     finish_shutdown(s);
138   } else {
139     GPR_ASSERT(s->destroyed_ports < s->nports);
140     gpr_mu_unlock(&s->mu);
141   }
142 }
143 
144 /* called when all listening endpoints have been shutdown, so no further
145    events will be received on them - at this point it's safe to destroy
146    things */
deactivated_all_ports(grpc_tcp_server * s)147 static void deactivated_all_ports(grpc_tcp_server* s) {
148   /* delete ALL the things */
149   gpr_mu_lock(&s->mu);
150 
151   GPR_ASSERT(s->shutdown);
152 
153   if (s->head) {
154     grpc_tcp_listener* sp;
155     for (sp = s->head; sp; sp = sp->next) {
156       grpc_unlink_if_unix_domain_socket(&sp->addr);
157       GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
158                         grpc_schedule_on_exec_ctx);
159       grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
160                      "tcp_listener_shutdown");
161     }
162     gpr_mu_unlock(&s->mu);
163   } else {
164     gpr_mu_unlock(&s->mu);
165     finish_shutdown(s);
166   }
167 }
168 
tcp_server_destroy(grpc_tcp_server * s)169 static void tcp_server_destroy(grpc_tcp_server* s) {
170   gpr_mu_lock(&s->mu);
171   GPR_ASSERT(!s->shutdown);
172   s->shutdown = true;
173   /* shutdown all fd's */
174   if (s->active_ports) {
175     grpc_tcp_listener* sp;
176     for (sp = s->head; sp; sp = sp->next) {
177       grpc_fd_shutdown(
178           sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server destroyed"));
179     }
180     gpr_mu_unlock(&s->mu);
181   } else {
182     gpr_mu_unlock(&s->mu);
183     deactivated_all_ports(s);
184   }
185 }
186 
187 /* event manager callback when reads are ready */
on_read(void * arg,grpc_error_handle err)188 static void on_read(void* arg, grpc_error_handle err) {
189   grpc_tcp_listener* sp = static_cast<grpc_tcp_listener*>(arg);
190   grpc_pollset* read_notifier_pollset;
191   if (err != GRPC_ERROR_NONE) {
192     goto error;
193   }
194 
195   /* loop until accept4 returns EAGAIN, and then re-arm notification */
196   for (;;) {
197     grpc_resolved_address addr;
198     memset(&addr, 0, sizeof(addr));
199     addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
200     /* Note: If we ever decide to return this address to the user, remember to
201        strip off the ::ffff:0.0.0.0/96 prefix first. */
202     int fd = grpc_accept4(sp->fd, &addr, 1, 1);
203     if (fd < 0) {
204       switch (errno) {
205         case EINTR:
206           continue;
207         case EAGAIN:
208           grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
209           return;
210         default:
211           gpr_mu_lock(&sp->server->mu);
212           if (!sp->server->shutdown_listeners) {
213             gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
214           } else {
215             /* if we have shutdown listeners, accept4 could fail, and we
216                needn't notify users */
217           }
218           gpr_mu_unlock(&sp->server->mu);
219           goto error;
220       }
221     }
222 
223     if (sp->server->memory_quota->IsMemoryPressureHigh()) {
224       gpr_log(GPR_INFO, "Drop incoming connection: high memory pressure");
225       close(fd);
226       continue;
227     }
228 
229     /* For UNIX sockets, the accept call might not fill up the member sun_path
230      * of sockaddr_un, so explicitly call getsockname to get it. */
231     if (grpc_is_unix_socket(&addr)) {
232       memset(&addr, 0, sizeof(addr));
233       addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
234       if (getsockname(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
235                       &(addr.len)) < 0) {
236         gpr_log(GPR_ERROR, "Failed getsockname: %s", strerror(errno));
237         close(fd);
238         goto error;
239       }
240     }
241 
242     (void)grpc_set_socket_no_sigpipe_if_possible(fd);
243 
244     err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_CONNECTION_USAGE,
245                                             sp->server->channel_args);
246     if (err != GRPC_ERROR_NONE) {
247       goto error;
248     }
249 
250     std::string addr_str = grpc_sockaddr_to_uri(&addr);
251     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
252       gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s",
253               addr_str.c_str());
254     }
255 
256     std::string name = absl::StrCat("tcp-server-connection:", addr_str);
257     grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
258 
259     read_notifier_pollset = (*(sp->server->pollsets))
260         [static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
261              &sp->server->next_pollset_to_assign, 1)) %
262          sp->server->pollsets->size()];
263 
264     grpc_pollset_add_fd(read_notifier_pollset, fdobj);
265 
266     // Create acceptor.
267     grpc_tcp_server_acceptor* acceptor =
268         static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
269     acceptor->from_server = sp->server;
270     acceptor->port_index = sp->port_index;
271     acceptor->fd_index = sp->fd_index;
272     acceptor->external_connection = false;
273     sp->server->on_accept_cb(
274         sp->server->on_accept_cb_arg,
275         grpc_tcp_create(fdobj, sp->server->channel_args, addr_str),
276         read_notifier_pollset, acceptor);
277   }
278 
279   GPR_UNREACHABLE_CODE(return );
280 
281 error:
282   gpr_mu_lock(&sp->server->mu);
283   if (0 == --sp->server->active_ports && sp->server->shutdown) {
284     gpr_mu_unlock(&sp->server->mu);
285     deactivated_all_ports(sp->server);
286   } else {
287     gpr_mu_unlock(&sp->server->mu);
288   }
289 }
290 
291 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
add_wildcard_addrs_to_server(grpc_tcp_server * s,unsigned port_index,int requested_port,int * out_port)292 static grpc_error_handle add_wildcard_addrs_to_server(grpc_tcp_server* s,
293                                                       unsigned port_index,
294                                                       int requested_port,
295                                                       int* out_port) {
296   grpc_resolved_address wild4;
297   grpc_resolved_address wild6;
298   unsigned fd_index = 0;
299   grpc_dualstack_mode dsmode;
300   grpc_tcp_listener* sp = nullptr;
301   grpc_tcp_listener* sp2 = nullptr;
302   grpc_error_handle v6_err = GRPC_ERROR_NONE;
303   grpc_error_handle v4_err = GRPC_ERROR_NONE;
304   *out_port = -1;
305 
306   if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
307     return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
308                                                out_port);
309   }
310 
311   grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
312   /* Try listening on IPv6 first. */
313   if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
314                                          &dsmode, &sp)) == GRPC_ERROR_NONE) {
315     ++fd_index;
316     requested_port = *out_port = sp->port;
317     if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
318       return GRPC_ERROR_NONE;
319     }
320   }
321   /* If we got a v6-only socket or nothing, try adding 0.0.0.0. */
322   grpc_sockaddr_set_port(&wild4, requested_port);
323   if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
324                                          &dsmode, &sp2)) == GRPC_ERROR_NONE) {
325     *out_port = sp2->port;
326     if (sp != nullptr) {
327       sp2->is_sibling = 1;
328       sp->sibling = sp2;
329     }
330   }
331   if (*out_port > 0) {
332     if (v6_err != GRPC_ERROR_NONE) {
333       gpr_log(GPR_INFO,
334               "Failed to add :: listener, "
335               "the environment may not support IPv6: %s",
336               grpc_error_std_string(v6_err).c_str());
337       GRPC_ERROR_UNREF(v6_err);
338     }
339     if (v4_err != GRPC_ERROR_NONE) {
340       gpr_log(GPR_INFO,
341               "Failed to add 0.0.0.0 listener, "
342               "the environment may not support IPv4: %s",
343               grpc_error_std_string(v4_err).c_str());
344       GRPC_ERROR_UNREF(v4_err);
345     }
346     return GRPC_ERROR_NONE;
347   } else {
348     grpc_error_handle root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
349         "Failed to add any wildcard listeners");
350     GPR_ASSERT(v6_err != GRPC_ERROR_NONE && v4_err != GRPC_ERROR_NONE);
351     root_err = grpc_error_add_child(root_err, v6_err);
352     root_err = grpc_error_add_child(root_err, v4_err);
353     return root_err;
354   }
355 }
356 
clone_port(grpc_tcp_listener * listener,unsigned count)357 static grpc_error_handle clone_port(grpc_tcp_listener* listener,
358                                     unsigned count) {
359   grpc_tcp_listener* sp = nullptr;
360   std::string addr_str;
361   grpc_error_handle err;
362 
363   for (grpc_tcp_listener* l = listener->next; l && l->is_sibling; l = l->next) {
364     l->fd_index += count;
365   }
366 
367   for (unsigned i = 0; i < count; i++) {
368     int fd = -1;
369     int port = -1;
370     grpc_dualstack_mode dsmode;
371     err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
372                                        &fd);
373     if (err != GRPC_ERROR_NONE) return err;
374     err = grpc_tcp_server_prepare_socket(listener->server, fd, &listener->addr,
375                                          true, &port);
376     if (err != GRPC_ERROR_NONE) return err;
377     listener->server->nports++;
378     addr_str = grpc_sockaddr_to_string(&listener->addr, true);
379     sp = static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
380     sp->next = listener->next;
381     listener->next = sp;
382     /* sp (the new listener) is a sibling of 'listener' (the original
383        listener). */
384     sp->is_sibling = 1;
385     sp->sibling = listener->sibling;
386     listener->sibling = sp;
387     sp->server = listener->server;
388     sp->fd = fd;
389     sp->emfd = grpc_fd_create(
390         fd,
391         absl::StrFormat("tcp-server-listener:%s/clone-%d", addr_str.c_str(), i)
392             .c_str(),
393         true);
394     memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
395     sp->port = port;
396     sp->port_index = listener->port_index;
397     sp->fd_index = listener->fd_index + count - i;
398     GPR_ASSERT(sp->emfd);
399     while (listener->server->tail->next != nullptr) {
400       listener->server->tail = listener->server->tail->next;
401     }
402   }
403 
404   return GRPC_ERROR_NONE;
405 }
406 
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * out_port)407 static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
408                                              const grpc_resolved_address* addr,
409                                              int* out_port) {
410   GPR_ASSERT(addr->len <= GRPC_MAX_SOCKADDR_SIZE);
411   grpc_tcp_listener* sp;
412   grpc_resolved_address sockname_temp;
413   grpc_resolved_address addr6_v4mapped;
414   int requested_port = grpc_sockaddr_get_port(addr);
415   unsigned port_index = 0;
416   grpc_dualstack_mode dsmode;
417   grpc_error_handle err;
418   *out_port = -1;
419   if (s->tail != nullptr) {
420     port_index = s->tail->port_index + 1;
421   }
422   grpc_unlink_if_unix_domain_socket(addr);
423 
424   /* Check if this is a wildcard port, and if so, try to keep the port the same
425      as some previously created listener. */
426   if (requested_port == 0) {
427     for (sp = s->head; sp; sp = sp->next) {
428       sockname_temp.len =
429           static_cast<socklen_t>(sizeof(struct sockaddr_storage));
430       if (0 ==
431           getsockname(sp->fd,
432                       reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr),
433                       &sockname_temp.len)) {
434         int used_port = grpc_sockaddr_get_port(&sockname_temp);
435         if (used_port > 0) {
436           memcpy(&sockname_temp, addr, sizeof(grpc_resolved_address));
437           grpc_sockaddr_set_port(&sockname_temp, used_port);
438           requested_port = used_port;
439           addr = &sockname_temp;
440           break;
441         }
442       }
443     }
444   }
445   if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
446     return add_wildcard_addrs_to_server(s, port_index, requested_port,
447                                         out_port);
448   }
449   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
450     addr = &addr6_v4mapped;
451   }
452   if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
453       GRPC_ERROR_NONE) {
454     *out_port = sp->port;
455   }
456   return err;
457 }
458 
459 /* Return listener at port_index or NULL. Should only be called with s->mu
460    locked. */
get_port_index(grpc_tcp_server * s,unsigned port_index)461 static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
462                                          unsigned port_index) {
463   unsigned num_ports = 0;
464   grpc_tcp_listener* sp;
465   for (sp = s->head; sp; sp = sp->next) {
466     if (!sp->is_sibling) {
467       if (++num_ports > port_index) {
468         return sp;
469       }
470     }
471   }
472   return nullptr;
473 }
474 
tcp_server_port_fd_count(grpc_tcp_server * s,unsigned port_index)475 unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
476   unsigned num_fds = 0;
477   gpr_mu_lock(&s->mu);
478   grpc_tcp_listener* sp = get_port_index(s, port_index);
479   for (; sp; sp = sp->sibling) {
480     ++num_fds;
481   }
482   gpr_mu_unlock(&s->mu);
483   return num_fds;
484 }
485 
tcp_server_port_fd(grpc_tcp_server * s,unsigned port_index,unsigned fd_index)486 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
487                               unsigned fd_index) {
488   gpr_mu_lock(&s->mu);
489   grpc_tcp_listener* sp = get_port_index(s, port_index);
490   for (; sp; sp = sp->sibling, --fd_index) {
491     if (fd_index == 0) {
492       gpr_mu_unlock(&s->mu);
493       return sp->fd;
494     }
495   }
496   gpr_mu_unlock(&s->mu);
497   return -1;
498 }
499 
tcp_server_start(grpc_tcp_server * s,const std::vector<grpc_pollset * > * pollsets,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg)500 static void tcp_server_start(grpc_tcp_server* s,
501                              const std::vector<grpc_pollset*>* pollsets,
502                              grpc_tcp_server_cb on_accept_cb,
503                              void* on_accept_cb_arg) {
504   size_t i;
505   grpc_tcp_listener* sp;
506   GPR_ASSERT(on_accept_cb);
507   gpr_mu_lock(&s->mu);
508   GPR_ASSERT(!s->on_accept_cb);
509   GPR_ASSERT(s->active_ports == 0);
510   s->on_accept_cb = on_accept_cb;
511   s->on_accept_cb_arg = on_accept_cb_arg;
512   s->pollsets = pollsets;
513   sp = s->head;
514   while (sp != nullptr) {
515     if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
516         pollsets->size() > 1) {
517       GPR_ASSERT(GRPC_LOG_IF_ERROR(
518           "clone_port", clone_port(sp, (unsigned)(pollsets->size() - 1))));
519       for (i = 0; i < pollsets->size(); i++) {
520         grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
521         GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
522                           grpc_schedule_on_exec_ctx);
523         grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
524         s->active_ports++;
525         sp = sp->next;
526       }
527     } else {
528       for (i = 0; i < pollsets->size(); i++) {
529         grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
530       }
531       GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
532                         grpc_schedule_on_exec_ctx);
533       grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
534       s->active_ports++;
535       sp = sp->next;
536     }
537   }
538   gpr_mu_unlock(&s->mu);
539 }
540 
tcp_server_ref(grpc_tcp_server * s)541 grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
542   gpr_ref_non_zero(&s->refs);
543   return s;
544 }
545 
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)546 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
547                                              grpc_closure* shutdown_starting) {
548   gpr_mu_lock(&s->mu);
549   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
550                            GRPC_ERROR_NONE);
551   gpr_mu_unlock(&s->mu);
552 }
553 
tcp_server_unref(grpc_tcp_server * s)554 static void tcp_server_unref(grpc_tcp_server* s) {
555   if (gpr_unref(&s->refs)) {
556     grpc_tcp_server_shutdown_listeners(s);
557     gpr_mu_lock(&s->mu);
558     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
559     gpr_mu_unlock(&s->mu);
560     tcp_server_destroy(s);
561   }
562 }
563 
tcp_server_shutdown_listeners(grpc_tcp_server * s)564 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
565   gpr_mu_lock(&s->mu);
566   s->shutdown_listeners = true;
567   /* shutdown all fd's */
568   if (s->active_ports) {
569     grpc_tcp_listener* sp;
570     for (sp = s->head; sp; sp = sp->next) {
571       grpc_fd_shutdown(sp->emfd,
572                        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"));
573     }
574   }
575   gpr_mu_unlock(&s->mu);
576 }
577 
578 namespace {
579 class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
580  public:
ExternalConnectionHandler(grpc_tcp_server * s)581   explicit ExternalConnectionHandler(grpc_tcp_server* s) : s_(s) {}
582 
583   // TODO(yangg) resolve duplicate code with on_read
Handle(int listener_fd,int fd,grpc_byte_buffer * buf)584   void Handle(int listener_fd, int fd, grpc_byte_buffer* buf) override {
585     grpc_pollset* read_notifier_pollset;
586     grpc_resolved_address addr;
587     memset(&addr, 0, sizeof(addr));
588     addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
589     grpc_core::ExecCtx exec_ctx;
590 
591     if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
592                     &(addr.len)) < 0) {
593       gpr_log(GPR_ERROR, "Failed getpeername: %s", strerror(errno));
594       close(fd);
595       return;
596     }
597     (void)grpc_set_socket_no_sigpipe_if_possible(fd);
598     std::string addr_str = grpc_sockaddr_to_uri(&addr);
599     if (grpc_tcp_trace.enabled()) {
600       gpr_log(GPR_INFO, "SERVER_CONNECT: incoming external connection: %s",
601               addr_str.c_str());
602     }
603     std::string name = absl::StrCat("tcp-server-connection:", addr_str);
604     grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
605     read_notifier_pollset =
606         (*(s_->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
607                               &s_->next_pollset_to_assign, 1)) %
608                           s_->pollsets->size()];
609     grpc_pollset_add_fd(read_notifier_pollset, fdobj);
610     grpc_tcp_server_acceptor* acceptor =
611         static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
612     acceptor->from_server = s_;
613     acceptor->port_index = -1;
614     acceptor->fd_index = -1;
615     acceptor->external_connection = true;
616     acceptor->listener_fd = listener_fd;
617     acceptor->pending_data = buf;
618     s_->on_accept_cb(s_->on_accept_cb_arg,
619                      grpc_tcp_create(fdobj, s_->channel_args, addr_str),
620                      read_notifier_pollset, acceptor);
621   }
622 
623  private:
624   grpc_tcp_server* s_;
625 };
626 }  // namespace
627 
tcp_server_create_fd_handler(grpc_tcp_server * s)628 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
629     grpc_tcp_server* s) {
630   s->fd_handler = new ExternalConnectionHandler(s);
631   return s->fd_handler;
632 }
633 
634 grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
635     tcp_server_create,        tcp_server_start,
636     tcp_server_add_port,      tcp_server_create_fd_handler,
637     tcp_server_port_fd_count, tcp_server_port_fd,
638     tcp_server_ref,           tcp_server_shutdown_starting_add,
639     tcp_server_unref,         tcp_server_shutdown_listeners};
640 
641 #endif /* GRPC_POSIX_SOCKET_TCP_SERVER */
642