1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #pragma once
25 
26 #include <bitset>
27 
28 #include "tscore/ink_platform.h"
29 
30 #define USE_EDGE_TRIGGER_EPOLL 1
31 #define USE_EDGE_TRIGGER_KQUEUE 1
32 #define USE_EDGE_TRIGGER_PORT 1
33 
34 #define EVENTIO_NETACCEPT 1
35 #define EVENTIO_READWRITE_VC 2
36 #define EVENTIO_DNS_CONNECTION 3
37 #define EVENTIO_UDP_CONNECTION 4
38 #define EVENTIO_ASYNC_SIGNAL 5
39 
40 #if TS_USE_EPOLL
41 #ifndef EPOLLEXCLUSIVE
42 #define EPOLLEXCLUSIVE 0
43 #endif
44 #ifdef USE_EDGE_TRIGGER_EPOLL
45 #define USE_EDGE_TRIGGER 1
46 #define EVENTIO_READ (EPOLLIN | EPOLLET)
47 #define EVENTIO_WRITE (EPOLLOUT | EPOLLET)
48 #else
49 #define EVENTIO_READ EPOLLIN
50 #define EVENTIO_WRITE EPOLLOUT
51 #endif
52 #define EVENTIO_ERROR (EPOLLERR | EPOLLPRI | EPOLLHUP)
53 #endif
54 
55 #if TS_USE_KQUEUE
56 #ifdef USE_EDGE_TRIGGER_KQUEUE
57 #define USE_EDGE_TRIGGER 1
58 #define INK_EV_EDGE_TRIGGER EV_CLEAR
59 #else
60 #define INK_EV_EDGE_TRIGGER 0
61 #endif
62 #define EVENTIO_READ INK_EVP_IN
63 #define EVENTIO_WRITE INK_EVP_OUT
64 #define EVENTIO_ERROR (0x010 | 0x002 | 0x020) // ERR PRI HUP
65 #endif
66 #if TS_USE_PORT
67 #ifdef USE_EDGE_TRIGGER_PORT
68 #define USE_EDGE_TRIGGER 1
69 #endif
70 #define EVENTIO_READ POLLIN
71 #define EVENTIO_WRITE POLLOUT
72 #define EVENTIO_ERROR (POLLERR | POLLPRI | POLLHUP)
73 #endif
74 
75 struct PollDescriptor;
76 typedef PollDescriptor *EventLoop;
77 
78 class NetEvent;
79 class UnixUDPConnection;
80 struct DNSConnection;
81 struct NetAccept;
82 
83 /// Unified API for setting and clearing kernel and epoll events.
84 struct EventIO {
85   int fd = -1; ///< file descriptor, often a system port
86 #if TS_USE_KQUEUE || TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER) || TS_USE_PORT
87   int events = 0; ///< a bit mask of enabled events
88 #endif
89   EventLoop event_loop = nullptr; ///< the assigned event loop
90   bool syscall         = true;    ///< if false, disable all functionality (for QUIC)
91   int type             = 0;       ///< class identifier of union data.
92   union {
93     Continuation *c;
94     NetEvent *ne;
95     DNSConnection *dnscon;
96     NetAccept *na;
97     UnixUDPConnection *uc;
98   } data; ///< a kind of continuation
99 
100   /** The start methods all logically Setup a class to be called
101      when a file descriptor is available for read or write.
102      The type of the classes vary.  Generally the file descriptor
103      is pulled from the class, but there is one option that lets
104      the file descriptor be expressed directly.
105      @param l the event loop
106      @param events a mask of flags (for details `man epoll_ctl`)
107      @return int the number of events created, -1 is error
108    */
109   int start(EventLoop l, DNSConnection *vc, int events);
110   int start(EventLoop l, NetAccept *vc, int events);
111   int start(EventLoop l, NetEvent *ne, int events);
112   int start(EventLoop l, UnixUDPConnection *vc, int events);
113   int start(EventLoop l, int fd, NetEvent *ne, int events);
114   int start_common(EventLoop l, int fd, int events);
115 
116   /** Alter the events that will trigger the continuation, for level triggered I/O.
117      @param events add with positive mask(+EVENTIO_READ), or remove with negative mask (-EVENTIO_READ)
118      @return int the number of events created, -1 is error
119    */
120   int modify(int events);
121 
122   /** Refresh the existing events (i.e. KQUEUE EV_CLEAR), for edge triggered I/O
123      @param events mask of events
124      @return int the number of events created, -1 is error
125    */
126   int refresh(int events);
127 
128   /// Remove the kernel or epoll event. Returns 0 on success.
129   int stop();
130 
131   /// Remove the epoll event and close the connection. Returns 0 on success.
132   int close();
133 
EventIOEventIO134   EventIO() { data.c = nullptr; }
135 };
136 
137 #include "P_Net.h"
138 #include "P_UnixNetProcessor.h"
139 #include "P_UnixNetVConnection.h"
140 #include "P_NetAccept.h"
141 #include "P_DNSConnection.h"
142 #include "P_UnixUDPConnection.h"
143 #include "P_UnixPollDescriptor.h"
144 #include <limits>
145 
146 class NetEvent;
147 class NetHandler;
148 typedef int (NetHandler::*NetContHandler)(int, void *);
149 typedef unsigned int uint32;
150 
151 extern ink_hrtime last_throttle_warning;
152 extern ink_hrtime last_shedding_warning;
153 extern ink_hrtime emergency_throttle_time;
154 extern int net_connections_throttle;
155 extern bool net_memory_throttle;
156 extern int fds_throttle;
157 extern int fds_limit;
158 extern ink_hrtime last_transient_accept_error;
159 extern int http_accept_port_number;
160 
161 //
162 // Configuration Parameter had to move here to share
163 // between UnixNet and UnixUDPNet or SSLNet modules.
164 // Design notes are in Memo.NetDesign
165 //
166 
167 #define THROTTLE_FD_HEADROOM (128 + 64) // CACHE_DB_FDS + 64
168 
169 #define TRANSIENT_ACCEPT_ERROR_MESSAGE_EVERY HRTIME_HOURS(24)
170 
171 // also the 'throttle connect headroom'
172 #define EMERGENCY_THROTTLE 16
173 #define THROTTLE_AT_ONCE 5
174 #define HYPER_EMERGENCY_THROTTLE 6
175 
176 #define NET_THROTTLE_ACCEPT_HEADROOM 1.1  // 10%
177 #define NET_THROTTLE_CONNECT_HEADROOM 1.0 // 0%
178 #define NET_THROTTLE_MESSAGE_EVERY HRTIME_MINUTES(10)
179 
180 #define PRINT_IP(x) ((uint8_t *)&(x))[0], ((uint8_t *)&(x))[1], ((uint8_t *)&(x))[2], ((uint8_t *)&(x))[3]
181 
182 // function prototype needed for SSLUnixNetVConnection
183 unsigned int net_next_connection_number();
184 
185 struct PollCont : public Continuation {
186   NetHandler *net_handler;
187   PollDescriptor *pollDescriptor;
188   PollDescriptor *nextPollDescriptor;
189   int poll_timeout;
190 
191   PollCont(Ptr<ProxyMutex> &m, int pt = net_config_poll_timeout);
192   PollCont(Ptr<ProxyMutex> &m, NetHandler *nh, int pt = net_config_poll_timeout);
193   ~PollCont() override;
194   int pollEvent(int, Event *);
195   void do_poll(ink_hrtime timeout);
196 };
197 
198 /**
199   NetHandler is the processor of NetEvent for the Net sub-system. The NetHandler
200   is the core component of the Net sub-system. Once started, it is responsible
201   for polling socket fds and perform the I/O tasks in NetEvent.
202 
203   The NetHandler is executed periodically to perform read/write tasks for
204   NetVConnection. The NetHandler::mainNetEvent() should be viewed as a part of
205   EThread::execute() loop. This is the reason that Net System is a sub-system.
206 
207   By get_NetHandler(this_ethread()), you can get the NetHandler object that
208   runs inside the current EThread and then @c startIO / @c stopIO which
209   assign/release a NetEvent to/from NetHandler. Before you call these functions,
210   holding the mutex of this NetHandler is required.
211 
212   The NetVConnection provides a set of do_io functions through which you can
213   specify continuations to be called back by its NetHandler. These function
214   calls do not block. Instead they return an VIO object and schedule the
215   callback to the continuation passed in when there are I/O events occurred.
216 
217   Multi-thread scheduler:
218 
219   The NetHandler should be viewed as multi-threaded schedulers which process
220   NetEvents from their queues. If vc wants to be managed by NetHandler, the vc
221   should be derived from NetEvent. The vc can be made of NetProcessor (allocate_vc)
222   either by directly adding a NetEvent to the queue (NetHandler::startIO), or more
223   conveniently, calling a method service call (NetProcessor::connect_re) which
224   synthesizes the NetEvent and places it in the queue.
225 
226   Callback event codes:
227 
228   These event codes for do_io_read and reenable(read VIO) task:
229     VC_EVENT_READ_READY, VC_EVENT_READ_COMPLETE,
230     VC_EVENT_EOS, VC_EVENT_ERROR
231 
232   These event codes for do_io_write and reenable(write VIO) task:
233     VC_EVENT_WRITE_READY, VC_EVENT_WRITE_COMPLETE
234     VC_EVENT_ERROR
235 
236   There is no event and callback for do_io_shutdown / do_io_close task.
237 
238   NetVConnection allocation policy:
239 
240   VCs are allocated by the NetProcessor and deallocated by NetHandler.
241   A state machine may access the returned, non-recurring NetEvent / VIO until
242   it is closed by do_io_close. For recurring NetEvent, the NetEvent may be
243   accessed until it is closed. Once the NetEvent is closed, it's the
244   NetHandler's responsibility to deallocate it.
245 
246   Before assign to NetHandler or after release from NetHandler, it's the
247   NetEvent's responsibility to deallocate itself.
248 
249  */
250 
251 //
252 // NetHandler
253 //
254 // A NetHandler handles the Network IO operations.  It maintains
255 // lists of operations at multiples of it's periodicity.
256 //
257 class NetHandler : public Continuation, public EThread::LoopTailHandler
258 {
259   using self_type = NetHandler; ///< Self reference type.
260 public:
261   // @a thread and @a trigger_event are redundant - you can get the former from the latter.
262   // If we don't get rid of @a trigger_event we should remove @a thread.
263   EThread *thread      = nullptr;
264   Event *trigger_event = nullptr;
265   QueM(NetEvent, NetState, read, ready_link) read_ready_list;
266   QueM(NetEvent, NetState, write, ready_link) write_ready_list;
267   Que(NetEvent, open_link) open_list;
268   DList(NetEvent, cop_link) cop_list;
269   ASLLM(NetEvent, NetState, read, enable_link) read_enable_list;
270   ASLLM(NetEvent, NetState, write, enable_link) write_enable_list;
271   Que(NetEvent, keep_alive_queue_link) keep_alive_queue;
272   uint32_t keep_alive_queue_size = 0;
273   Que(NetEvent, active_queue_link) active_queue;
274   uint32_t active_queue_size = 0;
275 
276   /// configuration settings for managing the active and keep-alive queues
277   struct Config {
278     uint32_t max_connections_in                 = 0;
279     uint32_t max_requests_in                    = 0;
280     uint32_t inactive_threshold_in              = 0;
281     uint32_t transaction_no_activity_timeout_in = 0;
282     uint32_t keep_alive_no_activity_timeout_in  = 0;
283     uint32_t default_inactivity_timeout         = 0;
284 
285     /** Return the address of the first value in this struct.
286 
287         Doing updates is much easier if we treat this config struct as an array.
288         Making it a method means the knowledge of which member is the first one
289         is localized to this struct, not scattered about.
290      */
291     uint32_t &
292     operator[](int n)
293     {
294       return *(&max_connections_in + n);
295     }
296   };
297   /** Static global config, set and updated per process.
298 
299       This is updated asynchronously and then events are sent to the NetHandler instances per thread
300       to copy to the per thread config at a convenient time. Because these are updated independently
301       from the command line, the update events just copy a single value from the global to the
302       local. This mechanism relies on members being identical types.
303   */
304   static Config global_config;
305   Config config; ///< Per thread copy of the @c global_config
306   // Active and keep alive queue values that depend on other configuration values.
307   // These are never updated directly, they are computed from other config values.
308   uint32_t max_connections_per_thread_in = 0;
309   uint32_t max_requests_per_thread_in    = 0;
310   /// Number of configuration items in @c Config.
311   static constexpr int CONFIG_ITEM_COUNT = sizeof(Config) / sizeof(uint32_t);
312   /// Which members of @c Config the per thread values depend on.
313   /// If one of these is updated, the per thread values must also be updated.
314   static const std::bitset<CONFIG_ITEM_COUNT> config_value_affects_per_thread_value;
315   /// Set of thread types in which nethandlers are active.
316   /// This enables signaling the correct instances when the configuration is updated.
317   /// Event type threads that use @c NetHandler must set the corresponding bit.
318   static std::bitset<std::numeric_limits<unsigned int>::digits> active_thread_types;
319 
320   int mainNetEvent(int event, Event *data);
321   int waitForActivity(ink_hrtime timeout) override;
322   void process_enabled_list();
323   void process_ready_list();
324   void manage_keep_alive_queue();
325   bool manage_active_queue(NetEvent *ne, bool ignore_queue_size);
326   void add_to_keep_alive_queue(NetEvent *ne);
327   void remove_from_keep_alive_queue(NetEvent *ne);
328   bool add_to_active_queue(NetEvent *ne);
329   void remove_from_active_queue(NetEvent *ne);
330 
331   /// Per process initialization logic.
332   static void init_for_process();
333   /// Update configuration values that are per thread and depend on other configuration values.
334   void configure_per_thread_values();
335 
336   /**
337     Start to handle read & write event on a NetEvent.
338     Initial the socket fd of ne for polling system.
339     Only be called when holding the mutex of this NetHandler.
340 
341     @param ne NetEvent to be managed by this NetHandler.
342     @return 0 on success, ne->nh set to this NetHandler.
343             -ERRNO on failure.
344    */
345   int startIO(NetEvent *ne);
346   /**
347     Stop to handle read & write event on a NetEvent.
348     Remove the socket fd of ne from polling system.
349     Only be called when holding the mutex of this NetHandler and must call stopCop(ne) first.
350 
351     @param ne NetEvent to be released.
352     @return ne->nh set to nullptr.
353    */
354   void stopIO(NetEvent *ne);
355 
356   /**
357     Start to handle active timeout and inactivity timeout on a NetEvent.
358     Put the ne into open_list. All NetEvents in the open_list is checked for timeout by InactivityCop.
359     Only be called when holding the mutex of this NetHandler and must call startIO(ne) first.
360 
361     @param ne NetEvent to be managed by InactivityCop
362    */
363   void startCop(NetEvent *ne);
364   /**
365     Stop to handle active timeout and inactivity on a NetEvent.
366     Remove the ne from open_list and cop_list.
367     Also remove the ne from keep_alive_queue and active_queue if its context is IN.
368     Only be called when holding the mutex of this NetHandler.
369 
370     @param ne NetEvent to be released.
371    */
372   void stopCop(NetEvent *ne);
373 
374   // Signal the epoll_wait to terminate.
375   void signalActivity() override;
376 
377   /**
378     Release a ne and free it.
379 
380     @param ne NetEvent to be detached.
381    */
382   void free_netevent(NetEvent *ne);
383 
384   NetHandler();
385 
386 private:
387   void _close_ne(NetEvent *ne, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count);
388 
389   /// Static method used as the callback for runtime configuration updates.
390   static int update_nethandler_config(const char *name, RecDataT, RecData data, void *);
391 };
392 
393 static inline NetHandler *
get_NetHandler(EThread * t)394 get_NetHandler(EThread *t)
395 {
396   return (NetHandler *)ETHREAD_GET_PTR(t, unix_netProcessor.netHandler_offset);
397 }
398 static inline PollCont *
get_PollCont(EThread * t)399 get_PollCont(EThread *t)
400 {
401   return (PollCont *)ETHREAD_GET_PTR(t, unix_netProcessor.pollCont_offset);
402 }
403 static inline PollDescriptor *
get_PollDescriptor(EThread * t)404 get_PollDescriptor(EThread *t)
405 {
406   PollCont *p = get_PollCont(t);
407   return p->pollDescriptor;
408 }
409 
410 enum ThrottleType {
411   ACCEPT,
412   CONNECT,
413 };
414 
415 TS_INLINE int
net_connections_to_throttle(ThrottleType t)416 net_connections_to_throttle(ThrottleType t)
417 {
418   double headroom = t == ACCEPT ? NET_THROTTLE_ACCEPT_HEADROOM : NET_THROTTLE_CONNECT_HEADROOM;
419   int64_t sval    = 0;
420 
421   NET_READ_GLOBAL_DYN_SUM(net_connections_currently_open_stat, sval);
422   int currently_open = (int)sval;
423   // deal with race if we got to multiple net threads
424   if (currently_open < 0)
425     currently_open = 0;
426   return (int)(currently_open * headroom);
427 }
428 
429 TS_INLINE void
check_shedding_warning()430 check_shedding_warning()
431 {
432   ink_hrtime t = Thread::get_hrtime();
433   if (t - last_shedding_warning > NET_THROTTLE_MESSAGE_EVERY) {
434     last_shedding_warning = t;
435     RecSignalWarning(REC_SIGNAL_SYSTEM_ERROR, "number of connections reaching shedding limit");
436   }
437 }
438 
439 TS_INLINE bool
check_net_throttle(ThrottleType t)440 check_net_throttle(ThrottleType t)
441 {
442   int connections = net_connections_to_throttle(t);
443 
444   if (net_connections_throttle != 0 && connections >= net_connections_throttle)
445     return true;
446 
447   return false;
448 }
449 
450 TS_INLINE void
check_throttle_warning(ThrottleType type)451 check_throttle_warning(ThrottleType type)
452 {
453   ink_hrtime t = Thread::get_hrtime();
454   if (t - last_throttle_warning > NET_THROTTLE_MESSAGE_EVERY) {
455     last_throttle_warning = t;
456     int connections       = net_connections_to_throttle(type);
457     RecSignalWarning(REC_SIGNAL_SYSTEM_ERROR,
458                      "too many connections, throttling.  connection_type=%s, current_connections=%d, net_connections_throttle=%d",
459                      type == ACCEPT ? "ACCEPT" : "CONNECT", connections, net_connections_throttle);
460   }
461 }
462 
463 TS_INLINE int
change_net_connections_throttle(const char * token,RecDataT data_type,RecData value,void * data)464 change_net_connections_throttle(const char *token, RecDataT data_type, RecData value, void *data)
465 {
466   (void)token;
467   (void)data_type;
468   (void)value;
469   (void)data;
470   int throttle = fds_limit - THROTTLE_FD_HEADROOM;
471   if (fds_throttle == 0) {
472     net_connections_throttle = fds_throttle;
473   } else if (fds_throttle < 0) {
474     net_connections_throttle = throttle;
475   } else {
476     net_connections_throttle = fds_throttle;
477     if (net_connections_throttle > throttle)
478       net_connections_throttle = throttle;
479   }
480   return 0;
481 }
482 
483 // 1  - transient
484 // 0  - report as warning
485 // -1 - fatal
486 TS_INLINE int
accept_error_seriousness(int res)487 accept_error_seriousness(int res)
488 {
489   switch (res) {
490   case -EAGAIN:
491   case -ECONNABORTED:
492   case -ECONNRESET: // for Linux
493   case -EPIPE:      // also for Linux
494     return 1;
495   case -EMFILE:
496   case -ENOMEM:
497 #if defined(ENOSR) && !defined(freebsd)
498   case -ENOSR:
499 #endif
500     ink_assert(!"throttling misconfigured: set too high");
501 #ifdef ENOBUFS
502   // fallthrough
503   case -ENOBUFS:
504 #endif
505 #ifdef ENFILE
506   case -ENFILE:
507 #endif
508     return 0;
509   case -EINTR:
510     ink_assert(!"should be handled at a lower level");
511     return 0;
512 #if defined(EPROTO) && !defined(freebsd)
513   case -EPROTO:
514 #endif
515   case -EOPNOTSUPP:
516   case -ENOTSOCK:
517   case -ENODEV:
518   case -EBADF:
519   default:
520     return -1;
521   }
522 }
523 
524 TS_INLINE void
check_transient_accept_error(int res)525 check_transient_accept_error(int res)
526 {
527   ink_hrtime t = Thread::get_hrtime();
528   if (!last_transient_accept_error || t - last_transient_accept_error > TRANSIENT_ACCEPT_ERROR_MESSAGE_EVERY) {
529     last_transient_accept_error = t;
530     Warning("accept thread received transient error: errno = %d", -res);
531 #if defined(linux)
532     if (res == -ENOBUFS || res == -ENFILE)
533       Warning("errno : %d consider a memory upgrade", -res);
534 #endif
535   }
536 }
537 
538 /** Disable reading on the NetEvent @a ne.
539      @param nh Nethandler that owns @a ne.
540      @param ne The @c NetEvent to modify.
541 
542      - If write is already disable, also disable the inactivity timeout.
543      - clear read enabled flag.
544      - Remove the @c epoll READ flag.
545      - Take @a ne out of the read ready list.
546 */
547 static inline void
read_disable(NetHandler * nh,NetEvent * ne)548 read_disable(NetHandler *nh, NetEvent *ne)
549 {
550   if (!ne->write.enabled) {
551     // Clear the next scheduled inactivity time, but don't clear inactivity_timeout_in,
552     // so the current timeout is used when the NetEvent is reenabled and not the default inactivity timeout
553     ne->next_inactivity_timeout_at = 0;
554     Debug("socket", "read_disable updating inactivity_at %" PRId64 ", NetEvent=%p", ne->next_inactivity_timeout_at, ne);
555   }
556   ne->read.enabled = 0;
557   nh->read_ready_list.remove(ne);
558   ne->ep.modify(-EVENTIO_READ);
559 }
560 
561 /** Disable writing on the NetEvent @a ne.
562      @param nh Nethandler that owns @a ne.
563      @param ne The @c NetEvent to modify.
564 
565      - If read is already disable, also disable the inactivity timeout.
566      - clear write enabled flag.
567      - Remove the @c epoll WRITE flag.
568      - Take @a ne out of the write ready list.
569 */
570 static inline void
write_disable(NetHandler * nh,NetEvent * ne)571 write_disable(NetHandler *nh, NetEvent *ne)
572 {
573   if (!ne->read.enabled) {
574     // Clear the next scheduled inactivity time, but don't clear inactivity_timeout_in,
575     // so the current timeout is used when the NetEvent is reenabled and not the default inactivity timeout
576     ne->next_inactivity_timeout_at = 0;
577     Debug("socket", "write_disable updating inactivity_at %" PRId64 ", NetEvent=%p", ne->next_inactivity_timeout_at, ne);
578   }
579   ne->write.enabled = 0;
580   nh->write_ready_list.remove(ne);
581   ne->ep.modify(-EVENTIO_WRITE);
582 }
583 
584 TS_INLINE int
start(EventLoop l,DNSConnection * vc,int events)585 EventIO::start(EventLoop l, DNSConnection *vc, int events)
586 {
587   type        = EVENTIO_DNS_CONNECTION;
588   data.dnscon = vc;
589   return start_common(l, vc->fd, events);
590 }
591 TS_INLINE int
start(EventLoop l,NetAccept * vc,int events)592 EventIO::start(EventLoop l, NetAccept *vc, int events)
593 {
594   type    = EVENTIO_NETACCEPT;
595   data.na = vc;
596   return start_common(l, vc->server.fd, events);
597 }
598 TS_INLINE int
start(EventLoop l,NetEvent * ne,int events)599 EventIO::start(EventLoop l, NetEvent *ne, int events)
600 {
601   type    = EVENTIO_READWRITE_VC;
602   data.ne = ne;
603   return start_common(l, ne->get_fd(), events);
604 }
605 
606 TS_INLINE int
start(EventLoop l,UnixUDPConnection * vc,int events)607 EventIO::start(EventLoop l, UnixUDPConnection *vc, int events)
608 {
609   type    = EVENTIO_UDP_CONNECTION;
610   data.uc = vc;
611   return start_common(l, vc->fd, events);
612 }
613 
614 TS_INLINE int
close()615 EventIO::close()
616 {
617   if (!this->syscall) {
618     return 0;
619   }
620 
621   stop();
622   switch (type) {
623   default:
624     ink_assert(!"case");
625   // fallthrough
626   case EVENTIO_DNS_CONNECTION:
627     return data.dnscon->close();
628     break;
629   case EVENTIO_NETACCEPT:
630     return data.na->server.close();
631     break;
632   case EVENTIO_READWRITE_VC:
633     return data.ne->close();
634     break;
635   }
636   return -1;
637 }
638 
639 TS_INLINE int
start(EventLoop l,int afd,NetEvent * ne,int e)640 EventIO::start(EventLoop l, int afd, NetEvent *ne, int e)
641 {
642   data.ne = ne;
643   return start_common(l, afd, e);
644 }
645 
646 TS_INLINE int
start_common(EventLoop l,int afd,int e)647 EventIO::start_common(EventLoop l, int afd, int e)
648 {
649   if (!this->syscall) {
650     return 0;
651   }
652 
653   fd         = afd;
654   event_loop = l;
655 #if TS_USE_EPOLL
656   struct epoll_event ev;
657   memset(&ev, 0, sizeof(ev));
658   ev.events   = e | EPOLLEXCLUSIVE;
659   ev.data.ptr = this;
660 #ifndef USE_EDGE_TRIGGER
661   events = e;
662 #endif
663   return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
664 #endif
665 #if TS_USE_KQUEUE
666   events = e;
667   struct kevent ev[2];
668   int n = 0;
669   if (e & EVENTIO_READ)
670     EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
671   if (e & EVENTIO_WRITE)
672     EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
673   return kevent(l->kqueue_fd, &ev[0], n, nullptr, 0, nullptr);
674 #endif
675 #if TS_USE_PORT
676   events     = e;
677   int retval = port_associate(event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
678   Debug("iocore_eventio", "[EventIO::start] e(%d), events(%d), %d[%s]=port_associate(%d,%d,%d,%d,%p)", e, events, retval,
679         retval < 0 ? strerror(errno) : "ok", event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
680   return retval;
681 #endif
682 }
683 
684 TS_INLINE int
modify(int e)685 EventIO::modify(int e)
686 {
687   if (!this->syscall) {
688     return 0;
689   }
690 
691   ink_assert(event_loop);
692 #if TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER)
693   struct epoll_event ev;
694   memset(&ev, 0, sizeof(ev));
695   int new_events = events, old_events = events;
696   if (e < 0)
697     new_events &= ~(-e);
698   else
699     new_events |= e;
700   events      = new_events;
701   ev.events   = new_events;
702   ev.data.ptr = this;
703   if (!new_events)
704     return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev);
705   else if (!old_events)
706     return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
707   else
708     return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_MOD, fd, &ev);
709 #endif
710 #if TS_USE_KQUEUE && !defined(USE_EDGE_TRIGGER)
711   int n = 0;
712   struct kevent ev[2];
713   int ee = events;
714   if (e < 0) {
715     ee &= ~(-e);
716     if ((-e) & EVENTIO_READ)
717       EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, this);
718     if ((-e) & EVENTIO_WRITE)
719       EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, this);
720   } else {
721     ee |= e;
722     if (e & EVENTIO_READ)
723       EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
724     if (e & EVENTIO_WRITE)
725       EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
726   }
727   events = ee;
728   if (n)
729     return kevent(event_loop->kqueue_fd, &ev[0], n, nullptr, 0, nullptr);
730   else
731     return 0;
732 #endif
733 #if TS_USE_PORT
734   int n  = 0;
735   int ne = e;
736   if (e < 0) {
737     if (((-e) & events)) {
738       ne = ~(-e) & events;
739       if ((-e) & EVENTIO_READ)
740         n++;
741       if ((-e) & EVENTIO_WRITE)
742         n++;
743     }
744   } else {
745     if (!(e & events)) {
746       ne = events | e;
747       if (e & EVENTIO_READ)
748         n++;
749       if (e & EVENTIO_WRITE)
750         n++;
751     }
752   }
753   if (n && ne && event_loop) {
754     events     = ne;
755     int retval = port_associate(event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
756     Debug("iocore_eventio", "[EventIO::modify] e(%d), ne(%d), events(%d), %d[%s]=port_associate(%d,%d,%d,%d,%p)", e, ne, events,
757           retval, retval < 0 ? strerror(errno) : "ok", event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
758     return retval;
759   }
760   return 0;
761 #else
762   (void)e; // ATS_UNUSED
763   return 0;
764 #endif
765 }
766 
767 TS_INLINE int
refresh(int e)768 EventIO::refresh(int e)
769 {
770   if (!this->syscall) {
771     return 0;
772   }
773 
774   ink_assert(event_loop);
775 #if TS_USE_KQUEUE && defined(USE_EDGE_TRIGGER)
776   e = e & events;
777   struct kevent ev[2];
778   int n = 0;
779   if (e & EVENTIO_READ)
780     EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
781   if (e & EVENTIO_WRITE)
782     EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
783   if (n)
784     return kevent(event_loop->kqueue_fd, &ev[0], n, nullptr, 0, nullptr);
785   else
786     return 0;
787 #endif
788 #if TS_USE_PORT
789   int n  = 0;
790   int ne = e;
791   if ((e & events)) {
792     ne = events | e;
793     if (e & EVENTIO_READ)
794       n++;
795     if (e & EVENTIO_WRITE)
796       n++;
797     if (n && ne && event_loop) {
798       events     = ne;
799       int retval = port_associate(event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
800       Debug("iocore_eventio", "[EventIO::refresh] e(%d), ne(%d), events(%d), %d[%s]=port_associate(%d,%d,%d,%d,%p)", e, ne, events,
801             retval, retval < 0 ? strerror(errno) : "ok", event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
802       return retval;
803     }
804   }
805   return 0;
806 #else
807   (void)e; // ATS_UNUSED
808   return 0;
809 #endif
810 }
811 
812 TS_INLINE int
stop()813 EventIO::stop()
814 {
815   if (!this->syscall) {
816     return 0;
817   }
818   if (event_loop) {
819     int retval = 0;
820 #if TS_USE_EPOLL
821     struct epoll_event ev;
822     memset(&ev, 0, sizeof(struct epoll_event));
823     ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
824     retval    = epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev);
825 #endif
826 #if TS_USE_PORT
827     retval = port_dissociate(event_loop->port_fd, PORT_SOURCE_FD, fd);
828     Debug("iocore_eventio", "[EventIO::stop] %d[%s]=port_dissociate(%d,%d,%d)", retval, retval < 0 ? strerror(errno) : "ok",
829           event_loop->port_fd, PORT_SOURCE_FD, fd);
830 #endif
831     event_loop = nullptr;
832     return retval;
833   }
834   return 0;
835 }
836 
837 TS_INLINE int
startIO(NetEvent * ne)838 NetHandler::startIO(NetEvent *ne)
839 {
840   ink_assert(this->mutex->thread_holding == this_ethread());
841   ink_assert(ne->get_thread() == this_ethread());
842   int res = 0;
843 
844   PollDescriptor *pd = get_PollDescriptor(this->thread);
845   if (ne->ep.start(pd, ne, EVENTIO_READ | EVENTIO_WRITE) < 0) {
846     res = errno;
847     // EEXIST should be ok, though it should have been cleared before we got back here
848     if (errno != EEXIST) {
849       Debug("iocore_net", "NetHandler::startIO : failed on EventIO::start, errno = [%d](%s)", errno, strerror(errno));
850       return -res;
851     }
852   }
853 
854   if (ne->read.triggered == 1) {
855     read_ready_list.enqueue(ne);
856   }
857   ne->nh = this;
858   return res;
859 }
860 
861 TS_INLINE void
stopIO(NetEvent * ne)862 NetHandler::stopIO(NetEvent *ne)
863 {
864   ink_release_assert(ne->nh == this);
865 
866   ne->ep.stop();
867 
868   read_ready_list.remove(ne);
869   write_ready_list.remove(ne);
870   if (ne->read.in_enabled_list) {
871     read_enable_list.remove(ne);
872     ne->read.in_enabled_list = 0;
873   }
874   if (ne->write.in_enabled_list) {
875     write_enable_list.remove(ne);
876     ne->write.in_enabled_list = 0;
877   }
878 
879   ne->nh = nullptr;
880 }
881 
882 TS_INLINE void
startCop(NetEvent * ne)883 NetHandler::startCop(NetEvent *ne)
884 {
885   ink_assert(this->mutex->thread_holding == this_ethread());
886   ink_release_assert(ne->nh == this);
887   ink_assert(!open_list.in(ne));
888 
889   open_list.enqueue(ne);
890 }
891 
892 TS_INLINE void
stopCop(NetEvent * ne)893 NetHandler::stopCop(NetEvent *ne)
894 {
895   ink_release_assert(ne->nh == this);
896 
897   open_list.remove(ne);
898   cop_list.remove(ne);
899   remove_from_keep_alive_queue(ne);
900   remove_from_active_queue(ne);
901 }
902