1 /*
2  * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 05    Socket Functions */
10 
11 #include "squid.h"
12 
13 #if USE_POLL
14 #include "anyp/PortCfg.h"
15 #include "comm/Connection.h"
16 #include "comm/Loops.h"
17 #include "fd.h"
18 #include "fde.h"
19 #include "globals.h"
20 #include "ICP.h"
21 #include "mgr/Registration.h"
22 #include "profiler/Profiler.h"
23 #include "SquidConfig.h"
24 #include "SquidTime.h"
25 #include "StatCounters.h"
26 #include "Store.h"
27 
28 #include <cerrno>
29 #if HAVE_POLL_H
30 #include <poll.h>
31 #endif
32 
33 /* Needed for poll() on Linux at least */
34 #if USE_POLL
35 #ifndef POLLRDNORM
36 #define POLLRDNORM POLLIN
37 #endif
38 #ifndef POLLWRNORM
39 #define POLLWRNORM POLLOUT
40 #endif
41 #endif
42 
43 static int MAX_POLL_TIME = 1000;    /* see also Comm::QuickPollRequired() */
44 
45 #ifndef        howmany
46 #define howmany(x, y)   (((x)+((y)-1))/(y))
47 #endif
48 #ifndef        NBBY
49 #define        NBBY    8
50 #endif
51 #define FD_MASK_BYTES sizeof(fd_mask)
52 #define FD_MASK_BITS (FD_MASK_BYTES*NBBY)
53 
54 /* STATIC */
55 static int fdIsTcpListen(int fd);
56 static int fdIsUdpListen(int fd);
57 static int fdIsDns(int fd);
58 static OBJH commIncomingStats;
59 static int comm_check_incoming_poll_handlers(int nfds, int *fds);
60 static void comm_poll_dns_incoming(void);
61 
62 /*
63  * Automatic tuning for incoming requests:
64  *
65  * INCOMING sockets are the ICP and HTTP ports.  We need to check these
66  * fairly regularly, but how often?  When the load increases, we
67  * want to check the incoming sockets more often.  If we have a lot
68  * of incoming ICP, then we need to check these sockets more than
69  * if we just have HTTP.
70  *
71  * The variables 'incoming_icp_interval' and 'incoming_http_interval'
72  * determine how many normal I/O events to process before checking
73  * incoming sockets again.  Note we store the incoming_interval
74  * multipled by a factor of (2^INCOMING_FACTOR) to have some
75  * pseudo-floating point precision.
76  *
77  * The variable 'udp_io_events' and 'tcp_io_events' counts how many normal
78  * I/O events have been processed since the last check on the incoming
79  * sockets.  When io_events > incoming_interval, its time to check incoming
80  * sockets.
81  *
82  * Every time we check incoming sockets, we count how many new messages
83  * or connections were processed.  This is used to adjust the
84  * incoming_interval for the next iteration.  The new incoming_interval
85  * is calculated as the current incoming_interval plus what we would
86  * like to see as an average number of events minus the number of
87  * events just processed.
88  *
89  *  incoming_interval = incoming_interval + target_average - number_of_events_processed
90  *
91  * There are separate incoming_interval counters for TCP-based, UDP-based, and DNS events
92  *
93  * You can see the current values of the incoming_interval's, as well as
94  * a histogram of 'incoming_events' by asking the cache manager
95  * for 'comm_incoming', e.g.:
96  *
97  *      % ./client mgr:comm_poll_incoming
98  *
99  * Caveats:
100  *
101  *      - We have MAX_INCOMING_INTEGER as a magic upper limit on
102  *        incoming_interval for both types of sockets.  At the
103  *        largest value the cache will effectively be idling.
104  *
105  *      - The higher the INCOMING_FACTOR, the slower the algorithm will
106  *        respond to load spikes/increases/decreases in demand. A value
107  *        between 3 and 8 is recommended.
108  */
109 
110 #define MAX_INCOMING_INTEGER 256
111 #define INCOMING_FACTOR 5
112 #define MAX_INCOMING_INTERVAL (MAX_INCOMING_INTEGER << INCOMING_FACTOR)
113 static int udp_io_events = 0; ///< I/O events passed since last UDP receiver socket poll
114 static int dns_io_events = 0; ///< I/O events passed since last DNS socket poll
115 static int tcp_io_events = 0; ///< I/O events passed since last TCP listening socket poll
116 static int incoming_udp_interval = 16 << INCOMING_FACTOR;
117 static int incoming_dns_interval = 16 << INCOMING_FACTOR;
118 static int incoming_tcp_interval = 16 << INCOMING_FACTOR;
119 #define commCheckUdpIncoming (++udp_io_events > (incoming_udp_interval>> INCOMING_FACTOR))
120 #define commCheckDnsIncoming (++dns_io_events > (incoming_dns_interval>> INCOMING_FACTOR))
121 #define commCheckTcpIncoming (++tcp_io_events > (incoming_tcp_interval>> INCOMING_FACTOR))
122 
123 void
SetSelect(int fd,unsigned int type,PF * handler,void * client_data,time_t timeout)124 Comm::SetSelect(int fd, unsigned int type, PF * handler, void *client_data, time_t timeout)
125 {
126     fde *F = &fd_table[fd];
127     assert(fd >= 0);
128     assert(F->flags.open || (!handler && !client_data && !timeout));
129     debugs(5, 5, HERE << "FD " << fd << ", type=" << type <<
130            ", handler=" << handler << ", client_data=" << client_data <<
131            ", timeout=" << timeout);
132 
133     if (type & COMM_SELECT_READ) {
134         F->read_handler = handler;
135         F->read_data = client_data;
136     }
137 
138     if (type & COMM_SELECT_WRITE) {
139         F->write_handler = handler;
140         F->write_data = client_data;
141     }
142 
143     if (timeout)
144         F->timeout = squid_curtime + timeout;
145 }
146 
147 static int
fdIsUdpListen(int fd)148 fdIsUdpListen(int fd)
149 {
150     if (icpIncomingConn != NULL && icpIncomingConn->fd == fd)
151         return 1;
152 
153     if (icpOutgoingConn != NULL && icpOutgoingConn->fd == fd)
154         return 1;
155 
156     return 0;
157 }
158 
159 static int
fdIsDns(int fd)160 fdIsDns(int fd)
161 {
162     if (fd == DnsSocketA)
163         return 1;
164 
165     if (fd == DnsSocketB)
166         return 1;
167 
168     return 0;
169 }
170 
171 static int
fdIsTcpListen(int fd)172 fdIsTcpListen(int fd)
173 {
174     for (AnyP::PortCfgPointer s = HttpPortList; s != NULL; s = s->next) {
175         if (s->listenConn != NULL && s->listenConn->fd == fd)
176             return 1;
177     }
178 
179     return 0;
180 }
181 
182 static int
comm_check_incoming_poll_handlers(int nfds,int * fds)183 comm_check_incoming_poll_handlers(int nfds, int *fds)
184 {
185     int i;
186     int fd;
187     PF *hdl = NULL;
188     int npfds;
189 
190     struct pollfd pfds[3 + MAXTCPLISTENPORTS];
191     PROF_start(comm_check_incoming);
192     incoming_sockets_accepted = 0;
193 
194     for (i = npfds = 0; i < nfds; ++i) {
195         int events;
196         fd = fds[i];
197         events = 0;
198 
199         if (fd_table[fd].read_handler)
200             events |= POLLRDNORM;
201 
202         if (fd_table[fd].write_handler)
203             events |= POLLWRNORM;
204 
205         if (events) {
206             pfds[npfds].fd = fd;
207             pfds[npfds].events = events;
208             pfds[npfds].revents = 0;
209             ++npfds;
210         }
211     }
212 
213     if (!nfds) {
214         PROF_stop(comm_check_incoming);
215         return -1;
216     }
217 
218     getCurrentTime();
219     ++ statCounter.syscalls.selects;
220 
221     if (poll(pfds, npfds, 0) < 1) {
222         PROF_stop(comm_check_incoming);
223         return incoming_sockets_accepted;
224     }
225 
226     for (i = 0; i < npfds; ++i) {
227         int revents;
228 
229         if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1))
230             continue;
231 
232         if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
233             if ((hdl = fd_table[fd].read_handler)) {
234                 fd_table[fd].read_handler = NULL;
235                 hdl(fd, fd_table[fd].read_data);
236             } else if (pfds[i].events & POLLRDNORM)
237                 debugs(5, DBG_IMPORTANT, "comm_poll_incoming: FD " << fd << " NULL read handler");
238         }
239 
240         if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
241             if ((hdl = fd_table[fd].write_handler)) {
242                 fd_table[fd].write_handler = NULL;
243                 hdl(fd, fd_table[fd].write_data);
244             } else if (pfds[i].events & POLLWRNORM)
245                 debugs(5, DBG_IMPORTANT, "comm_poll_incoming: FD " << fd << " NULL write_handler");
246         }
247     }
248 
249     PROF_stop(comm_check_incoming);
250     return incoming_sockets_accepted;
251 }
252 
253 static void
comm_poll_udp_incoming(void)254 comm_poll_udp_incoming(void)
255 {
256     int nfds = 0;
257     int fds[2];
258     int nevents;
259     udp_io_events = 0;
260 
261     if (Comm::IsConnOpen(icpIncomingConn)) {
262         fds[nfds] = icpIncomingConn->fd;
263         ++nfds;
264     }
265 
266     if (icpIncomingConn != icpOutgoingConn && Comm::IsConnOpen(icpOutgoingConn)) {
267         fds[nfds] = icpOutgoingConn->fd;
268         ++nfds;
269     }
270 
271     if (nfds == 0)
272         return;
273 
274     nevents = comm_check_incoming_poll_handlers(nfds, fds);
275 
276     incoming_udp_interval += Config.comm_incoming.udp.average - nevents;
277 
278     if (incoming_udp_interval < Config.comm_incoming.udp.min_poll)
279         incoming_udp_interval = Config.comm_incoming.udp.min_poll;
280 
281     if (incoming_udp_interval > MAX_INCOMING_INTERVAL)
282         incoming_udp_interval = MAX_INCOMING_INTERVAL;
283 
284     if (nevents > INCOMING_UDP_MAX)
285         nevents = INCOMING_UDP_MAX;
286 
287     statCounter.comm_udp_incoming.count(nevents);
288 }
289 
290 static void
comm_poll_tcp_incoming(void)291 comm_poll_tcp_incoming(void)
292 {
293     int nfds = 0;
294     int fds[MAXTCPLISTENPORTS];
295     int j;
296     int nevents;
297     tcp_io_events = 0;
298 
299     // XXX: only poll sockets that won't be deferred. But how do we identify them?
300 
301     for (j = 0; j < NHttpSockets; ++j) {
302         if (HttpSockets[j] < 0)
303             continue;
304 
305         fds[nfds] = HttpSockets[j];
306         ++nfds;
307     }
308 
309     nevents = comm_check_incoming_poll_handlers(nfds, fds);
310     incoming_tcp_interval = incoming_tcp_interval
311                             + Config.comm_incoming.tcp.average - nevents;
312 
313     if (incoming_tcp_interval < Config.comm_incoming.tcp.min_poll)
314         incoming_tcp_interval = Config.comm_incoming.tcp.min_poll;
315 
316     if (incoming_tcp_interval > MAX_INCOMING_INTERVAL)
317         incoming_tcp_interval = MAX_INCOMING_INTERVAL;
318 
319     if (nevents > INCOMING_TCP_MAX)
320         nevents = INCOMING_TCP_MAX;
321 
322     statCounter.comm_tcp_incoming.count(nevents);
323 }
324 
325 /* poll all sockets; call handlers for those that are ready. */
326 Comm::Flag
DoSelect(int msec)327 Comm::DoSelect(int msec)
328 {
329     struct pollfd pfds[SQUID_MAXFD];
330 
331     PF *hdl = NULL;
332     int fd;
333     int maxfd;
334     unsigned long nfds;
335     unsigned long npending;
336     int num;
337     int calldns = 0, calludp = 0, calltcp = 0;
338     double timeout = current_dtime + (msec / 1000.0);
339 
340     do {
341         double start;
342         getCurrentTime();
343         start = current_dtime;
344 
345         if (commCheckUdpIncoming)
346             comm_poll_udp_incoming();
347 
348         if (commCheckDnsIncoming)
349             comm_poll_dns_incoming();
350 
351         if (commCheckTcpIncoming)
352             comm_poll_tcp_incoming();
353 
354         PROF_start(comm_poll_prep_pfds);
355 
356         calldns = calludp = calltcp = 0;
357 
358         nfds = 0;
359 
360         npending = 0;
361 
362         maxfd = Biggest_FD + 1;
363 
364         for (int i = 0; i < maxfd; ++i) {
365             int events;
366             events = 0;
367             /* Check each open socket for a handler. */
368 
369             if (fd_table[i].read_handler)
370                 events |= POLLRDNORM;
371 
372             if (fd_table[i].write_handler)
373                 events |= POLLWRNORM;
374 
375             if (events) {
376                 pfds[nfds].fd = i;
377                 pfds[nfds].events = events;
378                 pfds[nfds].revents = 0;
379                 ++nfds;
380 
381                 if ((events & POLLRDNORM) && fd_table[i].flags.read_pending)
382                     ++npending;
383             }
384         }
385 
386         PROF_stop(comm_poll_prep_pfds);
387 
388         if (npending)
389             msec = 0;
390 
391         if (msec > MAX_POLL_TIME)
392             msec = MAX_POLL_TIME;
393 
394         /* nothing to do
395          *
396          * Note that this will only ever trigger when there are no log files
397          * and stdout/err/in are all closed too.
398          */
399         if (nfds == 0 && npending == 0) {
400             if (shutting_down)
401                 return Comm::SHUTDOWN;
402             else
403                 return Comm::IDLE;
404         }
405 
406         for (;;) {
407             PROF_start(comm_poll_normal);
408             ++ statCounter.syscalls.selects;
409             num = poll(pfds, nfds, msec);
410             int xerrno = errno;
411             ++ statCounter.select_loops;
412             PROF_stop(comm_poll_normal);
413 
414             if (num >= 0 || npending > 0)
415                 break;
416 
417             if (ignoreErrno(xerrno))
418                 continue;
419 
420             debugs(5, DBG_CRITICAL, MYNAME << "poll failure: " << xstrerr(xerrno));
421 
422             assert(xerrno != EINVAL);
423 
424             return Comm::COMM_ERROR;
425 
426             /* NOTREACHED */
427         }
428 
429         getCurrentTime();
430 
431         debugs(5, num ? 5 : 8, "comm_poll: " << num << "+" << npending << " FDs ready");
432         statCounter.select_fds_hist.count(num);
433 
434         if (num == 0 && npending == 0)
435             continue;
436 
437         /* scan each socket but the accept socket. Poll this
438          * more frequently to minimize losses due to the 5 connect
439          * limit in SunOS */
440         PROF_start(comm_handle_ready_fd);
441 
442         for (size_t loopIndex = 0; loopIndex < nfds; ++loopIndex) {
443             fde *F;
444             int revents = pfds[loopIndex].revents;
445             fd = pfds[loopIndex].fd;
446 
447             if (fd == -1)
448                 continue;
449 
450             if (fd_table[fd].flags.read_pending)
451                 revents |= POLLIN;
452 
453             if (revents == 0)
454                 continue;
455 
456             if (fdIsUdpListen(fd)) {
457                 calludp = 1;
458                 continue;
459             }
460 
461             if (fdIsDns(fd)) {
462                 calldns = 1;
463                 continue;
464             }
465 
466             if (fdIsTcpListen(fd)) {
467                 calltcp = 1;
468                 continue;
469             }
470 
471             F = &fd_table[fd];
472 
473             if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
474                 debugs(5, 6, "comm_poll: FD " << fd << " ready for reading");
475 
476                 if ((hdl = F->read_handler)) {
477                     PROF_start(comm_read_handler);
478                     F->read_handler = NULL;
479                     F->flags.read_pending = false;
480                     hdl(fd, F->read_data);
481                     PROF_stop(comm_read_handler);
482                     ++ statCounter.select_fds;
483 
484                     if (commCheckUdpIncoming)
485                         comm_poll_udp_incoming();
486 
487                     if (commCheckDnsIncoming)
488                         comm_poll_dns_incoming();
489 
490                     if (commCheckTcpIncoming)
491                         comm_poll_tcp_incoming();
492                 }
493             }
494 
495             if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
496                 debugs(5, 6, "comm_poll: FD " << fd << " ready for writing");
497 
498                 if ((hdl = F->write_handler)) {
499                     PROF_start(comm_write_handler);
500                     F->write_handler = NULL;
501                     hdl(fd, F->write_data);
502                     PROF_stop(comm_write_handler);
503                     ++ statCounter.select_fds;
504 
505                     if (commCheckUdpIncoming)
506                         comm_poll_udp_incoming();
507 
508                     if (commCheckDnsIncoming)
509                         comm_poll_dns_incoming();
510 
511                     if (commCheckTcpIncoming)
512                         comm_poll_tcp_incoming();
513                 }
514             }
515 
516             if (revents & POLLNVAL) {
517                 AsyncCall::Pointer ch;
518                 debugs(5, DBG_CRITICAL, "WARNING: FD " << fd << " has handlers, but it's invalid.");
519                 debugs(5, DBG_CRITICAL, "FD " << fd << " is a " << fdTypeStr[F->type]);
520                 debugs(5, DBG_CRITICAL, "--> " << F->desc);
521                 debugs(5, DBG_CRITICAL, "tmout:" << F->timeoutHandler << "read:" <<
522                        F->read_handler << " write:" << F->write_handler);
523 
524                 for (ch = F->closeHandler; ch != NULL; ch = ch->Next())
525                     debugs(5, DBG_CRITICAL, " close handler: " << ch);
526 
527                 if (F->closeHandler != NULL) {
528                     commCallCloseHandlers(fd);
529                 } else if (F->timeoutHandler != NULL) {
530                     debugs(5, DBG_CRITICAL, "comm_poll: Calling Timeout Handler");
531                     ScheduleCallHere(F->timeoutHandler);
532                 }
533 
534                 F->closeHandler = NULL;
535                 F->timeoutHandler = NULL;
536                 F->read_handler = NULL;
537                 F->write_handler = NULL;
538 
539                 if (F->flags.open)
540                     fd_close(fd);
541             }
542         }
543 
544         PROF_stop(comm_handle_ready_fd);
545 
546         if (calludp)
547             comm_poll_udp_incoming();
548 
549         if (calldns)
550             comm_poll_dns_incoming();
551 
552         if (calltcp)
553             comm_poll_tcp_incoming();
554 
555         getCurrentTime();
556 
557         statCounter.select_time += (current_dtime - start);
558 
559         return Comm::OK;
560     } while (timeout > current_dtime);
561 
562     debugs(5, 8, "comm_poll: time out: " << squid_curtime << ".");
563 
564     return Comm::TIMEOUT;
565 }
566 
567 static void
comm_poll_dns_incoming(void)568 comm_poll_dns_incoming(void)
569 {
570     int nfds = 0;
571     int fds[2];
572     int nevents;
573     dns_io_events = 0;
574 
575     if (DnsSocketA < 0 && DnsSocketB < 0)
576         return;
577 
578     if (DnsSocketA >= 0) {
579         fds[nfds] = DnsSocketA;
580         ++nfds;
581     }
582 
583     if (DnsSocketB >= 0) {
584         fds[nfds] = DnsSocketB;
585         ++nfds;
586     }
587 
588     nevents = comm_check_incoming_poll_handlers(nfds, fds);
589 
590     if (nevents < 0)
591         return;
592 
593     incoming_dns_interval += Config.comm_incoming.dns.average - nevents;
594 
595     if (incoming_dns_interval < Config.comm_incoming.dns.min_poll)
596         incoming_dns_interval = Config.comm_incoming.dns.min_poll;
597 
598     if (incoming_dns_interval > MAX_INCOMING_INTERVAL)
599         incoming_dns_interval = MAX_INCOMING_INTERVAL;
600 
601     if (nevents > INCOMING_DNS_MAX)
602         nevents = INCOMING_DNS_MAX;
603 
604     statCounter.comm_dns_incoming.count(nevents);
605 }
606 
607 static void
commPollRegisterWithCacheManager(void)608 commPollRegisterWithCacheManager(void)
609 {
610     Mgr::RegisterAction("comm_poll_incoming",
611                         "comm_incoming() stats",
612                         commIncomingStats, 0, 1);
613 }
614 
615 void
SelectLoopInit(void)616 Comm::SelectLoopInit(void)
617 {
618     commPollRegisterWithCacheManager();
619 }
620 
621 static void
commIncomingStats(StoreEntry * sentry)622 commIncomingStats(StoreEntry * sentry)
623 {
624     storeAppendPrintf(sentry, "Current incoming_udp_interval: %d\n",
625                       incoming_udp_interval >> INCOMING_FACTOR);
626     storeAppendPrintf(sentry, "Current incoming_dns_interval: %d\n",
627                       incoming_dns_interval >> INCOMING_FACTOR);
628     storeAppendPrintf(sentry, "Current incoming_tcp_interval: %d\n",
629                       incoming_tcp_interval >> INCOMING_FACTOR);
630     storeAppendPrintf(sentry, "\n");
631     storeAppendPrintf(sentry, "Histogram of events per incoming socket type\n");
632     storeAppendPrintf(sentry, "ICP Messages handled per comm_poll_udp_incoming() call:\n");
633     statCounter.comm_udp_incoming.dump(sentry, statHistIntDumper);
634     storeAppendPrintf(sentry, "DNS Messages handled per comm_poll_dns_incoming() call:\n");
635     statCounter.comm_dns_incoming.dump(sentry, statHistIntDumper);
636     storeAppendPrintf(sentry, "HTTP Messages handled per comm_poll_tcp_incoming() call:\n");
637     statCounter.comm_tcp_incoming.dump(sentry, statHistIntDumper);
638 }
639 
640 /* Called by async-io or diskd to speed up the polling */
641 void
QuickPollRequired(void)642 Comm::QuickPollRequired(void)
643 {
644     MAX_POLL_TIME = 10;
645 }
646 
647 #endif /* USE_POLL */
648 
649