1 /*
2 Buffer. Very fast reblocking filter speedy writing of tapes.
3 Copyright (C) 1990,1991 Lee McLoughlin
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 1, or (at your option)
8 any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19 Lee McLoughlin.
20 Dept of Computing, Imperial College,
21 180 Queens Gate, London, SW7 2BZ, UK.
22
23 Email: L.McLoughlin@doc.ic.ac.uk
24 */
25
26 /* This is a reblocking process, designed to try and read from stdin
27 * and write to stdout - but to always try and keep the writing side
28 * busy. It is meant to try and stream tape writes.
29 *
30 * This program runs in two parts. The reader and the writer. They
31 * communicate using shared memory with semaphores locking the access.
32 * The shared memory implements a circular list of blocks of data.
33 *
34 * L.McLoughlin, Imperial College, 1990
35 *
36 * $Log: buffer.c,v $
37 * Revision 1.19 1995/08/24 17:46:28 lmjm
38 * Be more careful abour EINTR errors
39 * Ingnore child processes dying.
40 *
41 * Revision 1.18 1993/08/25 19:07:31 lmjm
42 * Added Brad Isleys patchs to read/sigchld handling.
43 *
44 * Revision 1.17 1993/06/04 10:26:39 lmjm
45 * Cleaned up error reporting.
46 * Spot when the child terminating is not mine but inherited from via exec.
47 * Use only one semaphore group.
48 * Print out why writer died on error.
49 *
50 * Revision 1.16 1993/05/28 10:47:32 lmjm
51 * Debug shutdown sequence.
52 *
53 * Revision 1.15 1992/11/23 23:32:58 lmjm
54 * Oops! This should be outside the ifdef
55 *
56 * Revision 1.14 1992/11/23 23:29:58 lmjm
57 * allow MAX_BLOCKSIZE and DEF_SHMEM to be configured
58 *
59 * Revision 1.13 1992/11/23 23:22:29 lmjm
60 * Printf's use %lu where appropriate.
61 *
62 * Revision 1.12 1992/11/23 23:17:55 lmjm
63 * Got rid of floats and use Kbyte counters instead.
64 *
65 * Revision 1.11 1992/11/03 23:11:51 lmjm
66 * Forgot Andi Karrer on the patch list.
67 *
68 * Revision 1.10 1992/11/03 22:58:41 lmjm
69 * Cleaned up the debugging prints.
70 *
71 * Revision 1.9 1992/11/03 22:53:00 lmjm
72 * Corrected stdin, stout and showevery use.
73 *
74 * Revision 1.8 1992/11/03 22:41:34 lmjm
75 * Added 2Gig patches from:
76 * Andi Karrer <karrer@bernina.ethz.ch>
77 * Rumi Zahir <rumi@iis.ethz.ch>
78 * Christoph Wicki <wicki@iis.ethz.ch>
79 *
80 * Revision 1.7 1992/07/23 20:42:03 lmjm
81 * Added 't' option to print total writen at end.
82 *
83 * Revision 1.6 1992/04/07 19:57:30 lmjm
84 * Added Kevins -B and -p options.
85 * Turn off buffering to make -S output appear ok.
86 * Added GPL.
87 *
88 * Revision 1.5 90/07/22 18:46:38 lmjm
89 * Added system 5 support.
90 *
91 * Revision 1.4 90/07/22 18:29:48 lmjm
92 * Updated arg handling to be more consistent.
93 * Make sofar printing size an option.
94 *
95 * Revision 1.3 90/05/15 23:27:46 lmjm
96 * Added -S option (show how much has been writen).
97 * Added -m option to specify how much shared memory to grab.
98 * Now tries to fill this with blocks.
99 * reader waits for writer to terminate and then frees the shared mem and sems.
100 *
101 * Revision 1.2 90/01/20 21:37:59 lmjm
102 * Reset default number of blocks and blocksize for best thruput of
103 * standard tar 10K Allow.
104 * blocks number of blocks to be changed.
105 * Don't need a hole in the circular queue since the semaphores prevent block
106 * clash.
107 *
108 * Revision 1.1 90/01/17 11:30:23 lmjm
109 * Initial revision
110 *
111 */
112 #include <unistd.h>
113 #include <stdio.h>
114 #include <signal.h>
115 #include <fcntl.h>
116 #include <errno.h>
117 #include <machine/param.h>
118 #include <sys/types.h>
119 #include <sys/stat.h>
120 #include <sys/ipc.h>
121 #include <sys/shm.h>
122 #include <sys/sem.h>
123 #include <sys/wait.h>
124 #include <sys/time.h>
125 #include <limits.h>
126 #include <stdlib.h>
127 #include <strings.h>
128 #include "sem.h"
129
130 #ifndef lint
131 static char *rcsid = "$Header: /a/swan/home/swan/staff/csg/lmjm/src/buffer/RCS/buffer.c,v 1.19 1995/08/24 17:46:28 lmjm Exp lmjm $";
132 #endif
133
134 #ifndef __alpha
135 #endif /* __alpha */
136
137 /* General macros */
138 #define TRUE 1
139 #define FALSE 0
140 #define K *1024
141 #define M *1024*1024
142
143 /* Some forward declarations */
144 void byee();
145 void start_reader_and_writer();
146 void parse_args();
147 void set_handlers();
148 void buffer_allocate();
149 void report_proc();
150 int do_size();
151 void get_buffer();
152 void reader();
153 void writer();
154 void writer_end();
155 void wait_for_writer_end();
156 void get_next_free_block();
157 void test_writer();
158 int fill_block();
159 void get_next_filled_block();
160 int data_to_write();
161 void write_blocks_to_stdout();
162 void write_block_to_stdout();
163 void pr_out();
164 void end_writer();
165
166 /* When showing print a note every this many bytes writen */
167 int showevery = 0;
168 #define PRINT_EVERY 10 K
169
170 /* Pause after every write */
171 unsigned write_pause;
172
173 /* This is the inter-process buffer - it implements a circular list
174 * of blocks. */
175
176 #ifdef AMPEX
177 #define MAX_BLOCKSIZE (4 M)
178 #define DEF_BLOCKSIZE MAX_BLOCKSIZE
179 #define DEF_SHMEM (32 M)
180 #endif
181
182
183 #ifndef MAX_BLOCKSIZE
184 #define MAX_BLOCKSIZE (512 K)
185 #endif
186 #ifndef DEF_BLOCKSIZE
187 #define DEF_BLOCKSIZE (10 K)
188 #endif
189
190 int blocksize = DEF_BLOCKSIZE;
191
192 /* Which process... in error reports*/
193 char *proc_string = "buffer";
194
195 /* Numbers of blocks in the queue.
196 */
197 #define MAX_BLOCKS 2048
198 int blocks = 1;
199 /* Circular increment of a buffer index */
200 #define INC(i) (((i)+1) == blocks ? 0 : ((i)+1))
201
202 /* Max amount of shared memory you can allocate - can't see a way to look
203 * this up.
204 */
205 #ifndef DEF_SHMEM
206 #define DEF_SHMEM (1 K K)
207 #endif
208 int max_shmem = DEF_SHMEM;
209
210 /* Just a flag to show unfilled */
211 #define NONE (-1)
212
213 /* the shared memory id of the buffer */
214 int buffer_id = NONE;
215 struct block {
216 int bytes;
217 char *data;
218 } *curr_block;
219
220 #define NO_BUFFER ((struct buffer *)-1)
221 struct buffer {
222 /* Id of the semaphore group */
223 int semid;
224
225 /* writer will hang trying to lock this till reader fills in a block */
226 int blocks_used_lock;
227 /* reader will hang trying to lock this till writer empties a block */
228 int blocks_free_lock;
229
230 int next_block_in;
231 int next_block_out;
232
233 struct block block[ MAX_BLOCKS ];
234
235 /* These actual space for the blocks is here - the array extends
236 * pass 1 */
237 char data_space[ 1 ];
238 } *pbuffer = NO_BUFFER;
239 int buffer_size;
240
241 int fdin = 0;
242 int fdout = 1;
243 int in_ISCHR = 0;
244 int out_ISCHR = 0;
245 int padblock = FALSE;
246 int writer_pid = 0;
247 int reader_pid = 0;
248 int free_shm = 1;
249 int percent = 0;
250 int debug = 0;
251 int Zflag = 0;
252 int writer_status = 0;
253 char *progname = "buffer";
254
255 char print_total = 0;
256 /* Number of K output */
257 unsigned long outk = 0;
258
259 struct timeval starttime;
260
261 int
main(argc,argv)262 main( argc, argv )
263 int argc;
264 char **argv;
265 {
266 parse_args( argc, argv );
267
268 set_handlers();
269
270 buffer_allocate();
271
272 gettimeofday(&starttime, NULL);
273
274 start_reader_and_writer();
275
276 byee( 0 );
277
278 /* NOTREACHED */
279 exit( 0 );
280 }
281
282 void
parse_args(argc,argv)283 parse_args( argc, argv )
284 int argc;
285 char **argv;
286 {
287 int c;
288 int iflag = 0;
289 int oflag = 0;
290 int zflag = 0;
291 extern char *optarg;
292 char blocks_given = FALSE;
293 struct stat buf;
294
295
296 while( (c = getopt( argc, argv, "BS:Zdm:s:b:p:u:ti:o:z:" )) != -1 ){
297 switch( c ){
298 case 't': /* Print to stderr the total no of bytes writen */
299 print_total++;
300 break;
301 case 'u': /* pause after write for given microseconds */
302 write_pause = atoi( optarg );
303 break;
304 case 'B': /* Pad last block */
305 padblock = TRUE;
306 break;
307 case 'Z': /* Zero by lseek on the tape device */
308 Zflag = TRUE;
309 break;
310 case 'i': /* Input file */
311 iflag++;
312 if( iflag > 1 ){
313 report_proc();
314 fprintf( stderr, "-i given twice\n" );
315 byee( -1 );
316 }
317 if( (fdin = open( optarg, O_RDONLY )) < 0 ){
318 report_proc();
319 perror( "cannot open input file" );
320 fprintf( stderr, "filename: %s\n", optarg );
321 byee ( -1 );
322 }
323 break;
324 case 'o': /* Output file */
325 oflag++;
326 if( oflag > 1 ){
327 report_proc();
328 fprintf( stderr, "-o given twice\n" );
329 byee( -1 );
330 }
331 if( (fdout = open( optarg, O_WRONLY | O_CREAT | O_TRUNC, 0666 )) < 0 ){
332 report_proc();
333 perror( "cannot open output file" );
334 fprintf( stderr, "filename: %s\n", optarg );
335 byee ( -1 );
336 }
337 break;
338 case 'S':
339 /* Show every once in a while how much is printed */
340 showevery = do_size( optarg );
341 if( showevery <= 0 )
342 showevery = PRINT_EVERY;
343 break;
344 case 'd': /* debug */
345 debug++;
346 if( debug == 1 ){
347 setbuf( stdout, NULL );
348 setbuf( stderr, NULL );
349 fprintf( stderr, "debugging turned on\n" );
350 }
351 break;
352 case 'm':
353 /* Max size of shared memory lump */
354 max_shmem = do_size( optarg );
355
356 if( max_shmem < (sizeof( struct buffer ) + (blocksize * blocks)) ){
357 fprintf( stderr, "max_shmem %d too low\n", max_shmem );
358 byee( -1 );
359 }
360 break;
361 case 'b':
362 /* Number of blocks */
363 blocks_given = TRUE;
364 blocks = atoi( optarg );
365 if( (blocks <= 0) || (MAX_BLOCKS < blocks) ){
366 fprintf( stderr, "blocks %d out of range\n", blocks );
367 byee( -1 );
368 }
369 break;
370 case 'p': /* percent to wait before dumping */
371 percent = atoi( optarg );
372
373 if( (percent < 0) || (100 < percent) ){
374 fprintf( stderr, "percent %d out of range\n", percent );
375 byee( -1 );
376 }
377 if( debug )
378 fprintf( stderr, "percent set to %d\n", percent );
379 break;
380 case 'z':
381 zflag++;
382 /* FALL THRU */
383 case 's': /* Size of a block */
384 blocksize = do_size( optarg );
385
386 if( (blocksize <= 0) || (MAX_BLOCKSIZE < blocksize) ){
387 fprintf( stderr, "blocksize %d out of range\n", blocksize );
388 byee( -1 );
389 }
390 break;
391 default:
392 fprintf( stderr, "Usage: %s [-B] [-t] [-S size] [-m memsize] [-b blocks] [-p percent] [-s blocksize] [-u pause] [-i infile] [-o outfile] [-z size]\n",
393 progname );
394 fprintf( stderr, "-B = blocked device - pad out last block\n" );
395 fprintf( stderr, "-t = show total amount written at end\n" );
396 fprintf( stderr, "-S size = show amount written every size bytes\n" );
397 fprintf( stderr, "-m size = size of shared mem chunk to grab\n" );
398 fprintf( stderr, "-b num = number of blocks in queue\n" );
399 fprintf( stderr, "-p percent = don't start writing until percent blocks filled\n" );
400 fprintf( stderr, "-s size = size of a block\n" );
401 fprintf( stderr, "-u usecs = microseconds to sleep after each write\n" );
402 fprintf( stderr, "-i infile = file to read from\n" );
403 fprintf( stderr, "-o outfile = file to write to\n" );
404 fprintf( stderr, "-z size = combined -S/-s flag\n" );
405 byee( -1 );
406 }
407 }
408
409 if (argc > optind) {
410 fprintf( stderr, "too many arguments\n" );
411 byee( -1 );
412 }
413
414 if (zflag) showevery = blocksize;
415
416 /* If -b was not given try and work out the max buffer size */
417 if( !blocks_given ){
418 blocks = (max_shmem - sizeof( struct buffer )) / blocksize;
419 if( blocks <= 0 ){
420 fprintf( stderr, "Cannot handle blocks that big, aborting!\n" );
421 byee( -1 );
422 }
423 if( MAX_BLOCKS < blocks ){
424 fprintf( stderr, "Cannot handle that many blocks, aborting!\n" );
425 byee( -1 );
426 }
427 }
428
429 /* check if fdin or fdout are character special files */
430 if( fstat( fdin, &buf ) != 0 ){
431 report_proc();
432 perror( "can't stat input file" );
433 byee( -1 );
434 }
435 in_ISCHR = S_ISCHR( buf.st_mode );
436 if( fstat( fdout, &buf ) != 0 ){
437 report_proc();
438 perror( "can't stat output file" );
439 byee( -1 );
440 }
441 out_ISCHR = S_ISCHR( buf.st_mode );
442 }
443
444 /* The interrupt handler */
445 void
shutdown()446 shutdown()
447 {
448 static int shutting;
449 if( shutting ){
450 if( debug )
451 fprintf( stderr, "%s: ALREADY SHUTTING!\n", proc_string );
452 return;
453 }
454 shutting = 1;
455 if( debug )
456 fprintf( stderr, "%s: shutdown on signal\n", proc_string );
457
458 byee( -1 );
459 }
460
461 /* Shutdown because the child has ended */
462 void
child_shutdown()463 child_shutdown()
464 {
465 /* Find out which child has died. (They may not be my
466 * children if buffer was exec'd on top of something that had
467 * childred.)
468 */
469 int deadpid;
470
471 while( (deadpid = waitpid( -1, &writer_status, WNOHANG )) &&
472 deadpid != -1 && deadpid != 0 ){
473 if( debug > 2 )
474 fprintf( stderr, "child_shutdown %d: 0x%04x\n", deadpid, writer_status );
475 if( deadpid == writer_pid ){
476 if( debug > 2 )
477 fprintf( stderr, "writer has ended\n" );
478 writer_pid = 0;
479 byee( 0 );
480 }
481 }
482 }
483
484 void
set_handlers()485 set_handlers()
486 {
487 if( debug )
488 fprintf( stderr, "%s: setting handlers\n", proc_string );
489
490 signal( SIGHUP, shutdown );
491 signal( SIGINT, shutdown );
492 signal( SIGQUIT, shutdown );
493 signal( SIGTERM, shutdown );
494 #ifdef SIGCHLD
495 signal( SIGCHLD, child_shutdown );
496 #else
497 #ifdef SIGCLD
498 signal( SIGCLD, child_shutdown );
499 #endif
500 #endif
501 }
502
503 void
buffer_allocate()504 buffer_allocate()
505 {
506 /* Allow for the data space */
507 buffer_size = sizeof( struct buffer ) +
508 ((blocks * blocksize) - sizeof( char ));
509
510 /* Create the space for the buffer */
511 buffer_id = shmget( IPC_PRIVATE,
512 buffer_size,
513 IPC_CREAT|S_IREAD|S_IWRITE );
514 if( buffer_id < 0 ){
515 report_proc();
516 perror( "couldn't create shared memory segment" );
517 byee( -1 );
518 }
519
520 get_buffer();
521
522 if( debug )
523 fprintf( stderr, "%s pbuffer is 0x%08lx, buffer_size is %d [%d x %d]\n",
524 proc_string,
525 (unsigned long)pbuffer, buffer_size, blocks, blocksize );
526
527 #ifdef SYS5
528 memset( (char *)pbuffer, '\0', buffer_size );
529 #else
530 bzero( (char *)pbuffer, buffer_size );
531 #endif
532 pbuffer->semid = -1;
533 pbuffer->blocks_used_lock = -1;
534 pbuffer->blocks_free_lock = -1;
535
536 pbuffer->semid = new_sems( 2 ); /* Get a read and a write sem */
537 pbuffer->blocks_used_lock = 0;
538 /* Start it off locked - it is unlocked when a buffer gets filled in */
539 lock( pbuffer->semid, pbuffer->blocks_used_lock );
540
541 pbuffer->blocks_free_lock = 1;
542 /* Initializing the semaphore to "blocks - 1" causes a hang when using option
543 * "-p 100" because it always keeps one block free, so we'll never reach 100% fill
544 * level. However, there doesn't seem to be a good reason to keep one block free,
545 * so we initialize the semaphore to "blocks" instead. */
546 sem_set( pbuffer->semid, pbuffer->blocks_free_lock, blocks );
547
548 /* Detattach the shared memory so the fork doesnt do anything odd */
549 shmdt( (char *)pbuffer );
550 pbuffer = NO_BUFFER;
551 }
552
553 void
buffer_remove()554 buffer_remove()
555 {
556 static char removing = FALSE;
557
558 /* Avoid accidental recursion */
559 if( removing )
560 return;
561 removing = TRUE;
562
563 /* Buffer not yet created */
564 if( buffer_id == NONE )
565 return;
566
567 /* There should be a buffer so this must be after its detached it
568 * but before the fork picks it up */
569 if( pbuffer == NO_BUFFER )
570 get_buffer();
571
572 if( debug )
573 fprintf( stderr, "%s: removing semaphores and buffer\n", proc_string );
574 remove_sems( pbuffer->semid );
575
576 if( shmctl( buffer_id, IPC_RMID, (struct shmid_ds *)0 ) == -1 ){
577 report_proc();
578 perror( "failed to remove shared memory buffer" );
579 }
580 }
581
582 void
get_buffer()583 get_buffer()
584 {
585 int b;
586
587 /* Grab the buffer space */
588 pbuffer = (struct buffer *)shmat( buffer_id, (char *)0, 0 );
589 if( pbuffer == NO_BUFFER ){
590 report_proc();
591 perror( "failed to attach shared memory" );
592 byee( -1 );
593 }
594
595 /* Setup the data space pointers */
596 for( b = 0; b < blocks; b++ )
597 pbuffer->block[ b ].data =
598 &pbuffer->data_space[ b * blocksize ];
599
600 }
601
602 void
start_reader_and_writer()603 start_reader_and_writer()
604 {
605 fflush( stdout );
606 fflush( stderr );
607
608 if( (writer_pid = fork()) == -1 ){
609 report_proc();
610 perror( "unable to fork" );
611 byee( -1 );
612 }
613 else if( writer_pid == 0 ){
614 free_shm = 0;
615 proc_string = "buffer (writer)";
616 reader_pid = getppid();
617
618 /* Never trust fork() to propogate signals - reset them */
619 set_handlers();
620
621 writer();
622 }
623 else {
624 proc_string = "buffer (reader)";
625 reader();
626
627 wait_for_writer_end();
628 }
629 }
630
631 /* Read from stdin into the buffer */
632 void
reader()633 reader()
634 {
635 if( debug )
636 fprintf( stderr, "R: Entering reader\n" );
637
638 get_buffer();
639
640 while( 1 ){
641 get_next_free_block();
642 if( ! fill_block() )
643 break;
644 }
645
646 if( debug )
647 fprintf( stderr, "R: Exiting reader\n" );
648 }
649
650 void
get_next_free_block()651 get_next_free_block()
652 {
653 test_writer();
654
655 /* Maybe wait till there is room in the buffer */
656 lock( pbuffer->semid, pbuffer->blocks_free_lock );
657
658 curr_block = &pbuffer->block[ pbuffer->next_block_in ];
659
660 pbuffer->next_block_in = INC( pbuffer->next_block_in );
661 }
662
663 int
fill_block()664 fill_block()
665 {
666 int bytes = 0;
667 char *start;
668 int toread;
669 static char eof_reached = 0;
670
671 if( eof_reached ){
672 curr_block->bytes = 0;
673 unlock( pbuffer->semid, pbuffer->blocks_used_lock );
674 return 0;
675 }
676
677 start = curr_block->data;
678 toread = blocksize;
679
680 /* Fill the block with input. This reblocks the input. */
681 while( toread != 0 ){
682 bytes = read( fdin, start, toread );
683 if( bytes <= 0 ){
684 /* catch interrupted system calls for death
685 * of children in pipeline */
686 if( bytes < 0 && errno == EINTR )
687 continue;
688 break;
689 }
690 start += bytes;
691 toread -= bytes;
692 }
693
694 if( bytes == 0 )
695 eof_reached = 1;
696
697 if( bytes < 0 ){
698 report_proc();
699 perror( "failed to read input" );
700 byee( -1 );
701 }
702
703 /* number of bytes available. Zero will be taken as eof */
704 if( !padblock || toread == blocksize )
705 curr_block->bytes = blocksize - toread;
706 else {
707 if( toread ) bzero( start, toread );
708 curr_block->bytes = blocksize;
709 }
710
711 if( debug > 1 )
712 fprintf( stderr, "R: got %d bytes\n", curr_block->bytes );
713
714 unlock( pbuffer->semid, pbuffer->blocks_used_lock );
715
716 return curr_block->bytes;
717 }
718
719 /* Write the buffer to stdout */
720 void
writer()721 writer()
722 {
723 int filled = 0;
724 int maxfilled = (blocks * percent) / 100;
725 int first_block = 0;
726
727 if( debug )
728 fprintf( stderr, "\tW: Entering writer\n blocks = %d\n maxfilled = %d\n",
729 blocks,
730 maxfilled );
731
732 get_buffer();
733
734 while( 1 ){
735 if( !filled )
736 first_block = pbuffer->next_block_out;
737 get_next_filled_block();
738 if( !data_to_write() )
739 break;
740
741 filled++;
742 if( debug > 1 )
743 fprintf( stderr, "W: filled = %d\n", filled );
744 if( filled >= maxfilled ){
745 if( debug > 1 )
746 fprintf( stderr, "W: writing\n" );
747 write_blocks_to_stdout( filled, first_block );
748 filled = 0;
749 }
750 }
751
752 write_blocks_to_stdout( filled, first_block );
753
754 if( showevery ){
755 pr_out();
756 fprintf( stderr, "\n" );
757 }
758
759 if( print_total ){
760 fprintf( stderr, "Kilobytes Out %lu\n", outk );
761 }
762
763 if( debug )
764 fprintf( stderr, "\tW: Exiting writer\n" );
765 }
766
767 void
get_next_filled_block()768 get_next_filled_block()
769 {
770 /* Hang till some data is available */
771 lock( pbuffer->semid, pbuffer->blocks_used_lock );
772
773 curr_block = &pbuffer->block[ pbuffer->next_block_out ];
774
775 pbuffer->next_block_out = INC( pbuffer->next_block_out );
776 }
777
778 int
data_to_write()779 data_to_write()
780 {
781 return curr_block->bytes;
782 }
783
784 void
write_blocks_to_stdout(filled,first_block)785 write_blocks_to_stdout( filled, first_block )
786 int filled;
787 int first_block;
788 {
789 pbuffer->next_block_out = first_block;
790
791 while( filled-- ){
792 curr_block = &pbuffer->block[ pbuffer->next_block_out ];
793 pbuffer->next_block_out = INC( pbuffer->next_block_out );
794 write_block_to_stdout();
795 }
796 }
797
798 void
write_block_to_stdout()799 write_block_to_stdout()
800 {
801 static unsigned long out = 0;
802 static unsigned long last_gb = 0;
803 static unsigned long next_k = 0;
804 int written;
805
806 if( next_k == 0 && showevery ){
807 if( debug > 3 )
808 fprintf( stderr, "W: next_k = %lu showevery = %d\n", next_k, showevery );
809 showevery = showevery / 1024;
810 next_k = showevery;
811 }
812
813 if( (written = write( fdout, curr_block->data, curr_block->bytes )) != curr_block->bytes ){
814 report_proc();
815 perror( "write of data failed" );
816 fprintf( stderr, "bytes to write=%d, bytes written=%d, total written %10luK\n", curr_block->bytes, written, outk );
817 byee( -1 );
818 }
819
820 if( write_pause ){
821 usleep( write_pause );
822 }
823
824 out = curr_block->bytes / 1024;
825 outk += out;
826 last_gb += out;
827
828 /*
829 * on character special devices (tapes), do an lseek() every 1 Gb,
830 * to overcome the 2Gb limit. This resets the file offset to
831 * zero, but -- at least on exabyte SCSI drives -- does not perform
832 * any actual action on the tape.
833 */
834 if( Zflag && last_gb >= 1 K K ){
835 last_gb = 0;
836 if( in_ISCHR )
837 (void) lseek( fdin, 0, SEEK_SET);
838 if( out_ISCHR )
839 (void) lseek( fdout, 0, SEEK_SET);
840 }
841 if( showevery ){
842 if( debug > 3 )
843 fprintf( stderr, "W: outk = %lu, next_k = %lu\n",
844 outk, next_k );
845 if( outk >= next_k ){
846 pr_out();
847 next_k += showevery;
848 }
849 }
850
851 unlock( pbuffer->semid, pbuffer->blocks_free_lock );
852 }
853
854
855 void
byee(exit_val)856 byee( exit_val )
857 int exit_val;
858 {
859 if( writer_pid != 0 ){
860 if( exit_val != 0 ){
861 /* I am shutting down due to an error.
862 * Shut the writer down or else it will try to access
863 * the freed up locks */
864 end_writer();
865 }
866 wait_for_writer_end();
867 }
868
869 if( free_shm ){
870 buffer_remove();
871 }
872
873 #ifdef SIGCHLD
874 signal( SIGCHLD, SIG_IGN );
875 #else
876 #ifdef SIGCLD
877 signal( SIGCLD, SIG_IGN );
878 #endif
879 #endif
880
881 /* If the child died or was killed show this in the exit value */
882 if( writer_status ){
883 if( WEXITSTATUS( writer_status ) || WIFSIGNALED( writer_status ) ){
884 if( debug )
885 fprintf( stderr, "writer died badly: 0x%04x\n", writer_status );
886 exit( -2 );
887 }
888 }
889
890 exit( exit_val );
891 }
892
893 /* Kill off the writer */
894 void
end_writer()895 end_writer()
896 {
897 if( writer_pid )
898 kill( writer_pid, SIGHUP );
899 }
900
901 void
wait_for_writer_end()902 wait_for_writer_end()
903 {
904 int deadpid;
905
906 /* Now wait for the writer to finish */
907 while( writer_pid && ((deadpid = wait( &writer_status )) != writer_pid) &&
908 deadpid != -1 )
909 ;
910 }
911
912 void
test_writer()913 test_writer()
914 {
915 /* Has the writer gone unexpectedly? */
916 if( writer_pid == 0 ){
917 fprintf( stderr, "writer has died unexpectedly\n" );
918 byee( -1 );
919 }
920 }
921
922 /* Given a string of <num>[<suff>] returns a num
923 * suff =
924 * m/M for 1meg
925 * k/K for 1k
926 * b/B for 512
927 */
928 int
do_size(arg)929 do_size( arg )
930 char *arg;
931 {
932 int ret = 0;
933
934 char unit = '\0';
935 sscanf( arg, "%d%c", &ret, &unit );
936
937 switch( unit ){
938 case 'm':
939 case 'M':
940 ret = ret K K;
941 break;
942 case 'k':
943 case 'K':
944 ret = ret K;
945 break;
946 case 'b':
947 case 'B':
948 ret *= 512;
949 break;
950 }
951
952 return ret;
953 }
954
955 void
pr_out()956 pr_out()
957 {
958 struct timeval now;
959 unsigned long ms_delta, k_per_s;
960
961 gettimeofday(&now, NULL);
962 ms_delta = (now.tv_sec - starttime.tv_sec) * 1000
963 + (now.tv_usec - starttime.tv_usec) / 1000;
964 if (ms_delta) {
965 /* Use increased accuracy for small amounts of data,
966 * decreased accuracy for *huge* throughputs > 4.1GB/s
967 * to avoid division by 0. This will overflow if your
968 * machine's throughput exceeds 4TB/s - you deserve to
969 * loose if you're still using 32 bit longs on such a
970 * beast ;-)
971 * <mbuck@debian.org>
972 */
973 if (outk < ULONG_MAX / 1000) {
974 k_per_s = (outk * 1000) / ms_delta;
975 } else if (ms_delta >= 1000) {
976 k_per_s = outk / (ms_delta / 1000);
977 } else {
978 k_per_s = (outk / ms_delta) * 1000;
979 }
980 fprintf( stderr, " %10luK, %10luK/s\r", outk, k_per_s );
981 } else {
982 if (outk) {
983 fprintf( stderr, " %10luK, ?K/s\r", outk );
984 } else {
985 fprintf( stderr, " 0K, 0K/s\r");
986 }
987 }
988 }
989
990 #ifdef SYS5
991 #include <sys/time.h>
992
993 #ifndef __alpha
bzero(b,l)994 bzero( b, l )
995 char *b;
996 unsigned l;
997 {
998 memset( b, '\0', l );
999 }
1000 #endif /* __alpha */
1001
usleep_back()1002 usleep_back()
1003 {
1004 }
1005
1006 void
usleep(u)1007 usleep( u )
1008 unsigned u;
1009 {
1010 struct itimerval old, t;
1011 signal( SIGALRM, usleep_back );
1012 t.it_interval.tv_sec = 0;
1013 t.it_interval.tv_usec = 0;
1014 t.it_value.tv_sec = u / 1000000;
1015 t.it_value.tv_usec = u % 1000000;
1016 setitimer( ITIMER_REAL, &t, &old );
1017 pause();
1018 setitimer( ITIMER_REAL, &old, NULL );
1019 }
1020 #endif
1021
1022 /* Called before error reports */
1023 void
report_proc()1024 report_proc()
1025 {
1026 fprintf( stderr, "%s: ", proc_string );
1027 }
1028