1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include <tscore/TSSystemState.h>
25 
26 #include "P_Net.h"
27 
28 #ifdef ROUNDUP
29 #undef ROUNDUP
30 #endif
31 #define ROUNDUP(x, y) ((((x) + ((y)-1)) / (y)) * (y))
32 
33 using NetAcceptHandler = int (NetAccept::*)(int, void *);
34 int accept_till_done   = 1;
35 
36 // we need to protect naVec since it might be accessed
37 // in different threads at the same time
38 Ptr<ProxyMutex> naVecMutex;
39 std::vector<NetAccept *> naVec;
40 static void
safe_delay(int msec)41 safe_delay(int msec)
42 {
43   socketManager.poll(nullptr, 0, msec);
44 }
45 
46 //
47 // General case network connection accept code
48 //
49 int
net_accept(NetAccept * na,void * ep,bool blockable)50 net_accept(NetAccept *na, void *ep, bool blockable)
51 {
52   Event *e               = static_cast<Event *>(ep);
53   int res                = 0;
54   int count              = 0;
55   int loop               = accept_till_done;
56   UnixNetVConnection *vc = nullptr;
57   Connection con;
58 
59   if (!blockable) {
60     if (!MUTEX_TAKE_TRY_LOCK(na->action_->mutex, e->ethread)) {
61       return 0;
62     }
63   }
64 
65   // do-while for accepting all the connections
66   // added by YTS Team, yamsat
67   do {
68     if ((res = na->server.accept(&con)) < 0) {
69       if (res == -EAGAIN || res == -ECONNABORTED || res == -EPIPE) {
70         goto Ldone;
71       }
72       if (na->server.fd != NO_FD && !na->action_->cancelled) {
73         if (!blockable) {
74           na->action_->continuation->handleEvent(EVENT_ERROR, (void *)static_cast<intptr_t>(res));
75         } else {
76           SCOPED_MUTEX_LOCK(lock, na->action_->mutex, e->ethread);
77           na->action_->continuation->handleEvent(EVENT_ERROR, (void *)static_cast<intptr_t>(res));
78         }
79       }
80       count = res;
81       goto Ldone;
82     }
83     NET_SUM_GLOBAL_DYN_STAT(net_tcp_accept_stat, 1);
84 
85     vc = static_cast<UnixNetVConnection *>(na->getNetProcessor()->allocate_vc(e->ethread));
86     if (!vc) {
87       goto Ldone; // note: @a con will clean up the socket when it goes out of scope.
88     }
89 
90     ++count;
91     NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
92     vc->id = net_next_connection_number();
93     vc->con.move(con);
94     vc->submit_time = Thread::get_hrtime();
95     vc->action_     = *na->action_;
96     vc->set_is_transparent(na->opt.f_inbound_transparent);
97     vc->set_is_proxy_protocol(na->opt.f_proxy_protocol);
98     vc->set_context(NET_VCONNECTION_IN);
99     if (na->opt.f_mptcp) {
100       vc->set_mptcp_state(); // Try to get the MPTCP state, and update accordingly
101     }
102 #ifdef USE_EDGE_TRIGGER
103     // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket.
104     if (na->server.http_accept_filter) {
105       vc->read.triggered = 1;
106     }
107 #endif
108     SET_CONTINUATION_HANDLER(vc, (NetVConnHandler)&UnixNetVConnection::acceptEvent);
109 
110     EThread *t;
111     NetHandler *h;
112     if (e->ethread->is_event_type(na->opt.etype)) {
113       t = e->ethread;
114       h = get_NetHandler(t);
115       // Assign NetHandler->mutex to NetVC
116       vc->mutex = h->mutex;
117       MUTEX_TRY_LOCK(lock, h->mutex, t);
118       if (!lock.is_locked()) {
119         t->schedule_in(vc, HRTIME_MSECONDS(net_retry_delay));
120       } else {
121         vc->handleEvent(EVENT_NONE, e);
122       }
123     } else {
124       t = eventProcessor.assign_thread(na->opt.etype);
125       h = get_NetHandler(t);
126       // Assign NetHandler->mutex to NetVC
127       vc->mutex = h->mutex;
128       t->schedule_imm(vc);
129     }
130   } while (loop);
131 
132 Ldone:
133   if (!blockable) {
134     MUTEX_UNTAKE_LOCK(na->action_->mutex, e->ethread);
135   }
136   return count;
137 }
138 
139 NetAccept *
getNetAccept(int ID)140 getNetAccept(int ID)
141 {
142   SCOPED_MUTEX_LOCK(lock, naVecMutex, this_ethread());
143   return naVec.at(ID);
144 }
145 
146 //
147 // Initialize the NetAccept for execution in its own thread.
148 // This should be done for low latency, high connection rate sockets.
149 //
150 void
init_accept_loop()151 NetAccept::init_accept_loop()
152 {
153   int i, n;
154   char thr_name[MAX_THREAD_NAME_LENGTH];
155   size_t stacksize;
156   if (do_listen(BLOCKING)) {
157     return;
158   }
159   REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
160   SET_CONTINUATION_HANDLER(this, &NetAccept::acceptLoopEvent);
161 
162   n = opt.accept_threads;
163   // Fill in accept thread from configuration if necessary.
164   if (n < 0) {
165     REC_ReadConfigInteger(n, "proxy.config.accept_threads");
166   }
167 
168   for (i = 0; i < n; i++) {
169     NetAccept *a = (i < n - 1) ? clone() : this;
170     snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ACCEPT %d:%d]", i, ats_ip_port_host_order(&server.accept_addr));
171     eventProcessor.spawn_thread(a, thr_name, stacksize);
172     Debug("iocore_net_accept_start", "Created accept thread #%d for port %d", i + 1, ats_ip_port_host_order(&server.accept_addr));
173   }
174 }
175 
176 //
177 // Initialize the NetAccept for execution in a etype thread.
178 // This should be done for low connection rate sockets.
179 // (Management, Cluster, etc.)  Also, since it adapts to the
180 // number of connections arriving, it should be reasonable to
181 // use it for high connection rates as well.
182 //
183 void
init_accept(EThread * t)184 NetAccept::init_accept(EThread *t)
185 {
186   if (!t) {
187     t = eventProcessor.assign_thread(opt.etype);
188   }
189 
190   if (!action_->continuation->mutex) {
191     action_->continuation->mutex = t->mutex;
192     action_->mutex               = t->mutex;
193   }
194 
195   if (do_listen(NON_BLOCKING)) {
196     return;
197   }
198 
199   SET_HANDLER((NetAcceptHandler)&NetAccept::acceptEvent);
200   period = -HRTIME_MSECONDS(net_accept_period);
201   t->schedule_every(this, period);
202 }
203 
204 int
accept_per_thread(int event,void * ep)205 NetAccept::accept_per_thread(int event, void *ep)
206 {
207   int listen_per_thread = 0;
208   REC_ReadConfigInteger(listen_per_thread, "proxy.config.exec_thread.listen");
209 
210   if (listen_per_thread == 1) {
211     if (do_listen(NON_BLOCKING)) {
212       Fatal("[NetAccept::accept_per_thread]:error listenting on ports");
213       return -1;
214     }
215   }
216 
217   if (accept_fn == net_accept) {
218     SET_HANDLER((NetAcceptHandler)&NetAccept::acceptFastEvent);
219   } else {
220     SET_HANDLER((NetAcceptHandler)&NetAccept::acceptEvent);
221   }
222   PollDescriptor *pd = get_PollDescriptor(this_ethread());
223   if (this->ep.start(pd, this, EVENTIO_READ) < 0) {
224     Fatal("[NetAccept::accept_per_thread]:error starting EventIO");
225     return -1;
226   }
227   return 0;
228 }
229 
230 void
init_accept_per_thread()231 NetAccept::init_accept_per_thread()
232 {
233   int i, n;
234   int listen_per_thread = 0;
235 
236   ink_assert(opt.etype >= 0);
237   REC_ReadConfigInteger(listen_per_thread, "proxy.config.exec_thread.listen");
238 
239   if (listen_per_thread == 0) {
240     if (do_listen(NON_BLOCKING)) {
241       Fatal("[NetAccept::accept_per_thread]:error listenting on ports");
242       return;
243     }
244   }
245 
246   SET_HANDLER((NetAcceptHandler)&NetAccept::accept_per_thread);
247   n = eventProcessor.thread_group[opt.etype]._count;
248 
249   for (i = 0; i < n; i++) {
250     NetAccept *a = (i < n - 1) ? clone() : this;
251     EThread *t   = eventProcessor.thread_group[opt.etype]._thread[i];
252     a->mutex     = get_NetHandler(t)->mutex;
253     t->schedule_imm(a);
254   }
255 }
256 
257 void
stop_accept()258 NetAccept::stop_accept()
259 {
260   if (!action_->cancelled) {
261     action_->cancel();
262   }
263   server.close();
264 }
265 
266 int
do_listen(bool non_blocking)267 NetAccept::do_listen(bool non_blocking)
268 {
269   int res = 0;
270 
271   if (server.fd != NO_FD) {
272     if ((res = server.setup_fd_for_listen(non_blocking, opt))) {
273       Warning("unable to listen on main accept port %d: errno = %d, %s", ntohs(server.accept_addr.port()), errno, strerror(errno));
274       goto Lretry;
275     }
276   } else {
277   Lretry:
278     if ((res = server.listen(non_blocking, opt))) {
279       Warning("unable to listen on port %d: %d %d, %s", ntohs(server.accept_addr.port()), res, errno, strerror(errno));
280     }
281   }
282 
283   return res;
284 }
285 
286 int
do_blocking_accept(EThread * t)287 NetAccept::do_blocking_accept(EThread *t)
288 {
289   int res                = 0;
290   int loop               = accept_till_done;
291   UnixNetVConnection *vc = nullptr;
292   Connection con;
293   con.sock_type = SOCK_STREAM;
294 
295   // do-while for accepting all the connections
296   // added by YTS Team, yamsat
297   do {
298     if ((res = server.accept(&con)) < 0) {
299       int seriousness = accept_error_seriousness(res);
300       if (seriousness >= 0) { // not so bad
301         if (!seriousness) {   // bad enough to warn about
302           check_transient_accept_error(res);
303         }
304         safe_delay(net_throttle_delay);
305         return 0;
306       }
307       if (!action_->cancelled) {
308         SCOPED_MUTEX_LOCK(lock, action_->mutex ? action_->mutex : t->mutex, t);
309         action_->continuation->handleEvent(EVENT_ERROR, (void *)static_cast<intptr_t>(res));
310         Warning("accept thread received fatal error: errno = %d", errno);
311       }
312       return -1;
313     }
314     // check for throttle
315     if (!opt.backdoor && check_net_throttle(ACCEPT)) {
316       check_throttle_warning(ACCEPT);
317       // close the connection as we are in throttle state
318       con.close();
319       NET_SUM_DYN_STAT(net_connections_throttled_in_stat, 1);
320       continue;
321     }
322 
323     if (TSSystemState::is_event_system_shut_down()) {
324       return -1;
325     }
326 
327     NET_SUM_GLOBAL_DYN_STAT(net_tcp_accept_stat, 1);
328 
329     // Use 'nullptr' to Bypass thread allocator
330     vc = (UnixNetVConnection *)this->getNetProcessor()->allocate_vc(nullptr);
331     if (unlikely(!vc)) {
332       return -1;
333     }
334 
335     NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
336     vc->id = net_next_connection_number();
337     vc->con.move(con);
338     vc->submit_time = Thread::get_hrtime();
339     vc->action_     = *action_;
340     vc->set_is_transparent(opt.f_inbound_transparent);
341     vc->set_is_proxy_protocol(opt.f_proxy_protocol);
342     vc->options.packet_mark = opt.packet_mark;
343     vc->options.packet_tos  = opt.packet_tos;
344     vc->options.ip_family   = opt.ip_family;
345     vc->apply_options();
346     vc->set_context(NET_VCONNECTION_IN);
347     if (opt.f_mptcp) {
348       vc->set_mptcp_state(); // Try to get the MPTCP state, and update accordingly
349     }
350     vc->accept_object = this;
351 #ifdef USE_EDGE_TRIGGER
352     // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket.
353     if (server.http_accept_filter) {
354       vc->read.triggered = 1;
355     }
356 #endif
357     SET_CONTINUATION_HANDLER(vc, (NetVConnHandler)&UnixNetVConnection::acceptEvent);
358 
359     EThread *localt = eventProcessor.assign_thread(opt.etype);
360     NetHandler *h   = get_NetHandler(localt);
361     // Assign NetHandler->mutex to NetVC
362     vc->mutex = h->mutex;
363     localt->schedule_imm(vc);
364   } while (loop);
365 
366   return 1;
367 }
368 
369 int
acceptEvent(int event,void * ep)370 NetAccept::acceptEvent(int event, void *ep)
371 {
372   (void)event;
373   Event *e = static_cast<Event *>(ep);
374   // PollDescriptor *pd = get_PollDescriptor(e->ethread);
375   Ptr<ProxyMutex> m;
376 
377   if (action_->mutex) {
378     m = action_->mutex;
379   } else {
380     m = mutex;
381   }
382 
383   MUTEX_TRY_LOCK(lock, m, e->ethread);
384   if (lock.is_locked()) {
385     if (action_->cancelled) {
386       e->cancel();
387       NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
388       delete this;
389       return EVENT_DONE;
390     }
391 
392     int res;
393     if ((res = accept_fn(this, e, false)) < 0) {
394       NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
395       /* INKqa11179 */
396       Warning("Accept on port %d failed with error no %d", ats_ip_port_host_order(&server.addr), res);
397       Warning("Traffic Server may be unable to accept more network"
398               "connections on %d",
399               ats_ip_port_host_order(&server.addr));
400       e->cancel();
401       delete this;
402       return EVENT_DONE;
403     }
404     //}
405   }
406   return EVENT_CONT;
407 }
408 
409 int
acceptFastEvent(int event,void * ep)410 NetAccept::acceptFastEvent(int event, void *ep)
411 {
412   Event *e = static_cast<Event *>(ep);
413   (void)event;
414   (void)e;
415   int bufsz, res = 0;
416   Connection con;
417   con.sock_type = SOCK_STREAM;
418 
419   UnixNetVConnection *vc = nullptr;
420   int loop               = accept_till_done;
421 
422   do {
423     socklen_t sz = sizeof(con.addr);
424     int fd       = socketManager.accept4(server.fd, &con.addr.sa, &sz, SOCK_NONBLOCK | SOCK_CLOEXEC);
425     con.fd       = fd;
426 
427     if (likely(fd >= 0)) {
428       // check for throttle
429       if (!opt.backdoor && check_net_throttle(ACCEPT)) {
430         // close the connection as we are in throttle state
431         con.close();
432         NET_SUM_DYN_STAT(net_connections_throttled_in_stat, 1);
433         continue;
434       }
435       Debug("iocore_net", "accepted a new socket: %d", fd);
436       NET_SUM_GLOBAL_DYN_STAT(net_tcp_accept_stat, 1);
437       if (opt.send_bufsize > 0) {
438         if (unlikely(socketManager.set_sndbuf_size(fd, opt.send_bufsize))) {
439           bufsz = ROUNDUP(opt.send_bufsize, 1024);
440           while (bufsz > 0) {
441             if (!socketManager.set_sndbuf_size(fd, bufsz)) {
442               break;
443             }
444             bufsz -= 1024;
445           }
446         }
447       }
448       if (opt.recv_bufsize > 0) {
449         if (unlikely(socketManager.set_rcvbuf_size(fd, opt.recv_bufsize))) {
450           bufsz = ROUNDUP(opt.recv_bufsize, 1024);
451           while (bufsz > 0) {
452             if (!socketManager.set_rcvbuf_size(fd, bufsz)) {
453               break;
454             }
455             bufsz -= 1024;
456           }
457         }
458       }
459     } else {
460       res = fd;
461     }
462     // check return value from accept()
463     if (res < 0) {
464       Debug("iocore_net", "received : %s", strerror(errno));
465       res = -errno;
466       if (res == -EAGAIN || res == -ECONNABORTED
467 #if defined(linux)
468           || res == -EPIPE
469 #endif
470       ) {
471         goto Ldone;
472       } else if (accept_error_seriousness(res) >= 0) {
473         check_transient_accept_error(res);
474         goto Ldone;
475       }
476       if (!action_->cancelled) {
477         action_->continuation->handleEvent(EVENT_ERROR, (void *)static_cast<intptr_t>(res));
478       }
479       goto Lerror;
480     }
481 
482     vc = (UnixNetVConnection *)this->getNetProcessor()->allocate_vc(e->ethread);
483     ink_release_assert(vc);
484 
485     NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
486     vc->id = net_next_connection_number();
487     vc->con.move(con);
488     vc->submit_time = Thread::get_hrtime();
489     vc->action_     = *action_;
490     vc->set_is_transparent(opt.f_inbound_transparent);
491     vc->set_is_proxy_protocol(opt.f_proxy_protocol);
492     vc->options.packet_mark = opt.packet_mark;
493     vc->options.packet_tos  = opt.packet_tos;
494     vc->options.ip_family   = opt.ip_family;
495     vc->apply_options();
496     vc->set_context(NET_VCONNECTION_IN);
497     if (opt.f_mptcp) {
498       vc->set_mptcp_state(); // Try to get the MPTCP state, and update accordingly
499     }
500 
501 #ifdef USE_EDGE_TRIGGER
502     // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket.
503     if (server.http_accept_filter) {
504       vc->read.triggered = 1;
505     }
506 #endif
507     SET_CONTINUATION_HANDLER(vc, (NetVConnHandler)&UnixNetVConnection::acceptEvent);
508 
509     EThread *t    = e->ethread;
510     NetHandler *h = get_NetHandler(t);
511     // Assign NetHandler->mutex to NetVC
512     vc->mutex = h->mutex;
513     // We must be holding the lock already to do later do_io_read's
514     SCOPED_MUTEX_LOCK(lock, vc->mutex, e->ethread);
515     vc->handleEvent(EVENT_NONE, nullptr);
516     vc = nullptr;
517   } while (loop);
518 
519 Ldone:
520   return EVENT_CONT;
521 
522 Lerror:
523   server.close();
524   e->cancel();
525   NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
526   delete this;
527   return EVENT_DONE;
528 }
529 
530 int
acceptLoopEvent(int event,Event * e)531 NetAccept::acceptLoopEvent(int event, Event *e)
532 {
533   (void)event;
534   (void)e;
535   EThread *t = this_ethread();
536 
537   while (do_blocking_accept(t) >= 0) {
538     ;
539   }
540 
541   // Don't think this ever happens ...
542   NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
543   delete this;
544   return EVENT_DONE;
545 }
546 
547 //
548 // Accept Event handler
549 //
550 //
551 
NetAccept(const NetProcessor::AcceptOptions & _opt)552 NetAccept::NetAccept(const NetProcessor::AcceptOptions &_opt) : Continuation(nullptr), opt(_opt) {}
553 
554 //
555 // Stop listening.  When the next poll takes place, an error will result.
556 // THIS ONLY WORKS WITH POLLING STYLE ACCEPTS!
557 //
558 void
cancel()559 NetAccept::cancel()
560 {
561   action_->cancel();
562   server.close();
563 }
564 
565 NetAccept *
clone() const566 NetAccept::clone() const
567 {
568   NetAccept *na;
569   na  = new NetAccept(opt);
570   *na = *this;
571   return na;
572 }
573 
574 NetProcessor *
getNetProcessor() const575 NetAccept::getNetProcessor() const
576 {
577   return &netProcessor;
578 }
579