1 /*
2 **  The implementation of the innfeed Tape class.
3 **
4 **  Written by James Brister <brister@vix.com>
5 **
6 **  The implementation of the Tape class. Tapes are read-only or write-only
7 **  files that are accessed sequentially. Their basic unit of i/o is an
8 **  Article. Tapes work out of a single directory and manage all file names
9 **  themselves.
10 **
11 **  Tapes will checkpoint themselves periodically so that when innfeed exits
12 **  or crashes things can restart close to where they were last. The period
13 **  checkpointing is handled entirely by the Tape class, but the checkpoint
14 **  period needs to be set by some external user before the first tape is
15 **  created.
16 */
17 
18 #include "portable/system.h"
19 
20 #include "innfeed.h"
21 
22 #include <assert.h>
23 #include <ctype.h>
24 #include <errno.h>
25 #include <sys/param.h>
26 #include <sys/stat.h>
27 #include <syslog.h>
28 
29 #include <dirent.h>
30 typedef struct dirent DIRENTRY;
31 
32 #ifdef HAVE_SYS_TIME_H
33 #    include <sys/time.h>
34 #endif
35 #include <time.h>
36 
37 #include "inn/innconf.h"
38 #include "inn/libinn.h"
39 #include "inn/messages.h"
40 
41 #include "article.h"
42 #include "configfile.h"
43 #include "endpoint.h"
44 #include "host.h"
45 #include "tape.h"
46 
47 #if defined(INNFEED_DEBUG)
48 /* A structure for temporary storage of articles. */
49 typedef struct q_e_s {
50     Article article;
51     struct q_e_s *next;
52 } * QueueElem;
53 #endif
54 
55 /* The Tape class type. */
56 struct tape_s {
57     /* the pathname of the file the administrator can drop in by hand. */
58     char *handFilename;
59 
60     /* the pathname of the file the Tape will read from */
61     char *inputFilename;
62 
63     /* the pathname of the file the Tape will write to. */
64     char *outputFilename;
65 
66     /* the pathname of the file used in locking */
67     char *lockFilename;
68 
69     /* the peer we're doing this for. */
70     char *peerName;
71 
72     FILE *inFp;  /* input FILE */
73     FILE *outFp; /* output FILE */
74 
75     time_t lastRotated; /* time files last got switched */
76     bool checkNew;      /* set bool when we need to check for
77                            hand-crafted file. */
78 
79 #if defined(INNFEED_DEBUG)
80     /* The tape holds a small output queue in memory to avoid thrashing. */
81     QueueElem head;
82     QueueElem tail;
83     unsigned int qLength; /* amount on the queue */
84 #endif
85 
86     long outputSize; /* the current size of the output file. */
87     long lossage;
88 
89     /* the number of bytes we try to keep the output under. We actually
90        wait for the outputSize to get 10% greater than this amount before
91        shrinking the file down again. A value of zero means no limit. */
92     long outputLowLimit;
93     long outputHighLimit;
94     double backlogFactor;
95 
96     bool scribbled; /* have we scribbled a checkpoint value in
97                        the file. */
98     long tellpos;   /* for input file checkpointing. */
99     bool changed;   /* true if tape was read since last
100                        checkpoint or start. */
101 
102     /* true if articles that are output are NOT later input. */
103     bool noRotate;
104 
105     /* true if no articles should ever be spooled */
106     bool noBacklog;
107 };
108 
109 
110 static void checkpointTape(Tape tape);
111 static void removeTapeGlobally(Tape tape);
112 static void addTapeGlobally(Tape tape);
113 static void prepareFiles(Tape tape);
114 static void tapeCkNewFileCbk(TimeoutId id, void *d);
115 static void tapeCheckpointCallback(TimeoutId id, void *d);
116 #if defined(INNFEED_DEBUG)
117 static void flushTape(Tape tape);
118 #endif
119 static void tapesSetCheckNew(void);
120 static void initTape(Tape nt);
121 static void tapeCleanup(void);
122 
123 
124 /* pathname of directory we store tape files in. */
125 static char *tapeDirectory;
126 
127 /* the callback ID of the checkpoint timer callback. */
128 static TimeoutId checkPtId;
129 static TimeoutId ckNewFileId;
130 
131 /* number of seconds between tape checkpoints. */
132 static unsigned int tapeCkPtPeriod;
133 static unsigned int tapeCkNewFilePeriod;
134 
135 static time_t rotatePeriod = TAPE_ROTATE_PERIOD;
136 
137 /* global list of tapes so we can checkpoint them periodically */
138 static Tape *activeTapes;
139 
140 /* Size of the activeTapes array */
141 static size_t activeTapeSize;
142 
143 /* index of last element in activeTapes that's being used. */
144 static size_t activeTapeIdx;
145 
146 #if defined(INNFEED_DEBUG)
147 /* default limit of the size of output tapes. */
148 static long defaultSizeLimit;
149 #endif
150 
151 static unsigned int tapeHighwater;
152 
153 bool debugShrinking = false;
154 
155 
156 /* callback when config file is loaded */
157 int
tapeConfigLoadCbk(void * data)158 tapeConfigLoadCbk(void *data)
159 {
160     int rval = 1;
161     long iv;
162     int bv;
163     FILE *fp = (FILE *) data;
164     char *dir, *p;
165 
166     if (getString(topScope, "backlog-directory", &p, NO_INHERIT)) {
167         dir = concatpath(innconf->pathspool, p);
168         free(p);
169         if (tapeDirectory != NULL && strcmp(tapeDirectory, dir) != 0) {
170             warn("ME config: cannot change backlog-directory of a running"
171                  " process");
172             free(dir);
173             dir = xstrdup(tapeDirectory);
174         }
175 
176         if (!isDirectory(dir) && isDirectory(dflTapeDir)) {
177             logOrPrint(LOG_ERR, fp,
178                        "ME config: definition of backlog-directory (%s) is a"
179                        " non-existant directory. Using %s",
180                        dir, dflTapeDir);
181             free(dir);
182             dir = xstrdup(dflTapeDir);
183         } else if (!isDirectory(dir))
184             logAndExit(1, "ME config: no usable value for backlog-directory");
185     } else if (!isDirectory(dflTapeDir)) {
186         logAndExit(1, "ME config: no usable value for backlog-directory");
187         /* NOTREACHED */
188         return -1;
189     } else
190         dir = xstrdup(dflTapeDir);
191 
192     if (tapeDirectory != NULL)
193         free(tapeDirectory);
194     tapeDirectory = dir;
195 
196 
197     if (getInteger(topScope, "backlog-highwater", &iv, NO_INHERIT)) {
198         if (iv < 0) {
199             rval = 0;
200             logOrPrint(LOG_ERR, fp,
201                        "ME config: value of %s (%ld) in %s cannot be less"
202                        " than 0. Using %ld",
203                        "backlog-highwater", iv, "global scope",
204                        (long) TAPE_HIGHWATER);
205             iv = TAPE_HIGHWATER;
206         }
207     } else
208         iv = TAPE_HIGHWATER;
209     tapeHighwater = (unsigned int) iv;
210 
211 
212     if (getInteger(topScope, "backlog-rotate-period", &iv, NO_INHERIT)) {
213         if (iv < 0) {
214             rval = 0;
215             logOrPrint(LOG_ERR, fp,
216                        "ME config: value of %s (%ld) in %s cannot be less"
217                        " than 0. Using %ld",
218                        "backlog-rotate-period", iv, "global scope",
219                        (long) TAPE_ROTATE_PERIOD);
220             iv = TAPE_ROTATE_PERIOD;
221         }
222     } else
223         iv = TAPE_ROTATE_PERIOD;
224     rotatePeriod = (unsigned int) iv;
225 
226 
227     if (getInteger(topScope, "backlog-ckpt-period", &iv, NO_INHERIT)) {
228         if (iv < 0) {
229             rval = 0;
230             logOrPrint(LOG_ERR, fp,
231                        "ME config: value of %s (%ld) in %s cannot be less"
232                        " than 0. Using %ld",
233                        "backlog-ckpt-period", iv, "global scope",
234                        (long) TAPE_CHECKPOINT_PERIOD);
235             iv = TAPE_CHECKPOINT_PERIOD;
236         }
237     } else
238         iv = TAPE_CHECKPOINT_PERIOD;
239     tapeCkPtPeriod = (unsigned int) iv;
240 
241 
242     if (getInteger(topScope, "backlog-newfile-period", &iv, NO_INHERIT)) {
243         if (iv < 0) {
244             rval = 0;
245             logOrPrint(LOG_ERR, fp,
246                        "ME config: value of %s (%ld) in %s cannot be less"
247                        " than 0. Using %ld",
248                        "backlog-newfile-period", iv, "global scope",
249                        (long) TAPE_NEWFILE_PERIOD);
250             iv = TAPE_NEWFILE_PERIOD;
251         }
252     } else
253         iv = TAPE_NEWFILE_PERIOD;
254     tapeCkNewFilePeriod = (unsigned int) iv;
255 
256 
257     if (getBool(topScope, "debug-shrinking", &bv, NO_INHERIT))
258         debugShrinking = (bv ? true : false);
259 
260     return rval;
261 }
262 
263 
264 /* Create a new Tape object. There are three potential files involved in
265    I/O. 'peerName' is what  the admin may have dropped in by
266    hand. 'peerName.input' is the file that was being used as input the last
267    time things were run. 'peerName.output' is the file that was being used
268    as output. The file 'peerName' is appended to 'peerName.input' (or
269    renamed if 'peerName.input' doesn't exist). Then 'peerName.output' is
270    appeneded (or renamed) to 'peerName.input' */
271 
272 static bool inited = false;
273 Tape
newTape(const char * peerName,bool dontRotate)274 newTape(const char *peerName, bool dontRotate)
275 {
276     Tape nt = xmalloc(sizeof(struct tape_s));
277     size_t pLen = strlen(peerName);
278     size_t dLen = strlen(tapeDirectory);
279 
280     if (!inited) {
281         inited = true;
282         atexit(tapeCleanup);
283     }
284 
285     ASSERT(nt != NULL);
286 
287     if (endsIn(peerName, INPUT_TAIL))
288         die("Sorry, can't have a peer name ending in \"%s\"", INPUT_TAIL);
289 
290     if (endsIn(peerName, OUTPUT_TAIL))
291         die("Sorry, can't have a peer name ending in \"%s\"", OUTPUT_TAIL);
292 
293     if (endsIn(peerName, LOCK_TAIL))
294         die("Sorry, can't have a peer name ending in \"%s\"", LOCK_TAIL);
295 
296     nt->peerName = xstrdup(peerName);
297 
298     nt->handFilename = xmalloc(pLen + dLen + 2);
299     sprintf(nt->handFilename, "%s/%s", tapeDirectory, peerName);
300 
301     nt->lockFilename = xmalloc(pLen + dLen + strlen(LOCK_TAIL) + 2);
302     sprintf(nt->lockFilename, "%s/%s%s", tapeDirectory, peerName, LOCK_TAIL);
303 
304     nt->inputFilename = xmalloc(pLen + dLen + strlen(INPUT_TAIL) + 2);
305     sprintf(nt->inputFilename, "%s/%s%s", tapeDirectory, peerName, INPUT_TAIL);
306 
307     nt->outputFilename = xmalloc(pLen + dLen + strlen(OUTPUT_TAIL) + 2);
308     sprintf(nt->outputFilename, "%s/%s%s", tapeDirectory, peerName,
309             OUTPUT_TAIL);
310 
311     if (!lockFile(nt->lockFilename)) {
312         warn("ME lock failed for host: %s", nt->lockFilename);
313 
314         free(nt->handFilename);
315         free(nt->lockFilename);
316         free(nt->inputFilename);
317         free(nt->outputFilename);
318         free(nt);
319 
320         return NULL;
321     }
322 
323     nt->noRotate = false; /* for first time prepare */
324     initTape(nt);
325     nt->noRotate = dontRotate;
326 
327     addTapeGlobally(nt);
328 
329     if (checkPtId == 0 && tapeCkPtPeriod > 0) /* only done once. */
330         checkPtId = prepareSleep(tapeCheckpointCallback, tapeCkPtPeriod, NULL);
331 
332     if (ckNewFileId == 0 && tapeCkNewFilePeriod > 0)
333         ckNewFileId =
334             prepareSleep(tapeCkNewFileCbk, tapeCkNewFilePeriod, NULL);
335 
336     return nt;
337 }
338 
339 static void
initTape(Tape nt)340 initTape(Tape nt)
341 {
342     value *peerVal = findPeer(nt->peerName);
343     scope *s = (peerVal == NULL ? NULL : peerVal->v.scope_val);
344 
345     nt->inFp = NULL;
346     nt->outFp = NULL;
347 
348     nt->lastRotated = 0;
349     nt->checkNew = false;
350 
351 #if defined(INNFEED_DEBUG)
352     nt->head = NULL;
353     nt->tail = NULL;
354     nt->qLength = 0;
355 #endif
356 
357     nt->scribbled = false;
358     nt->tellpos = 0;
359 
360     nt->changed = false;
361 
362     nt->outputSize = 0;
363     nt->lossage = 0;
364 
365     nt->noBacklog = false;
366     nt->outputLowLimit = BLOGLIMIT;
367     nt->outputHighLimit = BLOGLIMIT_HIGH;
368     nt->backlogFactor = LIMIT_FUDGE;
369 
370     if (!talkToSelf) {
371         int bval;
372 
373         if (getBool(s, "no-backlog", &bval, INHERIT))
374             nt->noBacklog = (bval ? true : false);
375         else
376             nt->noBacklog = TAPE_DISABLE;
377 
378         if (getInteger(s, "backlog-limit", &nt->outputLowLimit, INHERIT)) {
379             if (!getReal(s, "backlog-factor", &nt->backlogFactor, INHERIT)) {
380                 if (!getInteger(s, "backlog-limit-highwater",
381                                 &nt->outputHighLimit, INHERIT)) {
382                     warn("%s no backlog-factor or backlog-limit-highwater",
383                          nt->peerName);
384                     nt->outputLowLimit = BLOGLIMIT;
385                     nt->outputHighLimit = BLOGLIMIT_HIGH;
386                     nt->backlogFactor = LIMIT_FUDGE;
387                 }
388             } else
389                 nt->outputHighLimit =
390                     (long) ((double) nt->outputLowLimit * nt->backlogFactor);
391         } else
392             warn("ME config: no definition for required key backlog-limit");
393     }
394 
395     d_printf(1, "%s spooling: %s\n", nt->peerName,
396              nt->noBacklog ? "disabled" : "enabled");
397 
398     d_printf(1, "%s tape backlog limit: [%ld %ld]\n", nt->peerName,
399              nt->outputLowLimit, nt->outputHighLimit);
400 
401     prepareFiles(nt);
402 }
403 
404 
405 void
gFlushTapes(void)406 gFlushTapes(void)
407 {
408     unsigned int i;
409 
410     notice("ME flushing tapes");
411     for (i = 0; i < activeTapeIdx; i++)
412         tapeFlush(activeTapes[i]);
413 }
414 
415 
416 /* close the input and output tapes and reinitialize everything in the
417   tape. */
418 void
tapeFlush(Tape tape)419 tapeFlush(Tape tape)
420 {
421     if (tape->inFp != NULL) {
422         checkpointTape(tape);
423         fclose(tape->inFp);
424     }
425 
426     if (tape->outFp != NULL)
427         fclose(tape->outFp);
428 
429     initTape(tape);
430 }
431 
432 
433 void
gPrintTapeInfo(FILE * fp,unsigned int indentAmt)434 gPrintTapeInfo(FILE *fp, unsigned int indentAmt)
435 {
436     char indent[INDENT_BUFFER_SIZE];
437     unsigned int i;
438 
439     for (i = 0; i < MIN(INDENT_BUFFER_SIZE - 1, indentAmt); i++)
440         indent[i] = ' ';
441     indent[i] = '\0';
442 
443     fprintf(fp, "%sGlobal Tape List : (count %lu) {\n", indent,
444             (unsigned long) activeTapeIdx);
445 
446     for (i = 0; i < activeTapeIdx; i++)
447         printTapeInfo(activeTapes[i], fp, indentAmt + INDENT_INCR);
448     fprintf(fp, "%s}\n", indent);
449 }
450 
451 
452 void
tapeLogGlobalStatus(FILE * fp)453 tapeLogGlobalStatus(FILE *fp)
454 {
455     fprintf(fp, "%sBacklog file global values:%s\n", genHtml ? "<strong>" : "",
456             genHtml ? "</strong>" : "");
457     fprintf(fp, "        directory: %s\n", tapeDirectory);
458     fprintf(fp, "    rotate period: %-3ld seconds\n", (long) rotatePeriod);
459     fprintf(fp, "checkpoint period: %-3ld seconds\n", (long) tapeCkPtPeriod);
460     fprintf(fp, "   newfile period: %-3ld seconds\n",
461             (long) tapeCkNewFilePeriod);
462     fprintf(fp, "backlog highwater: %u\n", tapeHighwater);
463     fprintf(fp, "  highwater queue: %u\n", hostHighwater);
464     fprintf(fp, "\n");
465 }
466 
467 
468 void
tapeLogStatus(Tape tape,FILE * fp)469 tapeLogStatus(Tape tape, FILE *fp)
470 {
471     if (tape == NULL)
472         fprintf(fp, "(no tape)\n");
473     else if (tape->noBacklog)
474         fprintf(fp, "                 spooling: DISABLED\n");
475     else if (tape->outputLowLimit == 0)
476         fprintf(fp, "                 spooling: UNLIMITED\n");
477     else {
478         fprintf(fp, "                 backlog low limit: %ld\n",
479                 tape->outputLowLimit);
480         fprintf(fp, "               backlog upper limit: %ld",
481                 tape->outputHighLimit);
482         if (tape->backlogFactor > 0.0)
483             fprintf(fp, " (factor %1.2f)", tape->backlogFactor);
484         fputc('\n', fp);
485         fprintf(fp, "                 backlog shrinkage: ");
486         fprintf(fp, "%ld bytes (from current file)\n", tape->lossage);
487     }
488 }
489 
490 void
printTapeInfo(Tape tape,FILE * fp,unsigned int indentAmt)491 printTapeInfo(Tape tape, FILE *fp, unsigned int indentAmt)
492 {
493     char indent[INDENT_BUFFER_SIZE];
494     unsigned int i;
495 #if defined(INNFEED_DEBUG)
496     QueueElem qe;
497 #endif
498 
499     for (i = 0; i < MIN(INDENT_BUFFER_SIZE - 1, indentAmt); i++)
500         indent[i] = ' ';
501     indent[i] = '\0';
502 
503     fprintf(fp, "%sTape : %p {\n", indent, (void *) tape);
504 
505     if (tape == NULL) {
506         fprintf(fp, "%s}\n", indent);
507         return;
508     }
509 
510     fprintf(fp, "%s    master-file : %s\n", indent, tape->handFilename);
511     fprintf(fp, "%s    input-file : %s\n", indent, tape->inputFilename);
512     fprintf(fp, "%s    output-file : %s\n", indent, tape->outputFilename);
513     fprintf(fp, "%s    lock-file : %s\n", indent, tape->lockFilename);
514     fprintf(fp, "%s    peerName : %s\n", indent, tape->peerName);
515     fprintf(fp, "%s    input-FILE : %p\n", indent, (void *) tape->inFp);
516     fprintf(fp, "%s    output-FILE : %p\n", indent, (void *) tape->outFp);
517     fprintf(fp, "%s    output-limit : %ld\n", indent, tape->outputLowLimit);
518 
519 #if defined(INNFEED_DEBUG)
520     fprintf(fp, "%s    in-memory article queue (length  %d) {\n", indent,
521             tape->qLength);
522 
523     for (qe = tape->head; qe != NULL; qe = qe->next) {
524         /* printArticleInfo(qe->article, fp, indentAmt + INDENT_INCR); */
525         fprintf(fp, "%s    %p\n", indent, qe->article);
526     }
527 
528     fprintf(fp, "%s    }\n", indent);
529 #endif
530 
531     fprintf(fp, "%s    tell-position : %ld\n", indent, (long) tape->tellpos);
532     fprintf(fp, "%s    input-FILE-changed : %s\n", indent,
533             boolToString(tape->changed));
534 
535     fprintf(fp, "%s    no-rotate : %s\n", indent,
536             boolToString(tape->noRotate));
537 
538     fprintf(fp, "%s}\n", indent);
539 }
540 
541 
542 /* delete the tape. Spools the in-memory articles to disk. */
543 void
delTape(Tape tape)544 delTape(Tape tape)
545 {
546     struct stat st;
547 
548     if (tape == NULL)
549         return;
550 
551     if (tape->outFp != NULL && fclose(tape->outFp) != 0)
552         syswarn("ME ioerr fclose %s", tape->outputFilename);
553 
554     if (stat(tape->outputFilename, &st) == 0 && st.st_size == 0) {
555         d_printf(1, "removing empty output tape: %s\n", tape->outputFilename);
556         unlink(tape->outputFilename);
557     }
558 
559     tape->outFp = NULL;
560     tape->outputSize = 0;
561 
562     if (tape->inFp != NULL) {
563         checkpointTape(tape);
564         fclose(tape->inFp);
565     }
566 
567     unlockFile(tape->lockFilename);
568 
569     freeCharP(tape->handFilename);
570     freeCharP(tape->inputFilename);
571     freeCharP(tape->outputFilename);
572     freeCharP(tape->lockFilename);
573     freeCharP(tape->peerName);
574 
575     removeTapeGlobally(tape);
576 
577     free(tape);
578 }
579 
580 
581 void
tapeTakeArticle(Tape tape,Article article)582 tapeTakeArticle(Tape tape, Article article)
583 {
584     const char *fname, *msgid;
585 
586     ASSERT(tape != NULL);
587     ASSERT(article != NULL);
588 
589     /* return immediately if spooling disabled - jgarzik */
590     if (tape->noBacklog) {
591         delArticle(article);
592         return;
593     }
594 
595     fname = artFileName(article);
596     msgid = artMsgId(article);
597     fprintf(tape->outFp, "%s %s\n", fname, msgid);
598     /* I'd rather know where I am each time, and I don't trust all
599      * fprintf's to give me character counts.  Therefore, do not use:
600      *   tape->outputSize += (return value of the previous fprintf call);
601      * nor:
602      *   tape->outputSize = ftello (tape->outFp);
603      */
604     tape->outputSize += strlen(fname) + strlen(msgid) + 2; /* " " + "\n" */
605 
606     delArticle(article);
607 
608     if (debugShrinking) {
609         struct stat sb;
610 
611         fflush(tape->outFp);
612 
613         if (fstat(fileno(tape->outFp), &sb) != 0)
614             syswarn("ME oserr fstat %s", tape->outputFilename);
615         else if (sb.st_size != tape->outputSize)
616             syslog(LOG_ERR, "fstat and ftello do not agree: %ld %ld for %s\n",
617                    (long) sb.st_size, tape->outputSize, tape->outputFilename);
618     }
619 
620     if (tape->outputHighLimit > 0
621         && tape->outputSize >= tape->outputHighLimit) {
622         long oldSize = tape->outputSize;
623         shrinkfile(tape->outFp, tape->outputLowLimit, tape->outputFilename,
624                    "a+");
625         tape->outputSize = ftello(tape->outFp);
626         tape->lossage += oldSize - tape->outputSize;
627     }
628 }
629 
630 
631 /* Pick an article off a tape and return it. NULL is returned if there
632    are no more articles. */
633 Article
getArticle(Tape tape)634 getArticle(Tape tape)
635 {
636     char line[2048]; /* ick. 1024 for filename + 1024 for msgid */
637     char *p, *q;
638     char *msgid, *filename;
639     Article art = NULL;
640     time_t now = theTime();
641 
642     ASSERT(tape != NULL);
643 
644     if (tape->inFp == NULL && (now - tape->lastRotated) > rotatePeriod)
645         prepareFiles(tape); /* will flush queue too. */
646 
647     while (tape->inFp != NULL && art == NULL) {
648         tape->changed = true;
649 
650         if (fgets(line, sizeof(line), tape->inFp) == NULL) {
651             if (ferror(tape->inFp))
652                 syswarn("ME ioerr on tape file %s", tape->inputFilename);
653             else if (!feof(tape->inFp))
654                 syswarn("ME oserr fgets %s", tape->inputFilename);
655 
656             if (fclose(tape->inFp) != 0)
657                 syswarn("ME ioerr fclose %s", tape->inputFilename);
658 
659             d_printf(1, "No more articles on tape %s\n", tape->inputFilename);
660 
661             tape->inFp = NULL;
662             tape->scribbled = false;
663 
664             unlink(tape->inputFilename);
665 
666             if ((now - tape->lastRotated) > rotatePeriod)
667                 prepareFiles(tape); /* rotate files to try next. */
668         } else {
669             msgid = filename = NULL;
670 
671             for (p = line; *p && isspace((unsigned char) *p);
672                  p++) /* eat whitespace */
673                 /* nada */;
674 
675             if (*p != '\0') {
676                 q = strchr(p, ' ');
677 
678                 if (q != NULL) {
679                     filename = p;
680                     *q = '\0';
681 
682                     for (q++; *q && isspace((unsigned char) *q); q++)
683                         /* nada */;
684 
685                     if (*q != '\0') {
686                         if (((p = strchr(q, ' ')) != NULL)
687                             || ((p = strchr(q, '\n')) != NULL))
688                             *p = '\0';
689 
690                         if (p != NULL)
691                             msgid = q;
692                         else
693                             filename = NULL; /* no trailing newline or blank */
694                     } else
695                         filename =
696                             NULL; /* line had one field and some blanks */
697                 } else
698                     filename = NULL; /* line only had one field */
699             }
700 
701             /* See if message ID looks valid. */
702             if (msgid) {
703                 for (p = msgid; *p; p++)
704                     ;
705                 if (p > msgid)
706                     p--;
707                 if (*msgid != '<' || *p != '>') {
708                     warn("ME tape invalid messageID in %s: %s",
709                          tape->inputFilename, msgid);
710                     msgid = NULL;
711                 }
712             }
713 
714             if (filename != NULL && msgid != NULL)
715                 art = newArticle(filename, msgid);
716 
717             /* art may be NULL here if the file is no longer valid. */
718         }
719     }
720 
721 #if defined(INNFEED_DEBUG)
722     /* now we either have an article or there is no more on disk */
723     if (art == NULL) {
724         int c;
725         if (tape->inFp != NULL && ((c = fgetc(tape->inFp)) != EOF))
726             ungetc(c, tape->inFp); /* shouldn't happen */
727         else if (tape->inFp != NULL) {
728             /* last article read was the end of the tape. */
729             if (fclose(tape->inFp) != 0)
730                 syswarn("ME ioerr fclose %s", tape->inputFilename);
731 
732             tape->inFp = NULL;
733             tape->scribbled = false;
734 
735             /* toss out the old input file and prepare the new one */
736             unlink(tape->inputFilename);
737 
738             if (now - tape->lastRotated > rotatePeriod)
739                 prepareFiles(tape);
740         }
741     }
742 #endif
743 
744     if (art == NULL)
745         d_printf(2, "%s All out of articles in the backlog\n", tape->peerName);
746     else
747         d_printf(2, "%s Peeled article %s from backlog\n", tape->peerName,
748                  artMsgId(art));
749 
750     return art;
751 }
752 
753 
754 /****************************************************/
755 /**                  CLASS FUNCTIONS               **/
756 /****************************************************/
757 
758 /* Cause all the Tapes to checkpoint themselves. */
759 void
checkPointTapes(void)760 checkPointTapes(void)
761 {
762     unsigned int i;
763 
764     for (i = 0; i < activeTapeIdx; i++)
765         checkpointTape(activeTapes[i]);
766 }
767 
768 
769 /* make all the tapes set their checkNew flag. */
770 static void
tapesSetCheckNew(void)771 tapesSetCheckNew(void)
772 {
773     unsigned int i;
774 
775     for (i = 0; i < activeTapeIdx; i++)
776         activeTapes[i]->checkNew = true;
777 }
778 
779 
780 /* Get the pathname of the directory tapes are stored in. */
781 const char *
getTapeDirectory(void)782 getTapeDirectory(void)
783 {
784     ASSERT(tapeDirectory != NULL);
785 
786 #if defined(INNFEED_DEBUG)
787     if (tapeDirectory == NULL) {
788         tapeDirectory = xstrdup(dflTapeDir);
789         addPointerFreedOnExit(tapeDirectory);
790     }
791 #endif
792 
793     return tapeDirectory;
794 }
795 
796 
797 #if defined(INNFEED_DEBUG)
798 void
setOutputSizeLimit(long val)799 setOutputSizeLimit(long val)
800 {
801     defaultSizeLimit = val;
802 }
803 #endif
804 
805 
806 /**********************************************************************/
807 /*                          PRIVATE FUNCTIONS                         */
808 /**********************************************************************/
809 
810 
811 /* Add a new tape to the class-level list of active tapes. */
812 static void
addTapeGlobally(Tape tape)813 addTapeGlobally(Tape tape)
814 {
815     ASSERT(tape != NULL);
816 
817     if (activeTapeSize == activeTapeIdx) {
818         unsigned int i;
819 
820         activeTapeSize += 10;
821         if (activeTapes != NULL)
822             activeTapes = xrealloc(activeTapes, sizeof(Tape) * activeTapeSize);
823         else
824             activeTapes = xmalloc(sizeof(Tape) * activeTapeSize);
825 
826         for (i = activeTapeIdx; i < activeTapeSize; i++)
827             activeTapes[i] = NULL;
828     }
829     activeTapes[activeTapeIdx++] = tape;
830 }
831 
832 
833 /* Remove a tape for the class-level list of active tapes. */
834 static void
removeTapeGlobally(Tape tape)835 removeTapeGlobally(Tape tape)
836 {
837     unsigned int i;
838 
839     if (tape == NULL)
840         return;
841 
842     ASSERT(activeTapeIdx > 0);
843 
844     for (i = 0; i < activeTapeIdx; i++)
845         if (activeTapes[i] == tape)
846             break;
847 
848     ASSERT(i < activeTapeIdx);
849 
850     for (; i < (activeTapeIdx - 1); i++)
851         activeTapes[i] = activeTapes[i + 1];
852 
853     activeTapes[--activeTapeIdx] = NULL;
854 
855     if (activeTapeIdx == 0) {
856         free(activeTapes);
857         activeTapes = NULL;
858     }
859 }
860 
861 
862 /* Have a tape checkpoint itself so that next process can pick up where
863    this one left off. */
864 static void
checkpointTape(Tape tape)865 checkpointTape(Tape tape)
866 {
867     if (tape->inFp == NULL) /* no input file being read. */
868         return;
869 
870     if (!tape->changed) /* haven't read since last checkpoint */
871     {
872         d_printf(1, "Not checkpointing unchanged tape: %s\n", tape->peerName);
873         return;
874     }
875 
876     if ((tape->tellpos = ftello(tape->inFp)) < 0) {
877         syswarn("ME oserr ftello %s", tape->inputFilename);
878         return;
879     }
880 
881     /* strlen of "18446744073709551616\n" (2^64) */
882 #define BITS64 21
883 
884     /* make sure we're not right at the beginning of the file so we can write.
885      */
886     if (tape->tellpos > BITS64) {
887         rewind(tape->inFp);
888 
889         /* scribble blanks over the first lines characters */
890         if (!tape->scribbled) {
891             int currloc = 0;
892             int c;
893 
894             while ((c = fgetc(tape->inFp)) != '\n' || currloc <= BITS64)
895                 if (c == EOF)
896                     return;
897                 else
898                     currloc++;
899 
900             rewind(tape->inFp);
901 
902             while (currloc-- > 0)
903                 fputc(' ', tape->inFp);
904 
905             rewind(tape->inFp);
906 
907             fflush(tape->inFp);
908             tape->scribbled = true;
909         }
910 
911         fprintf(tape->inFp, "%ld", tape->tellpos);
912 
913         if (fseeko(tape->inFp, tape->tellpos, SEEK_SET) != 0)
914             syswarn("ME oserr fseeko(%s,%ld,SEEK_SET)", tape->inputFilename,
915                     tape->tellpos);
916     }
917 
918     tape->changed = false;
919 }
920 
921 
922 /* Prepare the tape file(s) for input and output */
923 
924 /* For a given Tape there are
925  * three possible files: PEER.input PEER and
926  * PEER.output. PEER.input and PEER.output are private to
927  * innfeed. PEER is where a sysadmin can drop a file that (s)he
928  * wants to inject into the process. If the first line of the input file
929  * contains only an integer (possibly surrounded by spaces), then this is
930  * taken to be the position to seek to before reading.
931  *
932  * prepareFiles will process them in a manner much like the following shell
933  * commands:
934  *
935  * if [ ! -f PEER.input ]; then
936  *     if [ -f PEER ]; then mv PEER PEER.input
937  *     elif [ -f PEER.output ]; then mv PEER.output PEER; fi
938  * fi
939  *
940  * At this point PEER.input is opened for reading if it exists.
941  *
942  * The checkpoint file is left as-is unless the PEER.input file
943  * happens to be newer that the checkpoint file.
944  */
945 static void
prepareFiles(Tape tape)946 prepareFiles(Tape tape)
947 {
948     bool inpExists;
949     bool outExists;
950     bool newExists;
951 
952 #if defined(INNFEED_DEBUG)
953     /* flush any in memory articles to disk */
954     if (tape->head != NULL && tape->outFp != NULL)
955         flushTape(tape);
956 #endif
957 
958     tape->tellpos = 0;
959 
960     /*  First time through, or something external has set checkNew */
961     if (tape->lastRotated == 0 || tape->checkNew) {
962         newExists = fileExistsP(tape->handFilename);
963         if (newExists)
964             notice("%s new hand-prepared backlog file", tape->peerName);
965     } else
966         newExists = false;
967 
968     if (tape->lastRotated == 0) /* first time here */
969     {
970         inpExists = fileExistsP(tape->inputFilename);
971         outExists = fileExistsP(tape->outputFilename);
972     } else {
973         inpExists =
974             (tape->inFp != NULL) ? true : false; /* can this ever be true?? */
975         outExists =
976             (tape->outFp != NULL && tape->outputSize > 0) ? true : false;
977     }
978 
979 
980     /* move the hand-dropped file to the input file if needed. */
981     if (newExists && !inpExists) {
982         if (rename(tape->handFilename, tape->inputFilename) != 0)
983             syswarn("ME oserr rename %s, %s", tape->handFilename,
984                     tape->inputFilename);
985         else {
986             notice("%s grabbing external tape file", tape->peerName);
987             inpExists = true;
988         }
989     }
990 
991     /* now move the output file to the input file, if needed and only if in
992        not in NOROTATE mode. */
993     if (outExists && !inpExists && !tape->noRotate) {
994         if (tape->outFp != NULL) {
995             fclose(tape->outFp);
996             tape->outFp = NULL;
997         }
998 
999         if (rename(tape->outputFilename, tape->inputFilename) != 0)
1000             syswarn("ME oserr rename %s, %s", tape->outputFilename,
1001                     tape->inputFilename);
1002         else
1003             inpExists = true;
1004     }
1005 
1006     /* now open up the input file and seek to the proper position. */
1007     if (inpExists) {
1008         int c;
1009         long flength;
1010 
1011         if ((tape->inFp = fopen(tape->inputFilename, "r+")) == NULL)
1012             syswarn("ME fopen %s", tape->inputFilename);
1013         else {
1014             char buffer[64];
1015 
1016             if (fgets(buffer, sizeof(buffer) - 1, tape->inFp) == NULL) {
1017                 if (feof(tape->inFp)) {
1018                     d_printf(1, "Empty input file: %s\n", tape->inputFilename);
1019                     unlink(tape->inputFilename);
1020                 } else
1021                     syswarn("ME oserr fgets %s", tape->inputFilename);
1022 
1023                 fclose(tape->inFp);
1024                 tape->inFp = NULL;
1025                 tape->scribbled = false;
1026             } else {
1027                 unsigned int len = strlen(buffer);
1028                 long newPos = 0;
1029 
1030                 if (len > 0 && buffer[len - 1] == '\n')
1031                     buffer[--len] = '\0';
1032 
1033                 if (len > 0 && strspn(buffer, "0123456789 \n") == len) {
1034                     if (sscanf(buffer, "%ld", &newPos) == 1) {
1035                         tape->scribbled = true;
1036                         tape->tellpos = newPos;
1037                     }
1038                 }
1039 
1040                 if ((flength = fileLength(fileno(tape->inFp)))
1041                     < tape->tellpos) {
1042                     warn("ME tape short: %s %ld %ld", tape->inputFilename,
1043                          flength, tape->tellpos);
1044                     tape->tellpos = 0;
1045                 } else if (tape->tellpos == 0)
1046                     rewind(tape->inFp);
1047                 else if (fseeko(tape->inFp, tape->tellpos - 1, SEEK_SET) != 0)
1048                     syswarn("ME oserr fseeko(%s,%ld,SEEK_SET)",
1049                             tape->inputFilename, tape->tellpos);
1050                 else if ((c = fgetc(tape->inFp)) != '\n') {
1051                     while (c != EOF && c != '\n')
1052                         c = fgetc(tape->inFp);
1053 
1054                     if (c == EOF) {
1055                         fclose(tape->inFp);
1056                         unlink(tape->inputFilename);
1057                         tape->inFp = NULL;
1058                         tape->scribbled = false;
1059                         prepareFiles(tape);
1060                     } else {
1061                         long oldPos = tape->tellpos;
1062 
1063                         tape->changed = true;
1064                         checkpointTape(tape);
1065 
1066                         warn("ME internal checkpoint line boundary missed:"
1067                              " %s %ld vs. %ld",
1068                              tape->inputFilename, tape->tellpos, oldPos);
1069                     }
1070                 }
1071             }
1072         }
1073     }
1074 
1075     tape->lastRotated = theTime();
1076     tape->checkNew = false;
1077 
1078     /* now open up the output file. */
1079     if (tape->outFp == NULL) {
1080         if ((tape->outFp = fopen(tape->outputFilename, "a+")) == NULL) {
1081             syswarn("ME tape open failed (a+) %s", tape->outputFilename);
1082             return;
1083         }
1084         fseeko(tape->outFp, 0, SEEK_END);
1085         tape->outputSize = ftello(tape->outFp);
1086         tape->lossage = 0;
1087     }
1088 }
1089 
1090 
1091 static void
tapeCkNewFileCbk(TimeoutId id,void * d UNUSED)1092 tapeCkNewFileCbk(TimeoutId id, void *d UNUSED)
1093 {
1094     ASSERT(id == ckNewFileId);
1095     ASSERT(tapeCkNewFilePeriod > 0);
1096 
1097     tapesSetCheckNew();
1098 
1099     ckNewFileId = prepareSleep(tapeCkNewFileCbk, tapeCkNewFilePeriod, NULL);
1100 }
1101 
1102 
1103 /* The timer callback function that will checkpoint all the active tapes. */
1104 static void
tapeCheckpointCallback(TimeoutId id,void * d UNUSED)1105 tapeCheckpointCallback(TimeoutId id, void *d UNUSED)
1106 {
1107     ASSERT(id == checkPtId);
1108     ASSERT(tapeCkPtPeriod > 0);
1109 
1110     d_printf(1, "Checkpointing tapes\n");
1111 
1112     checkPointTapes();
1113 
1114     checkPtId = prepareSleep(tapeCheckpointCallback, tapeCkPtPeriod, NULL);
1115 }
1116 
1117 
1118 #if defined(INNFEED_DEBUG)
1119 static void
flushTape(Tape tape)1120 flushTape(Tape tape)
1121 {
1122     QueueElem elem;
1123 
1124     /* flush out queue to disk. */
1125     elem = tape->head;
1126     while (elem != NULL) {
1127         tape->head = tape->head->next;
1128         fprintf(tape->outFp, "%s %s\n", artFileName(elem->article),
1129                 artMsgId(elem->article));
1130 
1131         delArticle(elem->article);
1132 
1133         free(elem);
1134         elem = tape->head;
1135     }
1136     tape->tail = NULL;
1137     tape->qLength = 0;
1138 
1139 
1140     /* I'd rather know where I am each time, and I don't trust all
1141        fprintf's to give me character counts. */
1142     tape->outputSize = ftello(tape->outFp);
1143     if (debugShrinking) {
1144         struct stat buf;
1145         static bool logged = false;
1146 
1147         fflush(tape->outFp);
1148         if (fstat(fileno(tape->outFp), &buf) != 0 && !logged) {
1149             syslog(LOG_ERR, "fstat failed on %s", tape->outputFilename);
1150             logged = true;
1151         } else if (buf.st_size != tape->outputSize) {
1152             warn("ME fstat and ftello do not agree for %s",
1153                  tape->outputFilename);
1154             logged = true;
1155         }
1156     }
1157 
1158     if (tape->outputHighLimit > 0
1159         && tape->outputSize > tape->outputHighLimit) {
1160         shrinkfile(tape->outFp, tape->outputLowLimit, tape->outputFilename,
1161                    "a+");
1162         tape->outputSize = ftello(tape->outFp);
1163     }
1164 }
1165 #endif
1166 
1167 
1168 static void
tapeCleanup(void)1169 tapeCleanup(void)
1170 {
1171     free(tapeDirectory);
1172     tapeDirectory = NULL;
1173 }
1174