1 /*
2     Buffer.  Very fast reblocking filter speedy writing of tapes.
3     Copyright (C) 1990,1991  Lee McLoughlin
4 
5     This program is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation; either version 1, or (at your option)
8     any later version.
9 
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14 
15     You should have received a copy of the GNU General Public License
16     along with this program; if not, write to the Free Software
17     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 
19     Lee McLoughlin.
20     Dept of Computing, Imperial College,
21     180 Queens Gate, London, SW7 2BZ, UK.
22 
23     Email: L.McLoughlin@doc.ic.ac.uk
24 */
25 
26 /* This is a reblocking process, designed to try and read from stdin
27  * and write to stdout - but to always try and keep the writing side
28  * busy.  It is meant to try and stream tape writes.
29  *
30  * This program runs in two parts.  The reader and the writer.  They
31  * communicate using shared memory with semaphores locking the access.
32  * The shared memory implements a circular list of blocks of data.
33  *
34  * L.McLoughlin, Imperial College, 1990
35  *
36  * $Log: buffer.c,v $
37  * Revision 1.19  1995/08/24  17:46:28  lmjm
38  * Be more careful abour EINTR errors
39  * Ingnore child processes dying.
40  *
41  * Revision 1.18  1993/08/25  19:07:31  lmjm
42  * Added Brad Isleys patchs to read/sigchld handling.
43  *
44  * Revision 1.17  1993/06/04  10:26:39  lmjm
45  * Cleaned up error reporting.
46  * Spot when the child terminating is not mine but inherited from via exec.
47  * Use only one semaphore group.
48  * Print out why writer died on error.
49  *
50  * Revision 1.16  1993/05/28  10:47:32  lmjm
51  * Debug shutdown sequence.
52  *
53  * Revision 1.15  1992/11/23  23:32:58  lmjm
54  * Oops!  This should be outside the ifdef
55  *
56  * Revision 1.14  1992/11/23  23:29:58  lmjm
57  * allow MAX_BLOCKSIZE and DEF_SHMEM to be configured
58  *
59  * Revision 1.13  1992/11/23  23:22:29  lmjm
60  * Printf's use %lu where appropriate.
61  *
62  * Revision 1.12  1992/11/23  23:17:55  lmjm
63  * Got rid of floats and use Kbyte counters instead.
64  *
65  * Revision 1.11  1992/11/03  23:11:51  lmjm
66  * Forgot Andi Karrer on the patch list.
67  *
68  * Revision 1.10  1992/11/03  22:58:41  lmjm
69  * Cleaned up the debugging prints.
70  *
71  * Revision 1.9  1992/11/03  22:53:00  lmjm
72  * Corrected stdin, stout and showevery use.
73  *
74  * Revision 1.8  1992/11/03  22:41:34  lmjm
75  * Added 2Gig patches from:
76  * Andi Karrer <karrer@bernina.ethz.ch>
77  * Rumi Zahir <rumi@iis.ethz.ch>
78  * Christoph Wicki <wicki@iis.ethz.ch>
79  *
80  * Revision 1.7  1992/07/23  20:42:03  lmjm
81  * Added 't' option to print total writen at end.
82  *
83  * Revision 1.6  1992/04/07  19:57:30  lmjm
84  * Added Kevins -B and -p options.
85  * Turn off buffering to make -S output appear ok.
86  * Added GPL.
87  *
88  * Revision 1.5  90/07/22  18:46:38  lmjm
89  * Added system 5 support.
90  *
91  * Revision 1.4  90/07/22  18:29:48  lmjm
92  * Updated arg handling to be more consistent.
93  * Make sofar printing size an option.
94  *
95  * Revision 1.3  90/05/15  23:27:46  lmjm
96  * Added -S option (show how much has been writen).
97  * Added -m option to specify how much shared memory to grab.
98  * Now tries to fill this with blocks.
99  * reader waits for writer to terminate and then frees the shared mem and sems.
100  *
101  * Revision 1.2  90/01/20  21:37:59  lmjm
102  * Reset default number of  blocks and blocksize for best thruput of
103  * standard tar 10K Allow.
104  * blocks number of blocks to be changed.
105  * Don't need a hole in the circular queue since the semaphores prevent block
106  * clash.
107  *
108  * Revision 1.1  90/01/17  11:30:23  lmjm
109  * Initial revision
110  *
111  */
112 #include <unistd.h>
113 #include <stdio.h>
114 #include <signal.h>
115 #include <fcntl.h>
116 #include <errno.h>
117 #include <machine/param.h>
118 #include <sys/types.h>
119 #include <sys/stat.h>
120 #include <sys/ipc.h>
121 #include <sys/shm.h>
122 #include <sys/sem.h>
123 #include <sys/wait.h>
124 #include <sys/time.h>
125 #include <limits.h>
126 #include <stdlib.h>
127 #include <strings.h>
128 #include "sem.h"
129 
130 #ifndef lint
131 static char *rcsid = "$Header: /a/swan/home/swan/staff/csg/lmjm/src/buffer/RCS/buffer.c,v 1.19 1995/08/24 17:46:28 lmjm Exp lmjm $";
132 #endif
133 
134 #ifndef __alpha
135 #endif /* __alpha */
136 
137 /* General macros */
138 #define TRUE 1
139 #define FALSE 0
140 #define K *1024
141 #define M *1024*1024
142 
143 /* Some forward declarations */
144 void byee();
145 void start_reader_and_writer();
146 void parse_args();
147 void set_handlers();
148 void buffer_allocate();
149 void report_proc();
150 int do_size();
151 void get_buffer();
152 void reader();
153 void writer();
154 void writer_end();
155 void wait_for_writer_end();
156 void get_next_free_block();
157 void test_writer();
158 int fill_block();
159 void get_next_filled_block();
160 int data_to_write();
161 void write_blocks_to_stdout();
162 void write_block_to_stdout();
163 void pr_out();
164 void end_writer();
165 
166 /* When showing print a note every this many bytes writen */
167 int showevery = 0;
168 #define PRINT_EVERY 10 K
169 
170 /* Pause after every write */
171 unsigned write_pause;
172 
173 /* This is the inter-process buffer - it implements a circular list
174  * of blocks. */
175 
176 #ifdef AMPEX
177 #define MAX_BLOCKSIZE (4 M)
178 #define DEF_BLOCKSIZE MAX_BLOCKSIZE
179 #define DEF_SHMEM (32 M)
180 #endif
181 
182 
183 #ifndef MAX_BLOCKSIZE
184 #define MAX_BLOCKSIZE (512 K)
185 #endif
186 #ifndef DEF_BLOCKSIZE
187 #define DEF_BLOCKSIZE (10 K)
188 #endif
189 
190 int blocksize = DEF_BLOCKSIZE;
191 
192 /* Which process... in error reports*/
193 char *proc_string = "buffer";
194 
195 /* Numbers of blocks in the queue.
196  */
197 #define MAX_BLOCKS 2048
198 int blocks = 1;
199 /* Circular increment of a buffer index */
200 #define INC(i) (((i)+1) == blocks ? 0 : ((i)+1))
201 
202 /* Max amount of shared memory you can allocate - can't see a way to look
203  * this up.
204  */
205 #ifndef DEF_SHMEM
206 #define DEF_SHMEM (1 K K)
207 #endif
208 int max_shmem = DEF_SHMEM;
209 
210 /* Just a flag to show unfilled */
211 #define NONE (-1)
212 
213 /* the shared memory id of the buffer */
214 int buffer_id = NONE;
215 struct block {
216 	int bytes;
217 	char *data;
218 } *curr_block;
219 
220 #define NO_BUFFER ((struct buffer *)-1)
221 struct buffer {
222 	/* Id of the semaphore group */
223 	int semid;
224 
225 	/* writer will hang trying to lock this till reader fills in a block */
226 	int blocks_used_lock;
227 	/* reader will hang trying to lock this till writer empties a block */
228 	int blocks_free_lock;
229 
230 	int next_block_in;
231 	int next_block_out;
232 
233 	struct block block[ MAX_BLOCKS ];
234 
235 	/* These actual space for the blocks is here - the array extends
236 	 * pass 1 */
237 	char data_space[ 1 ];
238 } *pbuffer = NO_BUFFER;
239 int buffer_size;
240 
241 int fdin	= 0;
242 int fdout	= 1;
243 int in_ISCHR	= 0;
244 int out_ISCHR	= 0;
245 int padblock	= FALSE;
246 int writer_pid	= 0;
247 int reader_pid	= 0;
248 int free_shm	= 1;
249 int percent	= 0;
250 int debug	= 0;
251 int Zflag	= 0;
252 int writer_status = 0;
253 char *progname = "buffer";
254 
255 char print_total = 0;
256 /* Number of K output */
257 unsigned long outk = 0;
258 
259 struct timeval starttime;
260 
261 int
main(argc,argv)262 main( argc, argv )
263 	int argc;
264 	char **argv;
265 {
266 	parse_args( argc, argv );
267 
268 	set_handlers();
269 
270 	buffer_allocate();
271 
272 	gettimeofday(&starttime, NULL);
273 
274 	start_reader_and_writer();
275 
276 	byee( 0 );
277 
278 	/* NOTREACHED */
279 	exit( 0 );
280 }
281 
282 void
parse_args(argc,argv)283 parse_args( argc, argv )
284 	int argc;
285 	char **argv;
286 {
287 	int c;
288 	int iflag = 0;
289 	int oflag = 0;
290 	int zflag = 0;
291 	extern char *optarg;
292 	char blocks_given = FALSE;
293 	struct stat buf;
294 
295 
296 	while( (c = getopt( argc, argv, "BS:Zdm:s:b:p:u:ti:o:z:" )) != -1 ){
297 		switch( c ){
298 		case 't': /* Print to stderr the total no of bytes writen */
299 			print_total++;
300 			break;
301 		case 'u': /* pause after write for given microseconds */
302 			write_pause = atoi( optarg );
303 			break;
304 		case 'B':   /* Pad last block */
305 			padblock = TRUE;
306 			break;
307 		case 'Z':   /* Zero by lseek on the tape device */
308 			Zflag = TRUE;
309 			break;
310 		case 'i': /* Input file */
311 			iflag++;
312 			if( iflag > 1 ){
313 				report_proc();
314 				fprintf( stderr, "-i given twice\n" );
315 				byee( -1 );
316 			}
317 			if( (fdin = open( optarg, O_RDONLY )) < 0 ){
318 				report_proc();
319 				perror( "cannot open input file" );
320 				fprintf( stderr, "filename: %s\n", optarg );
321 				byee ( -1 );
322 			}
323 			break;
324 		case 'o': /* Output file */
325 			oflag++;
326 			if( oflag > 1 ){
327 				report_proc();
328 				fprintf( stderr, "-o given twice\n" );
329 				byee( -1 );
330 			}
331 			if( (fdout = open( optarg, O_WRONLY | O_CREAT | O_TRUNC, 0666 )) < 0 ){
332 				report_proc();
333 				perror( "cannot open output file" );
334 				fprintf( stderr, "filename: %s\n", optarg );
335 				byee ( -1 );
336 			}
337 			break;
338 		case 'S':
339 			/* Show every once in a while how much is printed */
340 			showevery = do_size( optarg );
341 			if( showevery <= 0 )
342 				showevery = PRINT_EVERY;
343 			break;
344 		case 'd':	/* debug */
345 			debug++;
346 			if( debug == 1 ){
347 				setbuf( stdout, NULL );
348 				setbuf( stderr, NULL );
349 				fprintf( stderr, "debugging turned on\n" );
350 			}
351 			break;
352 		case 'm':
353 			/* Max size of shared memory lump */
354 			max_shmem = do_size( optarg );
355 
356 			if( max_shmem < (sizeof( struct buffer ) + (blocksize * blocks)) ){
357 				fprintf( stderr, "max_shmem %d too low\n", max_shmem );
358 				byee( -1 );
359 			}
360 			break;
361 		case 'b':
362 			/* Number of blocks */
363 			blocks_given = TRUE;
364 			blocks = atoi( optarg );
365 			if( (blocks <= 0) || (MAX_BLOCKS < blocks) ){
366 				fprintf( stderr, "blocks %d out of range\n", blocks );
367 				byee( -1 );
368 			}
369 			break;
370 		case 'p':	/* percent to wait before dumping */
371 			percent = atoi( optarg );
372 
373 			if( (percent < 0) || (100 < percent) ){
374 				fprintf( stderr, "percent %d out of range\n", percent );
375 				byee( -1 );
376 			}
377 			if( debug )
378 				fprintf( stderr, "percent set to %d\n", percent );
379 			break;
380 		case 'z':
381 			zflag++;
382 			/* FALL THRU */
383 		case 's':	/* Size of a block */
384 			blocksize = do_size( optarg );
385 
386 			if( (blocksize <= 0) || (MAX_BLOCKSIZE < blocksize) ){
387 				fprintf( stderr, "blocksize %d out of range\n", blocksize );
388 				byee( -1 );
389 			}
390 			break;
391 		default:
392 			fprintf( stderr, "Usage: %s [-B] [-t] [-S size] [-m memsize] [-b blocks] [-p percent] [-s blocksize] [-u pause] [-i infile] [-o outfile] [-z size]\n",
393 				progname );
394 			fprintf( stderr, "-B = blocked device - pad out last block\n" );
395 			fprintf( stderr, "-t = show total amount written at end\n" );
396 			fprintf( stderr, "-S size = show amount written every size bytes\n" );
397 			fprintf( stderr, "-m size = size of shared mem chunk to grab\n" );
398 			fprintf( stderr, "-b num = number of blocks in queue\n" );
399 			fprintf( stderr, "-p percent = don't start writing until percent blocks filled\n" );
400 			fprintf( stderr, "-s size = size of a block\n" );
401 			fprintf( stderr, "-u usecs = microseconds to sleep after each write\n" );
402 			fprintf( stderr, "-i infile = file to read from\n" );
403 			fprintf( stderr, "-o outfile = file to write to\n" );
404 			fprintf( stderr, "-z size = combined -S/-s flag\n" );
405 			byee( -1 );
406 		}
407 	}
408 
409 	if (argc > optind) {
410 		fprintf( stderr, "too many arguments\n" );
411 		byee( -1 );
412 	}
413 
414 	if (zflag) showevery = blocksize;
415 
416 	/* If -b was not given try and work out the max buffer size */
417 	if( !blocks_given ){
418 		blocks = (max_shmem - sizeof( struct buffer )) / blocksize;
419 		if( blocks <= 0 ){
420 			fprintf( stderr, "Cannot handle blocks that big, aborting!\n" );
421 			byee( -1 );
422 		}
423 		if( MAX_BLOCKS < blocks  ){
424 			fprintf( stderr, "Cannot handle that many blocks, aborting!\n" );
425 			byee( -1 );
426 		}
427 	}
428 
429 	/* check if fdin or fdout are character special files */
430 	if( fstat( fdin, &buf ) != 0 ){
431 		report_proc();
432 		perror( "can't stat input file" );
433 		byee( -1 );
434 	}
435 	in_ISCHR = S_ISCHR( buf.st_mode );
436 	if( fstat( fdout, &buf ) != 0 ){
437 		report_proc();
438 		perror( "can't stat output file" );
439 		byee( -1 );
440 	}
441 	out_ISCHR = S_ISCHR( buf.st_mode );
442 }
443 
444 /* The interrupt handler */
445 void
shutdown()446 shutdown()
447 {
448 	static int shutting;
449 	if( shutting ){
450 		if( debug )
451 			fprintf( stderr, "%s: ALREADY SHUTTING!\n", proc_string );
452 		return;
453 	}
454 	shutting = 1;
455 	if( debug )
456 		fprintf( stderr, "%s: shutdown on signal\n", proc_string );
457 
458 	byee( -1 );
459 }
460 
461 /* Shutdown because the child has ended */
462 void
child_shutdown()463 child_shutdown()
464 {
465 	/* Find out which child has died.  (They may not be my
466 	 * children if buffer was exec'd on top of something that had
467 	 * childred.)
468 	 */
469 	int deadpid;
470 
471 	while( (deadpid = waitpid( -1, &writer_status, WNOHANG )) &&
472 		deadpid != -1 && deadpid != 0 ){
473 		if( debug > 2 )
474 			fprintf( stderr, "child_shutdown %d: 0x%04x\n", deadpid, writer_status );
475 		if( deadpid == writer_pid ){
476 			if( debug > 2 )
477 				fprintf( stderr, "writer has ended\n" );
478 			writer_pid = 0;
479 			byee( 0 );
480 		}
481 	}
482 }
483 
484 void
set_handlers()485 set_handlers()
486 {
487 	if( debug )
488 		fprintf( stderr, "%s: setting handlers\n", proc_string );
489 
490 	signal( SIGHUP, shutdown );
491 	signal( SIGINT, shutdown );
492 	signal( SIGQUIT, shutdown );
493 	signal( SIGTERM, shutdown );
494 #ifdef SIGCHLD
495 	signal( SIGCHLD, child_shutdown );
496 #else
497 #ifdef SIGCLD
498 	signal( SIGCLD, child_shutdown );
499 #endif
500 #endif
501 }
502 
503 void
buffer_allocate()504 buffer_allocate()
505 {
506 	/* Allow for the data space */
507 	buffer_size = sizeof( struct buffer ) +
508 		((blocks * blocksize) - sizeof( char ));
509 
510 	/* Create the space for the buffer */
511 	buffer_id = shmget( IPC_PRIVATE,
512 			   buffer_size,
513 			   IPC_CREAT|S_IREAD|S_IWRITE );
514 	if( buffer_id < 0 ){
515 		report_proc();
516 		perror( "couldn't create shared memory segment" );
517 		byee( -1 );
518 	}
519 
520 	get_buffer();
521 
522 	if( debug )
523 		fprintf( stderr, "%s pbuffer is 0x%08lx, buffer_size is %d [%d x %d]\n",
524 			proc_string,
525 			(unsigned long)pbuffer, buffer_size, blocks, blocksize );
526 
527 #ifdef SYS5
528 	memset( (char *)pbuffer, '\0', buffer_size );
529 #else
530 	bzero( (char *)pbuffer, buffer_size );
531 #endif
532 	pbuffer->semid = -1;
533 	pbuffer->blocks_used_lock = -1;
534 	pbuffer->blocks_free_lock = -1;
535 
536 	pbuffer->semid = new_sems( 2 );	/* Get a read and a write sem */
537 	pbuffer->blocks_used_lock = 0;
538 	/* Start it off locked - it is unlocked when a buffer gets filled in */
539 	lock( pbuffer->semid, pbuffer->blocks_used_lock );
540 
541 	pbuffer->blocks_free_lock = 1;
542 	/* Initializing the semaphore to "blocks - 1" causes a hang when using option
543 	 * "-p 100" because it always keeps one block free, so we'll never reach 100% fill
544 	 * level. However, there doesn't seem to be a good reason to keep one block free,
545 	 * so we initialize the semaphore to "blocks" instead. */
546 	sem_set( pbuffer->semid, pbuffer->blocks_free_lock, blocks );
547 
548 	/* Detattach the shared memory so the fork doesnt do anything odd */
549 	shmdt( (char *)pbuffer );
550 	pbuffer = NO_BUFFER;
551 }
552 
553 void
buffer_remove()554 buffer_remove()
555 {
556 	static char removing = FALSE;
557 
558 	/* Avoid accidental recursion */
559 	if( removing )
560 		return;
561 	removing = TRUE;
562 
563 	/* Buffer not yet created */
564 	if( buffer_id == NONE )
565 		return;
566 
567 	/* There should be a buffer so this must be after its detached it
568 	 * but before the fork picks it up */
569 	if( pbuffer == NO_BUFFER )
570 		get_buffer();
571 
572 	if( debug )
573 		fprintf( stderr, "%s: removing semaphores and buffer\n", proc_string );
574 	remove_sems( pbuffer->semid );
575 
576 	if( shmctl( buffer_id, IPC_RMID, (struct shmid_ds *)0 ) == -1 ){
577 		report_proc();
578 		perror( "failed to remove shared memory buffer" );
579 	}
580 }
581 
582 void
get_buffer()583 get_buffer()
584 {
585 	int b;
586 
587 	/* Grab the buffer space */
588 	pbuffer = (struct buffer *)shmat( buffer_id, (char *)0, 0 );
589 	if( pbuffer == NO_BUFFER ){
590 		report_proc();
591 		perror( "failed to attach shared memory" );
592 		byee( -1 );
593 	}
594 
595 	/* Setup the data space pointers */
596 	for( b = 0; b < blocks; b++ )
597 		pbuffer->block[ b ].data =
598 			&pbuffer->data_space[ b * blocksize ];
599 
600 }
601 
602 void
start_reader_and_writer()603 start_reader_and_writer()
604 {
605 	fflush( stdout );
606 	fflush( stderr );
607 
608 	if( (writer_pid = fork()) == -1 ){
609 		report_proc();
610 		perror( "unable to fork" );
611 		byee( -1 );
612 	}
613 	else if( writer_pid == 0 ){
614 		free_shm = 0;
615 		proc_string = "buffer (writer)";
616 		reader_pid = getppid();
617 
618 		/* Never trust fork() to propogate signals - reset them */
619 		set_handlers();
620 
621 		writer();
622 	}
623 	else {
624 		proc_string = "buffer (reader)";
625 		reader();
626 
627 		wait_for_writer_end();
628 	}
629 }
630 
631 /* Read from stdin into the buffer */
632 void
reader()633 reader()
634 {
635 	if( debug )
636 		fprintf( stderr, "R: Entering reader\n" );
637 
638 	get_buffer();
639 
640 	while( 1 ){
641 		get_next_free_block();
642 		if( ! fill_block() )
643 			break;
644 	}
645 
646 	if( debug )
647 		fprintf( stderr, "R: Exiting reader\n" );
648 }
649 
650 void
get_next_free_block()651 get_next_free_block()
652 {
653 	test_writer();
654 
655 	/* Maybe wait till there is room in the buffer */
656 	lock( pbuffer->semid, pbuffer->blocks_free_lock );
657 
658 	curr_block = &pbuffer->block[ pbuffer->next_block_in ];
659 
660 	pbuffer->next_block_in = INC( pbuffer->next_block_in );
661 }
662 
663 int
fill_block()664 fill_block()
665 {
666 	int bytes = 0;
667 	char *start;
668 	int toread;
669 	static char eof_reached = 0;
670 
671 	if( eof_reached ){
672 		curr_block->bytes = 0;
673 		unlock( pbuffer->semid, pbuffer->blocks_used_lock );
674 		return 0;
675 	}
676 
677 	start = curr_block->data;
678 	toread = blocksize;
679 
680 	/* Fill the block with input.  This reblocks the input. */
681 	while( toread != 0 ){
682 		bytes = read( fdin, start, toread );
683 		if( bytes <= 0 ){
684 			/* catch interrupted system calls for death
685 			 * of children in pipeline */
686 			if( bytes < 0 && errno == EINTR )
687 				continue;
688 			break;
689 		}
690 		start += bytes;
691 		toread -= bytes;
692 	}
693 
694 	if( bytes == 0 )
695 		eof_reached = 1;
696 
697 	if( bytes < 0 ){
698 		report_proc();
699 		perror( "failed to read input" );
700 		byee( -1 );
701 	}
702 
703 	/* number of bytes available. Zero will be taken as eof */
704 	if( !padblock || toread == blocksize )
705 		curr_block->bytes = blocksize - toread;
706 	else {
707 		if( toread ) bzero( start, toread );
708 		curr_block->bytes = blocksize;
709 	}
710 
711 	if( debug > 1 )
712 		fprintf( stderr, "R: got %d bytes\n", curr_block->bytes );
713 
714 	unlock( pbuffer->semid, pbuffer->blocks_used_lock );
715 
716 	return curr_block->bytes;
717 }
718 
719 /* Write the buffer to stdout */
720 void
writer()721 writer()
722 {
723 	int filled = 0;
724 	int maxfilled = (blocks * percent) / 100;
725 	int first_block = 0;
726 
727 	if( debug )
728 		fprintf( stderr, "\tW: Entering writer\n blocks = %d\n maxfilled = %d\n",
729 			blocks,
730 			maxfilled );
731 
732 	get_buffer();
733 
734 	while( 1 ){
735 		if( !filled )
736 			first_block = pbuffer->next_block_out;
737 		get_next_filled_block();
738 		if( !data_to_write() )
739 			break;
740 
741 		filled++;
742 		if( debug > 1 )
743 			fprintf( stderr, "W: filled = %d\n", filled );
744 		if( filled >= maxfilled ){
745 			if( debug > 1 )
746 				fprintf( stderr, "W: writing\n" );
747 			write_blocks_to_stdout( filled, first_block );
748 			filled = 0;
749 		}
750 	}
751 
752 	write_blocks_to_stdout( filled, first_block );
753 
754 	if( showevery ){
755 		pr_out();
756 		fprintf( stderr, "\n" );
757 	}
758 
759 	if( print_total ){
760 		fprintf( stderr, "Kilobytes Out %lu\n", outk );
761 	}
762 
763 	if( debug )
764 		fprintf( stderr, "\tW: Exiting writer\n" );
765 }
766 
767 void
get_next_filled_block()768 get_next_filled_block()
769 {
770 	/* Hang till some data is available */
771 	lock( pbuffer->semid, pbuffer->blocks_used_lock );
772 
773 	curr_block = &pbuffer->block[ pbuffer->next_block_out ];
774 
775 	pbuffer->next_block_out = INC( pbuffer->next_block_out );
776 }
777 
778 int
data_to_write()779 data_to_write()
780 {
781 	return curr_block->bytes;
782 }
783 
784 void
write_blocks_to_stdout(filled,first_block)785 write_blocks_to_stdout( filled, first_block )
786 	int filled;
787 	int first_block;
788 {
789 	pbuffer->next_block_out = first_block;
790 
791 	while( filled-- ){
792 		curr_block = &pbuffer->block[ pbuffer->next_block_out ];
793 		pbuffer->next_block_out = INC( pbuffer->next_block_out );
794 		write_block_to_stdout();
795 	}
796 }
797 
798 void
write_block_to_stdout()799 write_block_to_stdout()
800 {
801 	static unsigned long out = 0;
802 	static unsigned long last_gb = 0;
803 	static unsigned long next_k = 0;
804 	int written;
805 
806 	if( next_k == 0 && showevery ){
807 		if( debug > 3 )
808 			fprintf( stderr, "W: next_k = %lu showevery = %d\n", next_k, showevery );
809 		showevery = showevery / 1024;
810 		next_k = showevery;
811 	}
812 
813 	if( (written = write( fdout, curr_block->data, curr_block->bytes )) != curr_block->bytes ){
814 		report_proc();
815 		perror( "write of data failed" );
816 		fprintf( stderr, "bytes to write=%d, bytes written=%d, total written %10luK\n", curr_block->bytes, written, outk );
817 		byee( -1 );
818 	}
819 
820 	if( write_pause ){
821 		usleep( write_pause );
822 	}
823 
824 	out = curr_block->bytes / 1024;
825 	outk += out;
826 	last_gb += out;
827 
828 	/*
829 	 * on character special devices (tapes), do an lseek() every 1 Gb,
830 	 * to overcome the 2Gb limit. This resets the file offset to
831 	 * zero, but -- at least on exabyte SCSI drives -- does not perform
832 	 * any actual action on the tape.
833 	 */
834 	if( Zflag && last_gb >= 1 K K ){
835 		last_gb = 0;
836 		if( in_ISCHR )
837 			(void) lseek( fdin, 0, SEEK_SET);
838 		if( out_ISCHR )
839 			(void) lseek( fdout, 0, SEEK_SET);
840 	}
841 	if( showevery ){
842 		if( debug > 3 )
843 			fprintf( stderr, "W: outk = %lu, next_k = %lu\n",
844 				outk, next_k );
845 		if( outk >= next_k ){
846 			pr_out();
847 			next_k += showevery;
848 		}
849 	}
850 
851 	unlock( pbuffer->semid, pbuffer->blocks_free_lock );
852 }
853 
854 
855 void
byee(exit_val)856 byee( exit_val )
857 	int exit_val;
858 {
859 	if( writer_pid != 0 ){
860 		if( exit_val != 0 ){
861 			/* I am shutting down due to an error.
862 			 * Shut the writer down or else it will try to access
863 			 * the freed up locks */
864 			end_writer();
865 		}
866 		wait_for_writer_end();
867 	}
868 
869 	if( free_shm ){
870 		buffer_remove();
871 	}
872 
873 #ifdef SIGCHLD
874 	signal( SIGCHLD, SIG_IGN );
875 #else
876 #ifdef SIGCLD
877 	signal( SIGCLD, SIG_IGN );
878 #endif
879 #endif
880 
881 	/* If the child died or was killed show this in the exit value */
882 	if( writer_status ){
883 		if( WEXITSTATUS( writer_status ) || WIFSIGNALED( writer_status ) ){
884 			if( debug )
885 				fprintf( stderr, "writer died badly: 0x%04x\n", writer_status );
886 			exit( -2 );
887 		}
888 	}
889 
890 	exit( exit_val );
891 }
892 
893 /* Kill off the writer */
894 void
end_writer()895 end_writer()
896 {
897 	if( writer_pid )
898 		kill( writer_pid, SIGHUP );
899 }
900 
901 void
wait_for_writer_end()902 wait_for_writer_end()
903 {
904 	int deadpid;
905 
906 	/* Now wait for the writer to finish */
907 	while( writer_pid && ((deadpid = wait( &writer_status )) != writer_pid) &&
908 		deadpid != -1 )
909 		;
910 }
911 
912 void
test_writer()913 test_writer()
914 {
915 	/* Has the writer gone unexpectedly? */
916 	if( writer_pid == 0 ){
917 		fprintf( stderr, "writer has died unexpectedly\n" );
918 		byee( -1 );
919 	}
920 }
921 
922 /* Given a string of <num>[<suff>] returns a num
923  * suff =
924  *   m/M for 1meg
925  *   k/K for 1k
926  *   b/B for 512
927  */
928 int
do_size(arg)929 do_size( arg )
930 	char *arg;
931 {
932 	int ret = 0;
933 
934 	char unit = '\0';
935 	sscanf( arg, "%d%c", &ret, &unit  );
936 
937 	switch( unit ){
938 	case 'm':
939 	case 'M':
940 		ret = ret K K;
941 		break;
942 	case 'k':
943 	case 'K':
944 		ret = ret K;
945 		break;
946 	case 'b':
947 	case 'B':
948 		ret *= 512;
949 		break;
950 	}
951 
952 	return ret;
953 }
954 
955 void
pr_out()956 pr_out()
957 {
958 	struct timeval now;
959 	unsigned long ms_delta, k_per_s;
960 
961 	gettimeofday(&now, NULL);
962 	ms_delta = (now.tv_sec - starttime.tv_sec) * 1000
963 		+ (now.tv_usec - starttime.tv_usec) / 1000;
964 	if (ms_delta) {
965 		/* Use increased accuracy for small amounts of data,
966 		 * decreased accuracy for *huge* throughputs > 4.1GB/s
967 		 * to avoid division by 0. This will overflow if your
968 		 * machine's throughput exceeds 4TB/s - you deserve to
969 		 * loose if you're still using 32 bit longs on such a
970 		 * beast ;-)
971 		 * <mbuck@debian.org>
972 		 */
973 		if (outk < ULONG_MAX / 1000) {
974 			k_per_s = (outk * 1000) / ms_delta;
975 		} else if (ms_delta >= 1000) {
976 			k_per_s = outk / (ms_delta / 1000);
977 		} else {
978 			k_per_s = (outk / ms_delta) * 1000;
979 		}
980 		fprintf( stderr, " %10luK, %10luK/s\r", outk, k_per_s );
981 	} else {
982 		if (outk) {
983 			fprintf( stderr, " %10luK,          ?K/s\r", outk );
984 		} else {
985 			fprintf( stderr, "          0K,          0K/s\r");
986 		}
987 	}
988 }
989 
990 #ifdef SYS5
991 #include <sys/time.h>
992 
993 #ifndef __alpha
bzero(b,l)994 bzero( b, l )
995 	char *b;
996 	unsigned l;
997 {
998 	memset( b, '\0', l );
999 }
1000 #endif /* __alpha */
1001 
usleep_back()1002 usleep_back()
1003 {
1004 }
1005 
1006 void
usleep(u)1007 usleep( u )
1008 	unsigned u;
1009 {
1010 	struct itimerval old, t;
1011 	signal( SIGALRM, usleep_back );
1012 	t.it_interval.tv_sec = 0;
1013 	t.it_interval.tv_usec = 0;
1014 	t.it_value.tv_sec = u / 1000000;
1015 	t.it_value.tv_usec = u % 1000000;
1016 	setitimer( ITIMER_REAL, &t, &old );
1017 	pause();
1018 	setitimer( ITIMER_REAL, &old, NULL );
1019 }
1020 #endif
1021 
1022 /* Called before error reports */
1023 void
report_proc()1024 report_proc()
1025 {
1026 	fprintf( stderr, "%s: ", proc_string );
1027 }
1028