1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #pragma once
25
26 #include <bitset>
27
28 #include "tscore/ink_platform.h"
29
30 #define USE_EDGE_TRIGGER_EPOLL 1
31 #define USE_EDGE_TRIGGER_KQUEUE 1
32 #define USE_EDGE_TRIGGER_PORT 1
33
34 #define EVENTIO_NETACCEPT 1
35 #define EVENTIO_READWRITE_VC 2
36 #define EVENTIO_DNS_CONNECTION 3
37 #define EVENTIO_UDP_CONNECTION 4
38 #define EVENTIO_ASYNC_SIGNAL 5
39
40 #if TS_USE_EPOLL
41 #ifndef EPOLLEXCLUSIVE
42 #define EPOLLEXCLUSIVE 0
43 #endif
44 #ifdef USE_EDGE_TRIGGER_EPOLL
45 #define USE_EDGE_TRIGGER 1
46 #define EVENTIO_READ (EPOLLIN | EPOLLET)
47 #define EVENTIO_WRITE (EPOLLOUT | EPOLLET)
48 #else
49 #define EVENTIO_READ EPOLLIN
50 #define EVENTIO_WRITE EPOLLOUT
51 #endif
52 #define EVENTIO_ERROR (EPOLLERR | EPOLLPRI | EPOLLHUP)
53 #endif
54
55 #if TS_USE_KQUEUE
56 #ifdef USE_EDGE_TRIGGER_KQUEUE
57 #define USE_EDGE_TRIGGER 1
58 #define INK_EV_EDGE_TRIGGER EV_CLEAR
59 #else
60 #define INK_EV_EDGE_TRIGGER 0
61 #endif
62 #define EVENTIO_READ INK_EVP_IN
63 #define EVENTIO_WRITE INK_EVP_OUT
64 #define EVENTIO_ERROR (0x010 | 0x002 | 0x020) // ERR PRI HUP
65 #endif
66 #if TS_USE_PORT
67 #ifdef USE_EDGE_TRIGGER_PORT
68 #define USE_EDGE_TRIGGER 1
69 #endif
70 #define EVENTIO_READ POLLIN
71 #define EVENTIO_WRITE POLLOUT
72 #define EVENTIO_ERROR (POLLERR | POLLPRI | POLLHUP)
73 #endif
74
75 struct PollDescriptor;
76 typedef PollDescriptor *EventLoop;
77
78 class NetEvent;
79 class UnixUDPConnection;
80 struct DNSConnection;
81 struct NetAccept;
82
83 /// Unified API for setting and clearing kernel and epoll events.
84 struct EventIO {
85 int fd = -1; ///< file descriptor, often a system port
86 #if TS_USE_KQUEUE || TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER) || TS_USE_PORT
87 int events = 0; ///< a bit mask of enabled events
88 #endif
89 EventLoop event_loop = nullptr; ///< the assigned event loop
90 bool syscall = true; ///< if false, disable all functionality (for QUIC)
91 int type = 0; ///< class identifier of union data.
92 union {
93 Continuation *c;
94 NetEvent *ne;
95 DNSConnection *dnscon;
96 NetAccept *na;
97 UnixUDPConnection *uc;
98 } data; ///< a kind of continuation
99
100 /** The start methods all logically Setup a class to be called
101 when a file descriptor is available for read or write.
102 The type of the classes vary. Generally the file descriptor
103 is pulled from the class, but there is one option that lets
104 the file descriptor be expressed directly.
105 @param l the event loop
106 @param events a mask of flags (for details `man epoll_ctl`)
107 @return int the number of events created, -1 is error
108 */
109 int start(EventLoop l, DNSConnection *vc, int events);
110 int start(EventLoop l, NetAccept *vc, int events);
111 int start(EventLoop l, NetEvent *ne, int events);
112 int start(EventLoop l, UnixUDPConnection *vc, int events);
113 int start(EventLoop l, int fd, NetEvent *ne, int events);
114 int start_common(EventLoop l, int fd, int events);
115
116 /** Alter the events that will trigger the continuation, for level triggered I/O.
117 @param events add with positive mask(+EVENTIO_READ), or remove with negative mask (-EVENTIO_READ)
118 @return int the number of events created, -1 is error
119 */
120 int modify(int events);
121
122 /** Refresh the existing events (i.e. KQUEUE EV_CLEAR), for edge triggered I/O
123 @param events mask of events
124 @return int the number of events created, -1 is error
125 */
126 int refresh(int events);
127
128 /// Remove the kernel or epoll event. Returns 0 on success.
129 int stop();
130
131 /// Remove the epoll event and close the connection. Returns 0 on success.
132 int close();
133
EventIOEventIO134 EventIO() { data.c = nullptr; }
135 };
136
137 #include "P_Net.h"
138 #include "P_UnixNetProcessor.h"
139 #include "P_UnixNetVConnection.h"
140 #include "P_NetAccept.h"
141 #include "P_DNSConnection.h"
142 #include "P_UnixUDPConnection.h"
143 #include "P_UnixPollDescriptor.h"
144 #include <limits>
145
146 class NetEvent;
147 class NetHandler;
148 typedef int (NetHandler::*NetContHandler)(int, void *);
149 typedef unsigned int uint32;
150
151 extern ink_hrtime last_throttle_warning;
152 extern ink_hrtime last_shedding_warning;
153 extern ink_hrtime emergency_throttle_time;
154 extern int net_connections_throttle;
155 extern bool net_memory_throttle;
156 extern int fds_throttle;
157 extern int fds_limit;
158 extern ink_hrtime last_transient_accept_error;
159 extern int http_accept_port_number;
160
161 //
162 // Configuration Parameter had to move here to share
163 // between UnixNet and UnixUDPNet or SSLNet modules.
164 // Design notes are in Memo.NetDesign
165 //
166
167 #define THROTTLE_FD_HEADROOM (128 + 64) // CACHE_DB_FDS + 64
168
169 #define TRANSIENT_ACCEPT_ERROR_MESSAGE_EVERY HRTIME_HOURS(24)
170
171 // also the 'throttle connect headroom'
172 #define EMERGENCY_THROTTLE 16
173 #define THROTTLE_AT_ONCE 5
174 #define HYPER_EMERGENCY_THROTTLE 6
175
176 #define NET_THROTTLE_ACCEPT_HEADROOM 1.1 // 10%
177 #define NET_THROTTLE_CONNECT_HEADROOM 1.0 // 0%
178 #define NET_THROTTLE_MESSAGE_EVERY HRTIME_MINUTES(10)
179
180 #define PRINT_IP(x) ((uint8_t *)&(x))[0], ((uint8_t *)&(x))[1], ((uint8_t *)&(x))[2], ((uint8_t *)&(x))[3]
181
182 // function prototype needed for SSLUnixNetVConnection
183 unsigned int net_next_connection_number();
184
185 struct PollCont : public Continuation {
186 NetHandler *net_handler;
187 PollDescriptor *pollDescriptor;
188 PollDescriptor *nextPollDescriptor;
189 int poll_timeout;
190
191 PollCont(Ptr<ProxyMutex> &m, int pt = net_config_poll_timeout);
192 PollCont(Ptr<ProxyMutex> &m, NetHandler *nh, int pt = net_config_poll_timeout);
193 ~PollCont() override;
194 int pollEvent(int, Event *);
195 void do_poll(ink_hrtime timeout);
196 };
197
198 /**
199 NetHandler is the processor of NetEvent for the Net sub-system. The NetHandler
200 is the core component of the Net sub-system. Once started, it is responsible
201 for polling socket fds and perform the I/O tasks in NetEvent.
202
203 The NetHandler is executed periodically to perform read/write tasks for
204 NetVConnection. The NetHandler::mainNetEvent() should be viewed as a part of
205 EThread::execute() loop. This is the reason that Net System is a sub-system.
206
207 By get_NetHandler(this_ethread()), you can get the NetHandler object that
208 runs inside the current EThread and then @c startIO / @c stopIO which
209 assign/release a NetEvent to/from NetHandler. Before you call these functions,
210 holding the mutex of this NetHandler is required.
211
212 The NetVConnection provides a set of do_io functions through which you can
213 specify continuations to be called back by its NetHandler. These function
214 calls do not block. Instead they return an VIO object and schedule the
215 callback to the continuation passed in when there are I/O events occurred.
216
217 Multi-thread scheduler:
218
219 The NetHandler should be viewed as multi-threaded schedulers which process
220 NetEvents from their queues. If vc wants to be managed by NetHandler, the vc
221 should be derived from NetEvent. The vc can be made of NetProcessor (allocate_vc)
222 either by directly adding a NetEvent to the queue (NetHandler::startIO), or more
223 conveniently, calling a method service call (NetProcessor::connect_re) which
224 synthesizes the NetEvent and places it in the queue.
225
226 Callback event codes:
227
228 These event codes for do_io_read and reenable(read VIO) task:
229 VC_EVENT_READ_READY, VC_EVENT_READ_COMPLETE,
230 VC_EVENT_EOS, VC_EVENT_ERROR
231
232 These event codes for do_io_write and reenable(write VIO) task:
233 VC_EVENT_WRITE_READY, VC_EVENT_WRITE_COMPLETE
234 VC_EVENT_ERROR
235
236 There is no event and callback for do_io_shutdown / do_io_close task.
237
238 NetVConnection allocation policy:
239
240 VCs are allocated by the NetProcessor and deallocated by NetHandler.
241 A state machine may access the returned, non-recurring NetEvent / VIO until
242 it is closed by do_io_close. For recurring NetEvent, the NetEvent may be
243 accessed until it is closed. Once the NetEvent is closed, it's the
244 NetHandler's responsibility to deallocate it.
245
246 Before assign to NetHandler or after release from NetHandler, it's the
247 NetEvent's responsibility to deallocate itself.
248
249 */
250
251 //
252 // NetHandler
253 //
254 // A NetHandler handles the Network IO operations. It maintains
255 // lists of operations at multiples of it's periodicity.
256 //
257 class NetHandler : public Continuation, public EThread::LoopTailHandler
258 {
259 using self_type = NetHandler; ///< Self reference type.
260 public:
261 // @a thread and @a trigger_event are redundant - you can get the former from the latter.
262 // If we don't get rid of @a trigger_event we should remove @a thread.
263 EThread *thread = nullptr;
264 Event *trigger_event = nullptr;
265 QueM(NetEvent, NetState, read, ready_link) read_ready_list;
266 QueM(NetEvent, NetState, write, ready_link) write_ready_list;
267 Que(NetEvent, open_link) open_list;
268 DList(NetEvent, cop_link) cop_list;
269 ASLLM(NetEvent, NetState, read, enable_link) read_enable_list;
270 ASLLM(NetEvent, NetState, write, enable_link) write_enable_list;
271 Que(NetEvent, keep_alive_queue_link) keep_alive_queue;
272 uint32_t keep_alive_queue_size = 0;
273 Que(NetEvent, active_queue_link) active_queue;
274 uint32_t active_queue_size = 0;
275
276 /// configuration settings for managing the active and keep-alive queues
277 struct Config {
278 uint32_t max_connections_in = 0;
279 uint32_t max_requests_in = 0;
280 uint32_t inactive_threshold_in = 0;
281 uint32_t transaction_no_activity_timeout_in = 0;
282 uint32_t keep_alive_no_activity_timeout_in = 0;
283 uint32_t default_inactivity_timeout = 0;
284
285 /** Return the address of the first value in this struct.
286
287 Doing updates is much easier if we treat this config struct as an array.
288 Making it a method means the knowledge of which member is the first one
289 is localized to this struct, not scattered about.
290 */
291 uint32_t &
292 operator[](int n)
293 {
294 return *(&max_connections_in + n);
295 }
296 };
297 /** Static global config, set and updated per process.
298
299 This is updated asynchronously and then events are sent to the NetHandler instances per thread
300 to copy to the per thread config at a convenient time. Because these are updated independently
301 from the command line, the update events just copy a single value from the global to the
302 local. This mechanism relies on members being identical types.
303 */
304 static Config global_config;
305 Config config; ///< Per thread copy of the @c global_config
306 // Active and keep alive queue values that depend on other configuration values.
307 // These are never updated directly, they are computed from other config values.
308 uint32_t max_connections_per_thread_in = 0;
309 uint32_t max_requests_per_thread_in = 0;
310 /// Number of configuration items in @c Config.
311 static constexpr int CONFIG_ITEM_COUNT = sizeof(Config) / sizeof(uint32_t);
312 /// Which members of @c Config the per thread values depend on.
313 /// If one of these is updated, the per thread values must also be updated.
314 static const std::bitset<CONFIG_ITEM_COUNT> config_value_affects_per_thread_value;
315 /// Set of thread types in which nethandlers are active.
316 /// This enables signaling the correct instances when the configuration is updated.
317 /// Event type threads that use @c NetHandler must set the corresponding bit.
318 static std::bitset<std::numeric_limits<unsigned int>::digits> active_thread_types;
319
320 int mainNetEvent(int event, Event *data);
321 int waitForActivity(ink_hrtime timeout) override;
322 void process_enabled_list();
323 void process_ready_list();
324 void manage_keep_alive_queue();
325 bool manage_active_queue(NetEvent *ne, bool ignore_queue_size);
326 void add_to_keep_alive_queue(NetEvent *ne);
327 void remove_from_keep_alive_queue(NetEvent *ne);
328 bool add_to_active_queue(NetEvent *ne);
329 void remove_from_active_queue(NetEvent *ne);
330
331 /// Per process initialization logic.
332 static void init_for_process();
333 /// Update configuration values that are per thread and depend on other configuration values.
334 void configure_per_thread_values();
335
336 /**
337 Start to handle read & write event on a NetEvent.
338 Initial the socket fd of ne for polling system.
339 Only be called when holding the mutex of this NetHandler.
340
341 @param ne NetEvent to be managed by this NetHandler.
342 @return 0 on success, ne->nh set to this NetHandler.
343 -ERRNO on failure.
344 */
345 int startIO(NetEvent *ne);
346 /**
347 Stop to handle read & write event on a NetEvent.
348 Remove the socket fd of ne from polling system.
349 Only be called when holding the mutex of this NetHandler and must call stopCop(ne) first.
350
351 @param ne NetEvent to be released.
352 @return ne->nh set to nullptr.
353 */
354 void stopIO(NetEvent *ne);
355
356 /**
357 Start to handle active timeout and inactivity timeout on a NetEvent.
358 Put the ne into open_list. All NetEvents in the open_list is checked for timeout by InactivityCop.
359 Only be called when holding the mutex of this NetHandler and must call startIO(ne) first.
360
361 @param ne NetEvent to be managed by InactivityCop
362 */
363 void startCop(NetEvent *ne);
364 /**
365 Stop to handle active timeout and inactivity on a NetEvent.
366 Remove the ne from open_list and cop_list.
367 Also remove the ne from keep_alive_queue and active_queue if its context is IN.
368 Only be called when holding the mutex of this NetHandler.
369
370 @param ne NetEvent to be released.
371 */
372 void stopCop(NetEvent *ne);
373
374 // Signal the epoll_wait to terminate.
375 void signalActivity() override;
376
377 /**
378 Release a ne and free it.
379
380 @param ne NetEvent to be detached.
381 */
382 void free_netevent(NetEvent *ne);
383
384 NetHandler();
385
386 private:
387 void _close_ne(NetEvent *ne, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count);
388
389 /// Static method used as the callback for runtime configuration updates.
390 static int update_nethandler_config(const char *name, RecDataT, RecData data, void *);
391 };
392
393 static inline NetHandler *
get_NetHandler(EThread * t)394 get_NetHandler(EThread *t)
395 {
396 return (NetHandler *)ETHREAD_GET_PTR(t, unix_netProcessor.netHandler_offset);
397 }
398 static inline PollCont *
get_PollCont(EThread * t)399 get_PollCont(EThread *t)
400 {
401 return (PollCont *)ETHREAD_GET_PTR(t, unix_netProcessor.pollCont_offset);
402 }
403 static inline PollDescriptor *
get_PollDescriptor(EThread * t)404 get_PollDescriptor(EThread *t)
405 {
406 PollCont *p = get_PollCont(t);
407 return p->pollDescriptor;
408 }
409
410 enum ThrottleType {
411 ACCEPT,
412 CONNECT,
413 };
414
415 TS_INLINE int
net_connections_to_throttle(ThrottleType t)416 net_connections_to_throttle(ThrottleType t)
417 {
418 double headroom = t == ACCEPT ? NET_THROTTLE_ACCEPT_HEADROOM : NET_THROTTLE_CONNECT_HEADROOM;
419 int64_t sval = 0;
420
421 NET_READ_GLOBAL_DYN_SUM(net_connections_currently_open_stat, sval);
422 int currently_open = (int)sval;
423 // deal with race if we got to multiple net threads
424 if (currently_open < 0)
425 currently_open = 0;
426 return (int)(currently_open * headroom);
427 }
428
429 TS_INLINE void
check_shedding_warning()430 check_shedding_warning()
431 {
432 ink_hrtime t = Thread::get_hrtime();
433 if (t - last_shedding_warning > NET_THROTTLE_MESSAGE_EVERY) {
434 last_shedding_warning = t;
435 RecSignalWarning(REC_SIGNAL_SYSTEM_ERROR, "number of connections reaching shedding limit");
436 }
437 }
438
439 TS_INLINE bool
check_net_throttle(ThrottleType t)440 check_net_throttle(ThrottleType t)
441 {
442 int connections = net_connections_to_throttle(t);
443
444 if (net_connections_throttle != 0 && connections >= net_connections_throttle)
445 return true;
446
447 return false;
448 }
449
450 TS_INLINE void
check_throttle_warning(ThrottleType type)451 check_throttle_warning(ThrottleType type)
452 {
453 ink_hrtime t = Thread::get_hrtime();
454 if (t - last_throttle_warning > NET_THROTTLE_MESSAGE_EVERY) {
455 last_throttle_warning = t;
456 int connections = net_connections_to_throttle(type);
457 RecSignalWarning(REC_SIGNAL_SYSTEM_ERROR,
458 "too many connections, throttling. connection_type=%s, current_connections=%d, net_connections_throttle=%d",
459 type == ACCEPT ? "ACCEPT" : "CONNECT", connections, net_connections_throttle);
460 }
461 }
462
463 TS_INLINE int
change_net_connections_throttle(const char * token,RecDataT data_type,RecData value,void * data)464 change_net_connections_throttle(const char *token, RecDataT data_type, RecData value, void *data)
465 {
466 (void)token;
467 (void)data_type;
468 (void)value;
469 (void)data;
470 int throttle = fds_limit - THROTTLE_FD_HEADROOM;
471 if (fds_throttle == 0) {
472 net_connections_throttle = fds_throttle;
473 } else if (fds_throttle < 0) {
474 net_connections_throttle = throttle;
475 } else {
476 net_connections_throttle = fds_throttle;
477 if (net_connections_throttle > throttle)
478 net_connections_throttle = throttle;
479 }
480 return 0;
481 }
482
483 // 1 - transient
484 // 0 - report as warning
485 // -1 - fatal
486 TS_INLINE int
accept_error_seriousness(int res)487 accept_error_seriousness(int res)
488 {
489 switch (res) {
490 case -EAGAIN:
491 case -ECONNABORTED:
492 case -ECONNRESET: // for Linux
493 case -EPIPE: // also for Linux
494 return 1;
495 case -EMFILE:
496 case -ENOMEM:
497 #if defined(ENOSR) && !defined(freebsd)
498 case -ENOSR:
499 #endif
500 ink_assert(!"throttling misconfigured: set too high");
501 #ifdef ENOBUFS
502 // fallthrough
503 case -ENOBUFS:
504 #endif
505 #ifdef ENFILE
506 case -ENFILE:
507 #endif
508 return 0;
509 case -EINTR:
510 ink_assert(!"should be handled at a lower level");
511 return 0;
512 #if defined(EPROTO) && !defined(freebsd)
513 case -EPROTO:
514 #endif
515 case -EOPNOTSUPP:
516 case -ENOTSOCK:
517 case -ENODEV:
518 case -EBADF:
519 default:
520 return -1;
521 }
522 }
523
524 TS_INLINE void
check_transient_accept_error(int res)525 check_transient_accept_error(int res)
526 {
527 ink_hrtime t = Thread::get_hrtime();
528 if (!last_transient_accept_error || t - last_transient_accept_error > TRANSIENT_ACCEPT_ERROR_MESSAGE_EVERY) {
529 last_transient_accept_error = t;
530 Warning("accept thread received transient error: errno = %d", -res);
531 #if defined(linux)
532 if (res == -ENOBUFS || res == -ENFILE)
533 Warning("errno : %d consider a memory upgrade", -res);
534 #endif
535 }
536 }
537
538 /** Disable reading on the NetEvent @a ne.
539 @param nh Nethandler that owns @a ne.
540 @param ne The @c NetEvent to modify.
541
542 - If write is already disable, also disable the inactivity timeout.
543 - clear read enabled flag.
544 - Remove the @c epoll READ flag.
545 - Take @a ne out of the read ready list.
546 */
547 static inline void
read_disable(NetHandler * nh,NetEvent * ne)548 read_disable(NetHandler *nh, NetEvent *ne)
549 {
550 if (!ne->write.enabled) {
551 // Clear the next scheduled inactivity time, but don't clear inactivity_timeout_in,
552 // so the current timeout is used when the NetEvent is reenabled and not the default inactivity timeout
553 ne->next_inactivity_timeout_at = 0;
554 Debug("socket", "read_disable updating inactivity_at %" PRId64 ", NetEvent=%p", ne->next_inactivity_timeout_at, ne);
555 }
556 ne->read.enabled = 0;
557 nh->read_ready_list.remove(ne);
558 ne->ep.modify(-EVENTIO_READ);
559 }
560
561 /** Disable writing on the NetEvent @a ne.
562 @param nh Nethandler that owns @a ne.
563 @param ne The @c NetEvent to modify.
564
565 - If read is already disable, also disable the inactivity timeout.
566 - clear write enabled flag.
567 - Remove the @c epoll WRITE flag.
568 - Take @a ne out of the write ready list.
569 */
570 static inline void
write_disable(NetHandler * nh,NetEvent * ne)571 write_disable(NetHandler *nh, NetEvent *ne)
572 {
573 if (!ne->read.enabled) {
574 // Clear the next scheduled inactivity time, but don't clear inactivity_timeout_in,
575 // so the current timeout is used when the NetEvent is reenabled and not the default inactivity timeout
576 ne->next_inactivity_timeout_at = 0;
577 Debug("socket", "write_disable updating inactivity_at %" PRId64 ", NetEvent=%p", ne->next_inactivity_timeout_at, ne);
578 }
579 ne->write.enabled = 0;
580 nh->write_ready_list.remove(ne);
581 ne->ep.modify(-EVENTIO_WRITE);
582 }
583
584 TS_INLINE int
start(EventLoop l,DNSConnection * vc,int events)585 EventIO::start(EventLoop l, DNSConnection *vc, int events)
586 {
587 type = EVENTIO_DNS_CONNECTION;
588 data.dnscon = vc;
589 return start_common(l, vc->fd, events);
590 }
591 TS_INLINE int
start(EventLoop l,NetAccept * vc,int events)592 EventIO::start(EventLoop l, NetAccept *vc, int events)
593 {
594 type = EVENTIO_NETACCEPT;
595 data.na = vc;
596 return start_common(l, vc->server.fd, events);
597 }
598 TS_INLINE int
start(EventLoop l,NetEvent * ne,int events)599 EventIO::start(EventLoop l, NetEvent *ne, int events)
600 {
601 type = EVENTIO_READWRITE_VC;
602 data.ne = ne;
603 return start_common(l, ne->get_fd(), events);
604 }
605
606 TS_INLINE int
start(EventLoop l,UnixUDPConnection * vc,int events)607 EventIO::start(EventLoop l, UnixUDPConnection *vc, int events)
608 {
609 type = EVENTIO_UDP_CONNECTION;
610 data.uc = vc;
611 return start_common(l, vc->fd, events);
612 }
613
614 TS_INLINE int
close()615 EventIO::close()
616 {
617 if (!this->syscall) {
618 return 0;
619 }
620
621 stop();
622 switch (type) {
623 default:
624 ink_assert(!"case");
625 // fallthrough
626 case EVENTIO_DNS_CONNECTION:
627 return data.dnscon->close();
628 break;
629 case EVENTIO_NETACCEPT:
630 return data.na->server.close();
631 break;
632 case EVENTIO_READWRITE_VC:
633 return data.ne->close();
634 break;
635 }
636 return -1;
637 }
638
639 TS_INLINE int
start(EventLoop l,int afd,NetEvent * ne,int e)640 EventIO::start(EventLoop l, int afd, NetEvent *ne, int e)
641 {
642 data.ne = ne;
643 return start_common(l, afd, e);
644 }
645
646 TS_INLINE int
start_common(EventLoop l,int afd,int e)647 EventIO::start_common(EventLoop l, int afd, int e)
648 {
649 if (!this->syscall) {
650 return 0;
651 }
652
653 fd = afd;
654 event_loop = l;
655 #if TS_USE_EPOLL
656 struct epoll_event ev;
657 memset(&ev, 0, sizeof(ev));
658 ev.events = e | EPOLLEXCLUSIVE;
659 ev.data.ptr = this;
660 #ifndef USE_EDGE_TRIGGER
661 events = e;
662 #endif
663 return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
664 #endif
665 #if TS_USE_KQUEUE
666 events = e;
667 struct kevent ev[2];
668 int n = 0;
669 if (e & EVENTIO_READ)
670 EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
671 if (e & EVENTIO_WRITE)
672 EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
673 return kevent(l->kqueue_fd, &ev[0], n, nullptr, 0, nullptr);
674 #endif
675 #if TS_USE_PORT
676 events = e;
677 int retval = port_associate(event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
678 Debug("iocore_eventio", "[EventIO::start] e(%d), events(%d), %d[%s]=port_associate(%d,%d,%d,%d,%p)", e, events, retval,
679 retval < 0 ? strerror(errno) : "ok", event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
680 return retval;
681 #endif
682 }
683
684 TS_INLINE int
modify(int e)685 EventIO::modify(int e)
686 {
687 if (!this->syscall) {
688 return 0;
689 }
690
691 ink_assert(event_loop);
692 #if TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER)
693 struct epoll_event ev;
694 memset(&ev, 0, sizeof(ev));
695 int new_events = events, old_events = events;
696 if (e < 0)
697 new_events &= ~(-e);
698 else
699 new_events |= e;
700 events = new_events;
701 ev.events = new_events;
702 ev.data.ptr = this;
703 if (!new_events)
704 return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev);
705 else if (!old_events)
706 return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
707 else
708 return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_MOD, fd, &ev);
709 #endif
710 #if TS_USE_KQUEUE && !defined(USE_EDGE_TRIGGER)
711 int n = 0;
712 struct kevent ev[2];
713 int ee = events;
714 if (e < 0) {
715 ee &= ~(-e);
716 if ((-e) & EVENTIO_READ)
717 EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, this);
718 if ((-e) & EVENTIO_WRITE)
719 EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, this);
720 } else {
721 ee |= e;
722 if (e & EVENTIO_READ)
723 EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
724 if (e & EVENTIO_WRITE)
725 EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
726 }
727 events = ee;
728 if (n)
729 return kevent(event_loop->kqueue_fd, &ev[0], n, nullptr, 0, nullptr);
730 else
731 return 0;
732 #endif
733 #if TS_USE_PORT
734 int n = 0;
735 int ne = e;
736 if (e < 0) {
737 if (((-e) & events)) {
738 ne = ~(-e) & events;
739 if ((-e) & EVENTIO_READ)
740 n++;
741 if ((-e) & EVENTIO_WRITE)
742 n++;
743 }
744 } else {
745 if (!(e & events)) {
746 ne = events | e;
747 if (e & EVENTIO_READ)
748 n++;
749 if (e & EVENTIO_WRITE)
750 n++;
751 }
752 }
753 if (n && ne && event_loop) {
754 events = ne;
755 int retval = port_associate(event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
756 Debug("iocore_eventio", "[EventIO::modify] e(%d), ne(%d), events(%d), %d[%s]=port_associate(%d,%d,%d,%d,%p)", e, ne, events,
757 retval, retval < 0 ? strerror(errno) : "ok", event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
758 return retval;
759 }
760 return 0;
761 #else
762 (void)e; // ATS_UNUSED
763 return 0;
764 #endif
765 }
766
767 TS_INLINE int
refresh(int e)768 EventIO::refresh(int e)
769 {
770 if (!this->syscall) {
771 return 0;
772 }
773
774 ink_assert(event_loop);
775 #if TS_USE_KQUEUE && defined(USE_EDGE_TRIGGER)
776 e = e & events;
777 struct kevent ev[2];
778 int n = 0;
779 if (e & EVENTIO_READ)
780 EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
781 if (e & EVENTIO_WRITE)
782 EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this);
783 if (n)
784 return kevent(event_loop->kqueue_fd, &ev[0], n, nullptr, 0, nullptr);
785 else
786 return 0;
787 #endif
788 #if TS_USE_PORT
789 int n = 0;
790 int ne = e;
791 if ((e & events)) {
792 ne = events | e;
793 if (e & EVENTIO_READ)
794 n++;
795 if (e & EVENTIO_WRITE)
796 n++;
797 if (n && ne && event_loop) {
798 events = ne;
799 int retval = port_associate(event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
800 Debug("iocore_eventio", "[EventIO::refresh] e(%d), ne(%d), events(%d), %d[%s]=port_associate(%d,%d,%d,%d,%p)", e, ne, events,
801 retval, retval < 0 ? strerror(errno) : "ok", event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
802 return retval;
803 }
804 }
805 return 0;
806 #else
807 (void)e; // ATS_UNUSED
808 return 0;
809 #endif
810 }
811
812 TS_INLINE int
stop()813 EventIO::stop()
814 {
815 if (!this->syscall) {
816 return 0;
817 }
818 if (event_loop) {
819 int retval = 0;
820 #if TS_USE_EPOLL
821 struct epoll_event ev;
822 memset(&ev, 0, sizeof(struct epoll_event));
823 ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
824 retval = epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev);
825 #endif
826 #if TS_USE_PORT
827 retval = port_dissociate(event_loop->port_fd, PORT_SOURCE_FD, fd);
828 Debug("iocore_eventio", "[EventIO::stop] %d[%s]=port_dissociate(%d,%d,%d)", retval, retval < 0 ? strerror(errno) : "ok",
829 event_loop->port_fd, PORT_SOURCE_FD, fd);
830 #endif
831 event_loop = nullptr;
832 return retval;
833 }
834 return 0;
835 }
836
837 TS_INLINE int
startIO(NetEvent * ne)838 NetHandler::startIO(NetEvent *ne)
839 {
840 ink_assert(this->mutex->thread_holding == this_ethread());
841 ink_assert(ne->get_thread() == this_ethread());
842 int res = 0;
843
844 PollDescriptor *pd = get_PollDescriptor(this->thread);
845 if (ne->ep.start(pd, ne, EVENTIO_READ | EVENTIO_WRITE) < 0) {
846 res = errno;
847 // EEXIST should be ok, though it should have been cleared before we got back here
848 if (errno != EEXIST) {
849 Debug("iocore_net", "NetHandler::startIO : failed on EventIO::start, errno = [%d](%s)", errno, strerror(errno));
850 return -res;
851 }
852 }
853
854 if (ne->read.triggered == 1) {
855 read_ready_list.enqueue(ne);
856 }
857 ne->nh = this;
858 return res;
859 }
860
861 TS_INLINE void
stopIO(NetEvent * ne)862 NetHandler::stopIO(NetEvent *ne)
863 {
864 ink_release_assert(ne->nh == this);
865
866 ne->ep.stop();
867
868 read_ready_list.remove(ne);
869 write_ready_list.remove(ne);
870 if (ne->read.in_enabled_list) {
871 read_enable_list.remove(ne);
872 ne->read.in_enabled_list = 0;
873 }
874 if (ne->write.in_enabled_list) {
875 write_enable_list.remove(ne);
876 ne->write.in_enabled_list = 0;
877 }
878
879 ne->nh = nullptr;
880 }
881
882 TS_INLINE void
startCop(NetEvent * ne)883 NetHandler::startCop(NetEvent *ne)
884 {
885 ink_assert(this->mutex->thread_holding == this_ethread());
886 ink_release_assert(ne->nh == this);
887 ink_assert(!open_list.in(ne));
888
889 open_list.enqueue(ne);
890 }
891
892 TS_INLINE void
stopCop(NetEvent * ne)893 NetHandler::stopCop(NetEvent *ne)
894 {
895 ink_release_assert(ne->nh == this);
896
897 open_list.remove(ne);
898 cop_list.remove(ne);
899 remove_from_keep_alive_queue(ne);
900 remove_from_active_queue(ne);
901 }
902