1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "P_Net.h"
25
26 using namespace std::literals;
27
28 ink_hrtime last_throttle_warning;
29 ink_hrtime last_shedding_warning;
30 int net_connections_throttle;
31 bool net_memory_throttle = false;
32 int fds_throttle;
33 int fds_limit = 8000;
34 ink_hrtime last_transient_accept_error;
35
36 NetHandler::Config NetHandler::global_config;
37 std::bitset<std::numeric_limits<unsigned int>::digits> NetHandler::active_thread_types;
38 const std::bitset<NetHandler::CONFIG_ITEM_COUNT> NetHandler::config_value_affects_per_thread_value{0x3};
39
40 extern "C" void fd_reify(struct ev_loop *);
41
42 // INKqa10496
43 // One Inactivity cop runs on each thread once every second and
44 // loops through the list of NetEvents and calls the timeouts
45 class InactivityCop : public Continuation
46 {
47 public:
InactivityCop(Ptr<ProxyMutex> & m)48 explicit InactivityCop(Ptr<ProxyMutex> &m) : Continuation(m.get()) { SET_HANDLER(&InactivityCop::check_inactivity); }
49 int
check_inactivity(int event,Event * e)50 check_inactivity(int event, Event *e)
51 {
52 (void)event;
53 ink_hrtime now = Thread::get_hrtime();
54 NetHandler &nh = *get_NetHandler(this_ethread());
55
56 Debug("inactivity_cop_check", "Checking inactivity on Thread-ID #%d", this_ethread()->id);
57 // The rest NetEvents in cop_list which are not triggered between InactivityCop runs.
58 // Use pop() to catch any closes caused by callbacks.
59 while (NetEvent *ne = nh.cop_list.pop()) {
60 // If we cannot get the lock don't stop just keep cleaning
61 MUTEX_TRY_LOCK(lock, ne->get_mutex(), this_ethread());
62 if (!lock.is_locked()) {
63 NET_INCREMENT_DYN_STAT(inactivity_cop_lock_acquire_failure_stat);
64 continue;
65 }
66
67 if (ne->closed) {
68 nh.free_netevent(ne);
69 continue;
70 }
71
72 // set a default inactivity timeout if one is not set
73 // The event `EVENT_INACTIVITY_TIMEOUT` only be triggered if a read
74 // or write I/O operation was set by `do_io_read()` or `do_io_write()`.
75 if (ne->next_inactivity_timeout_at == 0 && nh.config.default_inactivity_timeout > 0 &&
76 (ne->read.enabled || ne->write.enabled)) {
77 Debug("inactivity_cop", "vc: %p inactivity timeout not set, setting a default of %d", ne,
78 nh.config.default_inactivity_timeout);
79 ne->set_default_inactivity_timeout(HRTIME_SECONDS(nh.config.default_inactivity_timeout));
80 NET_INCREMENT_DYN_STAT(default_inactivity_timeout_applied_stat);
81 }
82
83 if (ne->next_inactivity_timeout_at && ne->next_inactivity_timeout_at < now) {
84 if (ne->is_default_inactivity_timeout()) {
85 // track the connections that timed out due to default inactivity
86 NET_INCREMENT_DYN_STAT(default_inactivity_timeout_count_stat);
87 }
88 if (nh.keep_alive_queue.in(ne)) {
89 // only stat if the connection is in keep-alive, there can be other inactivity timeouts
90 ink_hrtime diff = (now - (ne->next_inactivity_timeout_at - ne->inactivity_timeout_in)) / HRTIME_SECOND;
91 NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff);
92 NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat);
93 }
94 Debug("inactivity_cop_verbose", "ne: %p now: %" PRId64 " timeout at: %" PRId64 " timeout in: %" PRId64, ne,
95 ink_hrtime_to_sec(now), ne->next_inactivity_timeout_at, ne->inactivity_timeout_in);
96 ne->callback(VC_EVENT_INACTIVITY_TIMEOUT, e);
97 } else if (ne->next_activity_timeout_at && ne->next_activity_timeout_at < now) {
98 Debug("inactivity_cop_verbose", "active ne: %p now: %" PRId64 " timeout at: %" PRId64 " timeout in: %" PRId64, ne,
99 ink_hrtime_to_sec(now), ne->next_activity_timeout_at, ne->active_timeout_in);
100 ne->callback(VC_EVENT_ACTIVE_TIMEOUT, e);
101 }
102 }
103 // The cop_list is empty now.
104 // Let's reload the cop_list from open_list again.
105 forl_LL(NetEvent, ne, nh.open_list)
106 {
107 if (ne->get_thread() == this_ethread()) {
108 nh.cop_list.push(ne);
109 }
110 }
111 // NetHandler will remove NetEvent from cop_list if it is triggered.
112 // As the NetHandler runs, the number of NetEvents in the cop_list is decreasing.
113 // NetHandler runs 100 times maximum between InactivityCop runs.
114 // Therefore we don't have to check all the NetEvents as much as open_list.
115
116 // Cleanup the active and keep-alive queues periodically
117 nh.manage_active_queue(nullptr, true); // close any connections over the active timeout
118 nh.manage_keep_alive_queue();
119
120 return 0;
121 }
122 };
123
PollCont(Ptr<ProxyMutex> & m,int pt)124 PollCont::PollCont(Ptr<ProxyMutex> &m, int pt)
125 : Continuation(m.get()), net_handler(nullptr), nextPollDescriptor(nullptr), poll_timeout(pt)
126 {
127 pollDescriptor = new PollDescriptor();
128 SET_HANDLER(&PollCont::pollEvent);
129 }
130
PollCont(Ptr<ProxyMutex> & m,NetHandler * nh,int pt)131 PollCont::PollCont(Ptr<ProxyMutex> &m, NetHandler *nh, int pt)
132 : Continuation(m.get()), net_handler(nh), nextPollDescriptor(nullptr), poll_timeout(pt)
133 {
134 pollDescriptor = new PollDescriptor();
135 SET_HANDLER(&PollCont::pollEvent);
136 }
137
~PollCont()138 PollCont::~PollCont()
139 {
140 delete pollDescriptor;
141 if (nextPollDescriptor != nullptr) {
142 delete nextPollDescriptor;
143 }
144 }
145
146 //
147 // PollCont continuation which does the epoll_wait
148 // and stores the resultant events in ePoll_Triggered_Events
149 //
150 int
pollEvent(int,Event *)151 PollCont::pollEvent(int, Event *)
152 {
153 this->do_poll(-1);
154 return EVENT_CONT;
155 }
156
157 void
do_poll(ink_hrtime timeout)158 PollCont::do_poll(ink_hrtime timeout)
159 {
160 if (likely(net_handler)) {
161 /* checking to see whether there are connections on the ready_queue (either read or write) that need processing [ebalsa] */
162 if (likely(!net_handler->read_ready_list.empty() || !net_handler->write_ready_list.empty() ||
163 !net_handler->read_enable_list.empty() || !net_handler->write_enable_list.empty())) {
164 NetDebug("iocore_net_poll", "rrq: %d, wrq: %d, rel: %d, wel: %d", net_handler->read_ready_list.empty(),
165 net_handler->write_ready_list.empty(), net_handler->read_enable_list.empty(),
166 net_handler->write_enable_list.empty());
167 poll_timeout = 0; // poll immediately returns -- we have triggered stuff to process right now
168 } else if (timeout >= 0) {
169 poll_timeout = ink_hrtime_to_msec(timeout);
170 } else {
171 poll_timeout = net_config_poll_timeout;
172 }
173 }
174 // wait for fd's to trigger, or don't wait if timeout is 0
175 #if TS_USE_EPOLL
176 pollDescriptor->result =
177 epoll_wait(pollDescriptor->epoll_fd, pollDescriptor->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout);
178 NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] epoll_fd: %d, timeout: %d, results: %d", pollDescriptor->epoll_fd,
179 poll_timeout, pollDescriptor->result);
180 #elif TS_USE_KQUEUE
181 struct timespec tv;
182 tv.tv_sec = poll_timeout / 1000;
183 tv.tv_nsec = 1000000 * (poll_timeout % 1000);
184 pollDescriptor->result =
185 kevent(pollDescriptor->kqueue_fd, nullptr, 0, pollDescriptor->kq_Triggered_Events, POLL_DESCRIPTOR_SIZE, &tv);
186 NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] kqueue_fd: %d, timeout: %d, results: %d", pollDescriptor->kqueue_fd,
187 poll_timeout, pollDescriptor->result);
188 #elif TS_USE_PORT
189 int retval;
190 timespec_t ptimeout;
191 ptimeout.tv_sec = poll_timeout / 1000;
192 ptimeout.tv_nsec = 1000000 * (poll_timeout % 1000);
193 unsigned nget = 1;
194 if ((retval = port_getn(pollDescriptor->port_fd, pollDescriptor->Port_Triggered_Events, POLL_DESCRIPTOR_SIZE, &nget, &ptimeout)) <
195 0) {
196 pollDescriptor->result = 0;
197 switch (errno) {
198 case EINTR:
199 case EAGAIN:
200 case ETIME:
201 if (nget > 0) {
202 pollDescriptor->result = (int)nget;
203 }
204 break;
205 default:
206 ink_assert(!"unhandled port_getn() case:");
207 break;
208 }
209 } else {
210 pollDescriptor->result = (int)nget;
211 }
212 NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] %d[%s]=port_getn(%d,%p,%d,%d,%d),results(%d)", retval,
213 retval < 0 ? strerror(errno) : "ok", pollDescriptor->port_fd, pollDescriptor->Port_Triggered_Events,
214 POLL_DESCRIPTOR_SIZE, nget, poll_timeout, pollDescriptor->result);
215 #else
216 #error port me
217 #endif
218 }
219
220 static void
net_signal_hook_callback(EThread * thread)221 net_signal_hook_callback(EThread *thread)
222 {
223 #if HAVE_EVENTFD
224 uint64_t counter;
225 ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t)));
226 #elif TS_USE_PORT
227 /* Nothing to drain or do */
228 #else
229 char dummy[1024];
230 ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024));
231 #endif
232 }
233
234 void
initialize_thread_for_net(EThread * thread)235 initialize_thread_for_net(EThread *thread)
236 {
237 NetHandler *nh = get_NetHandler(thread);
238
239 new (reinterpret_cast<ink_dummy_for_new *>(nh)) NetHandler();
240 new (reinterpret_cast<ink_dummy_for_new *>(get_PollCont(thread))) PollCont(thread->mutex, nh);
241 nh->mutex = new_ProxyMutex();
242 nh->thread = thread;
243
244 PollCont *pc = get_PollCont(thread);
245 PollDescriptor *pd = pc->pollDescriptor;
246
247 InactivityCop *inactivityCop = new InactivityCop(get_NetHandler(thread)->mutex);
248 int cop_freq = 1;
249
250 REC_ReadConfigInteger(cop_freq, "proxy.config.net.inactivity_check_frequency");
251 memcpy(&nh->config, &NetHandler::global_config, sizeof(NetHandler::global_config));
252 nh->configure_per_thread_values();
253 thread->schedule_every(inactivityCop, HRTIME_SECONDS(cop_freq));
254
255 thread->set_tail_handler(nh);
256 thread->ep = static_cast<EventIO *>(ats_malloc(sizeof(EventIO)));
257 new (thread->ep) EventIO();
258 thread->ep->type = EVENTIO_ASYNC_SIGNAL;
259 #if HAVE_EVENTFD
260 thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ);
261 #else
262 thread->ep->start(pd, thread->evpipe[0], nullptr, EVENTIO_READ);
263 #endif
264 }
265
266 // NetHandler method definitions
267
NetHandler()268 NetHandler::NetHandler() : Continuation(nullptr)
269 {
270 SET_HANDLER((NetContHandler)&NetHandler::mainNetEvent);
271 }
272
273 int
update_nethandler_config(const char * str,RecDataT,RecData data,void *)274 NetHandler::update_nethandler_config(const char *str, RecDataT, RecData data, void *)
275 {
276 uint32_t *updated_member = nullptr; // direct pointer to config member for update.
277 std::string_view name{str};
278
279 if (name == "proxy.config.net.max_connections_in"sv) {
280 updated_member = &NetHandler::global_config.max_connections_in;
281 Debug("net_queue", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int);
282 } else if (name == "proxy.config.net.max_requests_in"sv) {
283 updated_member = &NetHandler::global_config.max_requests_in;
284 Debug("net_queue", "proxy.config.net.max_requests_in updated to %" PRId64, data.rec_int);
285 } else if (name == "proxy.config.net.inactive_threshold_in"sv) {
286 updated_member = &NetHandler::global_config.inactive_threshold_in;
287 Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %" PRId64, data.rec_int);
288 } else if (name == "proxy.config.net.transaction_no_activity_timeout_in"sv) {
289 updated_member = &NetHandler::global_config.transaction_no_activity_timeout_in;
290 Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %" PRId64, data.rec_int);
291 } else if (name == "proxy.config.net.keep_alive_no_activity_timeout_in"sv) {
292 updated_member = &NetHandler::global_config.keep_alive_no_activity_timeout_in;
293 Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %" PRId64, data.rec_int);
294 } else if (name == "proxy.config.net.default_inactivity_timeout"sv) {
295 updated_member = &NetHandler::global_config.default_inactivity_timeout;
296 Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %" PRId64, data.rec_int);
297 }
298
299 if (updated_member) {
300 *updated_member = data.rec_int; // do the actual update.
301 // portable form of the update, an index converted to <void*> so it can be passed as an event cookie.
302 void *idx = reinterpret_cast<void *>(static_cast<intptr_t>(updated_member - &global_config[0]));
303 // Signal the NetHandler instances, passing the index of the updated config value.
304 for (int i = 0; i < eventProcessor.n_thread_groups; ++i) {
305 if (!active_thread_types[i]) {
306 continue;
307 }
308 for (EThread **tp = eventProcessor.thread_group[i]._thread,
309 **limit = eventProcessor.thread_group[i]._thread + eventProcessor.thread_group[i]._count;
310 tp < limit; ++tp) {
311 NetHandler *nh = get_NetHandler(*tp);
312 if (nh) {
313 nh->thread->schedule_imm(nh, TS_EVENT_MGMT_UPDATE, idx);
314 }
315 }
316 }
317 }
318
319 return REC_ERR_OKAY;
320 }
321
322 void
init_for_process()323 NetHandler::init_for_process()
324 {
325 // read configuration values and setup callbacks for when they change
326 REC_ReadConfigInt32(global_config.max_connections_in, "proxy.config.net.max_connections_in");
327 REC_ReadConfigInt32(global_config.max_requests_in, "proxy.config.net.max_requests_in");
328 REC_ReadConfigInt32(global_config.inactive_threshold_in, "proxy.config.net.inactive_threshold_in");
329 REC_ReadConfigInt32(global_config.transaction_no_activity_timeout_in, "proxy.config.net.transaction_no_activity_timeout_in");
330 REC_ReadConfigInt32(global_config.keep_alive_no_activity_timeout_in, "proxy.config.net.keep_alive_no_activity_timeout_in");
331 REC_ReadConfigInt32(global_config.default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
332
333 RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_nethandler_config, nullptr);
334 RecRegisterConfigUpdateCb("proxy.config.net.max_requests_in", update_nethandler_config, nullptr);
335 RecRegisterConfigUpdateCb("proxy.config.net.inactive_threshold_in", update_nethandler_config, nullptr);
336 RecRegisterConfigUpdateCb("proxy.config.net.transaction_no_activity_timeout_in", update_nethandler_config, nullptr);
337 RecRegisterConfigUpdateCb("proxy.config.net.keep_alive_no_activity_timeout_in", update_nethandler_config, nullptr);
338 RecRegisterConfigUpdateCb("proxy.config.net.default_inactivity_timeout", update_nethandler_config, nullptr);
339
340 Debug("net_queue", "proxy.config.net.max_connections_in updated to %d", global_config.max_connections_in);
341 Debug("net_queue", "proxy.config.net.max_requests_in updated to %d", global_config.max_requests_in);
342 Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %d", global_config.inactive_threshold_in);
343 Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %d",
344 global_config.transaction_no_activity_timeout_in);
345 Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %d",
346 global_config.keep_alive_no_activity_timeout_in);
347 Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %d", global_config.default_inactivity_timeout);
348 }
349
350 //
351 // Function used to release a NetEvent and free it.
352 //
353 void
free_netevent(NetEvent * ne)354 NetHandler::free_netevent(NetEvent *ne)
355 {
356 EThread *t = this->thread;
357
358 ink_assert(t == this_ethread());
359 ink_release_assert(ne->get_thread() == t);
360 ink_release_assert(ne->nh == this);
361
362 // Release ne from InactivityCop
363 stopCop(ne);
364 // Release ne from NetHandler
365 stopIO(ne);
366 // Clear and deallocate ne
367 ne->free(t);
368 }
369
370 //
371 // Move VC's enabled on a different thread to the ready list
372 //
373 void
process_enabled_list()374 NetHandler::process_enabled_list()
375 {
376 NetEvent *ne = nullptr;
377
378 SListM(NetEvent, NetState, read, enable_link) rq(read_enable_list.popall());
379 while ((ne = rq.pop())) {
380 ne->ep.modify(EVENTIO_READ);
381 ne->ep.refresh(EVENTIO_READ);
382 ne->read.in_enabled_list = 0;
383 if ((ne->read.enabled && ne->read.triggered) || ne->closed) {
384 read_ready_list.in_or_enqueue(ne);
385 }
386 }
387
388 SListM(NetEvent, NetState, write, enable_link) wq(write_enable_list.popall());
389 while ((ne = wq.pop())) {
390 ne->ep.modify(EVENTIO_WRITE);
391 ne->ep.refresh(EVENTIO_WRITE);
392 ne->write.in_enabled_list = 0;
393 if ((ne->write.enabled && ne->write.triggered) || ne->closed) {
394 write_ready_list.in_or_enqueue(ne);
395 }
396 }
397 }
398
399 //
400 // Walk through the ready list
401 //
402 void
process_ready_list()403 NetHandler::process_ready_list()
404 {
405 NetEvent *ne = nullptr;
406
407 #if defined(USE_EDGE_TRIGGER)
408 // NetEvent *
409 while ((ne = read_ready_list.dequeue())) {
410 // Initialize the thread-local continuation flags
411 set_cont_flags(ne->get_control_flags());
412 if (ne->closed) {
413 free_netevent(ne);
414 } else if (ne->read.enabled && ne->read.triggered) {
415 ne->net_read_io(this, this->thread);
416 } else if (!ne->read.enabled) {
417 read_ready_list.remove(ne);
418 #if defined(solaris)
419 if (ne->read.triggered && ne->write.enabled) {
420 ne->ep.modify(-EVENTIO_READ);
421 ne->ep.refresh(EVENTIO_WRITE);
422 ne->writeReschedule(this);
423 }
424 #endif
425 }
426 }
427 while ((ne = write_ready_list.dequeue())) {
428 set_cont_flags(ne->get_control_flags());
429 if (ne->closed) {
430 free_netevent(ne);
431 } else if (ne->write.enabled && ne->write.triggered) {
432 ne->net_write_io(this, this->thread);
433 } else if (!ne->write.enabled) {
434 write_ready_list.remove(ne);
435 #if defined(solaris)
436 if (ne->write.triggered && ne->read.enabled) {
437 ne->ep.modify(-EVENTIO_WRITE);
438 ne->ep.refresh(EVENTIO_READ);
439 ne->readReschedule(this);
440 }
441 #endif
442 }
443 }
444 #else /* !USE_EDGE_TRIGGER */
445 while ((ne = read_ready_list.dequeue())) {
446 set_cont_flags(ne->get_control_flags());
447 if (ne->closed)
448 free_netevent(ne);
449 else if (ne->read.enabled && ne->read.triggered)
450 ne->net_read_io(this, this->thread);
451 else if (!ne->read.enabled)
452 ne->ep.modify(-EVENTIO_READ);
453 }
454 while ((ne = write_ready_list.dequeue())) {
455 set_cont_flags(ne->get_control_flags());
456 if (ne->closed)
457 free_netevent(ne);
458 else if (ne->write.enabled && ne->write.triggered)
459 write_to_net(this, ne, this->thread);
460 else if (!ne->write.enabled)
461 ne->ep.modify(-EVENTIO_WRITE);
462 }
463 #endif /* !USE_EDGE_TRIGGER */
464 }
465
466 //
467 // The main event for NetHandler
468 int
mainNetEvent(int event,Event * e)469 NetHandler::mainNetEvent(int event, Event *e)
470 {
471 if (TS_EVENT_MGMT_UPDATE == event) {
472 intptr_t idx = reinterpret_cast<intptr_t>(e->cookie);
473 // Copy to the same offset in the instance struct.
474 config[idx] = global_config[idx];
475 if (config_value_affects_per_thread_value[idx]) {
476 this->configure_per_thread_values();
477 }
478 return EVENT_CONT;
479 } else {
480 ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
481 return this->waitForActivity(-1);
482 }
483 }
484
485 int
waitForActivity(ink_hrtime timeout)486 NetHandler::waitForActivity(ink_hrtime timeout)
487 {
488 EventIO *epd = nullptr;
489
490 NET_INCREMENT_DYN_STAT(net_handler_run_stat);
491 SCOPED_MUTEX_LOCK(lock, mutex, this->thread);
492
493 process_enabled_list();
494
495 // Polling event by PollCont
496 PollCont *p = get_PollCont(this->thread);
497 p->do_poll(timeout);
498
499 // Get & Process polling result
500 PollDescriptor *pd = get_PollDescriptor(this->thread);
501 NetEvent *ne = nullptr;
502 for (int x = 0; x < pd->result; x++) {
503 epd = static_cast<EventIO *> get_ev_data(pd, x);
504 if (epd->type == EVENTIO_READWRITE_VC) {
505 ne = epd->data.ne;
506 // Remove triggered NetEvent from cop_list because it won't be timeout before next InactivityCop runs.
507 if (cop_list.in(ne)) {
508 cop_list.remove(ne);
509 }
510 int flags = get_ev_events(pd, x);
511 if (flags & (EVENTIO_ERROR)) {
512 ne->set_error_from_socket();
513 }
514 if (flags & (EVENTIO_READ)) {
515 ne->read.triggered = 1;
516 if (!read_ready_list.in(ne)) {
517 read_ready_list.enqueue(ne);
518 }
519 }
520 if (flags & (EVENTIO_WRITE)) {
521 ne->write.triggered = 1;
522 if (!write_ready_list.in(ne)) {
523 write_ready_list.enqueue(ne);
524 }
525 } else if (!(flags & (EVENTIO_READ))) {
526 Debug("iocore_net_main", "Unhandled epoll event: 0x%04x", flags);
527 // In practice we sometimes see EPOLLERR and EPOLLHUP through there
528 // Anything else would be surprising
529 ink_assert((flags & ~(EVENTIO_ERROR)) == 0);
530 ne->write.triggered = 1;
531 if (!write_ready_list.in(ne)) {
532 write_ready_list.enqueue(ne);
533 }
534 }
535 } else if (epd->type == EVENTIO_DNS_CONNECTION) {
536 if (epd->data.dnscon != nullptr) {
537 epd->data.dnscon->trigger(); // Make sure the DNSHandler for this con knows we triggered
538 #if defined(USE_EDGE_TRIGGER)
539 epd->refresh(EVENTIO_READ);
540 #endif
541 }
542 } else if (epd->type == EVENTIO_ASYNC_SIGNAL) {
543 net_signal_hook_callback(this->thread);
544 } else if (epd->type == EVENTIO_NETACCEPT) {
545 this->thread->schedule_imm(epd->data.c);
546 }
547 ev_next_event(pd, x);
548 }
549
550 pd->result = 0;
551
552 process_ready_list();
553
554 return EVENT_CONT;
555 }
556
557 void
signalActivity()558 NetHandler::signalActivity()
559 {
560 #if HAVE_EVENTFD
561 uint64_t counter = 1;
562 ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
563 #elif TS_USE_PORT
564 PollDescriptor *pd = get_PollDescriptor(thread);
565 ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
566 #else
567 char dummy = 1;
568 ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
569 #endif
570 }
571
572 bool
manage_active_queue(NetEvent * enabling_ne,bool ignore_queue_size=false)573 NetHandler::manage_active_queue(NetEvent *enabling_ne, bool ignore_queue_size = false)
574 {
575 const int total_connections_in = active_queue_size + keep_alive_queue_size;
576 Debug("v_net_queue",
577 "max_connections_per_thread_in: %d max_requests_per_thread_in: %d total_connections_in: %d "
578 "active_queue_size: %d keep_alive_queue_size: %d",
579 max_connections_per_thread_in, max_requests_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size);
580
581 if (!max_requests_per_thread_in) {
582 // active queue has no max
583 return true;
584 }
585
586 if (ignore_queue_size == false && max_requests_per_thread_in > active_queue_size) {
587 return true;
588 }
589
590 ink_hrtime now = Thread::get_hrtime();
591
592 // loop over the non-active connections and try to close them
593 NetEvent *ne = active_queue.head;
594 NetEvent *ne_next = nullptr;
595 int closed = 0;
596 int handle_event = 0;
597 int total_idle_time = 0;
598 int total_idle_count = 0;
599 for (; ne != nullptr; ne = ne_next) {
600 ne_next = ne->active_queue_link.next;
601 // It seems dangerous closing the current ne at this point
602 // Let the activity_cop deal with it
603 if (ne == enabling_ne) {
604 continue;
605 }
606 if ((ne->next_inactivity_timeout_at && ne->next_inactivity_timeout_at <= now) ||
607 (ne->next_activity_timeout_at && ne->next_activity_timeout_at <= now)) {
608 _close_ne(ne, now, handle_event, closed, total_idle_time, total_idle_count);
609 }
610 if (ignore_queue_size == false && max_requests_per_thread_in > active_queue_size) {
611 return true;
612 }
613 }
614
615 if (max_requests_per_thread_in > active_queue_size) {
616 return true;
617 }
618
619 return false; // failed to make room in the queue, all connections are active
620 }
621
622 void
configure_per_thread_values()623 NetHandler::configure_per_thread_values()
624 {
625 // figure out the number of threads and calculate the number of connections per thread
626 int threads = eventProcessor.thread_group[ET_NET]._count;
627 max_connections_per_thread_in = config.max_connections_in / threads;
628 max_requests_per_thread_in = config.max_requests_in / threads;
629 Debug("net_queue", "max_connections_per_thread_in updated to %d threads: %d", max_connections_per_thread_in, threads);
630 Debug("net_queue", "max_requests_per_thread_in updated to %d threads: %d", max_requests_per_thread_in, threads);
631 }
632
633 void
manage_keep_alive_queue()634 NetHandler::manage_keep_alive_queue()
635 {
636 uint32_t total_connections_in = active_queue_size + keep_alive_queue_size;
637 ink_hrtime now = Thread::get_hrtime();
638
639 Debug("v_net_queue", "max_connections_per_thread_in: %d total_connections_in: %d active_queue_size: %d keep_alive_queue_size: %d",
640 max_connections_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size);
641
642 if (!max_connections_per_thread_in || total_connections_in <= max_connections_per_thread_in) {
643 return;
644 }
645
646 // loop over the non-active connections and try to close them
647 NetEvent *ne_next = nullptr;
648 int closed = 0;
649 int handle_event = 0;
650 int total_idle_time = 0;
651 int total_idle_count = 0;
652 for (NetEvent *ne = keep_alive_queue.head; ne != nullptr; ne = ne_next) {
653 ne_next = ne->keep_alive_queue_link.next;
654 _close_ne(ne, now, handle_event, closed, total_idle_time, total_idle_count);
655
656 total_connections_in = active_queue_size + keep_alive_queue_size;
657 if (total_connections_in <= max_connections_per_thread_in) {
658 break;
659 }
660 }
661
662 if (total_idle_count > 0) {
663 Debug("net_queue", "max cons: %d active: %d idle: %d already closed: %d, close event: %d mean idle: %d",
664 max_connections_per_thread_in, total_connections_in, keep_alive_queue_size, closed, handle_event,
665 total_idle_time / total_idle_count);
666 }
667 }
668
669 void
_close_ne(NetEvent * ne,ink_hrtime now,int & handle_event,int & closed,int & total_idle_time,int & total_idle_count)670 NetHandler::_close_ne(NetEvent *ne, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count)
671 {
672 if (ne->get_thread() != this_ethread()) {
673 return;
674 }
675 MUTEX_TRY_LOCK(lock, ne->get_mutex(), this_ethread());
676 if (!lock.is_locked()) {
677 return;
678 }
679 ink_hrtime diff = (now - (ne->next_inactivity_timeout_at - ne->inactivity_timeout_in)) / HRTIME_SECOND;
680 if (diff > 0) {
681 total_idle_time += diff;
682 ++total_idle_count;
683 NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff);
684 NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat);
685 }
686 Debug("net_queue", "closing connection NetEvent=%p idle: %u now: %" PRId64 " at: %" PRId64 " in: %" PRId64 " diff: %" PRId64, ne,
687 keep_alive_queue_size, ink_hrtime_to_sec(now), ink_hrtime_to_sec(ne->next_inactivity_timeout_at),
688 ink_hrtime_to_sec(ne->inactivity_timeout_in), diff);
689 if (ne->closed) {
690 free_netevent(ne);
691 ++closed;
692 } else {
693 ne->next_inactivity_timeout_at = now;
694 // create a dummy event
695 Event event;
696 event.ethread = this_ethread();
697 if (ne->inactivity_timeout_in && ne->next_inactivity_timeout_at <= now) {
698 if (ne->callback(VC_EVENT_INACTIVITY_TIMEOUT, &event) == EVENT_DONE) {
699 ++handle_event;
700 }
701 } else if (ne->active_timeout_in && ne->next_activity_timeout_at <= now) {
702 if (ne->callback(VC_EVENT_ACTIVE_TIMEOUT, &event) == EVENT_DONE) {
703 ++handle_event;
704 }
705 }
706 }
707 }
708
709 void
add_to_keep_alive_queue(NetEvent * ne)710 NetHandler::add_to_keep_alive_queue(NetEvent *ne)
711 {
712 Debug("net_queue", "NetEvent: %p", ne);
713 ink_assert(mutex->thread_holding == this_ethread());
714
715 if (keep_alive_queue.in(ne)) {
716 // already in the keep-alive queue, move the head
717 keep_alive_queue.remove(ne);
718 } else {
719 // in the active queue or no queue, new to this queue
720 remove_from_active_queue(ne);
721 ++keep_alive_queue_size;
722 }
723 keep_alive_queue.enqueue(ne);
724
725 // if keep-alive queue is over size then close connections
726 manage_keep_alive_queue();
727 }
728
729 void
remove_from_keep_alive_queue(NetEvent * ne)730 NetHandler::remove_from_keep_alive_queue(NetEvent *ne)
731 {
732 Debug("net_queue", "NetEvent: %p", ne);
733 ink_assert(mutex->thread_holding == this_ethread());
734
735 if (keep_alive_queue.in(ne)) {
736 keep_alive_queue.remove(ne);
737 --keep_alive_queue_size;
738 }
739 }
740
741 bool
add_to_active_queue(NetEvent * ne)742 NetHandler::add_to_active_queue(NetEvent *ne)
743 {
744 Debug("net_queue", "NetEvent: %p", ne);
745 Debug("net_queue", "max_connections_per_thread_in: %d active_queue_size: %d keep_alive_queue_size: %d",
746 max_connections_per_thread_in, active_queue_size, keep_alive_queue_size);
747 ink_assert(mutex->thread_holding == this_ethread());
748
749 bool active_queue_full = false;
750
751 // if active queue is over size then close inactive connections
752 if (manage_active_queue(ne) == false) {
753 active_queue_full = true;
754 }
755
756 if (active_queue.in(ne)) {
757 // already in the active queue, move the head
758 active_queue.remove(ne);
759 } else {
760 if (active_queue_full) {
761 // there is no room left in the queue
762 NET_SUM_DYN_STAT(net_requests_max_throttled_in_stat, 1);
763 return false;
764 }
765 // in the keep-alive queue or no queue, new to this queue
766 remove_from_keep_alive_queue(ne);
767 ++active_queue_size;
768 }
769 active_queue.enqueue(ne);
770
771 return true;
772 }
773
774 void
remove_from_active_queue(NetEvent * ne)775 NetHandler::remove_from_active_queue(NetEvent *ne)
776 {
777 Debug("net_queue", "NetEvent: %p", ne);
778 ink_assert(mutex->thread_holding == this_ethread());
779
780 if (active_queue.in(ne)) {
781 active_queue.remove(ne);
782 --active_queue_size;
783 }
784 }
785