1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include <tscore/TSSystemState.h>
25
26 #include "P_Net.h"
27
28 #ifdef ROUNDUP
29 #undef ROUNDUP
30 #endif
31 #define ROUNDUP(x, y) ((((x) + ((y)-1)) / (y)) * (y))
32
33 using NetAcceptHandler = int (NetAccept::*)(int, void *);
34 int accept_till_done = 1;
35
36 // we need to protect naVec since it might be accessed
37 // in different threads at the same time
38 Ptr<ProxyMutex> naVecMutex;
39 std::vector<NetAccept *> naVec;
40 static void
safe_delay(int msec)41 safe_delay(int msec)
42 {
43 socketManager.poll(nullptr, 0, msec);
44 }
45
46 //
47 // General case network connection accept code
48 //
49 int
net_accept(NetAccept * na,void * ep,bool blockable)50 net_accept(NetAccept *na, void *ep, bool blockable)
51 {
52 Event *e = static_cast<Event *>(ep);
53 int res = 0;
54 int count = 0;
55 int loop = accept_till_done;
56 UnixNetVConnection *vc = nullptr;
57 Connection con;
58
59 if (!blockable) {
60 if (!MUTEX_TAKE_TRY_LOCK(na->action_->mutex, e->ethread)) {
61 return 0;
62 }
63 }
64
65 // do-while for accepting all the connections
66 // added by YTS Team, yamsat
67 do {
68 if ((res = na->server.accept(&con)) < 0) {
69 if (res == -EAGAIN || res == -ECONNABORTED || res == -EPIPE) {
70 goto Ldone;
71 }
72 if (na->server.fd != NO_FD && !na->action_->cancelled) {
73 if (!blockable) {
74 na->action_->continuation->handleEvent(EVENT_ERROR, (void *)static_cast<intptr_t>(res));
75 } else {
76 SCOPED_MUTEX_LOCK(lock, na->action_->mutex, e->ethread);
77 na->action_->continuation->handleEvent(EVENT_ERROR, (void *)static_cast<intptr_t>(res));
78 }
79 }
80 count = res;
81 goto Ldone;
82 }
83 NET_SUM_GLOBAL_DYN_STAT(net_tcp_accept_stat, 1);
84
85 vc = static_cast<UnixNetVConnection *>(na->getNetProcessor()->allocate_vc(e->ethread));
86 if (!vc) {
87 goto Ldone; // note: @a con will clean up the socket when it goes out of scope.
88 }
89
90 ++count;
91 NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
92 vc->id = net_next_connection_number();
93 vc->con.move(con);
94 vc->submit_time = Thread::get_hrtime();
95 vc->action_ = *na->action_;
96 vc->set_is_transparent(na->opt.f_inbound_transparent);
97 vc->set_is_proxy_protocol(na->opt.f_proxy_protocol);
98 vc->set_context(NET_VCONNECTION_IN);
99 if (na->opt.f_mptcp) {
100 vc->set_mptcp_state(); // Try to get the MPTCP state, and update accordingly
101 }
102 #ifdef USE_EDGE_TRIGGER
103 // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket.
104 if (na->server.http_accept_filter) {
105 vc->read.triggered = 1;
106 }
107 #endif
108 SET_CONTINUATION_HANDLER(vc, (NetVConnHandler)&UnixNetVConnection::acceptEvent);
109
110 EThread *t;
111 NetHandler *h;
112 if (e->ethread->is_event_type(na->opt.etype)) {
113 t = e->ethread;
114 h = get_NetHandler(t);
115 // Assign NetHandler->mutex to NetVC
116 vc->mutex = h->mutex;
117 MUTEX_TRY_LOCK(lock, h->mutex, t);
118 if (!lock.is_locked()) {
119 t->schedule_in(vc, HRTIME_MSECONDS(net_retry_delay));
120 } else {
121 vc->handleEvent(EVENT_NONE, e);
122 }
123 } else {
124 t = eventProcessor.assign_thread(na->opt.etype);
125 h = get_NetHandler(t);
126 // Assign NetHandler->mutex to NetVC
127 vc->mutex = h->mutex;
128 t->schedule_imm(vc);
129 }
130 } while (loop);
131
132 Ldone:
133 if (!blockable) {
134 MUTEX_UNTAKE_LOCK(na->action_->mutex, e->ethread);
135 }
136 return count;
137 }
138
139 NetAccept *
getNetAccept(int ID)140 getNetAccept(int ID)
141 {
142 SCOPED_MUTEX_LOCK(lock, naVecMutex, this_ethread());
143 return naVec.at(ID);
144 }
145
146 //
147 // Initialize the NetAccept for execution in its own thread.
148 // This should be done for low latency, high connection rate sockets.
149 //
150 void
init_accept_loop()151 NetAccept::init_accept_loop()
152 {
153 int i, n;
154 char thr_name[MAX_THREAD_NAME_LENGTH];
155 size_t stacksize;
156 if (do_listen(BLOCKING)) {
157 return;
158 }
159 REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
160 SET_CONTINUATION_HANDLER(this, &NetAccept::acceptLoopEvent);
161
162 n = opt.accept_threads;
163 // Fill in accept thread from configuration if necessary.
164 if (n < 0) {
165 REC_ReadConfigInteger(n, "proxy.config.accept_threads");
166 }
167
168 for (i = 0; i < n; i++) {
169 NetAccept *a = (i < n - 1) ? clone() : this;
170 snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ACCEPT %d:%d]", i, ats_ip_port_host_order(&server.accept_addr));
171 eventProcessor.spawn_thread(a, thr_name, stacksize);
172 Debug("iocore_net_accept_start", "Created accept thread #%d for port %d", i + 1, ats_ip_port_host_order(&server.accept_addr));
173 }
174 }
175
176 //
177 // Initialize the NetAccept for execution in a etype thread.
178 // This should be done for low connection rate sockets.
179 // (Management, Cluster, etc.) Also, since it adapts to the
180 // number of connections arriving, it should be reasonable to
181 // use it for high connection rates as well.
182 //
183 void
init_accept(EThread * t)184 NetAccept::init_accept(EThread *t)
185 {
186 if (!t) {
187 t = eventProcessor.assign_thread(opt.etype);
188 }
189
190 if (!action_->continuation->mutex) {
191 action_->continuation->mutex = t->mutex;
192 action_->mutex = t->mutex;
193 }
194
195 if (do_listen(NON_BLOCKING)) {
196 return;
197 }
198
199 SET_HANDLER((NetAcceptHandler)&NetAccept::acceptEvent);
200 period = -HRTIME_MSECONDS(net_accept_period);
201 t->schedule_every(this, period);
202 }
203
204 int
accept_per_thread(int event,void * ep)205 NetAccept::accept_per_thread(int event, void *ep)
206 {
207 int listen_per_thread = 0;
208 REC_ReadConfigInteger(listen_per_thread, "proxy.config.exec_thread.listen");
209
210 if (listen_per_thread == 1) {
211 if (do_listen(NON_BLOCKING)) {
212 Fatal("[NetAccept::accept_per_thread]:error listenting on ports");
213 return -1;
214 }
215 }
216
217 if (accept_fn == net_accept) {
218 SET_HANDLER((NetAcceptHandler)&NetAccept::acceptFastEvent);
219 } else {
220 SET_HANDLER((NetAcceptHandler)&NetAccept::acceptEvent);
221 }
222 PollDescriptor *pd = get_PollDescriptor(this_ethread());
223 if (this->ep.start(pd, this, EVENTIO_READ) < 0) {
224 Fatal("[NetAccept::accept_per_thread]:error starting EventIO");
225 return -1;
226 }
227 return 0;
228 }
229
230 void
init_accept_per_thread()231 NetAccept::init_accept_per_thread()
232 {
233 int i, n;
234 int listen_per_thread = 0;
235
236 ink_assert(opt.etype >= 0);
237 REC_ReadConfigInteger(listen_per_thread, "proxy.config.exec_thread.listen");
238
239 if (listen_per_thread == 0) {
240 if (do_listen(NON_BLOCKING)) {
241 Fatal("[NetAccept::accept_per_thread]:error listenting on ports");
242 return;
243 }
244 }
245
246 SET_HANDLER((NetAcceptHandler)&NetAccept::accept_per_thread);
247 n = eventProcessor.thread_group[opt.etype]._count;
248
249 for (i = 0; i < n; i++) {
250 NetAccept *a = (i < n - 1) ? clone() : this;
251 EThread *t = eventProcessor.thread_group[opt.etype]._thread[i];
252 a->mutex = get_NetHandler(t)->mutex;
253 t->schedule_imm(a);
254 }
255 }
256
257 void
stop_accept()258 NetAccept::stop_accept()
259 {
260 if (!action_->cancelled) {
261 action_->cancel();
262 }
263 server.close();
264 }
265
266 int
do_listen(bool non_blocking)267 NetAccept::do_listen(bool non_blocking)
268 {
269 int res = 0;
270
271 if (server.fd != NO_FD) {
272 if ((res = server.setup_fd_for_listen(non_blocking, opt))) {
273 Warning("unable to listen on main accept port %d: errno = %d, %s", ntohs(server.accept_addr.port()), errno, strerror(errno));
274 goto Lretry;
275 }
276 } else {
277 Lretry:
278 if ((res = server.listen(non_blocking, opt))) {
279 Warning("unable to listen on port %d: %d %d, %s", ntohs(server.accept_addr.port()), res, errno, strerror(errno));
280 }
281 }
282
283 return res;
284 }
285
286 int
do_blocking_accept(EThread * t)287 NetAccept::do_blocking_accept(EThread *t)
288 {
289 int res = 0;
290 int loop = accept_till_done;
291 UnixNetVConnection *vc = nullptr;
292 Connection con;
293 con.sock_type = SOCK_STREAM;
294
295 // do-while for accepting all the connections
296 // added by YTS Team, yamsat
297 do {
298 if ((res = server.accept(&con)) < 0) {
299 int seriousness = accept_error_seriousness(res);
300 if (seriousness >= 0) { // not so bad
301 if (!seriousness) { // bad enough to warn about
302 check_transient_accept_error(res);
303 }
304 safe_delay(net_throttle_delay);
305 return 0;
306 }
307 if (!action_->cancelled) {
308 SCOPED_MUTEX_LOCK(lock, action_->mutex ? action_->mutex : t->mutex, t);
309 action_->continuation->handleEvent(EVENT_ERROR, (void *)static_cast<intptr_t>(res));
310 Warning("accept thread received fatal error: errno = %d", errno);
311 }
312 return -1;
313 }
314 // check for throttle
315 if (!opt.backdoor && check_net_throttle(ACCEPT)) {
316 check_throttle_warning(ACCEPT);
317 // close the connection as we are in throttle state
318 con.close();
319 NET_SUM_DYN_STAT(net_connections_throttled_in_stat, 1);
320 continue;
321 }
322
323 if (TSSystemState::is_event_system_shut_down()) {
324 return -1;
325 }
326
327 NET_SUM_GLOBAL_DYN_STAT(net_tcp_accept_stat, 1);
328
329 // Use 'nullptr' to Bypass thread allocator
330 vc = (UnixNetVConnection *)this->getNetProcessor()->allocate_vc(nullptr);
331 if (unlikely(!vc)) {
332 return -1;
333 }
334
335 NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
336 vc->id = net_next_connection_number();
337 vc->con.move(con);
338 vc->submit_time = Thread::get_hrtime();
339 vc->action_ = *action_;
340 vc->set_is_transparent(opt.f_inbound_transparent);
341 vc->set_is_proxy_protocol(opt.f_proxy_protocol);
342 vc->options.packet_mark = opt.packet_mark;
343 vc->options.packet_tos = opt.packet_tos;
344 vc->options.ip_family = opt.ip_family;
345 vc->apply_options();
346 vc->set_context(NET_VCONNECTION_IN);
347 if (opt.f_mptcp) {
348 vc->set_mptcp_state(); // Try to get the MPTCP state, and update accordingly
349 }
350 vc->accept_object = this;
351 #ifdef USE_EDGE_TRIGGER
352 // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket.
353 if (server.http_accept_filter) {
354 vc->read.triggered = 1;
355 }
356 #endif
357 SET_CONTINUATION_HANDLER(vc, (NetVConnHandler)&UnixNetVConnection::acceptEvent);
358
359 EThread *localt = eventProcessor.assign_thread(opt.etype);
360 NetHandler *h = get_NetHandler(localt);
361 // Assign NetHandler->mutex to NetVC
362 vc->mutex = h->mutex;
363 localt->schedule_imm(vc);
364 } while (loop);
365
366 return 1;
367 }
368
369 int
acceptEvent(int event,void * ep)370 NetAccept::acceptEvent(int event, void *ep)
371 {
372 (void)event;
373 Event *e = static_cast<Event *>(ep);
374 // PollDescriptor *pd = get_PollDescriptor(e->ethread);
375 Ptr<ProxyMutex> m;
376
377 if (action_->mutex) {
378 m = action_->mutex;
379 } else {
380 m = mutex;
381 }
382
383 MUTEX_TRY_LOCK(lock, m, e->ethread);
384 if (lock.is_locked()) {
385 if (action_->cancelled) {
386 e->cancel();
387 NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
388 delete this;
389 return EVENT_DONE;
390 }
391
392 int res;
393 if ((res = accept_fn(this, e, false)) < 0) {
394 NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
395 /* INKqa11179 */
396 Warning("Accept on port %d failed with error no %d", ats_ip_port_host_order(&server.addr), res);
397 Warning("Traffic Server may be unable to accept more network"
398 "connections on %d",
399 ats_ip_port_host_order(&server.addr));
400 e->cancel();
401 delete this;
402 return EVENT_DONE;
403 }
404 //}
405 }
406 return EVENT_CONT;
407 }
408
409 int
acceptFastEvent(int event,void * ep)410 NetAccept::acceptFastEvent(int event, void *ep)
411 {
412 Event *e = static_cast<Event *>(ep);
413 (void)event;
414 (void)e;
415 int bufsz, res = 0;
416 Connection con;
417 con.sock_type = SOCK_STREAM;
418
419 UnixNetVConnection *vc = nullptr;
420 int loop = accept_till_done;
421
422 do {
423 socklen_t sz = sizeof(con.addr);
424 int fd = socketManager.accept4(server.fd, &con.addr.sa, &sz, SOCK_NONBLOCK | SOCK_CLOEXEC);
425 con.fd = fd;
426
427 if (likely(fd >= 0)) {
428 // check for throttle
429 if (!opt.backdoor && check_net_throttle(ACCEPT)) {
430 // close the connection as we are in throttle state
431 con.close();
432 NET_SUM_DYN_STAT(net_connections_throttled_in_stat, 1);
433 continue;
434 }
435 Debug("iocore_net", "accepted a new socket: %d", fd);
436 NET_SUM_GLOBAL_DYN_STAT(net_tcp_accept_stat, 1);
437 if (opt.send_bufsize > 0) {
438 if (unlikely(socketManager.set_sndbuf_size(fd, opt.send_bufsize))) {
439 bufsz = ROUNDUP(opt.send_bufsize, 1024);
440 while (bufsz > 0) {
441 if (!socketManager.set_sndbuf_size(fd, bufsz)) {
442 break;
443 }
444 bufsz -= 1024;
445 }
446 }
447 }
448 if (opt.recv_bufsize > 0) {
449 if (unlikely(socketManager.set_rcvbuf_size(fd, opt.recv_bufsize))) {
450 bufsz = ROUNDUP(opt.recv_bufsize, 1024);
451 while (bufsz > 0) {
452 if (!socketManager.set_rcvbuf_size(fd, bufsz)) {
453 break;
454 }
455 bufsz -= 1024;
456 }
457 }
458 }
459 } else {
460 res = fd;
461 }
462 // check return value from accept()
463 if (res < 0) {
464 Debug("iocore_net", "received : %s", strerror(errno));
465 res = -errno;
466 if (res == -EAGAIN || res == -ECONNABORTED
467 #if defined(linux)
468 || res == -EPIPE
469 #endif
470 ) {
471 goto Ldone;
472 } else if (accept_error_seriousness(res) >= 0) {
473 check_transient_accept_error(res);
474 goto Ldone;
475 }
476 if (!action_->cancelled) {
477 action_->continuation->handleEvent(EVENT_ERROR, (void *)static_cast<intptr_t>(res));
478 }
479 goto Lerror;
480 }
481
482 vc = (UnixNetVConnection *)this->getNetProcessor()->allocate_vc(e->ethread);
483 ink_release_assert(vc);
484
485 NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
486 vc->id = net_next_connection_number();
487 vc->con.move(con);
488 vc->submit_time = Thread::get_hrtime();
489 vc->action_ = *action_;
490 vc->set_is_transparent(opt.f_inbound_transparent);
491 vc->set_is_proxy_protocol(opt.f_proxy_protocol);
492 vc->options.packet_mark = opt.packet_mark;
493 vc->options.packet_tos = opt.packet_tos;
494 vc->options.ip_family = opt.ip_family;
495 vc->apply_options();
496 vc->set_context(NET_VCONNECTION_IN);
497 if (opt.f_mptcp) {
498 vc->set_mptcp_state(); // Try to get the MPTCP state, and update accordingly
499 }
500
501 #ifdef USE_EDGE_TRIGGER
502 // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket.
503 if (server.http_accept_filter) {
504 vc->read.triggered = 1;
505 }
506 #endif
507 SET_CONTINUATION_HANDLER(vc, (NetVConnHandler)&UnixNetVConnection::acceptEvent);
508
509 EThread *t = e->ethread;
510 NetHandler *h = get_NetHandler(t);
511 // Assign NetHandler->mutex to NetVC
512 vc->mutex = h->mutex;
513 // We must be holding the lock already to do later do_io_read's
514 SCOPED_MUTEX_LOCK(lock, vc->mutex, e->ethread);
515 vc->handleEvent(EVENT_NONE, nullptr);
516 vc = nullptr;
517 } while (loop);
518
519 Ldone:
520 return EVENT_CONT;
521
522 Lerror:
523 server.close();
524 e->cancel();
525 NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
526 delete this;
527 return EVENT_DONE;
528 }
529
530 int
acceptLoopEvent(int event,Event * e)531 NetAccept::acceptLoopEvent(int event, Event *e)
532 {
533 (void)event;
534 (void)e;
535 EThread *t = this_ethread();
536
537 while (do_blocking_accept(t) >= 0) {
538 ;
539 }
540
541 // Don't think this ever happens ...
542 NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
543 delete this;
544 return EVENT_DONE;
545 }
546
547 //
548 // Accept Event handler
549 //
550 //
551
NetAccept(const NetProcessor::AcceptOptions & _opt)552 NetAccept::NetAccept(const NetProcessor::AcceptOptions &_opt) : Continuation(nullptr), opt(_opt) {}
553
554 //
555 // Stop listening. When the next poll takes place, an error will result.
556 // THIS ONLY WORKS WITH POLLING STYLE ACCEPTS!
557 //
558 void
cancel()559 NetAccept::cancel()
560 {
561 action_->cancel();
562 server.close();
563 }
564
565 NetAccept *
clone() const566 NetAccept::clone() const
567 {
568 NetAccept *na;
569 na = new NetAccept(opt);
570 *na = *this;
571 return na;
572 }
573
574 NetProcessor *
getNetProcessor() const575 NetAccept::getNetProcessor() const
576 {
577 return &netProcessor;
578 }
579