1 #ifndef lint 2 static char *sccsid = "@(#)tape.c 1.11 (Berkeley) 05/07/85"; 3 #endif 4 5 #include "dump.h" 6 #include <signal.h> 7 8 char (*tblock)[TP_BSIZE]; /* Pointer to malloc()ed buffer for tape */ 9 int writesize; /* Size of malloc()ed buffer for tape */ 10 int trecno = 0; 11 extern int ntrec; /* blocking factor on tape */ 12 13 /* 14 * Streaming dump mods (Caltech) - disk block reading and tape writing 15 * are exported to several slave processes. While one slave writes the 16 * tape, the others read disk blocks; they pass control of the tape in 17 * a ring via pipes. The parent process traverses the filesystem and 18 * sends daddr's, inode records, etc, through pipes to each slave. 19 * Speed from Eagle to TU77 on VAX/780 is about 140 Kbytes/second. 20 * #ifdef RDUMP version is CPU-limited to about 40 Kbytes/second. 21 */ 22 struct req { /* instruction packets sent to slaves */ 23 daddr_t dblk; 24 int count; 25 } *req; 26 int reqsiz; 27 28 #define SLAVES 3 /* 2 slaves read disk while 3rd writes tape */ 29 #define LAG 2 /* Write behind by LAG tape blocks (rdump) */ 30 int slavefd[SLAVES]; /* Pipes from master to each slave */ 31 int rotor; /* Current slave number */ 32 int master; /* Pid of master, for sending error signals */ 33 int trace = 0; /* Protocol trace; easily patchable with adb */ 34 #define tmsg if (trace) msg 35 36 #ifdef RDUMP 37 extern int rmtape; 38 #endif 39 40 /* 41 * Allocate tape buffer contiguous with the array of instruction packets, 42 * so they can be written with a single write call in flusht(). 43 */ 44 alloctape() 45 { 46 47 writesize = ntrec * TP_BSIZE; 48 reqsiz = ntrec * sizeof(struct req); 49 req = (struct req *)malloc(reqsiz+writesize); /* array of packets */ 50 tblock = (char (*)[TP_BSIZE]) &req[ntrec]; /* Tape buffer */ 51 return (req != NULL); 52 } 53 54 /* 55 * Send special record to be put on tape 56 */ 57 taprec(dp) 58 char *dp; 59 { 60 61 tmsg("taprec %d\n", trecno); 62 req[trecno].dblk = (daddr_t)0; 63 req[trecno].count = 1; 64 *(union u_spcl *)(*tblock++) = *(union u_spcl *)dp; 65 spcl.c_tapea++; 66 if (++trecno >= ntrec) 67 flusht(); 68 } 69 70 dmpblk(blkno, size) 71 daddr_t blkno; 72 int size; 73 { 74 int tpblks, dblkno; 75 register int avail; 76 77 if (size % TP_BSIZE != 0) 78 msg("bad size to dmpblk: %d\n", size); 79 dblkno = fsbtodb(sblock, blkno); 80 tpblks = size / TP_BSIZE; 81 while ((avail = MIN(tpblks, ntrec - trecno)) > 0) { 82 tmsg("dmpblk %d\n", avail); 83 req[trecno].dblk = dblkno; 84 req[trecno].count = avail; 85 trecno += avail; 86 spcl.c_tapea += avail; 87 if (trecno >= ntrec) 88 flusht(); 89 dblkno += avail * (TP_BSIZE / DEV_BSIZE); 90 tpblks -= avail; 91 } 92 } 93 94 int nogripe = 0; 95 96 tperror() { 97 if (pipeout) { 98 msg("Tape write error on %s\n", tape); 99 msg("Cannot recover\n"); 100 dumpabort(); 101 /* NOTREACHED */ 102 } 103 msg("Tape write error on tape %d\n", tapeno); 104 broadcast("TAPE ERROR!\n"); 105 if (!query("Do you want to restart?")) 106 dumpabort(); 107 msg("This tape will rewind. After it is rewound,\n"); 108 msg("replace the faulty tape with a new one;\n"); 109 msg("this dump volume will be rewritten.\n"); 110 nogripe = 1; 111 close_rewind(); 112 Exit(X_REWRITE); 113 } 114 115 senderr() 116 { 117 118 perror(" DUMP: pipe error in command to slave"); 119 dumpabort(); 120 } 121 122 #ifdef RDUMP 123 tflush(cnt) 124 int cnt; 125 { 126 int i; 127 128 for (i = 0; i < ntrec; i++) 129 spclrec(); 130 } 131 #endif RDUMP 132 133 flusht() 134 { 135 int sig, siz = (char *)tblock - (char *)req; 136 137 tmsg("flusht %d\n", siz); 138 sig = sigblock(1<<SIGINT-1 | 1<<SIGIOT-1); /* Don't interrupt write */ 139 if (write(slavefd[rotor], req, siz) != siz) 140 senderr(); 141 sigsetmask(sig); 142 if (++rotor >= SLAVES) rotor = 0; 143 tblock = (char (*)[TP_BSIZE]) &req[ntrec]; 144 trecno = 0; 145 asize += writesize/density; 146 asize += 7; /* inter-record gap (why fixed?) */ 147 blockswritten += ntrec; 148 if (!pipeout && asize > tsize) { 149 close_rewind(); 150 otape(); 151 } 152 timeest(); 153 } 154 155 rewind() 156 { 157 register int f; 158 159 if (pipeout) 160 return; 161 for (f = 0; f < SLAVES; f++) 162 close(slavefd[f]); 163 while (wait(NULL) >= 0) ; /* wait for any signals from slaves */ 164 msg("Tape rewinding\n"); 165 #ifdef RDUMP 166 rmtclose(); 167 while (rmtopen(tape, 0) < 0) 168 sleep(10); 169 rmtclose(); 170 #else 171 close(to); 172 while ((f = open(tape, 0)) < 0) 173 sleep (10); 174 close(f); 175 #endif 176 } 177 178 close_rewind() 179 { 180 rewind(); 181 if (!nogripe) { 182 msg("Change Tapes: Mount tape #%d\n", tapeno+1); 183 broadcast("CHANGE TAPES!\7\7\n"); 184 } 185 while (!query("Is the new tape mounted and ready to go?")) 186 if (query("Do you want to abort?")) 187 dumpabort(); 188 } 189 190 /* 191 * We implement taking and restoring checkpoints on the tape level. 192 * When each tape is opened, a new process is created by forking; this 193 * saves all of the necessary context in the parent. The child 194 * continues the dump; the parent waits around, saving the context. 195 * If the child returns X_REWRITE, then it had problems writing that tape; 196 * this causes the parent to fork again, duplicating the context, and 197 * everything continues as if nothing had happened. 198 */ 199 200 otape() 201 { 202 int parentpid; 203 int childpid; 204 int status; 205 int waitpid; 206 int interrupt(); 207 208 parentpid = getpid(); 209 210 restore_check_point: 211 signal(SIGINT, interrupt); 212 /* 213 * All signals are inherited... 214 */ 215 childpid = fork(); 216 if (childpid < 0) { 217 msg("Context save fork fails in parent %d\n", parentpid); 218 Exit(X_ABORT); 219 } 220 if (childpid != 0) { 221 /* 222 * PARENT: 223 * save the context by waiting 224 * until the child doing all of the work returns. 225 * don't catch the interrupt 226 */ 227 signal(SIGINT, SIG_IGN); 228 #ifdef TDEBUG 229 msg("Tape: %d; parent process: %d child process %d\n", 230 tapeno+1, parentpid, childpid); 231 #endif TDEBUG 232 while ((waitpid = wait(&status)) != childpid) 233 msg("Parent %d waiting for child %d has another child %d return\n", 234 parentpid, childpid, waitpid); 235 if (status & 0xFF) { 236 msg("Child %d returns LOB status %o\n", 237 childpid, status&0xFF); 238 } 239 status = (status >> 8) & 0xFF; 240 #ifdef TDEBUG 241 switch(status) { 242 case X_FINOK: 243 msg("Child %d finishes X_FINOK\n", childpid); 244 break; 245 case X_ABORT: 246 msg("Child %d finishes X_ABORT\n", childpid); 247 break; 248 case X_REWRITE: 249 msg("Child %d finishes X_REWRITE\n", childpid); 250 break; 251 default: 252 msg("Child %d finishes unknown %d\n", 253 childpid, status); 254 break; 255 } 256 #endif TDEBUG 257 switch(status) { 258 case X_FINOK: 259 Exit(X_FINOK); 260 case X_ABORT: 261 Exit(X_ABORT); 262 case X_REWRITE: 263 goto restore_check_point; 264 default: 265 msg("Bad return code from dump: %d\n", status); 266 Exit(X_ABORT); 267 } 268 /*NOTREACHED*/ 269 } else { /* we are the child; just continue */ 270 #ifdef TDEBUG 271 sleep(4); /* allow time for parent's message to get out */ 272 msg("Child on Tape %d has parent %d, my pid = %d\n", 273 tapeno+1, parentpid, getpid()); 274 #endif 275 #ifdef RDUMP 276 while ((to = rmtopen(tape, 2)) < 0) 277 #else 278 while ((to = pipeout ? 1 : creat(tape, 0666)) < 0) 279 #endif 280 if (!query("Cannot open tape. Do you want to retry the open?")) 281 dumpabort(); 282 283 enslave(); /* Share open tape file descriptor with slaves */ 284 285 asize = 0; 286 tapeno++; /* current tape sequence */ 287 newtape++; /* new tape signal */ 288 spcl.c_volume++; 289 spcl.c_type = TS_TAPE; 290 spclrec(); 291 if (tapeno > 1) 292 msg("Tape %d begins with blocks from ino %d\n", 293 tapeno, ino); 294 } 295 } 296 297 dumpabort() 298 { 299 if (master != 0 && master != getpid()) 300 kill(master, SIGIOT); 301 msg("The ENTIRE dump is aborted.\n"); 302 Exit(X_ABORT); 303 } 304 305 Exit(status) 306 { 307 #ifdef TDEBUG 308 msg("pid = %d exits with status %d\n", getpid(), status); 309 #endif TDEBUG 310 exit(status); 311 } 312 313 #define OK 020 314 char tok = OK; 315 316 enslave() 317 { 318 int prev[2], next[2], cmd[2]; /* file descriptors for pipes */ 319 int i, j, ret, slavepid; 320 321 master = getpid(); 322 signal(SIGPIPE, dumpabort); 323 signal(SIGIOT, tperror); /* SIGIOT asks for restart from checkpoint */ 324 pipe(prev); 325 for (i = rotor = 0; i < SLAVES; ++i) { 326 if ((i < SLAVES - 1 && pipe(next) < 0) || pipe(cmd) < 0 327 || (slavepid = fork()) < 0) { 328 perror(" DUMP: too many slaves"); 329 dumpabort(); 330 } 331 if (i >= SLAVES - 1) 332 next[1] = prev[1]; /* Last slave loops back */ 333 slavefd[i] = cmd[1]; 334 if (slavepid == 0) { /* Slave starts up here */ 335 for (j = 0; j <= i; j++) 336 close(slavefd[j]); 337 if (i < SLAVES - 1) { 338 close(prev[1]); 339 close(next[0]); 340 } else { /* Insert initial token */ 341 if ((ret = write(next[1], &tok, 1)) != 1) 342 ringerr(ret, "cannot start token"); 343 } 344 doslave(i, cmd[0], prev[0], next[1]); 345 close(next[1]); 346 j = read(prev[0], &tok, 1); /* Eat the final token */ 347 #ifdef RDUMP /* Read remaining acknowledges */ 348 for (; j > 0 && (tok &~ OK) > 0; tok--) { 349 if (rmtwrite2() != writesize && (tok & OK)) { 350 kill(master, SIGIOT); 351 tok &= ~OK; 352 } 353 } 354 #endif 355 Exit(X_FINOK); 356 } 357 close(cmd[0]); 358 close(next[1]); 359 close(prev[0]); 360 prev[0] = next[0]; 361 } 362 master = 0; 363 } 364 365 /* 366 * Somebody must have died, should never happen 367 */ 368 ringerr(code, msg, a1, a2) 369 int code; 370 char *msg; 371 int a1, a2; 372 { 373 char buf[BUFSIZ]; 374 375 fprintf(stderr, " DUMP: "); 376 sprintf(buf, msg, a1, a2); 377 if (code < 0) 378 perror(msg); 379 else if (code == 0) 380 fprintf(stderr, "%s: unexpected EOF\n", buf); 381 else 382 fprintf(stderr, "%s: code %d\n", buf, code); 383 kill(master, SIGPIPE); 384 Exit(X_ABORT); 385 } 386 387 int childnum; 388 sigpipe() 389 { 390 391 ringerr(childnum, "SIGPIPE raised"); 392 } 393 394 doslave(num, cmd, prev, next) 395 int num, cmd, prev, next; 396 { 397 int ret; 398 399 tmsg("slave %d\n", num); 400 signal(SIGINT, SIG_IGN); /* Master handles it */ 401 signal(SIGTERM, SIG_IGN); 402 signal(SIGPIPE, sigpipe); 403 childnum = num; 404 close(fi); 405 if ((fi = open(disk, 0)) < 0) { /* Need our own seek pointer */ 406 perror(" DUMP: can't reopen disk"); 407 kill(master, SIGPIPE); 408 Exit(X_ABORT); 409 } 410 while ((ret = readpipe(cmd, req, reqsiz)) == reqsiz) { 411 register struct req *p = req; 412 for (trecno = 0; trecno < ntrec; trecno += p->count, p += p->count) { 413 if (p->dblk) { 414 tmsg("%d READS %d\n", num, p->count); 415 bread(p->dblk, tblock[trecno], 416 p->count * TP_BSIZE); 417 } else { 418 tmsg("%d PIPEIN %d\n", num, p->count); 419 if (p->count != 1) 420 ringerr(11, "%d PIPEIN %d", num, 421 p->count); 422 if (readpipe(cmd, tblock[trecno], TP_BSIZE) != TP_BSIZE) 423 senderr(); 424 } 425 } 426 if ((ret = read(prev, &tok, 1)) != 1) 427 ringerr(ret, "read token"); /* Wait your turn */ 428 tmsg("%d WRITE\n", num); 429 #ifdef RDUMP 430 if (tok & OK) { 431 rmtwrite0(writesize); 432 rmtwrite1(tblock[0], writesize); 433 tok++; /* Number of writes in progress */ 434 } 435 if (tok > (LAG|OK) && (--tok, rmtwrite2() != writesize)) { 436 #else 437 if ((tok & OK) && 438 write(to, tblock[0], writesize) != writesize) { 439 perror(tape); 440 #endif 441 kill(master, SIGIOT); /* restart from checkpoint */ 442 tok &= ~OK; 443 } 444 if ((ret = write(next, &tok, 1)) != 1) 445 ringerr(ret, "write token"); /* Next slave's turn */ 446 } 447 if (ret != 0) 448 ringerr(ret, "partial record?"); 449 tmsg("%d CLOSE\n", num); 450 } 451 452 /* 453 * Since a read from a pipe may not return all we asked for 454 * we must loop until we get all we need 455 */ 456 readpipe(fd, buf, cnt) 457 int fd; 458 char *buf; 459 int cnt; 460 { 461 int rd, got; 462 463 for (rd = cnt; rd > 0; rd -= got) { 464 got = read(fd, buf, rd); 465 if (got < 0) 466 return (got); 467 if (got == 0) 468 return (cnt - rd); 469 buf += got; 470 } 471 return (cnt); 472 } 473