1 /*
2 ** The implementation of the innfeed Tape class.
3 **
4 ** Written by James Brister <brister@vix.com>
5 **
6 ** The implementation of the Tape class. Tapes are read-only or write-only
7 ** files that are accessed sequentially. Their basic unit of i/o is an
8 ** Article. Tapes work out of a single directory and manage all file names
9 ** themselves.
10 **
11 ** Tapes will checkpoint themselves periodically so that when innfeed exits
12 ** or crashes things can restart close to where they were last. The period
13 ** checkpointing is handled entirely by the Tape class, but the checkpoint
14 ** period needs to be set by some external user before the first tape is
15 ** created.
16 */
17
18 #include "portable/system.h"
19
20 #include "innfeed.h"
21
22 #include <assert.h>
23 #include <ctype.h>
24 #include <errno.h>
25 #include <sys/param.h>
26 #include <sys/stat.h>
27 #include <syslog.h>
28
29 #include <dirent.h>
30 typedef struct dirent DIRENTRY;
31
32 #ifdef HAVE_SYS_TIME_H
33 # include <sys/time.h>
34 #endif
35 #include <time.h>
36
37 #include "inn/innconf.h"
38 #include "inn/libinn.h"
39 #include "inn/messages.h"
40
41 #include "article.h"
42 #include "configfile.h"
43 #include "endpoint.h"
44 #include "host.h"
45 #include "tape.h"
46
47 #if defined(INNFEED_DEBUG)
48 /* A structure for temporary storage of articles. */
49 typedef struct q_e_s {
50 Article article;
51 struct q_e_s *next;
52 } * QueueElem;
53 #endif
54
55 /* The Tape class type. */
56 struct tape_s {
57 /* the pathname of the file the administrator can drop in by hand. */
58 char *handFilename;
59
60 /* the pathname of the file the Tape will read from */
61 char *inputFilename;
62
63 /* the pathname of the file the Tape will write to. */
64 char *outputFilename;
65
66 /* the pathname of the file used in locking */
67 char *lockFilename;
68
69 /* the peer we're doing this for. */
70 char *peerName;
71
72 FILE *inFp; /* input FILE */
73 FILE *outFp; /* output FILE */
74
75 time_t lastRotated; /* time files last got switched */
76 bool checkNew; /* set bool when we need to check for
77 hand-crafted file. */
78
79 #if defined(INNFEED_DEBUG)
80 /* The tape holds a small output queue in memory to avoid thrashing. */
81 QueueElem head;
82 QueueElem tail;
83 unsigned int qLength; /* amount on the queue */
84 #endif
85
86 long outputSize; /* the current size of the output file. */
87 long lossage;
88
89 /* the number of bytes we try to keep the output under. We actually
90 wait for the outputSize to get 10% greater than this amount before
91 shrinking the file down again. A value of zero means no limit. */
92 long outputLowLimit;
93 long outputHighLimit;
94 double backlogFactor;
95
96 bool scribbled; /* have we scribbled a checkpoint value in
97 the file. */
98 long tellpos; /* for input file checkpointing. */
99 bool changed; /* true if tape was read since last
100 checkpoint or start. */
101
102 /* true if articles that are output are NOT later input. */
103 bool noRotate;
104
105 /* true if no articles should ever be spooled */
106 bool noBacklog;
107 };
108
109
110 static void checkpointTape(Tape tape);
111 static void removeTapeGlobally(Tape tape);
112 static void addTapeGlobally(Tape tape);
113 static void prepareFiles(Tape tape);
114 static void tapeCkNewFileCbk(TimeoutId id, void *d);
115 static void tapeCheckpointCallback(TimeoutId id, void *d);
116 #if defined(INNFEED_DEBUG)
117 static void flushTape(Tape tape);
118 #endif
119 static void tapesSetCheckNew(void);
120 static void initTape(Tape nt);
121 static void tapeCleanup(void);
122
123
124 /* pathname of directory we store tape files in. */
125 static char *tapeDirectory;
126
127 /* the callback ID of the checkpoint timer callback. */
128 static TimeoutId checkPtId;
129 static TimeoutId ckNewFileId;
130
131 /* number of seconds between tape checkpoints. */
132 static unsigned int tapeCkPtPeriod;
133 static unsigned int tapeCkNewFilePeriod;
134
135 static time_t rotatePeriod = TAPE_ROTATE_PERIOD;
136
137 /* global list of tapes so we can checkpoint them periodically */
138 static Tape *activeTapes;
139
140 /* Size of the activeTapes array */
141 static size_t activeTapeSize;
142
143 /* index of last element in activeTapes that's being used. */
144 static size_t activeTapeIdx;
145
146 #if defined(INNFEED_DEBUG)
147 /* default limit of the size of output tapes. */
148 static long defaultSizeLimit;
149 #endif
150
151 static unsigned int tapeHighwater;
152
153 bool debugShrinking = false;
154
155
156 /* callback when config file is loaded */
157 int
tapeConfigLoadCbk(void * data)158 tapeConfigLoadCbk(void *data)
159 {
160 int rval = 1;
161 long iv;
162 int bv;
163 FILE *fp = (FILE *) data;
164 char *dir, *p;
165
166 if (getString(topScope, "backlog-directory", &p, NO_INHERIT)) {
167 dir = concatpath(innconf->pathspool, p);
168 free(p);
169 if (tapeDirectory != NULL && strcmp(tapeDirectory, dir) != 0) {
170 warn("ME config: cannot change backlog-directory of a running"
171 " process");
172 free(dir);
173 dir = xstrdup(tapeDirectory);
174 }
175
176 if (!isDirectory(dir) && isDirectory(dflTapeDir)) {
177 logOrPrint(LOG_ERR, fp,
178 "ME config: definition of backlog-directory (%s) is a"
179 " non-existant directory. Using %s",
180 dir, dflTapeDir);
181 free(dir);
182 dir = xstrdup(dflTapeDir);
183 } else if (!isDirectory(dir))
184 logAndExit(1, "ME config: no usable value for backlog-directory");
185 } else if (!isDirectory(dflTapeDir)) {
186 logAndExit(1, "ME config: no usable value for backlog-directory");
187 /* NOTREACHED */
188 return -1;
189 } else
190 dir = xstrdup(dflTapeDir);
191
192 if (tapeDirectory != NULL)
193 free(tapeDirectory);
194 tapeDirectory = dir;
195
196
197 if (getInteger(topScope, "backlog-highwater", &iv, NO_INHERIT)) {
198 if (iv < 0) {
199 rval = 0;
200 logOrPrint(LOG_ERR, fp,
201 "ME config: value of %s (%ld) in %s cannot be less"
202 " than 0. Using %ld",
203 "backlog-highwater", iv, "global scope",
204 (long) TAPE_HIGHWATER);
205 iv = TAPE_HIGHWATER;
206 }
207 } else
208 iv = TAPE_HIGHWATER;
209 tapeHighwater = (unsigned int) iv;
210
211
212 if (getInteger(topScope, "backlog-rotate-period", &iv, NO_INHERIT)) {
213 if (iv < 0) {
214 rval = 0;
215 logOrPrint(LOG_ERR, fp,
216 "ME config: value of %s (%ld) in %s cannot be less"
217 " than 0. Using %ld",
218 "backlog-rotate-period", iv, "global scope",
219 (long) TAPE_ROTATE_PERIOD);
220 iv = TAPE_ROTATE_PERIOD;
221 }
222 } else
223 iv = TAPE_ROTATE_PERIOD;
224 rotatePeriod = (unsigned int) iv;
225
226
227 if (getInteger(topScope, "backlog-ckpt-period", &iv, NO_INHERIT)) {
228 if (iv < 0) {
229 rval = 0;
230 logOrPrint(LOG_ERR, fp,
231 "ME config: value of %s (%ld) in %s cannot be less"
232 " than 0. Using %ld",
233 "backlog-ckpt-period", iv, "global scope",
234 (long) TAPE_CHECKPOINT_PERIOD);
235 iv = TAPE_CHECKPOINT_PERIOD;
236 }
237 } else
238 iv = TAPE_CHECKPOINT_PERIOD;
239 tapeCkPtPeriod = (unsigned int) iv;
240
241
242 if (getInteger(topScope, "backlog-newfile-period", &iv, NO_INHERIT)) {
243 if (iv < 0) {
244 rval = 0;
245 logOrPrint(LOG_ERR, fp,
246 "ME config: value of %s (%ld) in %s cannot be less"
247 " than 0. Using %ld",
248 "backlog-newfile-period", iv, "global scope",
249 (long) TAPE_NEWFILE_PERIOD);
250 iv = TAPE_NEWFILE_PERIOD;
251 }
252 } else
253 iv = TAPE_NEWFILE_PERIOD;
254 tapeCkNewFilePeriod = (unsigned int) iv;
255
256
257 if (getBool(topScope, "debug-shrinking", &bv, NO_INHERIT))
258 debugShrinking = (bv ? true : false);
259
260 return rval;
261 }
262
263
264 /* Create a new Tape object. There are three potential files involved in
265 I/O. 'peerName' is what the admin may have dropped in by
266 hand. 'peerName.input' is the file that was being used as input the last
267 time things were run. 'peerName.output' is the file that was being used
268 as output. The file 'peerName' is appended to 'peerName.input' (or
269 renamed if 'peerName.input' doesn't exist). Then 'peerName.output' is
270 appeneded (or renamed) to 'peerName.input' */
271
272 static bool inited = false;
273 Tape
newTape(const char * peerName,bool dontRotate)274 newTape(const char *peerName, bool dontRotate)
275 {
276 Tape nt = xmalloc(sizeof(struct tape_s));
277 size_t pLen = strlen(peerName);
278 size_t dLen = strlen(tapeDirectory);
279
280 if (!inited) {
281 inited = true;
282 atexit(tapeCleanup);
283 }
284
285 ASSERT(nt != NULL);
286
287 if (endsIn(peerName, INPUT_TAIL))
288 die("Sorry, can't have a peer name ending in \"%s\"", INPUT_TAIL);
289
290 if (endsIn(peerName, OUTPUT_TAIL))
291 die("Sorry, can't have a peer name ending in \"%s\"", OUTPUT_TAIL);
292
293 if (endsIn(peerName, LOCK_TAIL))
294 die("Sorry, can't have a peer name ending in \"%s\"", LOCK_TAIL);
295
296 nt->peerName = xstrdup(peerName);
297
298 nt->handFilename = xmalloc(pLen + dLen + 2);
299 sprintf(nt->handFilename, "%s/%s", tapeDirectory, peerName);
300
301 nt->lockFilename = xmalloc(pLen + dLen + strlen(LOCK_TAIL) + 2);
302 sprintf(nt->lockFilename, "%s/%s%s", tapeDirectory, peerName, LOCK_TAIL);
303
304 nt->inputFilename = xmalloc(pLen + dLen + strlen(INPUT_TAIL) + 2);
305 sprintf(nt->inputFilename, "%s/%s%s", tapeDirectory, peerName, INPUT_TAIL);
306
307 nt->outputFilename = xmalloc(pLen + dLen + strlen(OUTPUT_TAIL) + 2);
308 sprintf(nt->outputFilename, "%s/%s%s", tapeDirectory, peerName,
309 OUTPUT_TAIL);
310
311 if (!lockFile(nt->lockFilename)) {
312 warn("ME lock failed for host: %s", nt->lockFilename);
313
314 free(nt->handFilename);
315 free(nt->lockFilename);
316 free(nt->inputFilename);
317 free(nt->outputFilename);
318 free(nt);
319
320 return NULL;
321 }
322
323 nt->noRotate = false; /* for first time prepare */
324 initTape(nt);
325 nt->noRotate = dontRotate;
326
327 addTapeGlobally(nt);
328
329 if (checkPtId == 0 && tapeCkPtPeriod > 0) /* only done once. */
330 checkPtId = prepareSleep(tapeCheckpointCallback, tapeCkPtPeriod, NULL);
331
332 if (ckNewFileId == 0 && tapeCkNewFilePeriod > 0)
333 ckNewFileId =
334 prepareSleep(tapeCkNewFileCbk, tapeCkNewFilePeriod, NULL);
335
336 return nt;
337 }
338
339 static void
initTape(Tape nt)340 initTape(Tape nt)
341 {
342 value *peerVal = findPeer(nt->peerName);
343 scope *s = (peerVal == NULL ? NULL : peerVal->v.scope_val);
344
345 nt->inFp = NULL;
346 nt->outFp = NULL;
347
348 nt->lastRotated = 0;
349 nt->checkNew = false;
350
351 #if defined(INNFEED_DEBUG)
352 nt->head = NULL;
353 nt->tail = NULL;
354 nt->qLength = 0;
355 #endif
356
357 nt->scribbled = false;
358 nt->tellpos = 0;
359
360 nt->changed = false;
361
362 nt->outputSize = 0;
363 nt->lossage = 0;
364
365 nt->noBacklog = false;
366 nt->outputLowLimit = BLOGLIMIT;
367 nt->outputHighLimit = BLOGLIMIT_HIGH;
368 nt->backlogFactor = LIMIT_FUDGE;
369
370 if (!talkToSelf) {
371 int bval;
372
373 if (getBool(s, "no-backlog", &bval, INHERIT))
374 nt->noBacklog = (bval ? true : false);
375 else
376 nt->noBacklog = TAPE_DISABLE;
377
378 if (getInteger(s, "backlog-limit", &nt->outputLowLimit, INHERIT)) {
379 if (!getReal(s, "backlog-factor", &nt->backlogFactor, INHERIT)) {
380 if (!getInteger(s, "backlog-limit-highwater",
381 &nt->outputHighLimit, INHERIT)) {
382 warn("%s no backlog-factor or backlog-limit-highwater",
383 nt->peerName);
384 nt->outputLowLimit = BLOGLIMIT;
385 nt->outputHighLimit = BLOGLIMIT_HIGH;
386 nt->backlogFactor = LIMIT_FUDGE;
387 }
388 } else
389 nt->outputHighLimit =
390 (long) ((double) nt->outputLowLimit * nt->backlogFactor);
391 } else
392 warn("ME config: no definition for required key backlog-limit");
393 }
394
395 d_printf(1, "%s spooling: %s\n", nt->peerName,
396 nt->noBacklog ? "disabled" : "enabled");
397
398 d_printf(1, "%s tape backlog limit: [%ld %ld]\n", nt->peerName,
399 nt->outputLowLimit, nt->outputHighLimit);
400
401 prepareFiles(nt);
402 }
403
404
405 void
gFlushTapes(void)406 gFlushTapes(void)
407 {
408 unsigned int i;
409
410 notice("ME flushing tapes");
411 for (i = 0; i < activeTapeIdx; i++)
412 tapeFlush(activeTapes[i]);
413 }
414
415
416 /* close the input and output tapes and reinitialize everything in the
417 tape. */
418 void
tapeFlush(Tape tape)419 tapeFlush(Tape tape)
420 {
421 if (tape->inFp != NULL) {
422 checkpointTape(tape);
423 fclose(tape->inFp);
424 }
425
426 if (tape->outFp != NULL)
427 fclose(tape->outFp);
428
429 initTape(tape);
430 }
431
432
433 void
gPrintTapeInfo(FILE * fp,unsigned int indentAmt)434 gPrintTapeInfo(FILE *fp, unsigned int indentAmt)
435 {
436 char indent[INDENT_BUFFER_SIZE];
437 unsigned int i;
438
439 for (i = 0; i < MIN(INDENT_BUFFER_SIZE - 1, indentAmt); i++)
440 indent[i] = ' ';
441 indent[i] = '\0';
442
443 fprintf(fp, "%sGlobal Tape List : (count %lu) {\n", indent,
444 (unsigned long) activeTapeIdx);
445
446 for (i = 0; i < activeTapeIdx; i++)
447 printTapeInfo(activeTapes[i], fp, indentAmt + INDENT_INCR);
448 fprintf(fp, "%s}\n", indent);
449 }
450
451
452 void
tapeLogGlobalStatus(FILE * fp)453 tapeLogGlobalStatus(FILE *fp)
454 {
455 fprintf(fp, "%sBacklog file global values:%s\n", genHtml ? "<strong>" : "",
456 genHtml ? "</strong>" : "");
457 fprintf(fp, " directory: %s\n", tapeDirectory);
458 fprintf(fp, " rotate period: %-3ld seconds\n", (long) rotatePeriod);
459 fprintf(fp, "checkpoint period: %-3ld seconds\n", (long) tapeCkPtPeriod);
460 fprintf(fp, " newfile period: %-3ld seconds\n",
461 (long) tapeCkNewFilePeriod);
462 fprintf(fp, "backlog highwater: %u\n", tapeHighwater);
463 fprintf(fp, " highwater queue: %u\n", hostHighwater);
464 fprintf(fp, "\n");
465 }
466
467
468 void
tapeLogStatus(Tape tape,FILE * fp)469 tapeLogStatus(Tape tape, FILE *fp)
470 {
471 if (tape == NULL)
472 fprintf(fp, "(no tape)\n");
473 else if (tape->noBacklog)
474 fprintf(fp, " spooling: DISABLED\n");
475 else if (tape->outputLowLimit == 0)
476 fprintf(fp, " spooling: UNLIMITED\n");
477 else {
478 fprintf(fp, " backlog low limit: %ld\n",
479 tape->outputLowLimit);
480 fprintf(fp, " backlog upper limit: %ld",
481 tape->outputHighLimit);
482 if (tape->backlogFactor > 0.0)
483 fprintf(fp, " (factor %1.2f)", tape->backlogFactor);
484 fputc('\n', fp);
485 fprintf(fp, " backlog shrinkage: ");
486 fprintf(fp, "%ld bytes (from current file)\n", tape->lossage);
487 }
488 }
489
490 void
printTapeInfo(Tape tape,FILE * fp,unsigned int indentAmt)491 printTapeInfo(Tape tape, FILE *fp, unsigned int indentAmt)
492 {
493 char indent[INDENT_BUFFER_SIZE];
494 unsigned int i;
495 #if defined(INNFEED_DEBUG)
496 QueueElem qe;
497 #endif
498
499 for (i = 0; i < MIN(INDENT_BUFFER_SIZE - 1, indentAmt); i++)
500 indent[i] = ' ';
501 indent[i] = '\0';
502
503 fprintf(fp, "%sTape : %p {\n", indent, (void *) tape);
504
505 if (tape == NULL) {
506 fprintf(fp, "%s}\n", indent);
507 return;
508 }
509
510 fprintf(fp, "%s master-file : %s\n", indent, tape->handFilename);
511 fprintf(fp, "%s input-file : %s\n", indent, tape->inputFilename);
512 fprintf(fp, "%s output-file : %s\n", indent, tape->outputFilename);
513 fprintf(fp, "%s lock-file : %s\n", indent, tape->lockFilename);
514 fprintf(fp, "%s peerName : %s\n", indent, tape->peerName);
515 fprintf(fp, "%s input-FILE : %p\n", indent, (void *) tape->inFp);
516 fprintf(fp, "%s output-FILE : %p\n", indent, (void *) tape->outFp);
517 fprintf(fp, "%s output-limit : %ld\n", indent, tape->outputLowLimit);
518
519 #if defined(INNFEED_DEBUG)
520 fprintf(fp, "%s in-memory article queue (length %d) {\n", indent,
521 tape->qLength);
522
523 for (qe = tape->head; qe != NULL; qe = qe->next) {
524 /* printArticleInfo(qe->article, fp, indentAmt + INDENT_INCR); */
525 fprintf(fp, "%s %p\n", indent, qe->article);
526 }
527
528 fprintf(fp, "%s }\n", indent);
529 #endif
530
531 fprintf(fp, "%s tell-position : %ld\n", indent, (long) tape->tellpos);
532 fprintf(fp, "%s input-FILE-changed : %s\n", indent,
533 boolToString(tape->changed));
534
535 fprintf(fp, "%s no-rotate : %s\n", indent,
536 boolToString(tape->noRotate));
537
538 fprintf(fp, "%s}\n", indent);
539 }
540
541
542 /* delete the tape. Spools the in-memory articles to disk. */
543 void
delTape(Tape tape)544 delTape(Tape tape)
545 {
546 struct stat st;
547
548 if (tape == NULL)
549 return;
550
551 if (tape->outFp != NULL && fclose(tape->outFp) != 0)
552 syswarn("ME ioerr fclose %s", tape->outputFilename);
553
554 if (stat(tape->outputFilename, &st) == 0 && st.st_size == 0) {
555 d_printf(1, "removing empty output tape: %s\n", tape->outputFilename);
556 unlink(tape->outputFilename);
557 }
558
559 tape->outFp = NULL;
560 tape->outputSize = 0;
561
562 if (tape->inFp != NULL) {
563 checkpointTape(tape);
564 fclose(tape->inFp);
565 }
566
567 unlockFile(tape->lockFilename);
568
569 freeCharP(tape->handFilename);
570 freeCharP(tape->inputFilename);
571 freeCharP(tape->outputFilename);
572 freeCharP(tape->lockFilename);
573 freeCharP(tape->peerName);
574
575 removeTapeGlobally(tape);
576
577 free(tape);
578 }
579
580
581 void
tapeTakeArticle(Tape tape,Article article)582 tapeTakeArticle(Tape tape, Article article)
583 {
584 const char *fname, *msgid;
585
586 ASSERT(tape != NULL);
587 ASSERT(article != NULL);
588
589 /* return immediately if spooling disabled - jgarzik */
590 if (tape->noBacklog) {
591 delArticle(article);
592 return;
593 }
594
595 fname = artFileName(article);
596 msgid = artMsgId(article);
597 fprintf(tape->outFp, "%s %s\n", fname, msgid);
598 /* I'd rather know where I am each time, and I don't trust all
599 * fprintf's to give me character counts. Therefore, do not use:
600 * tape->outputSize += (return value of the previous fprintf call);
601 * nor:
602 * tape->outputSize = ftello (tape->outFp);
603 */
604 tape->outputSize += strlen(fname) + strlen(msgid) + 2; /* " " + "\n" */
605
606 delArticle(article);
607
608 if (debugShrinking) {
609 struct stat sb;
610
611 fflush(tape->outFp);
612
613 if (fstat(fileno(tape->outFp), &sb) != 0)
614 syswarn("ME oserr fstat %s", tape->outputFilename);
615 else if (sb.st_size != tape->outputSize)
616 syslog(LOG_ERR, "fstat and ftello do not agree: %ld %ld for %s\n",
617 (long) sb.st_size, tape->outputSize, tape->outputFilename);
618 }
619
620 if (tape->outputHighLimit > 0
621 && tape->outputSize >= tape->outputHighLimit) {
622 long oldSize = tape->outputSize;
623 shrinkfile(tape->outFp, tape->outputLowLimit, tape->outputFilename,
624 "a+");
625 tape->outputSize = ftello(tape->outFp);
626 tape->lossage += oldSize - tape->outputSize;
627 }
628 }
629
630
631 /* Pick an article off a tape and return it. NULL is returned if there
632 are no more articles. */
633 Article
getArticle(Tape tape)634 getArticle(Tape tape)
635 {
636 char line[2048]; /* ick. 1024 for filename + 1024 for msgid */
637 char *p, *q;
638 char *msgid, *filename;
639 Article art = NULL;
640 time_t now = theTime();
641
642 ASSERT(tape != NULL);
643
644 if (tape->inFp == NULL && (now - tape->lastRotated) > rotatePeriod)
645 prepareFiles(tape); /* will flush queue too. */
646
647 while (tape->inFp != NULL && art == NULL) {
648 tape->changed = true;
649
650 if (fgets(line, sizeof(line), tape->inFp) == NULL) {
651 if (ferror(tape->inFp))
652 syswarn("ME ioerr on tape file %s", tape->inputFilename);
653 else if (!feof(tape->inFp))
654 syswarn("ME oserr fgets %s", tape->inputFilename);
655
656 if (fclose(tape->inFp) != 0)
657 syswarn("ME ioerr fclose %s", tape->inputFilename);
658
659 d_printf(1, "No more articles on tape %s\n", tape->inputFilename);
660
661 tape->inFp = NULL;
662 tape->scribbled = false;
663
664 unlink(tape->inputFilename);
665
666 if ((now - tape->lastRotated) > rotatePeriod)
667 prepareFiles(tape); /* rotate files to try next. */
668 } else {
669 msgid = filename = NULL;
670
671 for (p = line; *p && isspace((unsigned char) *p);
672 p++) /* eat whitespace */
673 /* nada */;
674
675 if (*p != '\0') {
676 q = strchr(p, ' ');
677
678 if (q != NULL) {
679 filename = p;
680 *q = '\0';
681
682 for (q++; *q && isspace((unsigned char) *q); q++)
683 /* nada */;
684
685 if (*q != '\0') {
686 if (((p = strchr(q, ' ')) != NULL)
687 || ((p = strchr(q, '\n')) != NULL))
688 *p = '\0';
689
690 if (p != NULL)
691 msgid = q;
692 else
693 filename = NULL; /* no trailing newline or blank */
694 } else
695 filename =
696 NULL; /* line had one field and some blanks */
697 } else
698 filename = NULL; /* line only had one field */
699 }
700
701 /* See if message ID looks valid. */
702 if (msgid) {
703 for (p = msgid; *p; p++)
704 ;
705 if (p > msgid)
706 p--;
707 if (*msgid != '<' || *p != '>') {
708 warn("ME tape invalid messageID in %s: %s",
709 tape->inputFilename, msgid);
710 msgid = NULL;
711 }
712 }
713
714 if (filename != NULL && msgid != NULL)
715 art = newArticle(filename, msgid);
716
717 /* art may be NULL here if the file is no longer valid. */
718 }
719 }
720
721 #if defined(INNFEED_DEBUG)
722 /* now we either have an article or there is no more on disk */
723 if (art == NULL) {
724 int c;
725 if (tape->inFp != NULL && ((c = fgetc(tape->inFp)) != EOF))
726 ungetc(c, tape->inFp); /* shouldn't happen */
727 else if (tape->inFp != NULL) {
728 /* last article read was the end of the tape. */
729 if (fclose(tape->inFp) != 0)
730 syswarn("ME ioerr fclose %s", tape->inputFilename);
731
732 tape->inFp = NULL;
733 tape->scribbled = false;
734
735 /* toss out the old input file and prepare the new one */
736 unlink(tape->inputFilename);
737
738 if (now - tape->lastRotated > rotatePeriod)
739 prepareFiles(tape);
740 }
741 }
742 #endif
743
744 if (art == NULL)
745 d_printf(2, "%s All out of articles in the backlog\n", tape->peerName);
746 else
747 d_printf(2, "%s Peeled article %s from backlog\n", tape->peerName,
748 artMsgId(art));
749
750 return art;
751 }
752
753
754 /****************************************************/
755 /** CLASS FUNCTIONS **/
756 /****************************************************/
757
758 /* Cause all the Tapes to checkpoint themselves. */
759 void
checkPointTapes(void)760 checkPointTapes(void)
761 {
762 unsigned int i;
763
764 for (i = 0; i < activeTapeIdx; i++)
765 checkpointTape(activeTapes[i]);
766 }
767
768
769 /* make all the tapes set their checkNew flag. */
770 static void
tapesSetCheckNew(void)771 tapesSetCheckNew(void)
772 {
773 unsigned int i;
774
775 for (i = 0; i < activeTapeIdx; i++)
776 activeTapes[i]->checkNew = true;
777 }
778
779
780 /* Get the pathname of the directory tapes are stored in. */
781 const char *
getTapeDirectory(void)782 getTapeDirectory(void)
783 {
784 ASSERT(tapeDirectory != NULL);
785
786 #if defined(INNFEED_DEBUG)
787 if (tapeDirectory == NULL) {
788 tapeDirectory = xstrdup(dflTapeDir);
789 addPointerFreedOnExit(tapeDirectory);
790 }
791 #endif
792
793 return tapeDirectory;
794 }
795
796
797 #if defined(INNFEED_DEBUG)
798 void
setOutputSizeLimit(long val)799 setOutputSizeLimit(long val)
800 {
801 defaultSizeLimit = val;
802 }
803 #endif
804
805
806 /**********************************************************************/
807 /* PRIVATE FUNCTIONS */
808 /**********************************************************************/
809
810
811 /* Add a new tape to the class-level list of active tapes. */
812 static void
addTapeGlobally(Tape tape)813 addTapeGlobally(Tape tape)
814 {
815 ASSERT(tape != NULL);
816
817 if (activeTapeSize == activeTapeIdx) {
818 unsigned int i;
819
820 activeTapeSize += 10;
821 if (activeTapes != NULL)
822 activeTapes = xrealloc(activeTapes, sizeof(Tape) * activeTapeSize);
823 else
824 activeTapes = xmalloc(sizeof(Tape) * activeTapeSize);
825
826 for (i = activeTapeIdx; i < activeTapeSize; i++)
827 activeTapes[i] = NULL;
828 }
829 activeTapes[activeTapeIdx++] = tape;
830 }
831
832
833 /* Remove a tape for the class-level list of active tapes. */
834 static void
removeTapeGlobally(Tape tape)835 removeTapeGlobally(Tape tape)
836 {
837 unsigned int i;
838
839 if (tape == NULL)
840 return;
841
842 ASSERT(activeTapeIdx > 0);
843
844 for (i = 0; i < activeTapeIdx; i++)
845 if (activeTapes[i] == tape)
846 break;
847
848 ASSERT(i < activeTapeIdx);
849
850 for (; i < (activeTapeIdx - 1); i++)
851 activeTapes[i] = activeTapes[i + 1];
852
853 activeTapes[--activeTapeIdx] = NULL;
854
855 if (activeTapeIdx == 0) {
856 free(activeTapes);
857 activeTapes = NULL;
858 }
859 }
860
861
862 /* Have a tape checkpoint itself so that next process can pick up where
863 this one left off. */
864 static void
checkpointTape(Tape tape)865 checkpointTape(Tape tape)
866 {
867 if (tape->inFp == NULL) /* no input file being read. */
868 return;
869
870 if (!tape->changed) /* haven't read since last checkpoint */
871 {
872 d_printf(1, "Not checkpointing unchanged tape: %s\n", tape->peerName);
873 return;
874 }
875
876 if ((tape->tellpos = ftello(tape->inFp)) < 0) {
877 syswarn("ME oserr ftello %s", tape->inputFilename);
878 return;
879 }
880
881 /* strlen of "18446744073709551616\n" (2^64) */
882 #define BITS64 21
883
884 /* make sure we're not right at the beginning of the file so we can write.
885 */
886 if (tape->tellpos > BITS64) {
887 rewind(tape->inFp);
888
889 /* scribble blanks over the first lines characters */
890 if (!tape->scribbled) {
891 int currloc = 0;
892 int c;
893
894 while ((c = fgetc(tape->inFp)) != '\n' || currloc <= BITS64)
895 if (c == EOF)
896 return;
897 else
898 currloc++;
899
900 rewind(tape->inFp);
901
902 while (currloc-- > 0)
903 fputc(' ', tape->inFp);
904
905 rewind(tape->inFp);
906
907 fflush(tape->inFp);
908 tape->scribbled = true;
909 }
910
911 fprintf(tape->inFp, "%ld", tape->tellpos);
912
913 if (fseeko(tape->inFp, tape->tellpos, SEEK_SET) != 0)
914 syswarn("ME oserr fseeko(%s,%ld,SEEK_SET)", tape->inputFilename,
915 tape->tellpos);
916 }
917
918 tape->changed = false;
919 }
920
921
922 /* Prepare the tape file(s) for input and output */
923
924 /* For a given Tape there are
925 * three possible files: PEER.input PEER and
926 * PEER.output. PEER.input and PEER.output are private to
927 * innfeed. PEER is where a sysadmin can drop a file that (s)he
928 * wants to inject into the process. If the first line of the input file
929 * contains only an integer (possibly surrounded by spaces), then this is
930 * taken to be the position to seek to before reading.
931 *
932 * prepareFiles will process them in a manner much like the following shell
933 * commands:
934 *
935 * if [ ! -f PEER.input ]; then
936 * if [ -f PEER ]; then mv PEER PEER.input
937 * elif [ -f PEER.output ]; then mv PEER.output PEER; fi
938 * fi
939 *
940 * At this point PEER.input is opened for reading if it exists.
941 *
942 * The checkpoint file is left as-is unless the PEER.input file
943 * happens to be newer that the checkpoint file.
944 */
945 static void
prepareFiles(Tape tape)946 prepareFiles(Tape tape)
947 {
948 bool inpExists;
949 bool outExists;
950 bool newExists;
951
952 #if defined(INNFEED_DEBUG)
953 /* flush any in memory articles to disk */
954 if (tape->head != NULL && tape->outFp != NULL)
955 flushTape(tape);
956 #endif
957
958 tape->tellpos = 0;
959
960 /* First time through, or something external has set checkNew */
961 if (tape->lastRotated == 0 || tape->checkNew) {
962 newExists = fileExistsP(tape->handFilename);
963 if (newExists)
964 notice("%s new hand-prepared backlog file", tape->peerName);
965 } else
966 newExists = false;
967
968 if (tape->lastRotated == 0) /* first time here */
969 {
970 inpExists = fileExistsP(tape->inputFilename);
971 outExists = fileExistsP(tape->outputFilename);
972 } else {
973 inpExists =
974 (tape->inFp != NULL) ? true : false; /* can this ever be true?? */
975 outExists =
976 (tape->outFp != NULL && tape->outputSize > 0) ? true : false;
977 }
978
979
980 /* move the hand-dropped file to the input file if needed. */
981 if (newExists && !inpExists) {
982 if (rename(tape->handFilename, tape->inputFilename) != 0)
983 syswarn("ME oserr rename %s, %s", tape->handFilename,
984 tape->inputFilename);
985 else {
986 notice("%s grabbing external tape file", tape->peerName);
987 inpExists = true;
988 }
989 }
990
991 /* now move the output file to the input file, if needed and only if in
992 not in NOROTATE mode. */
993 if (outExists && !inpExists && !tape->noRotate) {
994 if (tape->outFp != NULL) {
995 fclose(tape->outFp);
996 tape->outFp = NULL;
997 }
998
999 if (rename(tape->outputFilename, tape->inputFilename) != 0)
1000 syswarn("ME oserr rename %s, %s", tape->outputFilename,
1001 tape->inputFilename);
1002 else
1003 inpExists = true;
1004 }
1005
1006 /* now open up the input file and seek to the proper position. */
1007 if (inpExists) {
1008 int c;
1009 long flength;
1010
1011 if ((tape->inFp = fopen(tape->inputFilename, "r+")) == NULL)
1012 syswarn("ME fopen %s", tape->inputFilename);
1013 else {
1014 char buffer[64];
1015
1016 if (fgets(buffer, sizeof(buffer) - 1, tape->inFp) == NULL) {
1017 if (feof(tape->inFp)) {
1018 d_printf(1, "Empty input file: %s\n", tape->inputFilename);
1019 unlink(tape->inputFilename);
1020 } else
1021 syswarn("ME oserr fgets %s", tape->inputFilename);
1022
1023 fclose(tape->inFp);
1024 tape->inFp = NULL;
1025 tape->scribbled = false;
1026 } else {
1027 unsigned int len = strlen(buffer);
1028 long newPos = 0;
1029
1030 if (len > 0 && buffer[len - 1] == '\n')
1031 buffer[--len] = '\0';
1032
1033 if (len > 0 && strspn(buffer, "0123456789 \n") == len) {
1034 if (sscanf(buffer, "%ld", &newPos) == 1) {
1035 tape->scribbled = true;
1036 tape->tellpos = newPos;
1037 }
1038 }
1039
1040 if ((flength = fileLength(fileno(tape->inFp)))
1041 < tape->tellpos) {
1042 warn("ME tape short: %s %ld %ld", tape->inputFilename,
1043 flength, tape->tellpos);
1044 tape->tellpos = 0;
1045 } else if (tape->tellpos == 0)
1046 rewind(tape->inFp);
1047 else if (fseeko(tape->inFp, tape->tellpos - 1, SEEK_SET) != 0)
1048 syswarn("ME oserr fseeko(%s,%ld,SEEK_SET)",
1049 tape->inputFilename, tape->tellpos);
1050 else if ((c = fgetc(tape->inFp)) != '\n') {
1051 while (c != EOF && c != '\n')
1052 c = fgetc(tape->inFp);
1053
1054 if (c == EOF) {
1055 fclose(tape->inFp);
1056 unlink(tape->inputFilename);
1057 tape->inFp = NULL;
1058 tape->scribbled = false;
1059 prepareFiles(tape);
1060 } else {
1061 long oldPos = tape->tellpos;
1062
1063 tape->changed = true;
1064 checkpointTape(tape);
1065
1066 warn("ME internal checkpoint line boundary missed:"
1067 " %s %ld vs. %ld",
1068 tape->inputFilename, tape->tellpos, oldPos);
1069 }
1070 }
1071 }
1072 }
1073 }
1074
1075 tape->lastRotated = theTime();
1076 tape->checkNew = false;
1077
1078 /* now open up the output file. */
1079 if (tape->outFp == NULL) {
1080 if ((tape->outFp = fopen(tape->outputFilename, "a+")) == NULL) {
1081 syswarn("ME tape open failed (a+) %s", tape->outputFilename);
1082 return;
1083 }
1084 fseeko(tape->outFp, 0, SEEK_END);
1085 tape->outputSize = ftello(tape->outFp);
1086 tape->lossage = 0;
1087 }
1088 }
1089
1090
1091 static void
tapeCkNewFileCbk(TimeoutId id,void * d UNUSED)1092 tapeCkNewFileCbk(TimeoutId id, void *d UNUSED)
1093 {
1094 ASSERT(id == ckNewFileId);
1095 ASSERT(tapeCkNewFilePeriod > 0);
1096
1097 tapesSetCheckNew();
1098
1099 ckNewFileId = prepareSleep(tapeCkNewFileCbk, tapeCkNewFilePeriod, NULL);
1100 }
1101
1102
1103 /* The timer callback function that will checkpoint all the active tapes. */
1104 static void
tapeCheckpointCallback(TimeoutId id,void * d UNUSED)1105 tapeCheckpointCallback(TimeoutId id, void *d UNUSED)
1106 {
1107 ASSERT(id == checkPtId);
1108 ASSERT(tapeCkPtPeriod > 0);
1109
1110 d_printf(1, "Checkpointing tapes\n");
1111
1112 checkPointTapes();
1113
1114 checkPtId = prepareSleep(tapeCheckpointCallback, tapeCkPtPeriod, NULL);
1115 }
1116
1117
1118 #if defined(INNFEED_DEBUG)
1119 static void
flushTape(Tape tape)1120 flushTape(Tape tape)
1121 {
1122 QueueElem elem;
1123
1124 /* flush out queue to disk. */
1125 elem = tape->head;
1126 while (elem != NULL) {
1127 tape->head = tape->head->next;
1128 fprintf(tape->outFp, "%s %s\n", artFileName(elem->article),
1129 artMsgId(elem->article));
1130
1131 delArticle(elem->article);
1132
1133 free(elem);
1134 elem = tape->head;
1135 }
1136 tape->tail = NULL;
1137 tape->qLength = 0;
1138
1139
1140 /* I'd rather know where I am each time, and I don't trust all
1141 fprintf's to give me character counts. */
1142 tape->outputSize = ftello(tape->outFp);
1143 if (debugShrinking) {
1144 struct stat buf;
1145 static bool logged = false;
1146
1147 fflush(tape->outFp);
1148 if (fstat(fileno(tape->outFp), &buf) != 0 && !logged) {
1149 syslog(LOG_ERR, "fstat failed on %s", tape->outputFilename);
1150 logged = true;
1151 } else if (buf.st_size != tape->outputSize) {
1152 warn("ME fstat and ftello do not agree for %s",
1153 tape->outputFilename);
1154 logged = true;
1155 }
1156 }
1157
1158 if (tape->outputHighLimit > 0
1159 && tape->outputSize > tape->outputHighLimit) {
1160 shrinkfile(tape->outFp, tape->outputLowLimit, tape->outputFilename,
1161 "a+");
1162 tape->outputSize = ftello(tape->outFp);
1163 }
1164 }
1165 #endif
1166
1167
1168 static void
tapeCleanup(void)1169 tapeCleanup(void)
1170 {
1171 free(tapeDirectory);
1172 tapeDirectory = NULL;
1173 }
1174