1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "P_Net.h"
25 
26 using namespace std::literals;
27 
28 ink_hrtime last_throttle_warning;
29 ink_hrtime last_shedding_warning;
30 int net_connections_throttle;
31 bool net_memory_throttle = false;
32 int fds_throttle;
33 int fds_limit = 8000;
34 ink_hrtime last_transient_accept_error;
35 
36 NetHandler::Config NetHandler::global_config;
37 std::bitset<std::numeric_limits<unsigned int>::digits> NetHandler::active_thread_types;
38 const std::bitset<NetHandler::CONFIG_ITEM_COUNT> NetHandler::config_value_affects_per_thread_value{0x3};
39 
40 extern "C" void fd_reify(struct ev_loop *);
41 
42 // INKqa10496
43 // One Inactivity cop runs on each thread once every second and
44 // loops through the list of NetEvents and calls the timeouts
45 class InactivityCop : public Continuation
46 {
47 public:
InactivityCop(Ptr<ProxyMutex> & m)48   explicit InactivityCop(Ptr<ProxyMutex> &m) : Continuation(m.get()) { SET_HANDLER(&InactivityCop::check_inactivity); }
49   int
check_inactivity(int event,Event * e)50   check_inactivity(int event, Event *e)
51   {
52     (void)event;
53     ink_hrtime now = Thread::get_hrtime();
54     NetHandler &nh = *get_NetHandler(this_ethread());
55 
56     Debug("inactivity_cop_check", "Checking inactivity on Thread-ID #%d", this_ethread()->id);
57     // The rest NetEvents in cop_list which are not triggered between InactivityCop runs.
58     // Use pop() to catch any closes caused by callbacks.
59     while (NetEvent *ne = nh.cop_list.pop()) {
60       // If we cannot get the lock don't stop just keep cleaning
61       MUTEX_TRY_LOCK(lock, ne->get_mutex(), this_ethread());
62       if (!lock.is_locked()) {
63         NET_INCREMENT_DYN_STAT(inactivity_cop_lock_acquire_failure_stat);
64         continue;
65       }
66 
67       if (ne->closed) {
68         nh.free_netevent(ne);
69         continue;
70       }
71 
72       // set a default inactivity timeout if one is not set
73       // The event `EVENT_INACTIVITY_TIMEOUT` only be triggered if a read
74       // or write I/O operation was set by `do_io_read()` or `do_io_write()`.
75       if (ne->next_inactivity_timeout_at == 0 && nh.config.default_inactivity_timeout > 0 &&
76           (ne->read.enabled || ne->write.enabled)) {
77         Debug("inactivity_cop", "vc: %p inactivity timeout not set, setting a default of %d", ne,
78               nh.config.default_inactivity_timeout);
79         ne->set_default_inactivity_timeout(HRTIME_SECONDS(nh.config.default_inactivity_timeout));
80         NET_INCREMENT_DYN_STAT(default_inactivity_timeout_applied_stat);
81       }
82 
83       if (ne->next_inactivity_timeout_at && ne->next_inactivity_timeout_at < now) {
84         if (ne->is_default_inactivity_timeout()) {
85           // track the connections that timed out due to default inactivity
86           NET_INCREMENT_DYN_STAT(default_inactivity_timeout_count_stat);
87         }
88         if (nh.keep_alive_queue.in(ne)) {
89           // only stat if the connection is in keep-alive, there can be other inactivity timeouts
90           ink_hrtime diff = (now - (ne->next_inactivity_timeout_at - ne->inactivity_timeout_in)) / HRTIME_SECOND;
91           NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff);
92           NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat);
93         }
94         Debug("inactivity_cop_verbose", "ne: %p now: %" PRId64 " timeout at: %" PRId64 " timeout in: %" PRId64, ne,
95               ink_hrtime_to_sec(now), ne->next_inactivity_timeout_at, ne->inactivity_timeout_in);
96         ne->callback(VC_EVENT_INACTIVITY_TIMEOUT, e);
97       } else if (ne->next_activity_timeout_at && ne->next_activity_timeout_at < now) {
98         Debug("inactivity_cop_verbose", "active ne: %p now: %" PRId64 " timeout at: %" PRId64 " timeout in: %" PRId64, ne,
99               ink_hrtime_to_sec(now), ne->next_activity_timeout_at, ne->active_timeout_in);
100         ne->callback(VC_EVENT_ACTIVE_TIMEOUT, e);
101       }
102     }
103     // The cop_list is empty now.
104     // Let's reload the cop_list from open_list again.
105     forl_LL(NetEvent, ne, nh.open_list)
106     {
107       if (ne->get_thread() == this_ethread()) {
108         nh.cop_list.push(ne);
109       }
110     }
111     // NetHandler will remove NetEvent from cop_list if it is triggered.
112     // As the NetHandler runs, the number of NetEvents in the cop_list is decreasing.
113     // NetHandler runs 100 times maximum between InactivityCop runs.
114     // Therefore we don't have to check all the NetEvents as much as open_list.
115 
116     // Cleanup the active and keep-alive queues periodically
117     nh.manage_active_queue(nullptr, true); // close any connections over the active timeout
118     nh.manage_keep_alive_queue();
119 
120     return 0;
121   }
122 };
123 
PollCont(Ptr<ProxyMutex> & m,int pt)124 PollCont::PollCont(Ptr<ProxyMutex> &m, int pt)
125   : Continuation(m.get()), net_handler(nullptr), nextPollDescriptor(nullptr), poll_timeout(pt)
126 {
127   pollDescriptor = new PollDescriptor();
128   SET_HANDLER(&PollCont::pollEvent);
129 }
130 
PollCont(Ptr<ProxyMutex> & m,NetHandler * nh,int pt)131 PollCont::PollCont(Ptr<ProxyMutex> &m, NetHandler *nh, int pt)
132   : Continuation(m.get()), net_handler(nh), nextPollDescriptor(nullptr), poll_timeout(pt)
133 {
134   pollDescriptor = new PollDescriptor();
135   SET_HANDLER(&PollCont::pollEvent);
136 }
137 
~PollCont()138 PollCont::~PollCont()
139 {
140   delete pollDescriptor;
141   if (nextPollDescriptor != nullptr) {
142     delete nextPollDescriptor;
143   }
144 }
145 
146 //
147 // PollCont continuation which does the epoll_wait
148 // and stores the resultant events in ePoll_Triggered_Events
149 //
150 int
pollEvent(int,Event *)151 PollCont::pollEvent(int, Event *)
152 {
153   this->do_poll(-1);
154   return EVENT_CONT;
155 }
156 
157 void
do_poll(ink_hrtime timeout)158 PollCont::do_poll(ink_hrtime timeout)
159 {
160   if (likely(net_handler)) {
161     /* checking to see whether there are connections on the ready_queue (either read or write) that need processing [ebalsa] */
162     if (likely(!net_handler->read_ready_list.empty() || !net_handler->write_ready_list.empty() ||
163                !net_handler->read_enable_list.empty() || !net_handler->write_enable_list.empty())) {
164       NetDebug("iocore_net_poll", "rrq: %d, wrq: %d, rel: %d, wel: %d", net_handler->read_ready_list.empty(),
165                net_handler->write_ready_list.empty(), net_handler->read_enable_list.empty(),
166                net_handler->write_enable_list.empty());
167       poll_timeout = 0; // poll immediately returns -- we have triggered stuff to process right now
168     } else if (timeout >= 0) {
169       poll_timeout = ink_hrtime_to_msec(timeout);
170     } else {
171       poll_timeout = net_config_poll_timeout;
172     }
173   }
174 // wait for fd's to trigger, or don't wait if timeout is 0
175 #if TS_USE_EPOLL
176   pollDescriptor->result =
177     epoll_wait(pollDescriptor->epoll_fd, pollDescriptor->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout);
178   NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] epoll_fd: %d, timeout: %d, results: %d", pollDescriptor->epoll_fd,
179            poll_timeout, pollDescriptor->result);
180 #elif TS_USE_KQUEUE
181   struct timespec tv;
182   tv.tv_sec  = poll_timeout / 1000;
183   tv.tv_nsec = 1000000 * (poll_timeout % 1000);
184   pollDescriptor->result =
185     kevent(pollDescriptor->kqueue_fd, nullptr, 0, pollDescriptor->kq_Triggered_Events, POLL_DESCRIPTOR_SIZE, &tv);
186   NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] kqueue_fd: %d, timeout: %d, results: %d", pollDescriptor->kqueue_fd,
187            poll_timeout, pollDescriptor->result);
188 #elif TS_USE_PORT
189   int retval;
190   timespec_t ptimeout;
191   ptimeout.tv_sec  = poll_timeout / 1000;
192   ptimeout.tv_nsec = 1000000 * (poll_timeout % 1000);
193   unsigned nget    = 1;
194   if ((retval = port_getn(pollDescriptor->port_fd, pollDescriptor->Port_Triggered_Events, POLL_DESCRIPTOR_SIZE, &nget, &ptimeout)) <
195       0) {
196     pollDescriptor->result = 0;
197     switch (errno) {
198     case EINTR:
199     case EAGAIN:
200     case ETIME:
201       if (nget > 0) {
202         pollDescriptor->result = (int)nget;
203       }
204       break;
205     default:
206       ink_assert(!"unhandled port_getn() case:");
207       break;
208     }
209   } else {
210     pollDescriptor->result = (int)nget;
211   }
212   NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] %d[%s]=port_getn(%d,%p,%d,%d,%d),results(%d)", retval,
213            retval < 0 ? strerror(errno) : "ok", pollDescriptor->port_fd, pollDescriptor->Port_Triggered_Events,
214            POLL_DESCRIPTOR_SIZE, nget, poll_timeout, pollDescriptor->result);
215 #else
216 #error port me
217 #endif
218 }
219 
220 static void
net_signal_hook_callback(EThread * thread)221 net_signal_hook_callback(EThread *thread)
222 {
223 #if HAVE_EVENTFD
224   uint64_t counter;
225   ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t)));
226 #elif TS_USE_PORT
227 /* Nothing to drain or do */
228 #else
229   char dummy[1024];
230   ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024));
231 #endif
232 }
233 
234 void
initialize_thread_for_net(EThread * thread)235 initialize_thread_for_net(EThread *thread)
236 {
237   NetHandler *nh = get_NetHandler(thread);
238 
239   new (reinterpret_cast<ink_dummy_for_new *>(nh)) NetHandler();
240   new (reinterpret_cast<ink_dummy_for_new *>(get_PollCont(thread))) PollCont(thread->mutex, nh);
241   nh->mutex  = new_ProxyMutex();
242   nh->thread = thread;
243 
244   PollCont *pc       = get_PollCont(thread);
245   PollDescriptor *pd = pc->pollDescriptor;
246 
247   InactivityCop *inactivityCop = new InactivityCop(get_NetHandler(thread)->mutex);
248   int cop_freq                 = 1;
249 
250   REC_ReadConfigInteger(cop_freq, "proxy.config.net.inactivity_check_frequency");
251   memcpy(&nh->config, &NetHandler::global_config, sizeof(NetHandler::global_config));
252   nh->configure_per_thread_values();
253   thread->schedule_every(inactivityCop, HRTIME_SECONDS(cop_freq));
254 
255   thread->set_tail_handler(nh);
256   thread->ep = static_cast<EventIO *>(ats_malloc(sizeof(EventIO)));
257   new (thread->ep) EventIO();
258   thread->ep->type = EVENTIO_ASYNC_SIGNAL;
259 #if HAVE_EVENTFD
260   thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ);
261 #else
262   thread->ep->start(pd, thread->evpipe[0], nullptr, EVENTIO_READ);
263 #endif
264 }
265 
266 // NetHandler method definitions
267 
NetHandler()268 NetHandler::NetHandler() : Continuation(nullptr)
269 {
270   SET_HANDLER((NetContHandler)&NetHandler::mainNetEvent);
271 }
272 
273 int
update_nethandler_config(const char * str,RecDataT,RecData data,void *)274 NetHandler::update_nethandler_config(const char *str, RecDataT, RecData data, void *)
275 {
276   uint32_t *updated_member = nullptr; // direct pointer to config member for update.
277   std::string_view name{str};
278 
279   if (name == "proxy.config.net.max_connections_in"sv) {
280     updated_member = &NetHandler::global_config.max_connections_in;
281     Debug("net_queue", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int);
282   } else if (name == "proxy.config.net.max_requests_in"sv) {
283     updated_member = &NetHandler::global_config.max_requests_in;
284     Debug("net_queue", "proxy.config.net.max_requests_in updated to %" PRId64, data.rec_int);
285   } else if (name == "proxy.config.net.inactive_threshold_in"sv) {
286     updated_member = &NetHandler::global_config.inactive_threshold_in;
287     Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %" PRId64, data.rec_int);
288   } else if (name == "proxy.config.net.transaction_no_activity_timeout_in"sv) {
289     updated_member = &NetHandler::global_config.transaction_no_activity_timeout_in;
290     Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %" PRId64, data.rec_int);
291   } else if (name == "proxy.config.net.keep_alive_no_activity_timeout_in"sv) {
292     updated_member = &NetHandler::global_config.keep_alive_no_activity_timeout_in;
293     Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %" PRId64, data.rec_int);
294   } else if (name == "proxy.config.net.default_inactivity_timeout"sv) {
295     updated_member = &NetHandler::global_config.default_inactivity_timeout;
296     Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %" PRId64, data.rec_int);
297   }
298 
299   if (updated_member) {
300     *updated_member = data.rec_int; // do the actual update.
301     // portable form of the update, an index converted to <void*> so it can be passed as an event cookie.
302     void *idx = reinterpret_cast<void *>(static_cast<intptr_t>(updated_member - &global_config[0]));
303     // Signal the NetHandler instances, passing the index of the updated config value.
304     for (int i = 0; i < eventProcessor.n_thread_groups; ++i) {
305       if (!active_thread_types[i]) {
306         continue;
307       }
308       for (EThread **tp    = eventProcessor.thread_group[i]._thread,
309                    **limit = eventProcessor.thread_group[i]._thread + eventProcessor.thread_group[i]._count;
310            tp < limit; ++tp) {
311         NetHandler *nh = get_NetHandler(*tp);
312         if (nh) {
313           nh->thread->schedule_imm(nh, TS_EVENT_MGMT_UPDATE, idx);
314         }
315       }
316     }
317   }
318 
319   return REC_ERR_OKAY;
320 }
321 
322 void
init_for_process()323 NetHandler::init_for_process()
324 {
325   // read configuration values and setup callbacks for when they change
326   REC_ReadConfigInt32(global_config.max_connections_in, "proxy.config.net.max_connections_in");
327   REC_ReadConfigInt32(global_config.max_requests_in, "proxy.config.net.max_requests_in");
328   REC_ReadConfigInt32(global_config.inactive_threshold_in, "proxy.config.net.inactive_threshold_in");
329   REC_ReadConfigInt32(global_config.transaction_no_activity_timeout_in, "proxy.config.net.transaction_no_activity_timeout_in");
330   REC_ReadConfigInt32(global_config.keep_alive_no_activity_timeout_in, "proxy.config.net.keep_alive_no_activity_timeout_in");
331   REC_ReadConfigInt32(global_config.default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
332 
333   RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_nethandler_config, nullptr);
334   RecRegisterConfigUpdateCb("proxy.config.net.max_requests_in", update_nethandler_config, nullptr);
335   RecRegisterConfigUpdateCb("proxy.config.net.inactive_threshold_in", update_nethandler_config, nullptr);
336   RecRegisterConfigUpdateCb("proxy.config.net.transaction_no_activity_timeout_in", update_nethandler_config, nullptr);
337   RecRegisterConfigUpdateCb("proxy.config.net.keep_alive_no_activity_timeout_in", update_nethandler_config, nullptr);
338   RecRegisterConfigUpdateCb("proxy.config.net.default_inactivity_timeout", update_nethandler_config, nullptr);
339 
340   Debug("net_queue", "proxy.config.net.max_connections_in updated to %d", global_config.max_connections_in);
341   Debug("net_queue", "proxy.config.net.max_requests_in updated to %d", global_config.max_requests_in);
342   Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %d", global_config.inactive_threshold_in);
343   Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %d",
344         global_config.transaction_no_activity_timeout_in);
345   Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %d",
346         global_config.keep_alive_no_activity_timeout_in);
347   Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %d", global_config.default_inactivity_timeout);
348 }
349 
350 //
351 // Function used to release a NetEvent and free it.
352 //
353 void
free_netevent(NetEvent * ne)354 NetHandler::free_netevent(NetEvent *ne)
355 {
356   EThread *t = this->thread;
357 
358   ink_assert(t == this_ethread());
359   ink_release_assert(ne->get_thread() == t);
360   ink_release_assert(ne->nh == this);
361 
362   // Release ne from InactivityCop
363   stopCop(ne);
364   // Release ne from NetHandler
365   stopIO(ne);
366   // Clear and deallocate ne
367   ne->free(t);
368 }
369 
370 //
371 // Move VC's enabled on a different thread to the ready list
372 //
373 void
process_enabled_list()374 NetHandler::process_enabled_list()
375 {
376   NetEvent *ne = nullptr;
377 
378   SListM(NetEvent, NetState, read, enable_link) rq(read_enable_list.popall());
379   while ((ne = rq.pop())) {
380     ne->ep.modify(EVENTIO_READ);
381     ne->ep.refresh(EVENTIO_READ);
382     ne->read.in_enabled_list = 0;
383     if ((ne->read.enabled && ne->read.triggered) || ne->closed) {
384       read_ready_list.in_or_enqueue(ne);
385     }
386   }
387 
388   SListM(NetEvent, NetState, write, enable_link) wq(write_enable_list.popall());
389   while ((ne = wq.pop())) {
390     ne->ep.modify(EVENTIO_WRITE);
391     ne->ep.refresh(EVENTIO_WRITE);
392     ne->write.in_enabled_list = 0;
393     if ((ne->write.enabled && ne->write.triggered) || ne->closed) {
394       write_ready_list.in_or_enqueue(ne);
395     }
396   }
397 }
398 
399 //
400 // Walk through the ready list
401 //
402 void
process_ready_list()403 NetHandler::process_ready_list()
404 {
405   NetEvent *ne = nullptr;
406 
407 #if defined(USE_EDGE_TRIGGER)
408   // NetEvent *
409   while ((ne = read_ready_list.dequeue())) {
410     // Initialize the thread-local continuation flags
411     set_cont_flags(ne->get_control_flags());
412     if (ne->closed) {
413       free_netevent(ne);
414     } else if (ne->read.enabled && ne->read.triggered) {
415       ne->net_read_io(this, this->thread);
416     } else if (!ne->read.enabled) {
417       read_ready_list.remove(ne);
418 #if defined(solaris)
419       if (ne->read.triggered && ne->write.enabled) {
420         ne->ep.modify(-EVENTIO_READ);
421         ne->ep.refresh(EVENTIO_WRITE);
422         ne->writeReschedule(this);
423       }
424 #endif
425     }
426   }
427   while ((ne = write_ready_list.dequeue())) {
428     set_cont_flags(ne->get_control_flags());
429     if (ne->closed) {
430       free_netevent(ne);
431     } else if (ne->write.enabled && ne->write.triggered) {
432       ne->net_write_io(this, this->thread);
433     } else if (!ne->write.enabled) {
434       write_ready_list.remove(ne);
435 #if defined(solaris)
436       if (ne->write.triggered && ne->read.enabled) {
437         ne->ep.modify(-EVENTIO_WRITE);
438         ne->ep.refresh(EVENTIO_READ);
439         ne->readReschedule(this);
440       }
441 #endif
442     }
443   }
444 #else  /* !USE_EDGE_TRIGGER */
445   while ((ne = read_ready_list.dequeue())) {
446     set_cont_flags(ne->get_control_flags());
447     if (ne->closed)
448       free_netevent(ne);
449     else if (ne->read.enabled && ne->read.triggered)
450       ne->net_read_io(this, this->thread);
451     else if (!ne->read.enabled)
452       ne->ep.modify(-EVENTIO_READ);
453   }
454   while ((ne = write_ready_list.dequeue())) {
455     set_cont_flags(ne->get_control_flags());
456     if (ne->closed)
457       free_netevent(ne);
458     else if (ne->write.enabled && ne->write.triggered)
459       write_to_net(this, ne, this->thread);
460     else if (!ne->write.enabled)
461       ne->ep.modify(-EVENTIO_WRITE);
462   }
463 #endif /* !USE_EDGE_TRIGGER */
464 }
465 
466 //
467 // The main event for NetHandler
468 int
mainNetEvent(int event,Event * e)469 NetHandler::mainNetEvent(int event, Event *e)
470 {
471   if (TS_EVENT_MGMT_UPDATE == event) {
472     intptr_t idx = reinterpret_cast<intptr_t>(e->cookie);
473     // Copy to the same offset in the instance struct.
474     config[idx] = global_config[idx];
475     if (config_value_affects_per_thread_value[idx]) {
476       this->configure_per_thread_values();
477     }
478     return EVENT_CONT;
479   } else {
480     ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
481     return this->waitForActivity(-1);
482   }
483 }
484 
485 int
waitForActivity(ink_hrtime timeout)486 NetHandler::waitForActivity(ink_hrtime timeout)
487 {
488   EventIO *epd = nullptr;
489 
490   NET_INCREMENT_DYN_STAT(net_handler_run_stat);
491   SCOPED_MUTEX_LOCK(lock, mutex, this->thread);
492 
493   process_enabled_list();
494 
495   // Polling event by PollCont
496   PollCont *p = get_PollCont(this->thread);
497   p->do_poll(timeout);
498 
499   // Get & Process polling result
500   PollDescriptor *pd = get_PollDescriptor(this->thread);
501   NetEvent *ne       = nullptr;
502   for (int x = 0; x < pd->result; x++) {
503     epd = static_cast<EventIO *> get_ev_data(pd, x);
504     if (epd->type == EVENTIO_READWRITE_VC) {
505       ne = epd->data.ne;
506       // Remove triggered NetEvent from cop_list because it won't be timeout before next InactivityCop runs.
507       if (cop_list.in(ne)) {
508         cop_list.remove(ne);
509       }
510       int flags = get_ev_events(pd, x);
511       if (flags & (EVENTIO_ERROR)) {
512         ne->set_error_from_socket();
513       }
514       if (flags & (EVENTIO_READ)) {
515         ne->read.triggered = 1;
516         if (!read_ready_list.in(ne)) {
517           read_ready_list.enqueue(ne);
518         }
519       }
520       if (flags & (EVENTIO_WRITE)) {
521         ne->write.triggered = 1;
522         if (!write_ready_list.in(ne)) {
523           write_ready_list.enqueue(ne);
524         }
525       } else if (!(flags & (EVENTIO_READ))) {
526         Debug("iocore_net_main", "Unhandled epoll event: 0x%04x", flags);
527         // In practice we sometimes see EPOLLERR and EPOLLHUP through there
528         // Anything else would be surprising
529         ink_assert((flags & ~(EVENTIO_ERROR)) == 0);
530         ne->write.triggered = 1;
531         if (!write_ready_list.in(ne)) {
532           write_ready_list.enqueue(ne);
533         }
534       }
535     } else if (epd->type == EVENTIO_DNS_CONNECTION) {
536       if (epd->data.dnscon != nullptr) {
537         epd->data.dnscon->trigger(); // Make sure the DNSHandler for this con knows we triggered
538 #if defined(USE_EDGE_TRIGGER)
539         epd->refresh(EVENTIO_READ);
540 #endif
541       }
542     } else if (epd->type == EVENTIO_ASYNC_SIGNAL) {
543       net_signal_hook_callback(this->thread);
544     } else if (epd->type == EVENTIO_NETACCEPT) {
545       this->thread->schedule_imm(epd->data.c);
546     }
547     ev_next_event(pd, x);
548   }
549 
550   pd->result = 0;
551 
552   process_ready_list();
553 
554   return EVENT_CONT;
555 }
556 
557 void
signalActivity()558 NetHandler::signalActivity()
559 {
560 #if HAVE_EVENTFD
561   uint64_t counter = 1;
562   ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
563 #elif TS_USE_PORT
564   PollDescriptor *pd = get_PollDescriptor(thread);
565   ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
566 #else
567   char dummy = 1;
568   ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
569 #endif
570 }
571 
572 bool
manage_active_queue(NetEvent * enabling_ne,bool ignore_queue_size=false)573 NetHandler::manage_active_queue(NetEvent *enabling_ne, bool ignore_queue_size = false)
574 {
575   const int total_connections_in = active_queue_size + keep_alive_queue_size;
576   Debug("v_net_queue",
577         "max_connections_per_thread_in: %d max_requests_per_thread_in: %d total_connections_in: %d "
578         "active_queue_size: %d keep_alive_queue_size: %d",
579         max_connections_per_thread_in, max_requests_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size);
580 
581   if (!max_requests_per_thread_in) {
582     // active queue has no max
583     return true;
584   }
585 
586   if (ignore_queue_size == false && max_requests_per_thread_in > active_queue_size) {
587     return true;
588   }
589 
590   ink_hrtime now = Thread::get_hrtime();
591 
592   // loop over the non-active connections and try to close them
593   NetEvent *ne         = active_queue.head;
594   NetEvent *ne_next    = nullptr;
595   int closed           = 0;
596   int handle_event     = 0;
597   int total_idle_time  = 0;
598   int total_idle_count = 0;
599   for (; ne != nullptr; ne = ne_next) {
600     ne_next = ne->active_queue_link.next;
601     // It seems dangerous closing the current ne at this point
602     // Let the activity_cop deal with it
603     if (ne == enabling_ne) {
604       continue;
605     }
606     if ((ne->next_inactivity_timeout_at && ne->next_inactivity_timeout_at <= now) ||
607         (ne->next_activity_timeout_at && ne->next_activity_timeout_at <= now)) {
608       _close_ne(ne, now, handle_event, closed, total_idle_time, total_idle_count);
609     }
610     if (ignore_queue_size == false && max_requests_per_thread_in > active_queue_size) {
611       return true;
612     }
613   }
614 
615   if (max_requests_per_thread_in > active_queue_size) {
616     return true;
617   }
618 
619   return false; // failed to make room in the queue, all connections are active
620 }
621 
622 void
configure_per_thread_values()623 NetHandler::configure_per_thread_values()
624 {
625   // figure out the number of threads and calculate the number of connections per thread
626   int threads                   = eventProcessor.thread_group[ET_NET]._count;
627   max_connections_per_thread_in = config.max_connections_in / threads;
628   max_requests_per_thread_in    = config.max_requests_in / threads;
629   Debug("net_queue", "max_connections_per_thread_in updated to %d threads: %d", max_connections_per_thread_in, threads);
630   Debug("net_queue", "max_requests_per_thread_in updated to %d threads: %d", max_requests_per_thread_in, threads);
631 }
632 
633 void
manage_keep_alive_queue()634 NetHandler::manage_keep_alive_queue()
635 {
636   uint32_t total_connections_in = active_queue_size + keep_alive_queue_size;
637   ink_hrtime now                = Thread::get_hrtime();
638 
639   Debug("v_net_queue", "max_connections_per_thread_in: %d total_connections_in: %d active_queue_size: %d keep_alive_queue_size: %d",
640         max_connections_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size);
641 
642   if (!max_connections_per_thread_in || total_connections_in <= max_connections_per_thread_in) {
643     return;
644   }
645 
646   // loop over the non-active connections and try to close them
647   NetEvent *ne_next    = nullptr;
648   int closed           = 0;
649   int handle_event     = 0;
650   int total_idle_time  = 0;
651   int total_idle_count = 0;
652   for (NetEvent *ne = keep_alive_queue.head; ne != nullptr; ne = ne_next) {
653     ne_next = ne->keep_alive_queue_link.next;
654     _close_ne(ne, now, handle_event, closed, total_idle_time, total_idle_count);
655 
656     total_connections_in = active_queue_size + keep_alive_queue_size;
657     if (total_connections_in <= max_connections_per_thread_in) {
658       break;
659     }
660   }
661 
662   if (total_idle_count > 0) {
663     Debug("net_queue", "max cons: %d active: %d idle: %d already closed: %d, close event: %d mean idle: %d",
664           max_connections_per_thread_in, total_connections_in, keep_alive_queue_size, closed, handle_event,
665           total_idle_time / total_idle_count);
666   }
667 }
668 
669 void
_close_ne(NetEvent * ne,ink_hrtime now,int & handle_event,int & closed,int & total_idle_time,int & total_idle_count)670 NetHandler::_close_ne(NetEvent *ne, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count)
671 {
672   if (ne->get_thread() != this_ethread()) {
673     return;
674   }
675   MUTEX_TRY_LOCK(lock, ne->get_mutex(), this_ethread());
676   if (!lock.is_locked()) {
677     return;
678   }
679   ink_hrtime diff = (now - (ne->next_inactivity_timeout_at - ne->inactivity_timeout_in)) / HRTIME_SECOND;
680   if (diff > 0) {
681     total_idle_time += diff;
682     ++total_idle_count;
683     NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff);
684     NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat);
685   }
686   Debug("net_queue", "closing connection NetEvent=%p idle: %u now: %" PRId64 " at: %" PRId64 " in: %" PRId64 " diff: %" PRId64, ne,
687         keep_alive_queue_size, ink_hrtime_to_sec(now), ink_hrtime_to_sec(ne->next_inactivity_timeout_at),
688         ink_hrtime_to_sec(ne->inactivity_timeout_in), diff);
689   if (ne->closed) {
690     free_netevent(ne);
691     ++closed;
692   } else {
693     ne->next_inactivity_timeout_at = now;
694     // create a dummy event
695     Event event;
696     event.ethread = this_ethread();
697     if (ne->inactivity_timeout_in && ne->next_inactivity_timeout_at <= now) {
698       if (ne->callback(VC_EVENT_INACTIVITY_TIMEOUT, &event) == EVENT_DONE) {
699         ++handle_event;
700       }
701     } else if (ne->active_timeout_in && ne->next_activity_timeout_at <= now) {
702       if (ne->callback(VC_EVENT_ACTIVE_TIMEOUT, &event) == EVENT_DONE) {
703         ++handle_event;
704       }
705     }
706   }
707 }
708 
709 void
add_to_keep_alive_queue(NetEvent * ne)710 NetHandler::add_to_keep_alive_queue(NetEvent *ne)
711 {
712   Debug("net_queue", "NetEvent: %p", ne);
713   ink_assert(mutex->thread_holding == this_ethread());
714 
715   if (keep_alive_queue.in(ne)) {
716     // already in the keep-alive queue, move the head
717     keep_alive_queue.remove(ne);
718   } else {
719     // in the active queue or no queue, new to this queue
720     remove_from_active_queue(ne);
721     ++keep_alive_queue_size;
722   }
723   keep_alive_queue.enqueue(ne);
724 
725   // if keep-alive queue is over size then close connections
726   manage_keep_alive_queue();
727 }
728 
729 void
remove_from_keep_alive_queue(NetEvent * ne)730 NetHandler::remove_from_keep_alive_queue(NetEvent *ne)
731 {
732   Debug("net_queue", "NetEvent: %p", ne);
733   ink_assert(mutex->thread_holding == this_ethread());
734 
735   if (keep_alive_queue.in(ne)) {
736     keep_alive_queue.remove(ne);
737     --keep_alive_queue_size;
738   }
739 }
740 
741 bool
add_to_active_queue(NetEvent * ne)742 NetHandler::add_to_active_queue(NetEvent *ne)
743 {
744   Debug("net_queue", "NetEvent: %p", ne);
745   Debug("net_queue", "max_connections_per_thread_in: %d active_queue_size: %d keep_alive_queue_size: %d",
746         max_connections_per_thread_in, active_queue_size, keep_alive_queue_size);
747   ink_assert(mutex->thread_holding == this_ethread());
748 
749   bool active_queue_full = false;
750 
751   // if active queue is over size then close inactive connections
752   if (manage_active_queue(ne) == false) {
753     active_queue_full = true;
754   }
755 
756   if (active_queue.in(ne)) {
757     // already in the active queue, move the head
758     active_queue.remove(ne);
759   } else {
760     if (active_queue_full) {
761       // there is no room left in the queue
762       NET_SUM_DYN_STAT(net_requests_max_throttled_in_stat, 1);
763       return false;
764     }
765     // in the keep-alive queue or no queue, new to this queue
766     remove_from_keep_alive_queue(ne);
767     ++active_queue_size;
768   }
769   active_queue.enqueue(ne);
770 
771   return true;
772 }
773 
774 void
remove_from_active_queue(NetEvent * ne)775 NetHandler::remove_from_active_queue(NetEvent *ne)
776 {
777   Debug("net_queue", "NetEvent: %p", ne);
778   ink_assert(mutex->thread_holding == this_ethread());
779 
780   if (active_queue.in(ne)) {
781     active_queue.remove(ne);
782     --active_queue_size;
783   }
784 }
785