1 /* $Id: chan.c 10523 2021-01-17 21:52:00Z iulius $
2 **
3 ** I/O channel (and buffer) processing.
4 **
5 ** This file is the heart of innd. Everything innd does is represented by
6 ** channels; waiting for connections, reading from network connections,
7 ** writing to network connections, writing to processes, and writing to files
8 ** are all channel operations.
9 **
10 ** Channels can be in one of three states: reading, writing, or sleeping.
11 ** The first two are represented in the select file descriptor sets. The
12 ** last sits there until something else wakes the channel up. CHANreadloop
13 ** is the main I/O loop for innd, calling select and then dispatching control
14 ** to whatever channels have work to do.
15 */
16
17 #include "config.h"
18 #include "clibrary.h"
19
20 /* Needed on AIX 4.1 to get fd_set and friends. */
21 #ifdef HAVE_SYS_SELECT_H
22 # include <sys/select.h>
23 #endif
24
25 #include "inn/fdflag.h"
26 #include "inn/innconf.h"
27 #include "inn/network.h"
28 #include "innd.h"
29
30 /* These errno values don't exist on all systems, but may be returned as an
31 (ignorable) error to setting the accept socket nonblocking. Define them
32 to 0 if they don't exist so that we can unconditionally compare errno to
33 them in the code. */
34 #ifndef ENOTSOCK
35 # define ENOTSOCK 0
36 #endif
37 #ifndef ENOTTY
38 # define ENOTTY 0
39 #endif
40
41 static const char * const timer_name[] = {
42 "idle", "artclean", "artwrite", "artcncl", "sitesend", "overv",
43 "perl", "python", "nntpread", "artparse", "artlog", "datamove"
44 };
45
46 /* Compaction threshold as a divisor of the buffer size. If the amount free
47 at the beginning of the buffer is bigger than the quotient, it is compacted
48 in the read loop. */
49 #define COMP_THRESHOLD 10
50
51 /* Global data about the channels. */
52 struct channels {
53 fd_set read_set;
54 fd_set sleep_set;
55 fd_set write_set;
56 int sleep_count; /* Number of sleeping channels. */
57 int max_fd; /* Max fd in read_set or write_set. */
58 int max_sleep_fd; /* Max fd in sleep_set. */
59 int table_size; /* Total number of channels. */
60 CHANNEL *table; /* Table of channel structs. */
61
62 /* Special prioritized channels, for the control and remconn channels. We
63 check these first each time. */
64 CHANNEL **prioritized;
65 int prioritized_size;
66 };
67
68 /* Eventually this will move into some sort of global INN state structure. */
69 static struct channels channels;
70
71 /* We want to initialize four elements of CHANnull but let all of the other
72 elements be initialized according to the standard rules for a static
73 initializer. However, GCC warns about incomplete initializer lists (since
74 it may be a mistake), so we don't initialize anything at all here and
75 instead explicitly set the first three values in CHANsetup. This is the
76 only reason why this isn't const. */
77 static CHANNEL CHANnull;
78
79
80 /*
81 ** Returns true if socket activation is used and the channel given as
82 ** as argument is used for socket activation. Returns false otherwise.
83 */
84 bool
CHANsystemdsa(CHANNEL * cp)85 CHANsystemdsa(CHANNEL *cp)
86 {
87 const char *s;
88 int count;
89
90 s = getenv("INN_SD_LISTEN_FDS_COUNT");
91 if (s == NULL)
92 return false;
93
94 count = atoi(s);
95
96 if (cp->fd >= SD_LISTEN_FDS_START && cp->fd < SD_LISTEN_FDS_START + count)
97 return true;
98
99 return false;
100 }
101
102
103 /*
104 ** Tear down our world. Free all of the allocated channels and clear all
105 ** global state data. This function can also be used to initialize the
106 ** channels structure to a known empty state.
107 */
108 void
CHANshutdown(void)109 CHANshutdown(void)
110 {
111 CHANNEL *cp;
112 int i;
113
114 FD_ZERO(&channels.read_set);
115 FD_ZERO(&channels.sleep_set);
116 FD_ZERO(&channels.write_set);
117 channels.sleep_count = 0;
118 channels.max_fd = -1;
119 channels.max_sleep_fd = -1;
120
121 if (channels.table != NULL) {
122 cp = channels.table;
123 for (i = channels.table_size; --i >= 0; cp++) {
124 if (CHANsystemdsa(cp))
125 continue;
126 if (cp->Type != CTfree)
127 CHANclose(cp, CHANname(cp));
128 if (cp->In.data)
129 free(cp->In.data);
130 if (cp->Out.data)
131 free(cp->Out.data);
132 }
133 }
134 free(channels.table);
135 channels.table = NULL;
136 channels.table_size = 0;
137 if (channels.prioritized_size > 0) {
138 free(channels.prioritized);
139 channels.prioritized_size = 0;
140 }
141 }
142
143
144 /*
145 ** Initialize the global I/O channel state and prepare for creation of new
146 ** channels. Takes the number of channels to pre-create, which currently
147 ** must be as large as the largest file descriptor that will be used for a
148 ** channel.
149 */
150 void
CHANsetup(int count)151 CHANsetup(int count)
152 {
153 CHANNEL *cp;
154 int i;
155
156 CHANshutdown();
157 channels.table_size = count;
158 channels.table = xmalloc(count * sizeof(CHANNEL));
159
160 /* Finish initializing CHANnull, since we can't do this entirely with a
161 static initializer without having to list every element in the
162 incredibly long channel struct and update it whenever the channel
163 struct changes. */
164 CHANnull.Type = CTfree;
165 CHANnull.State = CSerror;
166 CHANnull.fd = -1;
167 CHANnull.NextLog = innconf->chaninacttime;
168
169 /* Now, we can use CHANnull to initialize all of the other channels. */
170 for (cp = channels.table; --count >= 0; cp++)
171 *cp = CHANnull;
172
173 /* Reserve three slots for prioritized channels (control and AF_INET and
174 AF_INET6 remconn). If we end up with more somehow, we'll resize in
175 CHANcreate. */
176 channels.prioritized_size = 3;
177 channels.prioritized = xmalloc(3 * sizeof(CHANNEL *));
178 for (i = 0; i < 3; i++)
179 channels.prioritized[i] = NULL;
180 }
181
182
183 /*
184 ** Create a channel from a file descriptor. Takes the channel type, the
185 ** initial state, the reader callback, and the writer callback.
186 */
187 CHANNEL *
CHANcreate(int fd,enum channel_type type,enum channel_state state,innd_callback_func reader,innd_callback_func write_done)188 CHANcreate(int fd, enum channel_type type, enum channel_state state,
189 innd_callback_func reader, innd_callback_func write_done)
190 {
191 CHANNEL *cp;
192 int i, j, size;
193 struct buffer in = { 0, 0, 0, NULL };
194 struct buffer out = { 0, 0, 0, NULL };
195
196 /* Currently, we do no dynamic allocation, but instead assume that the
197 channel table is sized large enough to hold all possible file
198 descriptors. The difficulty with dynamically resizing is that it may
199 invalidate any pointers into the channels, which hold all sorts of
200 random things, including article data.
201
202 FIXME: Design a better data structure that can be resized easily. */
203 cp = &channels.table[fd];
204
205 /* Don't lose the existing buffers when overwriting with CHANnull. */
206 in = cp->In;
207 buffer_resize(&in, START_BUFF_SIZE);
208 in.used = 0;
209 in.left = in.size;
210 out = cp->Out;
211 buffer_resize(&out, SMBUF);
212 buffer_set(&out, "", 0);
213
214 /* Set up the channel's info. Note that we don't have to initialize
215 anything that's already set properly to zero in CHANnull. */
216 *cp = CHANnull;
217 cp->av = NULL;
218 cp->fd = fd;
219 cp->Type = type;
220 cp->State = state;
221 cp->Reader = reader;
222 cp->WriteDone = write_done;
223 cp->Started = Now.tv_sec;
224 cp->Started_checkpoint = Now.tv_sec;
225 cp->LastActive = Now.tv_sec;
226 cp->In = in;
227 cp->Out = out;
228 cp->Tracing = Tracing;
229 HashClear(&cp->CurrentMessageIDHash);
230 ARTprepare(cp);
231
232 fdflag_close_exec(fd, true);
233
234 #ifndef _HPUX_SOURCE
235 /* Stupid HPUX 11.00 has a broken listen/accept where setting the listen
236 socket to nonblocking prevents you from successfully setting the
237 socket returned by accept(2) back to blocking mode, no matter what,
238 resulting in all kinds of funny behaviour, data loss, etc. etc. */
239 if (!fdflag_nonblocking(fd, true) && errno != ENOTSOCK && errno != ENOTTY)
240 syswarn("%s cant nonblock %d", LogName, fd);
241 #endif
242
243 /* Note the control and remconn channels, for efficiency. */
244 if (type == CTcontrol || type == CTremconn) {
245 for (i = 0; i < channels.prioritized_size; i++)
246 if (channels.prioritized[i] == NULL)
247 break;
248 if (i >= channels.prioritized_size) {
249 size = channels.prioritized_size + 1;
250 channels.prioritized
251 = xrealloc(channels.prioritized, size * sizeof(CHANNEL *));
252 for (j = channels.prioritized_size; j < size; j++)
253 channels.prioritized[j] = NULL;
254 channels.prioritized_size = size;
255 }
256 channels.prioritized[i] = cp;
257 }
258
259 /* Return a pointer to the new channel. */
260 return cp;
261 }
262
263
264 /*
265 ** Start or stop tracing a channel. If we're turning on tracing, dump a
266 ** bunch of information about that particular channel.
267 */
268 void
CHANtracing(CHANNEL * cp,bool flag)269 CHANtracing(CHANNEL *cp, bool flag)
270 {
271 char *name;
272 char addr[INET6_ADDRSTRLEN] = "?";
273
274 name = CHANname(cp);
275 notice("%s trace %s", name, flag ? "on" : "off");
276 cp->Tracing = flag;
277 if (flag) {
278 notice("%s trace badwrites %lu blockwrites %lu badreads %lu", name,
279 cp->BadWrites, cp->BlockedWrites, cp->BadReads);
280 network_sockaddr_sprint(addr, sizeof(addr),
281 (struct sockaddr *) &cp->Address);
282 notice("%s trace address %s lastactive %ld nextlog %ld", name,
283 addr, (long) cp->LastActive, (long) cp->NextLog);
284 if (FD_ISSET(cp->fd, &channels.sleep_set))
285 notice("%s trace sleeping %ld 0x%p", name, (long) cp->Waketime,
286 (void *) cp->Waker);
287 if (FD_ISSET(cp->fd, &channels.read_set))
288 notice("%s trace reading %lu %s", name,
289 (unsigned long) cp->In.used,
290 MaxLength(cp->In.data, cp->In.data));
291 if (FD_ISSET(cp->fd, &channels.write_set))
292 notice("%s trace writing %lu %s", name,
293 (unsigned long) cp->Out.left,
294 MaxLength(cp->Out.data, cp->Out.data));
295 }
296 }
297
298
299 /*
300 ** Close an NNTP channel. Called by CHANclose to do the special handling for
301 ** NNTP channels, which carry quite a bit more data than other channels.
302 **
303 ** FIXME: There really should be a type-specific channel closing callback
304 ** that can take care of this, and this will be mandatory when all the
305 ** NNTP-specific stuff is moved out of the core channel struct.
306 */
307 static void
CHANclose_nntp(CHANNEL * cp,const char * name)308 CHANclose_nntp(CHANNEL *cp, const char *name)
309 {
310 WIPprecomfree(cp);
311 NCclearwip(cp);
312 if (cp->State == CScancel)
313 notice("%s closed seconds %ld cancels %ld", name,
314 (long)(Now.tv_sec - cp->Started), cp->Received);
315 else {
316 notice("%s checkpoint seconds %ld accepted %ld refused %ld rejected %ld"
317 " duplicate %ld accepted size %.0f duplicate size %.0f"
318 " rejected size %.0f", name,
319 (long)(Now.tv_sec - cp->Started_checkpoint),
320 cp->Received - cp->Received_checkpoint,
321 cp->Refused - cp->Refused_checkpoint,
322 cp->Rejected - cp->Rejected_checkpoint,
323 cp->Duplicate - cp->Duplicate_checkpoint,
324 (double) (cp->Size - cp->Size_checkpoint),
325 (double) (cp->DuplicateSize - cp->DuplicateSize_checkpoint),
326 (double) (cp->RejectSize - cp->RejectSize_checkpoint));
327 notice("%s closed seconds %ld accepted %ld refused %ld rejected %ld"
328 " duplicate %ld accepted size %.0f duplicate size %.0f"
329 " rejected size %.0f", name, (long)(Now.tv_sec - cp->Started),
330 cp->Received, cp->Refused, cp->Rejected, cp->Duplicate,
331 (double) cp->Size, (double) cp->DuplicateSize,
332 (double) cp->RejectSize);
333 }
334 if (cp->Data.Newsgroups.Data != NULL) {
335 free(cp->Data.Newsgroups.Data);
336 cp->Data.Newsgroups.Data = NULL;
337 }
338 if (cp->Data.Newsgroups.List != NULL) {
339 free(cp->Data.Newsgroups.List);
340 cp->Data.Newsgroups.List = NULL;
341 }
342 if (cp->Data.Distribution.Data != NULL) {
343 free(cp->Data.Distribution.Data);
344 cp->Data.Distribution.Data = NULL;
345 }
346 if (cp->Data.Distribution.List != NULL) {
347 free(cp->Data.Distribution.List);
348 cp->Data.Distribution.List = NULL;
349 }
350 if (cp->Data.Path.Data != NULL) {
351 free(cp->Data.Path.Data);
352 cp->Data.Path.Data = NULL;
353 }
354 if (cp->Data.Path.List != NULL) {
355 free(cp->Data.Path.List);
356 cp->Data.Path.List = NULL;
357 }
358 if (cp->Data.Overview.size != 0) {
359 free(cp->Data.Overview.data);
360 cp->Data.Overview.data = NULL;
361 cp->Data.Overview.size = 0;
362 cp->Data.Overview.left = 0;
363 cp->Data.Overview.used = 0;
364 }
365 if (cp->Data.XrefBufLength != 0) {
366 free(cp->Data.Xref);
367 cp->Data.Xref = NULL;
368 cp->Data.XrefBufLength = 0;
369 }
370
371 /* If this was a peer who was connection-limited, take note that one of
372 their connections was just closed and possibly wake up another one.
373
374 FIXME: This is a butt-ugly way of handling this and a layering
375 violation to boot. This needs to happen in the NC code, possibly via a
376 channel closing callback. */
377 if (cp->MaxCnx > 0) {
378 char *label, *tmplabel;
379 int tfd;
380 CHANNEL *tempchan;
381
382 label = RClabelname(cp);
383 if (label != NULL) {
384 for (tfd = 0; tfd <= channels.max_fd; tfd++) {
385 if (tfd == cp->fd)
386 continue;
387 tempchan = &channels.table[tfd];
388 if (tempchan->fd < 0 || tempchan->Type != CTnntp)
389 continue;
390 tmplabel = RClabelname(tempchan);
391 if (tmplabel == NULL)
392 continue;
393 if (strcmp(label, tmplabel) == 0 && tempchan->ActiveCnx != 0) {
394 tempchan->ActiveCnx = cp->ActiveCnx;
395 RCHANadd(tempchan);
396 break;
397 }
398 }
399 }
400 }
401 }
402
403
404 /*
405 ** Close a channel.
406 */
407 void
CHANclose(CHANNEL * cp,const char * name)408 CHANclose(CHANNEL *cp, const char *name)
409 {
410 int i;
411
412 if (cp->Type == CTfree)
413 warn("%s internal closing free channel %d", name, cp->fd);
414 else {
415 if (cp->Type == CTnntp)
416 CHANclose_nntp(cp, name);
417 else if (cp->Type == CTreject)
418 notice("%s %ld", name, cp->Rejected); /* Use cp->Rejected for the response code. */
419 else if (cp->Out.left)
420 warn("%s closed lost %lu", name, (unsigned long) cp->Out.left);
421 else
422 notice("%s closed", name);
423 WCHANremove(cp);
424 RCHANremove(cp);
425 SCHANremove(cp);
426 if (cp->fd >= 0 && close(cp->fd) < 0)
427 syswarn("%s cant close %s", LogName, name);
428 for (i = 0; i < channels.prioritized_size; i++)
429 if (channels.prioritized[i] == cp)
430 channels.prioritized[i] = NULL;
431 }
432
433 /* Mark it unused. */
434 cp->Type = CTfree;
435 cp->State = CSerror;
436 cp->fd = -1;
437 if (cp->Argument != NULL)
438 free(cp->Argument);
439 cp->Argument = NULL;
440 cp->ActiveCnx = 0;
441
442 /* Free the buffers if they got big. */
443 if (cp->In.size > BIG_BUFFER) {
444 cp->In.size = 0;
445 cp->In.used = 0;
446 cp->In.left = 0;
447 free(cp->In.data);
448 cp->In.data = NULL;
449 }
450 if (cp->Out.size > BIG_BUFFER) {
451 cp->Out.size = 0;
452 cp->Out.used = 0;
453 cp->Out.left = 0;
454 free(cp->Out.data);
455 cp->Out.data = NULL;
456 }
457
458 /* Always free the Sendid buffer. */
459 if (cp->Sendid.size > 0) {
460 cp->Sendid.size = 0;
461 cp->Sendid.used = 0;
462 cp->Sendid.left = 0;
463 free(cp->Sendid.data);
464 cp->Sendid.data = NULL;
465 }
466
467 /* Free the space allocated for NNTP commands. */
468 if (cp->av != NULL) {
469 free(cp->av[0]);
470 free(cp->av);
471 cp->av = NULL;
472 }
473 }
474
475
476 /*
477 ** Return a printable name for the channel.
478 **
479 ** FIXME: Another layering violation. Each type of channel should register a
480 ** callback or a name rather than making the general code know how to name
481 ** channels.
482 */
483 char *
CHANname(CHANNEL * cp)484 CHANname(CHANNEL *cp)
485 {
486 int i;
487 SITE *sp;
488 const char *site;
489 pid_t pid;
490
491 switch (cp->Type) {
492 default:
493 snprintf(cp->Name, sizeof(cp->Name), "?%d(#%d@%ld)?", cp->Type,
494 cp->fd, (long) (cp - channels.table));
495 break;
496 case CTany:
497 snprintf(cp->Name, sizeof(cp->Name), "any:%d", cp->fd);
498 break;
499 case CTfree:
500 snprintf(cp->Name, sizeof(cp->Name), "free:%d", cp->fd);
501 break;
502 case CTremconn:
503 snprintf(cp->Name, sizeof(cp->Name), "remconn:%d", cp->fd);
504 break;
505 case CTreject:
506 snprintf(cp->Name, sizeof(cp->Name), "%s rejected",
507 cp->Address.ss_family == 0 ? "localhost" : RChostname(cp));
508 break;
509 case CTnntp:
510 snprintf(cp->Name, sizeof(cp->Name), "%s:%d",
511 cp->Address.ss_family == 0 ? "localhost" : RChostname(cp),
512 cp->fd);
513 break;
514 case CTlocalconn:
515 snprintf(cp->Name, sizeof(cp->Name), "localconn:%d", cp->fd);
516 break;
517 case CTcontrol:
518 snprintf(cp->Name, sizeof(cp->Name), "control:%d", cp->fd);
519 break;
520 case CTexploder:
521 case CTfile:
522 case CTprocess:
523 /* Find the site that has this channel. */
524 site = "?";
525 pid = 0;
526 for (i = nSites, sp = Sites; --i >= 0; sp++)
527 if (sp->Channel == cp) {
528 site = sp->Name;
529 if (cp->Type != CTfile)
530 pid = sp->pid;
531 break;
532 }
533 if (pid == 0)
534 snprintf(cp->Name, sizeof(cp->Name), "%s:%d:%s",
535 MaxLength(site, site), cp->fd,
536 cp->Type == CTfile ? "file" : "proc");
537 else
538 snprintf(cp->Name, sizeof(cp->Name), "%s:%d:%s:%ld",
539 MaxLength(site, site), cp->fd,
540 cp->Type == CTfile ? "file" : "proc", (long) pid);
541 break;
542 }
543 return cp->Name;
544 }
545
546
547 /*
548 ** Return the channel for a specified descriptor.
549 */
550 CHANNEL *
CHANfromdescriptor(int fd)551 CHANfromdescriptor(int fd)
552 {
553 if (fd < 0 || fd > channels.table_size)
554 return NULL;
555 return &channels.table[fd];
556 }
557
558
559 /*
560 ** Iterate over all channels of a specified type. The next channel is
561 ** returned and its index is put into ip, which serves as the cursor.
562 **
563 ** FIXME: Make ip an opaque cursor.
564 */
565 CHANNEL *
CHANiter(int * ip,enum channel_type type)566 CHANiter(int *ip, enum channel_type type)
567 {
568 CHANNEL *cp;
569 int i;
570
571 for (i = *ip; i >= 0 && i < channels.table_size; i++) {
572 cp = &channels.table[i];
573 if (cp->Type == CTfree && cp->fd == -1)
574 continue;
575 if (type == CTany || cp->Type == type) {
576 *ip = ++i;
577 return cp;
578 }
579 }
580 return NULL;
581 }
582
583
584 /*
585 ** When removing a channel from the read and write masks, we want to lower
586 ** the last file descriptor if we removed the highest one. Called from both
587 ** RCHANremove and WCHANremove.
588 */
589 static void
CHANresetlast(int fd)590 CHANresetlast(int fd)
591 {
592 if (fd == channels.max_fd)
593 while ( !FD_ISSET(channels.max_fd, &channels.read_set)
594 && !FD_ISSET(channels.max_fd, &channels.write_set)
595 && channels.max_fd > 1)
596 channels.max_fd--;
597 }
598
599
600 /*
601 ** When removing a channel from the sleep mask, we want to lower the last
602 ** file descriptor if we removed the highest one. Called from SCHANremove.
603 */
604 static void
CHANresetlastsleeping(int fd)605 CHANresetlastsleeping(int fd)
606 {
607 if (fd == channels.max_sleep_fd) {
608 while ( !FD_ISSET(channels.max_sleep_fd, &channels.sleep_set)
609 && channels.max_sleep_fd > 1)
610 channels.max_sleep_fd--;
611 }
612 }
613
614
615 /*
616 ** Mark a channel as an active reader.
617 */
618 void
RCHANadd(CHANNEL * cp)619 RCHANadd(CHANNEL *cp)
620 {
621 FD_SET(cp->fd, &channels.read_set);
622 if (cp->fd > channels.max_fd)
623 channels.max_fd = cp->fd;
624
625 /* For non-NNTP channels, start reading at the beginning of the buffer. */
626 if (cp->Type != CTnntp)
627 cp->In.used = 0;
628 }
629
630
631 /*
632 ** Remove a channel from the set of readers.
633 */
634 void
RCHANremove(CHANNEL * cp)635 RCHANremove(CHANNEL *cp)
636 {
637 if (FD_ISSET(cp->fd, &channels.read_set)) {
638 FD_CLR(cp->fd, &channels.read_set);
639 CHANresetlast(cp->fd);
640 }
641 }
642
643
644 /*
645 ** Put a channel to sleep and call a function when it wakes. Note that arg
646 ** must be NULL or allocated memory, since it will be freed later.
647 */
648 void
SCHANadd(CHANNEL * cp,time_t wake,void * event,innd_callback_func waker,void * arg)649 SCHANadd(CHANNEL *cp, time_t wake, void *event, innd_callback_func waker,
650 void *arg)
651 {
652 if (!CHANsleeping(cp)) {
653 channels.sleep_count++;
654 FD_SET(cp->fd, &channels.sleep_set);
655 }
656 if (cp->fd > channels.max_sleep_fd)
657 channels.max_sleep_fd = cp->fd;
658 cp->Waketime = wake;
659 cp->Waker = waker;
660 if (cp->Argument != arg) {
661 free(cp->Argument);
662 cp->Argument = arg;
663 }
664 cp->Event = event;
665 }
666
667
668 /*
669 ** Take a channel off the sleep list.
670 */
671 void
SCHANremove(CHANNEL * cp)672 SCHANremove(CHANNEL *cp)
673 {
674 if (!CHANsleeping(cp))
675 return;
676 FD_CLR(cp->fd, &channels.sleep_set);
677 channels.sleep_count--;
678 cp->Waketime = 0;
679
680 /* If this was the highest descriptor, get a new highest. */
681 CHANresetlastsleeping(cp->fd);
682 }
683
684
685 /*
686 ** Is a channel on the sleep list?
687 */
688 bool
CHANsleeping(CHANNEL * cp)689 CHANsleeping(CHANNEL *cp)
690 {
691 return FD_ISSET(cp->fd, &channels.sleep_set);
692 }
693
694
695 /*
696 ** Wake up channels waiting for a specific event. (Or rather, mark them
697 ** ready to wake up; they'll actually be woken up the next time through the
698 ** main loop.)
699 */
700 void
SCHANwakeup(void * event)701 SCHANwakeup(void *event)
702 {
703 CHANNEL *cp;
704 int i;
705
706 for (cp = channels.table, i = channels.table_size; --i >= 0; cp++)
707 if (cp->Type != CTfree && cp->Event == event && CHANsleeping(cp))
708 cp->Waketime = 0;
709 }
710
711
712 /*
713 ** Mark a channel as an active writer. Don't reset the Out->left field
714 ** since we could have buffered I/O already in there.
715 */
716 void
WCHANadd(CHANNEL * cp)717 WCHANadd(CHANNEL *cp)
718 {
719 if (cp->Out.left > 0) {
720 FD_SET(cp->fd, &channels.write_set);
721 if (cp->fd > channels.max_fd)
722 channels.max_fd = cp->fd;
723 }
724 }
725
726
727 /*
728 ** Remove a channel from the set of writers.
729 */
730 void
WCHANremove(CHANNEL * cp)731 WCHANremove(CHANNEL *cp)
732 {
733 if (FD_ISSET(cp->fd, &channels.write_set)) {
734 FD_CLR(cp->fd, &channels.write_set);
735 CHANresetlast(cp->fd);
736
737 /* No data left -- reset used so we don't grow the buffer. */
738 if (cp->Out.left <= 0) {
739 cp->Out.used = 0;
740 cp->Out.left = 0;
741 }
742 }
743 }
744
745
746 /*
747 ** Set a channel to start off with the contents of an existing channel.
748 */
749 void
WCHANsetfrombuffer(CHANNEL * cp,struct buffer * bp)750 WCHANsetfrombuffer(CHANNEL *cp, struct buffer *bp)
751 {
752 WCHANset(cp, &bp->data[bp->used], bp->left);
753 }
754
755
756 /*
757 ** Internal function to resize the in buffer for a given channel to the new
758 ** size given, adjusting pointers as required.
759 */
760 static void
CHANresize(CHANNEL * cp,size_t size)761 CHANresize(CHANNEL *cp, size_t size)
762 {
763 struct buffer *bp;
764 char *p;
765 size_t change;
766 ptrdiff_t offset;
767 int i;
768 HDRCONTENT *hc = cp->Data.HdrContent;
769
770 bp = &cp->In;
771 change = size - bp->size;
772 bp->size = size;
773 bp->left += change;
774 p = bp->data;
775
776 /* Reallocate the buffer and adjust offets if realloc moved the location
777 of the memory region. Only adjust offets if we're in a state where we
778 care about the header contents.
779
780 FIXME: This is invalid C, although it will work on most (all?) common
781 systems. The pointers need to be reduced to offets and then turned
782 back into relative pointers rather than adjusting the pointers
783 directly, since as soon as realloc is called, pointers into the old
784 space become invalid and may not be used further, even for arithmetic.
785 (Not to mention that two pointers to different objects may not be
786 compared and arithmetic may not be performed on them. */
787 TMRstart(TMR_DATAMOVE);
788 bp->data = xrealloc(bp->data, bp->size);
789 offset = p - bp->data;
790 if (offset != 0) {
791 if (cp->State == CSgetheader || cp->State == CSgetbody ||
792 cp->State == CSeatarticle) {
793 if (cp->Data.BytesHeader != NULL)
794 cp->Data.BytesHeader -= offset;
795 for (i = 0; i < MAX_ARTHEADER; i++, hc++) {
796 if (hc->Value != NULL)
797 hc->Value -= offset;
798 }
799 }
800 }
801 TMRstop(TMR_DATAMOVE);
802 }
803
804
805 /*
806 ** Read in text data, return the amount we read.
807 */
808 int
CHANreadtext(CHANNEL * cp)809 CHANreadtext(CHANNEL *cp)
810 {
811 struct buffer *bp;
812 char *name;
813 int oerrno, maxbyte;
814 ssize_t count;
815
816 /* Grow buffer if we're getting close to current limit.
817
818 FIXME: The In buffer doesn't use the normal meanings of .used and
819 .left. */
820 bp = &cp->In;
821 bp->left = bp->size - bp->used;
822 if (bp->left <= LOW_WATER)
823 CHANresize(cp, bp->size + GROW_AMOUNT(bp->size));
824
825 /* Read in whatever is there, up to some reasonable limit.
826
827 We want to limit the amount of time devoted to processing the incoming
828 data for any given channel. There's no easy way of doing that, though,
829 so we restrict the data size instead.
830
831 If the data is part of a single large article, then reading and
832 processing many kilobytes at a time costs very little. If the data is
833 a long list of CHECK commands from a streaming feed, then every line of
834 data will require a history lookup, and we probably don't want to do
835 more than about 10 of those per channel on each cycle of the main
836 select() loop (otherwise we might take too long before giving other
837 channels a turn). 10 lines of CHECK commands suggests a limit of about
838 1KB of data, or less, which is the innconf->maxcmdreadsize default. If
839 innconf->maxcmdreadsize is 0, there is no limit.
840
841 Reduce the read size only if we're reading commands.
842
843 FIXME: A better approach would be to limit the number of commands we
844 process for each channel. */
845 if (innconf->maxcmdreadsize == 0 || cp->State != CSgetcmd
846 || bp->left < innconf->maxcmdreadsize)
847 maxbyte = bp->left;
848 else
849 maxbyte = innconf->maxcmdreadsize;
850 TMRstart(TMR_NNTPREAD);
851 count = read(cp->fd, &bp->data[bp->used], maxbyte);
852 TMRstop(TMR_NNTPREAD);
853
854 /* Solaris (at least 2.4 through 2.6) will occasionally return EAGAIN in
855 response to a read even if the file descriptor already selected true
856 for reading, apparently due to some internal resource exhaustion. In
857 that case, return -2, which will drop back out to the main loop and go
858 on to the next file descriptor, as if the descriptor never selected
859 true. This check will probably never trigger on platforms other than
860 Solaris. */
861 if (count < 0) {
862 if (errno == EAGAIN)
863 return -2;
864 oerrno = errno;
865 name = CHANname(cp);
866 errno = oerrno;
867 sysnotice("%s cant read", name);
868 return -1;
869 }
870 if (count == 0) {
871 name = CHANname(cp);
872 notice("%s readclose", name);
873 return 0;
874 }
875 bp->used += count;
876 bp->left -= count;
877 return count;
878 }
879
880
881 /*
882 ** Internal function to write out channel data.
883 **
884 ** If I/O backs up a lot, we can get EMSGSIZE on some systems. If that
885 ** happens we want to do the I/O in chunks. We assume stdio's BUFSIZ is a
886 ** good chunk value.
887 **
888 ** FIXME: Roll this code into xwrite.
889 */
890 static int
CHANwrite(int fd,const char * data,long length)891 CHANwrite(int fd, const char *data, long length)
892 {
893 ssize_t count;
894 const char *p;
895
896 /* Try the standard case -- write it all. */
897 do {
898 count = write(fd, data, length);
899 } while (count < 0 && errno == EINTR);
900 if (count > 0 || (count < 0 && errno != EMSGSIZE))
901 return count;
902
903 /* We got EMSGSIZE, so write it in pieces. */
904 for (p = data, count = 0; length > 0; p += count, length -= count) {
905 count = write(fd, p, (length > BUFSIZ ? BUFSIZ : length));
906 if (count < 0 && errno == EINTR) {
907 count = 0;
908 continue;
909 }
910 if (count <= 0)
911 break;
912 }
913
914 /* Return error, or partial results if we got something. */
915 return (p == data) ? count : (p - data);
916 }
917
918
919 /*
920 ** Try to flush out the buffer. Use this only on file channels!
921 */
922 bool
WCHANflush(CHANNEL * cp)923 WCHANflush(CHANNEL *cp)
924 {
925 struct buffer *bp;
926 ssize_t count;
927
928 /* Write it. */
929 for (bp = &cp->Out; bp->left > 0; bp->left -= count, bp->used += count) {
930 count = CHANwrite(cp->fd, &bp->data[bp->used], bp->left);
931 if (count <= 0) {
932 syswarn("%s cant flush count %lu", CHANname(cp),
933 (unsigned long) bp->left);
934 return false;
935 }
936 if (count == 0) {
937 warn("%s cant flush count %lu", CHANname(cp),
938 (unsigned long) bp->left);
939 return false;
940 }
941 }
942 WCHANremove(cp);
943 return true;
944 }
945
946
947 /*
948 ** Standard wakeup routine called after a write channel was put to sleep.
949 */
950 static void
CHANwakeup(CHANNEL * cp)951 CHANwakeup(CHANNEL *cp)
952 {
953 notice("%s wakeup", CHANname(cp));
954 WCHANadd(cp);
955 }
956
957
958 /*
959 ** Attempting to write would block; stop output or give up.
960 */
961 static void
CHANwritesleep(CHANNEL * cp,const char * name)962 CHANwritesleep(CHANNEL *cp, const char *name)
963 {
964 unsigned long bad, delay;
965
966 bad = ++(cp->BlockedWrites);
967 if (bad > innconf->badiocount)
968 switch (cp->Type) {
969 default:
970 break;
971 case CTreject:
972 case CTnntp:
973 case CTfile:
974 case CTexploder:
975 case CTprocess:
976 warn("%s blocked closing", name);
977 SITEchanclose(cp);
978 CHANclose(cp, name);
979 return;
980 }
981 delay = bad * innconf->blockbackoff;
982 warn("%s blocked sleeping %lu", name, delay);
983 SCHANadd(cp, Now.tv_sec + delay, NULL, CHANwakeup, NULL);
984 }
985
986
987 /*
988 ** We got an unknown error in select. Find out the culprit. This is not
989 ** enabled yet; it's been disabled for years, and is expensive.
990 */
991 static void UNUSED
CHANdiagnose(void)992 CHANdiagnose(void)
993 {
994 fd_set test;
995 int fd;
996 struct timeval tv;
997
998 FD_ZERO(&test);
999 for (fd = channels.max_fd; fd >= 0; fd--) {
1000 if (FD_ISSET(fd, &channels.read_set)) {
1001 FD_SET(fd, &test);
1002 tv.tv_sec = 0;
1003 tv.tv_usec = 0;
1004 if (select(fd + 1, &test, NULL, NULL, &tv) < 0 && errno != EINTR) {
1005 warn("%s bad read file %d", LogName, fd);
1006 FD_CLR(fd, &channels.read_set);
1007 /* Probably do something about the file descriptor here; call
1008 CHANclose on it? */
1009 }
1010 FD_CLR(fd, &test);
1011 }
1012 if (FD_ISSET(fd, &channels.write_set)) {
1013 FD_SET(fd, &test);
1014 tv.tv_sec = 0;
1015 tv.tv_usec = 0;
1016 if (select(fd + 1, NULL, &test, NULL, &tv) < 0 && errno != EINTR) {
1017 warn("%s bad write file %d", LogName, fd);
1018 FD_CLR(fd, &channels.write_set);
1019 /* Probably do something about the file descriptor here; call
1020 CHANclose on it? */
1021 }
1022 FD_CLR(fd, &test);
1023 }
1024 }
1025 }
1026
1027
1028 /*
1029 ** Count the number of active connections for a given peer.
1030 **
1031 ** FIXME: This seems like an overly cumbersome way to do this, and also seems
1032 ** like a layering violation. Do we really have to do this here?
1033 */
1034 void
CHANcount_active(CHANNEL * cp)1035 CHANcount_active(CHANNEL *cp)
1036 {
1037 int found;
1038 CHANNEL *tempchan;
1039 char *label, *tmplabel;
1040 int fd;
1041
1042 if (cp->fd < 0 || cp->Type != CTnntp || cp->ActiveCnx == 0)
1043 return;
1044 found = 1;
1045 label = RClabelname(cp);
1046 if (label == NULL)
1047 return;
1048 for (fd = 0; fd <= channels.max_fd; fd++) {
1049 tempchan = &channels.table[fd];
1050 tmplabel = RClabelname(tempchan);
1051 if (tmplabel == NULL)
1052 continue;
1053 if (strcmp(label, tmplabel) == 0 && tempchan->ActiveCnx != 0)
1054 found++;
1055 }
1056 cp->ActiveCnx = found;
1057 }
1058
1059
1060 /*
1061 ** Handle a file descriptor that selects ready to read. Dispatch to the
1062 ** reader function, and then resize its input buffer if needed.
1063 */
1064 static void
CHANhandle_read(CHANNEL * cp)1065 CHANhandle_read(CHANNEL *cp)
1066 {
1067 size_t size;
1068
1069 if (cp->Type == CTfree) {
1070 warn("%s %d free but was in RMASK", CHANname(cp), cp->fd);
1071 RCHANremove(cp);
1072 close(cp->fd);
1073 cp->fd = -1;
1074 return;
1075 }
1076 cp->LastActive = Now.tv_sec;
1077 (*cp->Reader)(cp);
1078
1079 /* Check and see if the buffer is grossly overallocated and shrink if
1080 needed. */
1081 if (cp->In.size <= BIG_BUFFER)
1082 return;
1083 if (cp->In.used == 0)
1084 CHANresize(cp, START_BUFF_SIZE);
1085 else if ((cp->In.size / cp->In.used) > 10) {
1086 size = cp->In.used * 2;
1087 if (size < START_BUFF_SIZE)
1088 size = START_BUFF_SIZE;
1089 CHANresize(cp, size);
1090 }
1091 }
1092
1093
1094 /*
1095 ** Handle a file descriptor that selects ready to write. Write out the
1096 ** pending data.
1097 */
1098 static void
CHANhandle_write(CHANNEL * cp)1099 CHANhandle_write(CHANNEL *cp)
1100 {
1101 struct buffer *bp;
1102 ssize_t count;
1103 int oerrno;
1104 const char *name;
1105
1106 if (cp->Type == CTfree) {
1107 warn("%s %d free but was in WMASK", CHANname(cp), cp->fd);
1108 WCHANremove(cp);
1109 close(cp->fd);
1110 cp->fd = -1;
1111 return;
1112 }
1113 bp = &cp->Out;
1114 if (bp->left == 0) {
1115 /* Should not be possible. */
1116 WCHANremove(cp);
1117 return;
1118 }
1119 cp->LastActive = Now.tv_sec;
1120 count = CHANwrite(cp->fd, &bp->data[bp->used], bp->left);
1121 if (count <= 0) {
1122 oerrno = errno;
1123 name = CHANname(cp);
1124 errno = oerrno;
1125 if (oerrno == EWOULDBLOCK)
1126 oerrno = EAGAIN;
1127 if (count < 0)
1128 sysnotice("%s cant write", name);
1129 else
1130 notice("%s cant write", name);
1131 cp->BadWrites++;
1132 if (count < 0 && oerrno == EPIPE) {
1133 SITEchanclose(cp);
1134 CHANclose(cp, name);
1135 } else if (count < 0 && oerrno == EAGAIN) {
1136 WCHANremove(cp);
1137 CHANwritesleep(cp, name);
1138 } else if (cp->BadWrites >= innconf->badiocount) {
1139 warn("%s sleeping", name);
1140 WCHANremove(cp);
1141 SCHANadd(cp, Now.tv_sec + innconf->pauseretrytime, NULL,
1142 CHANwakeup, NULL);
1143 }
1144 } else {
1145 cp->BadWrites = 0;
1146 cp->BlockedWrites = 0;
1147 bp->left -= count;
1148 bp->used += count;
1149 if (bp->left > 0)
1150 buffer_compact(bp);
1151 else {
1152 WCHANremove(cp);
1153 (*cp->WriteDone)(cp);
1154 }
1155 }
1156 }
1157
1158
1159 /*
1160 ** Main I/O loop. Wait for data, call the channel's handler when there is
1161 ** something to read or when the queued write is finished. In order to be
1162 ** fair (i.e., don't always give descriptor n priority over n+1), we remember
1163 ** where we last had something and pick up from there.
1164 **
1165 ** Yes, the main code has really wandered over to the side a lot.
1166 */
1167 void
CHANreadloop(void)1168 CHANreadloop(void)
1169 {
1170 int i, startpoint, count, lastfd;
1171 CHANNEL *cp;
1172 fd_set rdfds, wrfds;
1173 struct timeval tv;
1174 unsigned long silence;
1175 const char *name;
1176 time_t last_sync;
1177 int fd = 0;
1178
1179 STATUSinit();
1180 gettimeofday(&Now, NULL);
1181 last_sync = Now.tv_sec;
1182
1183 while (1) {
1184 /* See if any processes died. */
1185 PROCscan();
1186
1187 /* Wait for data, note the time. */
1188 rdfds = channels.read_set;
1189 wrfds = channels.write_set;
1190 tv = TimeOut;
1191 if (innconf->timer != 0) {
1192 unsigned long now = TMRnow();
1193
1194 if (now < 1000 * innconf->timer)
1195 tv.tv_sec = innconf->timer - now / 1000;
1196 else {
1197 TMRsummary("ME", timer_name);
1198 InndHisLogStats();
1199 tv.tv_sec = innconf->timer;
1200 }
1201 }
1202
1203 /* Mask signals when not in select to prevent a signal handler
1204 from accessing data that the main code is mutating. */
1205 TMRstart(TMR_IDLE);
1206 xsignal_unmask();
1207 count = select(channels.max_fd + 1, &rdfds, &wrfds, NULL, &tv);
1208 xsignal_mask();
1209 TMRstop(TMR_IDLE);
1210
1211 if (count < 0) {
1212 if (errno != EINTR) {
1213 syswarn("%s cant select", LogName);
1214 #ifdef INND_FIND_BAD_FDS
1215 CHANdiagnose();
1216 #endif
1217 }
1218 continue;
1219 }
1220
1221 STATUSmainloophook();
1222 if (GotTerminate) {
1223 #ifdef DO_PERL
1224 PLmode(Mode, OMshutdown, (char *) "exiting due to signal");
1225 #endif
1226 #ifdef DO_PYTHON
1227 PYmode(Mode, OMshutdown, (char *) "exiting due to signal");
1228 #endif
1229 notice("%s exiting due to signal", LogName);
1230 CleanupAndExit(0, NULL);
1231 }
1232
1233 /* Update the "reasonably accurate" time. */
1234 gettimeofday(&Now, NULL);
1235 if (Now.tv_sec > last_sync + TimeOut.tv_sec) {
1236 HISsync(History);
1237 if (ICDactivedirty != 0) {
1238 ICDwriteactive();
1239 ICDactivedirty = 0;
1240 }
1241 last_sync = Now.tv_sec;
1242 }
1243
1244 /* If no channels are active, flush and skip if nobody's sleeping. */
1245 if (count == 0) {
1246 if (Mode == OMrunning)
1247 ICDwrite();
1248 if (channels.sleep_count == 0)
1249 continue;
1250 }
1251
1252 /* Try the prioritized channels first. */
1253 for (i = 0; i < channels.prioritized_size; i++) {
1254 int pfd;
1255
1256 if (channels.prioritized[i] == NULL)
1257 continue;
1258 pfd = channels.prioritized[i]->fd;
1259 if (FD_ISSET(pfd, &channels.read_set) && FD_ISSET(pfd, &rdfds)) {
1260 count--;
1261 if (count > 4)
1262 count = 4; /* might be more requests */
1263 (*channels.prioritized[i]->Reader)(channels.prioritized[i]);
1264 FD_CLR(pfd, &rdfds);
1265 }
1266 }
1267
1268 /* Loop through all active channels. Somebody could have closed a
1269 channel so we double-check the global mask before looking at what
1270 select returned. The code here is written so that a channel could
1271 be reading and writing and sleeping at the same time, even though
1272 that's not possible. (Just as well, since in SysVr4 the count
1273 would be wrong.) */
1274 lastfd = channels.max_fd;
1275 if (lastfd < channels.max_sleep_fd)
1276 lastfd = channels.max_sleep_fd;
1277 if (fd > lastfd)
1278 fd = 0;
1279 startpoint = fd;
1280 do {
1281 cp = &channels.table[fd];
1282
1283 /* Check to see if this peer has too many open connections, and if
1284 so, either close or make inactive this connection. */
1285 if (cp->Type == CTnntp && cp->MaxCnx > 0 && cp->HoldTime > 0) {
1286 CHANcount_active(cp);
1287 if (cp->ActiveCnx > cp->MaxCnx && cp->fd > 0) {
1288 if (cp->Started + cp->HoldTime < Now.tv_sec)
1289 CHANclose(cp, CHANname(cp));
1290 else {
1291 if (fd >= lastfd)
1292 fd = 0;
1293 else
1294 fd++;
1295 cp->ActiveCnx = 0;
1296 RCHANremove(cp);
1297 }
1298 continue;
1299 }
1300 }
1301
1302 /* Anything to read? */
1303 if (FD_ISSET(fd, &channels.read_set) && FD_ISSET(fd, &rdfds)) {
1304 count--;
1305 CHANhandle_read(cp);
1306 }
1307
1308 /* Possibly recheck for dead children so we don't get SIGPIPE on
1309 readerless channels. */
1310 if (PROCneedscan)
1311 PROCscan();
1312
1313 /* Ready to write? */
1314 if (FD_ISSET(fd, &channels.write_set) && FD_ISSET(fd, &wrfds)) {
1315 count--;
1316 CHANhandle_write(cp);
1317 }
1318
1319 /* Coming off a sleep? */
1320 if (FD_ISSET(fd, &channels.sleep_set)
1321 && cp->Waketime <= Now.tv_sec) {
1322 if (cp->Type == CTfree) {
1323 warn("%s %d free but was in SMASK", CHANname(cp), fd);
1324 FD_CLR(fd, &channels.sleep_set);
1325 channels.sleep_count--;
1326 CHANresetlastsleeping(fd);
1327 close(fd);
1328 cp->fd = -1;
1329 } else {
1330 cp->LastActive = Now.tv_sec;
1331 SCHANremove(cp);
1332 if (cp->Waker != NULL) {
1333 (*cp->Waker)(cp);
1334 } else {
1335 name = CHANname(cp);
1336 warn("%s %d sleeping without Waker", name, fd);
1337 SITEchanclose(cp);
1338 CHANclose(cp, name);
1339 }
1340 }
1341 }
1342
1343 /* Toss CTreject channel early if it's inactive. */
1344 if (cp->Type == CTreject
1345 && cp->LastActive + REJECT_TIMEOUT < Now.tv_sec) {
1346 name = CHANname(cp);
1347 notice("%s timeout reject", name);
1348 CHANclose(cp, name);
1349 }
1350
1351 /* Has this channel been inactive very long? */
1352 if (cp->Type == CTnntp
1353 && cp->LastActive + cp->NextLog < Now.tv_sec) {
1354 name = CHANname(cp);
1355 silence = Now.tv_sec - cp->LastActive;
1356 cp->NextLog += innconf->chaninacttime;
1357 notice("%s inactive %ld", name, silence / 60L);
1358 if (silence > innconf->peertimeout) {
1359 notice("%s timeout", name);
1360 CHANclose(cp, name);
1361 }
1362 }
1363
1364 /* Bump pointer, modulo the table size. */
1365 if (fd >= lastfd)
1366 fd = 0;
1367 else
1368 fd++;
1369
1370 /* If there is nothing to do, break out.
1371
1372 FIXME: Do we really want to keep going until sleep_count is
1373 zero? That means that when there are sleeping channels, we do
1374 a full traversal every time through the select loop. */
1375 if (count == 0 && channels.sleep_count == 0)
1376 break;
1377 } while (fd != startpoint);
1378 }
1379 }
1380