1 /*  $Id: chan.c 10523 2021-01-17 21:52:00Z iulius $
2 **
3 **  I/O channel (and buffer) processing.
4 **
5 **  This file is the heart of innd.  Everything innd does is represented by
6 **  channels; waiting for connections, reading from network connections,
7 **  writing to network connections, writing to processes, and writing to files
8 **  are all channel operations.
9 **
10 **  Channels can be in one of three states: reading, writing, or sleeping.
11 **  The first two are represented in the select file descriptor sets.  The
12 **  last sits there until something else wakes the channel up.  CHANreadloop
13 **  is the main I/O loop for innd, calling select and then dispatching control
14 **  to whatever channels have work to do.
15 */
16 
17 #include "config.h"
18 #include "clibrary.h"
19 
20 /* Needed on AIX 4.1 to get fd_set and friends. */
21 #ifdef HAVE_SYS_SELECT_H
22 # include <sys/select.h>
23 #endif
24 
25 #include "inn/fdflag.h"
26 #include "inn/innconf.h"
27 #include "inn/network.h"
28 #include "innd.h"
29 
30 /* These errno values don't exist on all systems, but may be returned as an
31    (ignorable) error to setting the accept socket nonblocking.  Define them
32    to 0 if they don't exist so that we can unconditionally compare errno to
33    them in the code. */
34 #ifndef ENOTSOCK
35 # define ENOTSOCK 0
36 #endif
37 #ifndef ENOTTY
38 # define ENOTTY 0
39 #endif
40 
41 static const char * const timer_name[] = {
42     "idle", "artclean", "artwrite", "artcncl", "sitesend", "overv",
43     "perl", "python", "nntpread", "artparse", "artlog", "datamove"
44 };
45 
46 /* Compaction threshold as a divisor of the buffer size.  If the amount free
47    at the beginning of the buffer is bigger than the quotient, it is compacted
48    in the read loop. */
49 #define COMP_THRESHOLD 10
50 
51 /* Global data about the channels. */
52 struct channels {
53     fd_set read_set;
54     fd_set sleep_set;
55     fd_set write_set;
56     int sleep_count;            /* Number of sleeping channels. */
57     int max_fd;                 /* Max fd in read_set or write_set. */
58     int max_sleep_fd;           /* Max fd in sleep_set. */
59     int table_size;             /* Total number of channels. */
60     CHANNEL *table;             /* Table of channel structs. */
61 
62     /* Special prioritized channels, for the control and remconn channels.  We
63        check these first each time. */
64     CHANNEL **prioritized;
65     int prioritized_size;
66 };
67 
68 /* Eventually this will move into some sort of global INN state structure. */
69 static struct channels channels;
70 
71 /* We want to initialize four elements of CHANnull but let all of the other
72    elements be initialized according to the standard rules for a static
73    initializer.  However, GCC warns about incomplete initializer lists (since
74    it may be a mistake), so we don't initialize anything at all here and
75    instead explicitly set the first three values in CHANsetup.  This is the
76    only reason why this isn't const. */
77 static CHANNEL CHANnull;
78 
79 
80 /*
81 **  Returns true if socket activation is used and the channel given as
82 **  as argument is used for socket activation.  Returns false otherwise.
83 */
84 bool
CHANsystemdsa(CHANNEL * cp)85 CHANsystemdsa(CHANNEL *cp)
86 {
87     const char *s;
88     int count;
89 
90     s = getenv("INN_SD_LISTEN_FDS_COUNT");
91     if (s == NULL)
92         return false;
93 
94     count = atoi(s);
95 
96     if (cp->fd >= SD_LISTEN_FDS_START && cp->fd < SD_LISTEN_FDS_START + count)
97         return true;
98 
99     return false;
100 }
101 
102 
103 /*
104 **  Tear down our world.  Free all of the allocated channels and clear all
105 **  global state data.  This function can also be used to initialize the
106 **  channels structure to a known empty state.
107 */
108 void
CHANshutdown(void)109 CHANshutdown(void)
110 {
111     CHANNEL *cp;
112     int i;
113 
114     FD_ZERO(&channels.read_set);
115     FD_ZERO(&channels.sleep_set);
116     FD_ZERO(&channels.write_set);
117     channels.sleep_count = 0;
118     channels.max_fd = -1;
119     channels.max_sleep_fd = -1;
120 
121     if (channels.table != NULL) {
122         cp = channels.table;
123         for (i = channels.table_size; --i >= 0; cp++) {
124             if (CHANsystemdsa(cp))
125                 continue;
126             if (cp->Type != CTfree)
127                 CHANclose(cp, CHANname(cp));
128             if (cp->In.data)
129                 free(cp->In.data);
130             if (cp->Out.data)
131                 free(cp->Out.data);
132         }
133     }
134     free(channels.table);
135     channels.table = NULL;
136     channels.table_size = 0;
137     if (channels.prioritized_size > 0) {
138         free(channels.prioritized);
139         channels.prioritized_size = 0;
140     }
141 }
142 
143 
144 /*
145 **  Initialize the global I/O channel state and prepare for creation of new
146 **  channels.  Takes the number of channels to pre-create, which currently
147 **  must be as large as the largest file descriptor that will be used for a
148 **  channel.
149 */
150 void
CHANsetup(int count)151 CHANsetup(int count)
152 {
153     CHANNEL *cp;
154     int i;
155 
156     CHANshutdown();
157     channels.table_size = count;
158     channels.table = xmalloc(count * sizeof(CHANNEL));
159 
160     /* Finish initializing CHANnull, since we can't do this entirely with a
161        static initializer without having to list every element in the
162        incredibly long channel struct and update it whenever the channel
163        struct changes. */
164     CHANnull.Type = CTfree;
165     CHANnull.State = CSerror;
166     CHANnull.fd = -1;
167     CHANnull.NextLog = innconf->chaninacttime;
168 
169     /* Now, we can use CHANnull to initialize all of the other channels. */
170     for (cp = channels.table; --count >= 0; cp++)
171         *cp = CHANnull;
172 
173     /* Reserve three slots for prioritized channels (control and AF_INET and
174        AF_INET6 remconn).  If we end up with more somehow, we'll resize in
175        CHANcreate. */
176     channels.prioritized_size = 3;
177     channels.prioritized = xmalloc(3 * sizeof(CHANNEL *));
178     for (i = 0; i < 3; i++)
179         channels.prioritized[i] = NULL;
180 }
181 
182 
183 /*
184 **  Create a channel from a file descriptor.  Takes the channel type, the
185 **  initial state, the reader callback, and the writer callback.
186 */
187 CHANNEL *
CHANcreate(int fd,enum channel_type type,enum channel_state state,innd_callback_func reader,innd_callback_func write_done)188 CHANcreate(int fd, enum channel_type type, enum channel_state state,
189            innd_callback_func reader, innd_callback_func write_done)
190 {
191     CHANNEL *cp;
192     int i, j, size;
193     struct buffer in  = { 0, 0, 0, NULL };
194     struct buffer out = { 0, 0, 0, NULL };
195 
196     /* Currently, we do no dynamic allocation, but instead assume that the
197        channel table is sized large enough to hold all possible file
198        descriptors.  The difficulty with dynamically resizing is that it may
199        invalidate any pointers into the channels, which hold all sorts of
200        random things, including article data.
201 
202        FIXME: Design a better data structure that can be resized easily. */
203     cp = &channels.table[fd];
204 
205     /* Don't lose the existing buffers when overwriting with CHANnull. */
206     in = cp->In;
207     buffer_resize(&in, START_BUFF_SIZE);
208     in.used = 0;
209     in.left = in.size;
210     out = cp->Out;
211     buffer_resize(&out, SMBUF);
212     buffer_set(&out, "", 0);
213 
214     /* Set up the channel's info.  Note that we don't have to initialize
215        anything that's already set properly to zero in CHANnull. */
216     *cp = CHANnull;
217     cp->av = NULL;
218     cp->fd = fd;
219     cp->Type = type;
220     cp->State = state;
221     cp->Reader = reader;
222     cp->WriteDone = write_done;
223     cp->Started = Now.tv_sec;
224     cp->Started_checkpoint = Now.tv_sec;
225     cp->LastActive = Now.tv_sec;
226     cp->In = in;
227     cp->Out = out;
228     cp->Tracing = Tracing;
229     HashClear(&cp->CurrentMessageIDHash);
230     ARTprepare(cp);
231 
232     fdflag_close_exec(fd, true);
233 
234 #ifndef _HPUX_SOURCE
235     /* Stupid HPUX 11.00 has a broken listen/accept where setting the listen
236        socket to nonblocking prevents you from successfully setting the
237        socket returned by accept(2) back to blocking mode, no matter what,
238        resulting in all kinds of funny behaviour, data loss, etc. etc.  */
239     if (!fdflag_nonblocking(fd, true) && errno != ENOTSOCK && errno != ENOTTY)
240         syswarn("%s cant nonblock %d", LogName, fd);
241 #endif
242 
243     /* Note the control and remconn channels, for efficiency. */
244     if (type == CTcontrol || type == CTremconn) {
245         for (i = 0; i < channels.prioritized_size; i++)
246             if (channels.prioritized[i] == NULL)
247                 break;
248         if (i >= channels.prioritized_size) {
249             size = channels.prioritized_size + 1;
250             channels.prioritized
251                 = xrealloc(channels.prioritized, size * sizeof(CHANNEL *));
252             for (j = channels.prioritized_size; j < size; j++)
253                 channels.prioritized[j] = NULL;
254             channels.prioritized_size = size;
255         }
256         channels.prioritized[i] = cp;
257     }
258 
259     /* Return a pointer to the new channel. */
260     return cp;
261 }
262 
263 
264 /*
265 **  Start or stop tracing a channel.  If we're turning on tracing, dump a
266 **  bunch of information about that particular channel.
267 */
268 void
CHANtracing(CHANNEL * cp,bool flag)269 CHANtracing(CHANNEL *cp, bool flag)
270 {
271     char *name;
272     char addr[INET6_ADDRSTRLEN] = "?";
273 
274     name = CHANname(cp);
275     notice("%s trace %s", name, flag ? "on" : "off");
276     cp->Tracing = flag;
277     if (flag) {
278         notice("%s trace badwrites %lu blockwrites %lu badreads %lu", name,
279                cp->BadWrites, cp->BlockedWrites, cp->BadReads);
280         network_sockaddr_sprint(addr, sizeof(addr),
281                                 (struct sockaddr *) &cp->Address);
282         notice("%s trace address %s lastactive %ld nextlog %ld", name,
283                addr, (long) cp->LastActive, (long) cp->NextLog);
284         if (FD_ISSET(cp->fd, &channels.sleep_set))
285             notice("%s trace sleeping %ld 0x%p", name, (long) cp->Waketime,
286                    (void *) cp->Waker);
287         if (FD_ISSET(cp->fd, &channels.read_set))
288             notice("%s trace reading %lu %s", name,
289                    (unsigned long) cp->In.used,
290                    MaxLength(cp->In.data, cp->In.data));
291         if (FD_ISSET(cp->fd, &channels.write_set))
292             notice("%s trace writing %lu %s", name,
293                    (unsigned long) cp->Out.left,
294                    MaxLength(cp->Out.data, cp->Out.data));
295     }
296 }
297 
298 
299 /*
300 **  Close an NNTP channel.  Called by CHANclose to do the special handling for
301 **  NNTP channels, which carry quite a bit more data than other channels.
302 **
303 **  FIXME: There really should be a type-specific channel closing callback
304 **  that can take care of this, and this will be mandatory when all the
305 **  NNTP-specific stuff is moved out of the core channel struct.
306 */
307 static void
CHANclose_nntp(CHANNEL * cp,const char * name)308 CHANclose_nntp(CHANNEL *cp, const char *name)
309 {
310     WIPprecomfree(cp);
311     NCclearwip(cp);
312     if (cp->State == CScancel)
313         notice("%s closed seconds %ld cancels %ld", name,
314                (long)(Now.tv_sec - cp->Started), cp->Received);
315     else {
316         notice("%s checkpoint seconds %ld accepted %ld refused %ld rejected %ld"
317                " duplicate %ld accepted size %.0f duplicate size %.0f"
318                " rejected size %.0f", name,
319                (long)(Now.tv_sec - cp->Started_checkpoint),
320                cp->Received - cp->Received_checkpoint,
321                cp->Refused - cp->Refused_checkpoint,
322                cp->Rejected - cp->Rejected_checkpoint,
323                cp->Duplicate - cp->Duplicate_checkpoint,
324                (double) (cp->Size - cp->Size_checkpoint),
325                (double) (cp->DuplicateSize - cp->DuplicateSize_checkpoint),
326                (double) (cp->RejectSize - cp->RejectSize_checkpoint));
327         notice("%s closed seconds %ld accepted %ld refused %ld rejected %ld"
328                " duplicate %ld accepted size %.0f duplicate size %.0f"
329                " rejected size %.0f", name, (long)(Now.tv_sec - cp->Started),
330                cp->Received, cp->Refused, cp->Rejected, cp->Duplicate,
331                (double) cp->Size, (double) cp->DuplicateSize,
332                (double) cp->RejectSize);
333     }
334     if (cp->Data.Newsgroups.Data != NULL) {
335         free(cp->Data.Newsgroups.Data);
336         cp->Data.Newsgroups.Data = NULL;
337     }
338     if (cp->Data.Newsgroups.List != NULL) {
339         free(cp->Data.Newsgroups.List);
340         cp->Data.Newsgroups.List = NULL;
341     }
342     if (cp->Data.Distribution.Data != NULL) {
343         free(cp->Data.Distribution.Data);
344         cp->Data.Distribution.Data = NULL;
345     }
346     if (cp->Data.Distribution.List != NULL) {
347         free(cp->Data.Distribution.List);
348         cp->Data.Distribution.List = NULL;
349     }
350     if (cp->Data.Path.Data != NULL) {
351         free(cp->Data.Path.Data);
352         cp->Data.Path.Data = NULL;
353     }
354     if (cp->Data.Path.List != NULL) {
355         free(cp->Data.Path.List);
356         cp->Data.Path.List = NULL;
357     }
358     if (cp->Data.Overview.size != 0) {
359         free(cp->Data.Overview.data);
360         cp->Data.Overview.data = NULL;
361         cp->Data.Overview.size = 0;
362         cp->Data.Overview.left = 0;
363         cp->Data.Overview.used = 0;
364     }
365     if (cp->Data.XrefBufLength != 0) {
366         free(cp->Data.Xref);
367         cp->Data.Xref = NULL;
368         cp->Data.XrefBufLength = 0;
369     }
370 
371     /* If this was a peer who was connection-limited, take note that one of
372        their connections was just closed and possibly wake up another one.
373 
374        FIXME: This is a butt-ugly way of handling this and a layering
375        violation to boot.  This needs to happen in the NC code, possibly via a
376        channel closing callback. */
377     if (cp->MaxCnx > 0) {
378         char *label, *tmplabel;
379         int tfd;
380         CHANNEL *tempchan;
381 
382         label = RClabelname(cp);
383         if (label != NULL) {
384             for (tfd = 0; tfd <= channels.max_fd; tfd++) {
385                 if (tfd == cp->fd)
386                     continue;
387                 tempchan = &channels.table[tfd];
388                 if (tempchan->fd < 0 || tempchan->Type != CTnntp)
389                     continue;
390                 tmplabel = RClabelname(tempchan);
391                 if (tmplabel == NULL)
392                     continue;
393                 if (strcmp(label, tmplabel) == 0 && tempchan->ActiveCnx != 0) {
394                     tempchan->ActiveCnx = cp->ActiveCnx;
395                     RCHANadd(tempchan);
396                     break;
397                 }
398             }
399         }
400     }
401 }
402 
403 
404 /*
405 **  Close a channel.
406 */
407 void
CHANclose(CHANNEL * cp,const char * name)408 CHANclose(CHANNEL *cp, const char *name)
409 {
410     int i;
411 
412     if (cp->Type == CTfree)
413         warn("%s internal closing free channel %d", name, cp->fd);
414     else {
415         if (cp->Type == CTnntp)
416             CHANclose_nntp(cp, name);
417         else if (cp->Type == CTreject)
418             notice("%s %ld", name, cp->Rejected); /* Use cp->Rejected for the response code. */
419         else if (cp->Out.left)
420             warn("%s closed lost %lu", name, (unsigned long) cp->Out.left);
421         else
422             notice("%s closed", name);
423         WCHANremove(cp);
424         RCHANremove(cp);
425         SCHANremove(cp);
426         if (cp->fd >= 0 && close(cp->fd) < 0)
427             syswarn("%s cant close %s", LogName, name);
428         for (i = 0; i < channels.prioritized_size; i++)
429             if (channels.prioritized[i] == cp)
430                 channels.prioritized[i] = NULL;
431     }
432 
433     /* Mark it unused. */
434     cp->Type = CTfree;
435     cp->State = CSerror;
436     cp->fd = -1;
437     if (cp->Argument != NULL)
438         free(cp->Argument);
439     cp->Argument = NULL;
440     cp->ActiveCnx = 0;
441 
442     /* Free the buffers if they got big. */
443     if (cp->In.size > BIG_BUFFER) {
444         cp->In.size = 0;
445         cp->In.used = 0;
446         cp->In.left = 0;
447         free(cp->In.data);
448         cp->In.data = NULL;
449     }
450     if (cp->Out.size > BIG_BUFFER) {
451         cp->Out.size = 0;
452         cp->Out.used = 0;
453         cp->Out.left = 0;
454         free(cp->Out.data);
455         cp->Out.data = NULL;
456     }
457 
458     /* Always free the Sendid buffer. */
459     if (cp->Sendid.size > 0) {
460         cp->Sendid.size = 0;
461         cp->Sendid.used = 0;
462         cp->Sendid.left = 0;
463         free(cp->Sendid.data);
464         cp->Sendid.data = NULL;
465     }
466 
467     /* Free the space allocated for NNTP commands. */
468     if (cp->av != NULL) {
469         free(cp->av[0]);
470         free(cp->av);
471         cp->av = NULL;
472     }
473 }
474 
475 
476 /*
477 **  Return a printable name for the channel.
478 **
479 **  FIXME: Another layering violation.  Each type of channel should register a
480 **  callback or a name rather than making the general code know how to name
481 **  channels.
482 */
483 char *
CHANname(CHANNEL * cp)484 CHANname(CHANNEL *cp)
485 {
486     int i;
487     SITE *sp;
488     const char *site;
489     pid_t pid;
490 
491     switch (cp->Type) {
492     default:
493         snprintf(cp->Name, sizeof(cp->Name), "?%d(#%d@%ld)?", cp->Type,
494                  cp->fd, (long) (cp - channels.table));
495         break;
496     case CTany:
497         snprintf(cp->Name, sizeof(cp->Name), "any:%d", cp->fd);
498         break;
499     case CTfree:
500         snprintf(cp->Name, sizeof(cp->Name), "free:%d", cp->fd);
501         break;
502     case CTremconn:
503         snprintf(cp->Name, sizeof(cp->Name), "remconn:%d", cp->fd);
504         break;
505     case CTreject:
506         snprintf(cp->Name, sizeof(cp->Name), "%s rejected",
507                  cp->Address.ss_family == 0 ? "localhost" : RChostname(cp));
508         break;
509     case CTnntp:
510         snprintf(cp->Name, sizeof(cp->Name), "%s:%d",
511                  cp->Address.ss_family == 0 ? "localhost" : RChostname(cp),
512                  cp->fd);
513         break;
514     case CTlocalconn:
515         snprintf(cp->Name, sizeof(cp->Name), "localconn:%d", cp->fd);
516         break;
517     case CTcontrol:
518         snprintf(cp->Name, sizeof(cp->Name), "control:%d", cp->fd);
519         break;
520     case CTexploder:
521     case CTfile:
522     case CTprocess:
523         /* Find the site that has this channel. */
524         site = "?";
525         pid = 0;
526         for (i = nSites, sp = Sites; --i >= 0; sp++)
527             if (sp->Channel == cp) {
528                 site = sp->Name;
529                 if (cp->Type != CTfile)
530                     pid = sp->pid;
531                 break;
532             }
533         if (pid == 0)
534             snprintf(cp->Name, sizeof(cp->Name), "%s:%d:%s",
535                      MaxLength(site, site), cp->fd,
536                      cp->Type == CTfile ? "file" : "proc");
537         else
538             snprintf(cp->Name, sizeof(cp->Name), "%s:%d:%s:%ld",
539                      MaxLength(site, site), cp->fd,
540                      cp->Type == CTfile ? "file" : "proc", (long) pid);
541         break;
542     }
543     return cp->Name;
544 }
545 
546 
547 /*
548 **  Return the channel for a specified descriptor.
549 */
550 CHANNEL *
CHANfromdescriptor(int fd)551 CHANfromdescriptor(int fd)
552 {
553     if (fd < 0 || fd > channels.table_size)
554         return NULL;
555     return &channels.table[fd];
556 }
557 
558 
559 /*
560 **  Iterate over all channels of a specified type.  The next channel is
561 **  returned and its index is put into ip, which serves as the cursor.
562 **
563 **  FIXME: Make ip an opaque cursor.
564 */
565 CHANNEL *
CHANiter(int * ip,enum channel_type type)566 CHANiter(int *ip, enum channel_type type)
567 {
568     CHANNEL *cp;
569     int i;
570 
571     for (i = *ip; i >= 0 && i < channels.table_size; i++) {
572         cp = &channels.table[i];
573         if (cp->Type == CTfree && cp->fd == -1)
574             continue;
575         if (type == CTany || cp->Type == type) {
576             *ip = ++i;
577             return cp;
578         }
579     }
580     return NULL;
581 }
582 
583 
584 /*
585 **  When removing a channel from the read and write masks, we want to lower
586 **  the last file descriptor if we removed the highest one.  Called from both
587 **  RCHANremove and WCHANremove.
588 */
589 static void
CHANresetlast(int fd)590 CHANresetlast(int fd)
591 {
592     if (fd == channels.max_fd)
593         while (   !FD_ISSET(channels.max_fd, &channels.read_set)
594                && !FD_ISSET(channels.max_fd, &channels.write_set)
595                && channels.max_fd > 1)
596             channels.max_fd--;
597 }
598 
599 
600 /*
601 **  When removing a channel from the sleep mask, we want to lower the last
602 **  file descriptor if we removed the highest one.  Called from SCHANremove.
603 */
604 static void
CHANresetlastsleeping(int fd)605 CHANresetlastsleeping(int fd)
606 {
607     if (fd == channels.max_sleep_fd) {
608         while (   !FD_ISSET(channels.max_sleep_fd, &channels.sleep_set)
609                && channels.max_sleep_fd > 1)
610             channels.max_sleep_fd--;
611     }
612 }
613 
614 
615 /*
616 **  Mark a channel as an active reader.
617 */
618 void
RCHANadd(CHANNEL * cp)619 RCHANadd(CHANNEL *cp)
620 {
621     FD_SET(cp->fd, &channels.read_set);
622     if (cp->fd > channels.max_fd)
623         channels.max_fd = cp->fd;
624 
625     /* For non-NNTP channels, start reading at the beginning of the buffer. */
626     if (cp->Type != CTnntp)
627         cp->In.used = 0;
628 }
629 
630 
631 /*
632 **  Remove a channel from the set of readers.
633 */
634 void
RCHANremove(CHANNEL * cp)635 RCHANremove(CHANNEL *cp)
636 {
637     if (FD_ISSET(cp->fd, &channels.read_set)) {
638         FD_CLR(cp->fd, &channels.read_set);
639         CHANresetlast(cp->fd);
640     }
641 }
642 
643 
644 /*
645 **  Put a channel to sleep and call a function when it wakes.  Note that arg
646 **  must be NULL or allocated memory, since it will be freed later.
647 */
648 void
SCHANadd(CHANNEL * cp,time_t wake,void * event,innd_callback_func waker,void * arg)649 SCHANadd(CHANNEL *cp, time_t wake, void *event, innd_callback_func waker,
650          void *arg)
651 {
652     if (!CHANsleeping(cp)) {
653         channels.sleep_count++;
654         FD_SET(cp->fd, &channels.sleep_set);
655     }
656     if (cp->fd > channels.max_sleep_fd)
657         channels.max_sleep_fd = cp->fd;
658     cp->Waketime = wake;
659     cp->Waker = waker;
660     if (cp->Argument != arg) {
661         free(cp->Argument);
662         cp->Argument = arg;
663     }
664     cp->Event = event;
665 }
666 
667 
668 /*
669 **  Take a channel off the sleep list.
670 */
671 void
SCHANremove(CHANNEL * cp)672 SCHANremove(CHANNEL *cp)
673 {
674     if (!CHANsleeping(cp))
675         return;
676     FD_CLR(cp->fd, &channels.sleep_set);
677     channels.sleep_count--;
678     cp->Waketime = 0;
679 
680     /* If this was the highest descriptor, get a new highest. */
681     CHANresetlastsleeping(cp->fd);
682 }
683 
684 
685 /*
686 **  Is a channel on the sleep list?
687 */
688 bool
CHANsleeping(CHANNEL * cp)689 CHANsleeping(CHANNEL *cp)
690 {
691     return FD_ISSET(cp->fd, &channels.sleep_set);
692 }
693 
694 
695 /*
696 **  Wake up channels waiting for a specific event.  (Or rather, mark them
697 **  ready to wake up; they'll actually be woken up the next time through the
698 **  main loop.)
699 */
700 void
SCHANwakeup(void * event)701 SCHANwakeup(void *event)
702 {
703     CHANNEL *cp;
704     int i;
705 
706     for (cp = channels.table, i = channels.table_size; --i >= 0; cp++)
707         if (cp->Type != CTfree && cp->Event == event && CHANsleeping(cp))
708             cp->Waketime = 0;
709 }
710 
711 
712 /*
713 **  Mark a channel as an active writer.  Don't reset the Out->left field
714 **  since we could have buffered I/O already in there.
715 */
716 void
WCHANadd(CHANNEL * cp)717 WCHANadd(CHANNEL *cp)
718 {
719     if (cp->Out.left > 0) {
720         FD_SET(cp->fd, &channels.write_set);
721         if (cp->fd > channels.max_fd)
722             channels.max_fd = cp->fd;
723     }
724 }
725 
726 
727 /*
728 **  Remove a channel from the set of writers.
729 */
730 void
WCHANremove(CHANNEL * cp)731 WCHANremove(CHANNEL *cp)
732 {
733     if (FD_ISSET(cp->fd, &channels.write_set)) {
734         FD_CLR(cp->fd, &channels.write_set);
735         CHANresetlast(cp->fd);
736 
737         /* No data left -- reset used so we don't grow the buffer. */
738         if (cp->Out.left <= 0) {
739             cp->Out.used = 0;
740             cp->Out.left = 0;
741         }
742     }
743 }
744 
745 
746 /*
747 **  Set a channel to start off with the contents of an existing channel.
748 */
749 void
WCHANsetfrombuffer(CHANNEL * cp,struct buffer * bp)750 WCHANsetfrombuffer(CHANNEL *cp, struct buffer *bp)
751 {
752     WCHANset(cp, &bp->data[bp->used], bp->left);
753 }
754 
755 
756 /*
757 **  Internal function to resize the in buffer for a given channel to the new
758 **  size given, adjusting pointers as required.
759 */
760 static void
CHANresize(CHANNEL * cp,size_t size)761 CHANresize(CHANNEL *cp, size_t size)
762 {
763     struct buffer *bp;
764     char *p;
765     size_t change;
766     ptrdiff_t offset;
767     int i;
768     HDRCONTENT *hc = cp->Data.HdrContent;
769 
770     bp = &cp->In;
771     change = size - bp->size;
772     bp->size = size;
773     bp->left += change;
774     p = bp->data;
775 
776     /* Reallocate the buffer and adjust offets if realloc moved the location
777        of the memory region.  Only adjust offets if we're in a state where we
778        care about the header contents.
779 
780        FIXME: This is invalid C, although it will work on most (all?)  common
781        systems.  The pointers need to be reduced to offets and then turned
782        back into relative pointers rather than adjusting the pointers
783        directly, since as soon as realloc is called, pointers into the old
784        space become invalid and may not be used further, even for arithmetic.
785        (Not to mention that two pointers to different objects may not be
786        compared and arithmetic may not be performed on them. */
787     TMRstart(TMR_DATAMOVE);
788     bp->data = xrealloc(bp->data, bp->size);
789     offset = p - bp->data;
790     if (offset != 0) {
791         if (cp->State == CSgetheader || cp->State == CSgetbody ||
792             cp->State == CSeatarticle) {
793             if (cp->Data.BytesHeader != NULL)
794                 cp->Data.BytesHeader -= offset;
795             for (i = 0; i < MAX_ARTHEADER; i++, hc++) {
796                 if (hc->Value != NULL)
797                     hc->Value -= offset;
798             }
799         }
800     }
801     TMRstop(TMR_DATAMOVE);
802 }
803 
804 
805 /*
806 **  Read in text data, return the amount we read.
807 */
808 int
CHANreadtext(CHANNEL * cp)809 CHANreadtext(CHANNEL *cp)
810 {
811     struct buffer *bp;
812     char *name;
813     int oerrno, maxbyte;
814     ssize_t count;
815 
816     /* Grow buffer if we're getting close to current limit.
817 
818        FIXME: The In buffer doesn't use the normal meanings of .used and
819        .left.  */
820     bp = &cp->In;
821     bp->left = bp->size - bp->used;
822     if (bp->left <= LOW_WATER)
823         CHANresize(cp, bp->size + GROW_AMOUNT(bp->size));
824 
825     /* Read in whatever is there, up to some reasonable limit.
826 
827        We want to limit the amount of time devoted to processing the incoming
828        data for any given channel.  There's no easy way of doing that, though,
829        so we restrict the data size instead.
830 
831        If the data is part of a single large article, then reading and
832        processing many kilobytes at a time costs very little.  If the data is
833        a long list of CHECK commands from a streaming feed, then every line of
834        data will require a history lookup, and we probably don't want to do
835        more than about 10 of those per channel on each cycle of the main
836        select() loop (otherwise we might take too long before giving other
837        channels a turn).  10 lines of CHECK commands suggests a limit of about
838        1KB of data, or less, which is the innconf->maxcmdreadsize default.  If
839        innconf->maxcmdreadsize is 0, there is no limit.
840 
841        Reduce the read size only if we're reading commands.
842 
843        FIXME: A better approach would be to limit the number of commands we
844        process for each channel. */
845     if (innconf->maxcmdreadsize == 0 || cp->State != CSgetcmd
846         || bp->left < innconf->maxcmdreadsize)
847         maxbyte = bp->left;
848     else
849         maxbyte = innconf->maxcmdreadsize;
850     TMRstart(TMR_NNTPREAD);
851     count = read(cp->fd, &bp->data[bp->used], maxbyte);
852     TMRstop(TMR_NNTPREAD);
853 
854     /* Solaris (at least 2.4 through 2.6) will occasionally return EAGAIN in
855        response to a read even if the file descriptor already selected true
856        for reading, apparently due to some internal resource exhaustion.  In
857        that case, return -2, which will drop back out to the main loop and go
858        on to the next file descriptor, as if the descriptor never selected
859        true.  This check will probably never trigger on platforms other than
860        Solaris. */
861     if (count < 0) {
862         if (errno == EAGAIN)
863             return -2;
864         oerrno = errno;
865         name = CHANname(cp);
866         errno = oerrno;
867         sysnotice("%s cant read", name);
868         return -1;
869     }
870     if (count == 0) {
871         name = CHANname(cp);
872         notice("%s readclose", name);
873         return 0;
874     }
875     bp->used += count;
876     bp->left -= count;
877     return count;
878 }
879 
880 
881 /*
882 **  Internal function to write out channel data.
883 **
884 **  If I/O backs up a lot, we can get EMSGSIZE on some systems.  If that
885 **  happens we want to do the I/O in chunks.  We assume stdio's BUFSIZ is a
886 **  good chunk value.
887 **
888 **  FIXME: Roll this code into xwrite.
889 */
890 static int
CHANwrite(int fd,const char * data,long length)891 CHANwrite(int fd, const char *data, long length)
892 {
893     ssize_t count;
894     const char *p;
895 
896     /* Try the standard case -- write it all. */
897     do {
898         count = write(fd, data, length);
899     } while (count < 0 && errno == EINTR);
900     if (count > 0 || (count < 0 && errno != EMSGSIZE))
901         return count;
902 
903     /* We got EMSGSIZE, so write it in pieces. */
904     for (p = data, count = 0; length > 0; p += count, length -= count) {
905         count = write(fd, p, (length > BUFSIZ ? BUFSIZ : length));
906         if (count < 0 && errno == EINTR) {
907             count = 0;
908             continue;
909         }
910         if (count <= 0)
911             break;
912     }
913 
914     /* Return error, or partial results if we got something. */
915     return (p == data) ? count : (p - data);
916 }
917 
918 
919 /*
920 **  Try to flush out the buffer.  Use this only on file channels!
921 */
922 bool
WCHANflush(CHANNEL * cp)923 WCHANflush(CHANNEL *cp)
924 {
925     struct buffer *bp;
926     ssize_t count;
927 
928     /* Write it. */
929     for (bp = &cp->Out; bp->left > 0; bp->left -= count, bp->used += count) {
930         count = CHANwrite(cp->fd, &bp->data[bp->used], bp->left);
931         if (count <= 0) {
932             syswarn("%s cant flush count %lu", CHANname(cp),
933                     (unsigned long) bp->left);
934             return false;
935         }
936         if (count == 0) {
937             warn("%s cant flush count %lu", CHANname(cp),
938                  (unsigned long) bp->left);
939             return false;
940         }
941     }
942     WCHANremove(cp);
943     return true;
944 }
945 
946 
947 /*
948 **  Standard wakeup routine called after a write channel was put to sleep.
949 */
950 static void
CHANwakeup(CHANNEL * cp)951 CHANwakeup(CHANNEL *cp)
952 {
953     notice("%s wakeup", CHANname(cp));
954     WCHANadd(cp);
955 }
956 
957 
958 /*
959 **  Attempting to write would block; stop output or give up.
960 */
961 static void
CHANwritesleep(CHANNEL * cp,const char * name)962 CHANwritesleep(CHANNEL *cp, const char *name)
963 {
964     unsigned long bad, delay;
965 
966     bad = ++(cp->BlockedWrites);
967     if (bad > innconf->badiocount)
968         switch (cp->Type) {
969         default:
970             break;
971         case CTreject:
972         case CTnntp:
973         case CTfile:
974         case CTexploder:
975         case CTprocess:
976             warn("%s blocked closing", name);
977             SITEchanclose(cp);
978             CHANclose(cp, name);
979             return;
980         }
981     delay = bad * innconf->blockbackoff;
982     warn("%s blocked sleeping %lu", name, delay);
983     SCHANadd(cp, Now.tv_sec + delay, NULL, CHANwakeup, NULL);
984 }
985 
986 
987 /*
988 **  We got an unknown error in select.  Find out the culprit.  This is not
989 **  enabled yet; it's been disabled for years, and is expensive.
990 */
991 static void UNUSED
CHANdiagnose(void)992 CHANdiagnose(void)
993 {
994     fd_set test;
995     int fd;
996     struct timeval tv;
997 
998     FD_ZERO(&test);
999     for (fd = channels.max_fd; fd >= 0; fd--) {
1000         if (FD_ISSET(fd, &channels.read_set)) {
1001             FD_SET(fd, &test);
1002             tv.tv_sec = 0;
1003             tv.tv_usec = 0;
1004             if (select(fd + 1, &test, NULL, NULL, &tv) < 0 && errno != EINTR) {
1005                 warn("%s bad read file %d", LogName, fd);
1006                 FD_CLR(fd, &channels.read_set);
1007                 /* Probably do something about the file descriptor here; call
1008                    CHANclose on it? */
1009             }
1010             FD_CLR(fd, &test);
1011         }
1012         if (FD_ISSET(fd, &channels.write_set)) {
1013             FD_SET(fd, &test);
1014             tv.tv_sec = 0;
1015             tv.tv_usec = 0;
1016             if (select(fd + 1, NULL, &test, NULL, &tv) < 0 && errno != EINTR) {
1017                 warn("%s bad write file %d", LogName, fd);
1018                 FD_CLR(fd, &channels.write_set);
1019                 /* Probably do something about the file descriptor here; call
1020                    CHANclose on it? */
1021             }
1022             FD_CLR(fd, &test);
1023         }
1024     }
1025 }
1026 
1027 
1028 /*
1029 **  Count the number of active connections for a given peer.
1030 **
1031 **  FIXME: This seems like an overly cumbersome way to do this, and also seems
1032 **  like a layering violation.  Do we really have to do this here?
1033 */
1034 void
CHANcount_active(CHANNEL * cp)1035 CHANcount_active(CHANNEL *cp)
1036 {
1037     int found;
1038     CHANNEL *tempchan;
1039     char *label, *tmplabel;
1040     int fd;
1041 
1042     if (cp->fd < 0 || cp->Type != CTnntp || cp->ActiveCnx == 0)
1043         return;
1044     found = 1;
1045     label = RClabelname(cp);
1046     if (label == NULL)
1047         return;
1048     for (fd = 0; fd <= channels.max_fd; fd++) {
1049         tempchan = &channels.table[fd];
1050         tmplabel = RClabelname(tempchan);
1051         if (tmplabel == NULL)
1052             continue;
1053         if (strcmp(label, tmplabel) == 0 && tempchan->ActiveCnx != 0)
1054             found++;
1055     }
1056     cp->ActiveCnx = found;
1057 }
1058 
1059 
1060 /*
1061 **  Handle a file descriptor that selects ready to read.  Dispatch to the
1062 **  reader function, and then resize its input buffer if needed.
1063 */
1064 static void
CHANhandle_read(CHANNEL * cp)1065 CHANhandle_read(CHANNEL *cp)
1066 {
1067     size_t size;
1068 
1069     if (cp->Type == CTfree) {
1070         warn("%s %d free but was in RMASK", CHANname(cp), cp->fd);
1071         RCHANremove(cp);
1072         close(cp->fd);
1073         cp->fd = -1;
1074         return;
1075     }
1076     cp->LastActive = Now.tv_sec;
1077     (*cp->Reader)(cp);
1078 
1079     /* Check and see if the buffer is grossly overallocated and shrink if
1080        needed. */
1081     if (cp->In.size <= BIG_BUFFER)
1082         return;
1083     if (cp->In.used == 0)
1084         CHANresize(cp, START_BUFF_SIZE);
1085     else if ((cp->In.size / cp->In.used) > 10) {
1086         size = cp->In.used * 2;
1087         if (size < START_BUFF_SIZE)
1088             size = START_BUFF_SIZE;
1089         CHANresize(cp, size);
1090     }
1091 }
1092 
1093 
1094 /*
1095 **  Handle a file descriptor that selects ready to write.  Write out the
1096 **  pending data.
1097 */
1098 static void
CHANhandle_write(CHANNEL * cp)1099 CHANhandle_write(CHANNEL *cp)
1100 {
1101     struct buffer *bp;
1102     ssize_t count;
1103     int oerrno;
1104     const char *name;
1105 
1106     if (cp->Type == CTfree) {
1107         warn("%s %d free but was in WMASK", CHANname(cp), cp->fd);
1108         WCHANremove(cp);
1109         close(cp->fd);
1110         cp->fd = -1;
1111         return;
1112     }
1113     bp = &cp->Out;
1114     if (bp->left == 0) {
1115         /* Should not be possible. */
1116         WCHANremove(cp);
1117         return;
1118     }
1119     cp->LastActive = Now.tv_sec;
1120     count = CHANwrite(cp->fd, &bp->data[bp->used], bp->left);
1121     if (count <= 0) {
1122         oerrno = errno;
1123         name = CHANname(cp);
1124         errno = oerrno;
1125         if (oerrno == EWOULDBLOCK)
1126             oerrno = EAGAIN;
1127         if (count < 0)
1128             sysnotice("%s cant write", name);
1129         else
1130             notice("%s cant write", name);
1131         cp->BadWrites++;
1132         if (count < 0 && oerrno == EPIPE) {
1133             SITEchanclose(cp);
1134             CHANclose(cp, name);
1135         } else if (count < 0 && oerrno == EAGAIN) {
1136             WCHANremove(cp);
1137             CHANwritesleep(cp, name);
1138         } else if (cp->BadWrites >= innconf->badiocount) {
1139             warn("%s sleeping", name);
1140             WCHANremove(cp);
1141             SCHANadd(cp, Now.tv_sec + innconf->pauseretrytime, NULL,
1142                      CHANwakeup, NULL);
1143         }
1144     } else {
1145         cp->BadWrites = 0;
1146         cp->BlockedWrites = 0;
1147         bp->left -= count;
1148         bp->used += count;
1149         if (bp->left > 0)
1150             buffer_compact(bp);
1151         else {
1152             WCHANremove(cp);
1153             (*cp->WriteDone)(cp);
1154         }
1155     }
1156 }
1157 
1158 
1159 /*
1160 **  Main I/O loop.  Wait for data, call the channel's handler when there is
1161 **  something to read or when the queued write is finished.  In order to be
1162 **  fair (i.e., don't always give descriptor n priority over n+1), we remember
1163 **  where we last had something and pick up from there.
1164 **
1165 **  Yes, the main code has really wandered over to the side a lot.
1166 */
1167 void
CHANreadloop(void)1168 CHANreadloop(void)
1169 {
1170     int i, startpoint, count, lastfd;
1171     CHANNEL *cp;
1172     fd_set rdfds, wrfds;
1173     struct timeval tv;
1174     unsigned long silence;
1175     const char *name;
1176     time_t last_sync;
1177     int fd = 0;
1178 
1179     STATUSinit();
1180     gettimeofday(&Now, NULL);
1181     last_sync = Now.tv_sec;
1182 
1183     while (1) {
1184         /* See if any processes died. */
1185         PROCscan();
1186 
1187         /* Wait for data, note the time. */
1188         rdfds = channels.read_set;
1189         wrfds = channels.write_set;
1190         tv = TimeOut;
1191         if (innconf->timer != 0) {
1192             unsigned long now = TMRnow();
1193 
1194             if (now < 1000 * innconf->timer)
1195                 tv.tv_sec = innconf->timer - now / 1000;
1196             else {
1197                 TMRsummary("ME", timer_name);
1198                 InndHisLogStats();
1199                 tv.tv_sec = innconf->timer;
1200             }
1201         }
1202 
1203         /* Mask signals when not in select to prevent a signal handler
1204            from accessing data that the main code is mutating. */
1205         TMRstart(TMR_IDLE);
1206         xsignal_unmask();
1207         count = select(channels.max_fd + 1, &rdfds, &wrfds, NULL, &tv);
1208         xsignal_mask();
1209         TMRstop(TMR_IDLE);
1210 
1211         if (count < 0) {
1212             if (errno != EINTR) {
1213                 syswarn("%s cant select", LogName);
1214 #ifdef INND_FIND_BAD_FDS
1215                 CHANdiagnose();
1216 #endif
1217             }
1218             continue;
1219         }
1220 
1221         STATUSmainloophook();
1222         if (GotTerminate) {
1223 #ifdef DO_PERL
1224             PLmode(Mode, OMshutdown, (char *) "exiting due to signal");
1225 #endif
1226 #ifdef DO_PYTHON
1227             PYmode(Mode, OMshutdown, (char *) "exiting due to signal");
1228 #endif
1229             notice("%s exiting due to signal", LogName);
1230             CleanupAndExit(0, NULL);
1231         }
1232 
1233         /* Update the "reasonably accurate" time. */
1234         gettimeofday(&Now, NULL);
1235         if (Now.tv_sec > last_sync + TimeOut.tv_sec) {
1236             HISsync(History);
1237             if (ICDactivedirty != 0) {
1238                 ICDwriteactive();
1239                 ICDactivedirty = 0;
1240             }
1241             last_sync = Now.tv_sec;
1242         }
1243 
1244         /* If no channels are active, flush and skip if nobody's sleeping. */
1245         if (count == 0) {
1246             if (Mode == OMrunning)
1247                 ICDwrite();
1248             if (channels.sleep_count == 0)
1249                 continue;
1250         }
1251 
1252         /* Try the prioritized channels first. */
1253         for (i = 0; i < channels.prioritized_size; i++) {
1254             int pfd;
1255 
1256             if (channels.prioritized[i] == NULL)
1257                 continue;
1258             pfd = channels.prioritized[i]->fd;
1259             if (FD_ISSET(pfd, &channels.read_set) && FD_ISSET(pfd, &rdfds)) {
1260                 count--;
1261                 if (count > 4)
1262                     count = 4; /* might be more requests */
1263                 (*channels.prioritized[i]->Reader)(channels.prioritized[i]);
1264                 FD_CLR(pfd, &rdfds);
1265             }
1266         }
1267 
1268         /* Loop through all active channels.  Somebody could have closed a
1269            channel so we double-check the global mask before looking at what
1270            select returned.  The code here is written so that a channel could
1271            be reading and writing and sleeping at the same time, even though
1272            that's not possible.  (Just as well, since in SysVr4 the count
1273            would be wrong.) */
1274         lastfd = channels.max_fd;
1275         if (lastfd < channels.max_sleep_fd)
1276             lastfd = channels.max_sleep_fd;
1277         if (fd > lastfd)
1278             fd = 0;
1279         startpoint = fd;
1280         do {
1281             cp = &channels.table[fd];
1282 
1283             /* Check to see if this peer has too many open connections, and if
1284                so, either close or make inactive this connection. */
1285             if (cp->Type == CTnntp && cp->MaxCnx > 0 && cp->HoldTime > 0) {
1286                 CHANcount_active(cp);
1287                 if (cp->ActiveCnx > cp->MaxCnx && cp->fd > 0) {
1288                     if (cp->Started + cp->HoldTime < Now.tv_sec)
1289                         CHANclose(cp, CHANname(cp));
1290                     else {
1291                         if (fd >= lastfd)
1292                             fd = 0;
1293                         else
1294                             fd++;
1295                         cp->ActiveCnx = 0;
1296                         RCHANremove(cp);
1297                     }
1298                     continue;
1299                 }
1300             }
1301 
1302             /* Anything to read? */
1303             if (FD_ISSET(fd, &channels.read_set) && FD_ISSET(fd, &rdfds)) {
1304                 count--;
1305                 CHANhandle_read(cp);
1306             }
1307 
1308             /* Possibly recheck for dead children so we don't get SIGPIPE on
1309                readerless channels. */
1310             if (PROCneedscan)
1311                 PROCscan();
1312 
1313             /* Ready to write? */
1314             if (FD_ISSET(fd, &channels.write_set) && FD_ISSET(fd, &wrfds)) {
1315                 count--;
1316                 CHANhandle_write(cp);
1317             }
1318 
1319             /* Coming off a sleep? */
1320             if (FD_ISSET(fd, &channels.sleep_set)
1321                 && cp->Waketime <= Now.tv_sec) {
1322                 if (cp->Type == CTfree) {
1323                     warn("%s %d free but was in SMASK", CHANname(cp), fd);
1324                     FD_CLR(fd, &channels.sleep_set);
1325                     channels.sleep_count--;
1326                     CHANresetlastsleeping(fd);
1327                     close(fd);
1328                     cp->fd = -1;
1329                 } else {
1330                     cp->LastActive = Now.tv_sec;
1331                     SCHANremove(cp);
1332                     if (cp->Waker != NULL) {
1333                         (*cp->Waker)(cp);
1334                     } else {
1335                         name = CHANname(cp);
1336                         warn("%s %d sleeping without Waker", name, fd);
1337                         SITEchanclose(cp);
1338                         CHANclose(cp, name);
1339                     }
1340                 }
1341             }
1342 
1343             /* Toss CTreject channel early if it's inactive. */
1344             if (cp->Type == CTreject
1345                 && cp->LastActive + REJECT_TIMEOUT < Now.tv_sec) {
1346                 name = CHANname(cp);
1347                 notice("%s timeout reject", name);
1348                 CHANclose(cp, name);
1349             }
1350 
1351             /* Has this channel been inactive very long? */
1352             if (cp->Type == CTnntp
1353                 && cp->LastActive + cp->NextLog < Now.tv_sec) {
1354                 name = CHANname(cp);
1355                 silence = Now.tv_sec - cp->LastActive;
1356                 cp->NextLog += innconf->chaninacttime;
1357                 notice("%s inactive %ld", name, silence / 60L);
1358                 if (silence > innconf->peertimeout) {
1359                     notice("%s timeout", name);
1360                     CHANclose(cp, name);
1361                 }
1362             }
1363 
1364             /* Bump pointer, modulo the table size. */
1365             if (fd >= lastfd)
1366                 fd = 0;
1367             else
1368                 fd++;
1369 
1370             /* If there is nothing to do, break out.
1371 
1372                FIXME: Do we really want to keep going until sleep_count is
1373                zero?  That means that when there are sleeping channels, we do
1374                a full traversal every time through the select loop. */
1375             if (count == 0 && channels.sleep_count == 0)
1376                 break;
1377         } while (fd != startpoint);
1378     }
1379 }
1380