1 /*
2   $Header: /fs/mumon/aware/piercarl/Cmd./Commands./team.c,v 3.1 1994/01/13 15:03:25 piercarl Exp piercarl $
3 */
4 
5 static char Notice[] =
6   "Copyright 1987,1989 Piercarlo Grandi. All rights reserved.";
7 
8 /*
9   This program is free software; you can redistribute it and/or
10   modify it under the terms of the GNU General Public License as
11   published by the Free Software Foundation; either version 2, or
12   (at your option) any later version.
13 
14   This program is distributed in the hope that it will be useful,
15   but WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   GNU General Public License for more details.
18 
19   You may have received a copy of the GNU General Public License
20   along with this program; if not, write to the Free Software
21   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23 
24 #undef DEBUG
25 
26 /*
27   Unix programs normally do synchronous read and write, that is,
28   you read and then you write; no overlap is possible.
29 
30   This is especially catastrophic for device to device copies,
31   whereit is important to minimize elapsed time, by overlapping
32   activity on one with activity on another.
33 
34   To obtain this, a multiprocess structure is necessary under
35   Unix.  This program is functionally equivalento to a pipe, in
36   that it copies its input (fd 0) to its output (fd 1) link.
37 
38   This programs is executed as a Team of N processes, called
39   Guys, all of which share the same input and output links; the
40   first reads a chunk of input, awakens the second, writes the
41   chunk to its output; the second does the same, and the last
42   awakens the first.
43 
44   Since this process is essentially cyclic, we use a ring of
45   pipes to synchronize the Guys.  Each guy has un input pipe from
46   the upstream guy and an output pipe to the downstream guy.
47   Whenever a guy receives a READ command from the upstream, it
48   first reads a block and then passes on the READ command
49   downstream; it then waits for a WRITE command from upstream,
50   and then writes the block and after that passes the WRITE
51   command downstream. A count of how much has been processed is
52   also passwd along, for statistics and verification.
53 
54   Two other commands are used, one is STOP, and is sent
55   downstream from the guy that detects the end of file of the
56   input, after which the guy exits, and ABORT, which is sent
57   downstream from the guy which detects trouble in the guy
58   upstream to it, which has much the same effect.
59 */
60 
61 #define UOFF_T uint64_t
62 
63 #define TeamLVOLSZ	(UOFF_T)(1L<<10)
64 #define TeamHVOLSZ	((UOFF_T) 3 * ((UOFF_T) 1 << 62))
65 
66 #define TeamLBUFSZ	(64)		/* Low buffer size		*/
67 #define TeamDBUFSZ	(60*512)	/* Default buffer size		*/
68 #define TeamHBUFSZ	(1L<<26)	/* High buffer size		*/
69 
70 #define TeamDTEAMSZ	4		/* Default # of processes	*/
71 #define TeamHTEAMSZ	16		/* High # of processes		*/
72 
73 /*
74   External components...  Probably the only system dependent part
75   of this program, as some systems have something in
76   /usr/include/sys where others have it in /usr/include.
77 
78   Also, the mesg() procedure is highly system dependent...  watch
79   out for locking and variable number of arguments.
80 */
81 
82 #include <errno.h>
83 #include <signal.h>
84 #include <stdio.h>
85 #include <sys/types.h>
86 #include <sys/file.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <time.h>
90 #include <stdlib.h>
91 #include <string.h>
92 #include <unistd.h>	/* for lseek() */
93 #ifdef HAVE_WAIT_H
94 #include <sys/wait.h>
95 #endif
96 
97 #ifdef HAVE_PARAM_H
98 #include <sys/param.h>
99 #endif
100 
101 
102 #ifdef sun
103 # undef F_SETLKW
104 #endif
105 
106 #ifdef __FreeBSD__
107 # undef F_SETLKW
108 #endif
109 
110 #if (PCG)
111 # include "Extend.h"
112 # include "Here.h"
113 # include "Type.h"
114 #else
115 # define call		(void)
116 # define were		if
117 # define fast		register
118 # define global     	/* extern */
119 # define local		static
120 # define when		break; case
121 # define otherwise	break; default
122 # define mode(which,name) typedef which name name; which name
123 # define bool	int
124 # define true 	   	1
125 # define false	    	0
126 # define nil(type)	((type) 0)
127 # define scalar	    	int
128   typedef char	    	*pointer;
129 # if (defined(SMALL_M))
130     typedef unsigned    address;
131 # else
132     typedef long	address;
133 # endif
134 # if (__STDC__)
135 #   define of(list)	list
136 #   define on(list)
137 #   define is(list)	(list)
138 #   define _		,
139 #   define noparms	void
140 # else
141 #   define void 	int
142 #   define const	/* const */
143 #   define of(list)	()
144 #   define on(list)	list
145 #   define is(list)	list;
146 #   define _		;
147 #   define noparms
148 # endif
149 #endif
150 
151 #ifdef DEBUG
152 # define Mesg(list) mesg list
153 #else
154 # define Mesg(list)
155 #endif
156 
157 /*VARARGS1*/
mesg(a,b,c,d,e,f,g,h,i)158 mesg(a,b,c,d,e,f,g,h,i)
159   char *a;
160   int b,c,d,e,f,g,h,i;
161 {
162 # if (defined F_SETLKW)
163     struct flock l;
164     l.l_whence = 0; l.l_start = 0L; l.l_len = 0L;
165     l.l_type = F_WRLCK; fcntl(fileno(stderr),F_SETLKW,&l);
166 # endif
167 # if (defined LOCK_EX)
168     flock(fileno(stderr),LOCK_EX);
169 # endif
170   fprintf(stderr,a,b,c,d,e,f,g,h,i);
171 # if (defined LOCK_EX)
172     flock(fileno(stderr),LOCK_UN);
173 # endif
174 # if (defined F_SETLKW)
175     l.l_type = F_UNLCK; fcntl(fileno(stderr),F_SETLKW,&l);
176 # endif
177 }
178 
179 local bool		verbose = false;
180 local bool		report = true;
181 local bool		guyhaderror = false;
182 
183 local time_t		origin;
184 extern char		*optarg;
185 extern int		optind;
186 
187 /*
188   The  regular Unix read and write calls are not guaranteed to process
189   all  the  bytes  requested.  These  procedures guarantee that if the
190   request is for N bytes, all of them are read or written unless there
191   is an error or eof.
192 */
193 
194 #define FdCLOSED    0
195 #define FdOPEN	    1
196 #define FdEOF 	    2
197 #define FdERROR     3
198 
mode(struct,Fd)199 mode(struct,Fd)
200 {
201   int 	    fd;
202   short	    status;
203   UOFF_T   size;
204 };
205 
206 local Fd		FdIn,FdOut;
207 
FdOpen(fd,ffd,size)208 local bool		FdOpen on((fd,ffd,size)) is
209 (
210   fast Fd		  *fd
211 _ int			  ffd
212 _ UOFF_T		  size
213 )
214 {
215   fd->status = (ffd >= 0) ? FdOPEN :   FdCLOSED;
216   fd->fd	= ffd;
217   fd->size	= size;
218 
219   Mesg(("FdOpen fd %d\n",ffd));
220 
221   return ffd >= 0;
222 }
223 
FdClose(fd)224 local bool		FdClose on((fd)) is
225 (
226   fast Fd		  *fd
227 )
228 {
229   int 			  ffd;
230 
231   ffd = fd->fd;
232   Mesg(("FdClose fd %d\n",fd->fd));
233   fd->status = FdCLOSED;
234   fd->fd = -1;
235 
236   return close(ffd) >= 0;
237 }
238 
FdCopy(to,from)239 local bool		FdCopy on((to,from)) is
240 (
241   fast Fd		  *to
242 _ fast Fd		  *from
243 )
244 {
245   to->size	= from->size;
246   to->status	= from->status;
247   to->fd	= dup(from->fd);
248   Mesg(("FdCopy of %d is %d\n",from->fd,to->fd));
249   return to->fd >= 0;
250 }
251 
FdSet(to,from)252 local void		FdSet on((to,from)) is
253 (
254   fast Fd		  *to
255 _ fast Fd		  *from
256 )
257 {
258   if (from->fd < 0)
259     mesg("team: set an invalid fd\n");
260   to->size	= from->size;
261   to->status	= from->status;
262   to->fd	= from->fd;
263 }
264 
FdRetry(fd,which,done,space)265 local UOFF_T	FdRetry on((fd,which,done,space)) is
266 (
267   fast Fd		  *fd
268 _ char			  *which
269 _ UOFF_T		  done
270 _ UOFF_T		  space
271 )
272 {
273   int			  tty;
274   char			  reply[2];
275   struct stat		  st;
276 
277   if (fstat(fd->fd,&st) < 0)
278   {
279     perror(which);
280     return 0;
281   }
282 
283   st.st_mode &= S_IFMT;
284   if (st.st_mode != S_IFCHR && st.st_mode != S_IFBLK)
285     return 0;
286 
287   if (!isatty(fileno(stderr)))
288     return 0;
289 
290   if ((tty = open("/dev/tty",0)) < 0)
291   {
292     perror("/dev/tty");
293     return 0;
294   }
295 
296   do
297   {
298 #if (defined i386 || defined sun)
299 #  if !(defined(BSD) && (BSD >= 199306))
300     extern char		*(sys_errlist[]);
301 #  endif
302 #  if (defined(BSD) && (BSD >= 199306)) && __STDC__
303     const
304 #  endif
305     char		*errmsg = sys_errlist[errno];
306 #else
307     char		errmsg[32];
308     (void) sprintf(errmsg,"Error %d",errno);
309 #endif
310     if (errno)
311       mesg("'%s' on %s after %quk. Continue [cyn] ? ",errmsg,which,done>>10);
312     else
313       mesg("EOF on %s after %quk. Continue [cyn] ? ",which,done>>10);
314 
315     read(tty,reply,sizeof reply);
316   }
317   while (strchr("cCyYnN",reply[0]) == 0);
318 
319   call close(tty);
320 
321   if (strchr("nN",reply[0]) != 0)
322     return 0L;
323 
324   errno = 0;
325 
326   if (strchr("cC",reply[0]) != 0)
327   {
328     call lseek(fd->fd,0L,0);
329     return fd->size;
330   }
331 
332   return space;
333 }
334 
FdCanDo(remaining,available)335 local unsigned		FdCanDo on((remaining,available)) is
336 (
337   fast address		  remaining
338 _ fast UOFF_T		  available
339 )
340 {
341   return (remaining < available)
342     ? (unsigned) remaining : (unsigned) available;
343 }
344 
FdRead(fd,buffer,todo,done)345 local address		FdRead on((fd,buffer,todo,done)) is
346 (
347   fast Fd		  *fd
348 _ pointer		  buffer
349 _ fast address		  todo
350 _ UOFF_T		  done
351 )
352 {
353   fast UOFF_T		  space;
354   fast int		  bytesRead;
355   fast address		  justDone;
356 
357   switch (fd->status)
358   {
359   when FdEOF:     return 0;
360   when FdERROR:   return -1;
361   when FdCLOSED:  return -1;
362 
363   when FdOPEN:
364 
365     space = fd->size - done%fd->size;
366 
367     for (justDone = 0; space != 0L && justDone < todo;)
368     {
369       bytesRead = read(fd->fd,buffer+justDone,
370 	     FdCanDo(todo-justDone,space-justDone));
371 
372       if (bytesRead <= 0 || (justDone += bytesRead) == space)
373 	space = FdRetry(fd,"input",done+justDone,space-justDone);
374     }
375 
376     if (bytesRead == 0) fd->status = FdEOF;
377     if (bytesRead < 0)	fd->status = FdERROR;
378 
379     Mesg(("FdRead %d reads %d last %d\n",fd->fd,justDone,bytesRead));
380 
381     return (justDone == 0) ? bytesRead : justDone;
382   }
383   /*NOTREACHED*/
384 }
385 
FdWrite(fd,buffer,todo,done)386 local address		FdWrite on((fd,buffer,todo,done)) is
387 (
388   fast Fd		  *fd
389 _ pointer		  buffer
390 _ fast address		  todo
391 _ UOFF_T		  done
392 )
393 {
394   fast UOFF_T		  space;
395   fast int		  bytesWritten;
396   fast address		  justDone;
397 
398   switch (fd->status)
399   {
400   when FdEOF:     return 0;
401   when FdERROR:   return -1;
402   when FdCLOSED:  return -1;
403 
404   when FdOPEN:
405 
406     space = fd->size - done%fd->size;
407 
408     for (justDone = 0; space != 0L && justDone < todo;)
409     {
410       bytesWritten = write(fd->fd,buffer+justDone,
411 	      FdCanDo(todo-justDone,space-justDone));
412 
413       if (bytesWritten <= 0 || (justDone += bytesWritten) == space)
414 	space = FdRetry(fd,"output",done+justDone,space-justDone);
415     }
416 
417     Mesg(("FdWrite %d writes %d last %d\n",fd->fd,justDone,bytesWritten));
418 
419     if (bytesWritten == 0)  fd->status =   FdEOF;
420     if (bytesWritten < 0)   fd->status =   FdERROR;
421 
422     return (justDone == 0) ? bytesWritten : justDone;
423   }
424   /*NOTREACHED*/
425 }
426 
427 /*
428   A Token is scalar   value   representing a command.
429 */
430 
431 typedef short scalar Token;
432 
433 #define TokenREAD   0
434 #define TokenWRITE  1
435 #define TokenSTOP   2
436 #define TokenABORT  -1
437 
438 /*
439   Here we represent Streams as Fds; this is is not entirely
440   appropriate, as Fds have also a volume size, and relatively
441   high overhead write and read functions.  Well, we just take
442   some liberties with abstraction levels here.  Actually we
443   should have an Fd abstraction for stream pipes and a Vol
444   abstraction for input and output...
445 */
446 
StreamPipe(downstream,upstream)447 local bool		StreamPipe on((downstream,upstream)) is
448 (
449   fast Fd		  *downstream
450 _ fast Fd		  *upstream
451 )
452 {
453   int			  links[2];
454 
455   if (pipe(links) < 0)
456   {
457     perror("team: opening links");
458     return false;
459   }
460 
461   Mesg(("StreamPipe fd downstream %d upstream %d\n",links[1],links[0]));
462 
463   return FdOpen(downstream,links[1],TeamHVOLSZ)
464     && FdOpen(upstream,links[0],TeamHVOLSZ);
465 }
466 
mode(struct,StreamMsg)467 mode(struct,StreamMsg)
468 {
469   Token			  token;
470   short			  status;
471   UOFF_T		  done;
472 };
473 
StreamSend(fd,token,status,done)474 local bool		StreamSend on((fd,token,status,done)) is
475 (
476   fast Fd		  *fd
477 _ Token 		  token
478 _ short 		  status
479 _ UOFF_T		  done
480 )
481 {
482   fast int		  n;
483   StreamMsg		  message;
484 
485   message.token = token;
486   message.status = status;
487   message.done = done;
488 
489   n = write(fd->fd,(pointer) &message,(unsigned) sizeof message);
490 
491   Mesg(("StreamSend fd %u n %d token %d\n",fd->fd,n,token));
492 
493   return n == sizeof message;
494 }
495 
StreamReceive(fd,tokenp,statusp,donep)496 local bool		StreamReceive on((fd,tokenp,statusp,donep)) is
497 (
498   fast Fd		  *fd
499 _ Token 		  *tokenp
500 _ short 		  *statusp
501 _ UOFF_T		  *donep
502 )
503 {
504   fast int		  n;
505   StreamMsg		  message;
506 
507   n = read(fd->fd,(pointer) &message,(unsigned) sizeof message);
508   *tokenp = message.token;
509   *statusp = message.status;
510   *donep = message.done;
511 
512   Mesg(("StreamReceive fd %u n %d token %d\n",fd->fd,n,*tokenp));
513 
514   return n == sizeof message;
515 }
516 /*
517   A guy is an instance of the input to output copier. It is attached
518   to a relay station, with an upstream link, from which commands
519   arrive, and a downward link, to which they are relayed once they are
520   executed.
521 */
522 
mode(struct,Guy)523 mode(struct,Guy)
524 {
525   int			  pid;
526   Fd			  upStream;
527   Fd			  downStream;
528 };
529 
GuyOpen(guy,pid,upstream,downstream)530 local bool		GuyOpen on((guy,pid,upstream,downstream)) is
531 (
532   fast Guy		  *guy
533 _ int			  pid
534 _ Fd			  *upstream
535 _ Fd			  *downstream
536 )
537 {
538   Mesg(("GuyOpen pid %u upstream %u downstream %u\n",
539     pid,upstream->fd,downstream->fd));
540 
541   guy->pid = pid;
542   FdSet(&guy->upStream,upstream);
543   FdSet(&guy->downStream,downstream);
544 
545   return true;
546 }
547 
548 #define GuySEND(guy,token,status,done)   \
549   StreamSend(&guy->downStream,token,status,done)
550 
551 #define GuyRECEIVE(guy,tokenp,statusp,donep) \
552   StreamReceive(&guy->upStream,tokenp,statusp,donep)
553 
554 local bool		GuyStop of((Guy *,char *,UOFF_T));
555 
GuyStart(guy,bufsize)556 local bool		GuyStart on((guy,bufsize)) is
557 (
558   fast Guy		  *guy
559 _ address		  bufsize
560 )
561 {
562   fast char		  *buffer;
563   Token 		  token;
564   short 		  status;
565   UOFF_T		  done;
566   bool			  received;
567   static int 		  bytesRead,bytesWritten;
568 
569   Mesg(("GuyStart guy %#o bufsize %u\n",guy,bufsize));
570 
571   buffer = (pointer) malloc((unsigned) bufsize);
572   if (buffer == nil(pointer))
573   {
574     mesg("team: guy %d cannot allocate %u bytes\n",
575       guy->pid,bufsize);
576     return false;
577   }
578 
579   while ((received = GuyRECEIVE(guy,&token,&status,&done)) && token != TokenSTOP)
580   switch (token)
581   {
582   when TokenREAD:
583     FdIn.status = status;
584 
585     Mesg(("GuyStart reading %d chars\n",bufsize));
586     bytesRead = FdRead(&FdIn,(pointer) buffer,bufsize,done);
587     Mesg(("GuyStart reads %d chars\n",bytesRead));
588 
589     if (bytesRead == 0) GuyStop(guy,nil(char *),done);
590     if (bytesRead < 0) GuyStop(guy,"error on guy read",done);
591 
592     done += bytesRead;
593 
594     if (verbose)
595       mesg("%quk read   \r",done>>10);
596 
597     if (!GuySEND(guy,TokenREAD,FdIn.status,done))
598       GuyStop(guy,"guy cannot send READ",done);
599 
600   when TokenWRITE:
601     FdOut.status = status;
602 
603     Mesg(("GuyStart writing %d chars\n",bytesRead));
604     bytesWritten = FdWrite(&FdOut,(pointer) buffer,(address) bytesRead,done);
605     Mesg(("GuyStart writes %d chars\n",bytesWritten));
606 
607     if (bytesWritten == 0)  GuyStop(guy,"eof on guy write",done);
608     if (bytesWritten < 0)   GuyStop(guy,"error on guy write",done);
609 
610     done += bytesWritten;
611 
612     if (verbose)
613       mesg("%quk written\r",done>>10);
614 
615     if (!GuySEND(guy,TokenWRITE,FdOut.status,done))
616       GuyStop(guy,"guy cannot send WRITE",done);
617 
618   when TokenABORT:
619     GuyStop(guy,"guy was aborted",0L);
620 
621   otherwise:
622     GuyStop(guy,"impossible token on ring",done);
623   }
624 
625   /* free((char *) buffer); */
626 
627   GuyStop(guy,(received) ? nil(char *) : "error on upstream receive",0L);
628   /*NOTREACHED*/
629 
630   /*return true;*/
631 }
632 
GuyStop(guy,errormsg,done)633 local bool		GuyStop on((guy,errormsg,done)) is
634 (
635   fast Guy		  *guy
636 _ char			  *errormsg
637 _ UOFF_T		  done
638 )
639 {
640   Mesg(("GuyStop guy %#o\n",guy));
641 
642   if (done)
643   {
644     if (report)
645       mesg("%qu kilobytes, %lu seconds\r\n",
646 	  done>>10,(UOFF_T) (time((time_t *) 0)-origin));
647     else if (verbose)
648       mesg("\n");
649   }
650 
651   if (errormsg != nil(char *))
652   {
653     mesg("team: guy pid %u: %s\n",guy->pid,errormsg);
654     call GuySEND(guy,TokenABORT,FdERROR,0L);
655     exit(2);
656     /*NOTREACHED*/
657   }
658 
659   if (!GuySEND(guy,TokenSTOP,FdEOF,0L))
660   {
661     exit(1);
662     /*NOTREACHED*/
663   }
664 
665   exit(0);
666   /*NOTREACHED*/
667 }
668 
GuyClose(guy)669 local bool		GuyClose on((guy)) is
670 (
671   fast Guy		  *guy
672 )
673 {
674   return FdClose(&guy->upStream) && FdClose(&guy->downStream);
675 }
676 
677 /*
678   A team is made up of a ring of guys; each guy copies a blockfrom its
679   input to its ouput, and is driven by tokens sent to it by the
680   previous guy on a pipe.
681 */
682 
mode(struct,Team)683 mode(struct,Team)
684 {
685   Guy			  *guys;
686   short unsigned	  size;
687   short unsigned	  active;
688 };
689 
TeamOpen(team,nominalsize)690 local bool		TeamOpen on((team,nominalsize)) is
691 (
692   Team			  *team
693 _ short unsigned	  nominalsize
694 )
695 {
696   Mesg(("TeamOpen nominalsize %u\n",nominalsize));
697 
698   team->size     = 0;
699   team->active   = 0;
700 
701   team->guys = (Guy *) calloc(sizeof (Guy),nominalsize);
702 
703   for (team->size = 0; team->size < nominalsize; team->size++);
704 
705   were (team->guys == nil(Guy *))
706     return false;
707 
708   return true;
709 }
710 
TeamStart(team,bufsize,isize,osize)711 local bool		TeamStart on((team,bufsize,isize,osize)) is
712 (
713   fast Team		  *team
714 _ address		  bufsize
715 _ UOFF_T		  isize
716 _ UOFF_T		  osize
717 )
718 {
719   /*
720     When generating each guy, we pass it an upstream link that
721     is the downstream of the previous guy, and create a new
722     downstream link that will be the next upstream.
723 
724     At each turn we obviously close the old downstream once it
725     has been passed to the forked guy.
726 
727     A special case are the first and last guys; the upstreamof
728     the first guy shall be the downstream of the last.  This
729     goes against the grain of our main logic, where the
730     upstream is expected to already exist and the downstream
731     must be created.
732 
733     This means that the last and first guys are created in a
734     special way.  When creating the first guy we shall create
735     its upstreamlink as well as its downstream, and we shall
736     save that in a special variable, last_downstream.  This we
737     shall use as the downstreamof the last guy.
738 
739     We shall also keep it open in the team manager (parent
740     process) because we shall use it to do the initial send of
741     the read and write tokens that will circulate in the relay
742     ring, activating the guys.
743 
744     Of course because of this each guy will inherit this link
745     as wellas its upstream and downstream, but shall graciously
746     close it.
747   */
748 
749   Fd 		    last_downstream;
750   Fd 		    this_upstream;
751   Fd 		    this_downstream;
752   Fd 		    next_upstream;
753 
754   Mesg(("TeamStart team %#o size %u bufsize %u\n",
755     team,team->size,bufsize));
756 
757   call FdOpen(&FdIn,0,isize); call FdOpen(&FdOut,1,osize);
758 
759   for (team->active = 0; team->active < team->size; team->active++)
760   {
761     fast Guy		*guy;
762     fast int 		pid;
763 
764     guy = team->guys+team->active;
765 
766     if (team->active == 0)
767     {
768       if (!StreamPipe(&last_downstream,&this_upstream))
769       {
770 	perror("cannot open first link");
771 	return false;
772       }
773 
774       if (!StreamPipe(&this_downstream,&next_upstream))
775       {
776 	perror("cannot  open link");
777 	return false;
778       }
779     }
780     else if (team->active < (team->size-1))
781     {
782       if (!StreamPipe(&this_downstream,&next_upstream))
783       {
784 	perror("cannot  open link");
785 	return  false;
786       }
787     }
788     else /*if (team->active == team->size-1)*/
789     {
790       FdSet(&this_downstream,&last_downstream);
791       if (!FdCopy(&last_downstream,&this_downstream))
792 	perror("team: cannot copy last downstream");
793     }
794 
795     Mesg(("TeamStart going to fork for guy %#o\n",guy));
796 
797     pid = fork();
798 
799     if (pid > 0)
800     {
801       Mesg(("TeamStart forked guy %#o as pid %u\n",guy,pid));
802       guy->pid = pid;
803 
804       if (!FdClose(&this_upstream))
805 	perror("cannot close this upstream link");
806       if (!FdClose(&this_downstream))
807 	perror("cannot close this downstream link");
808 
809       FdSet(&this_upstream,&next_upstream);
810     }
811     else if (pid == 0)
812     {
813       pid = getpid();
814 
815       /* Set SIGPIPE handling back to the default in the guys */
816       signal(SIGPIPE, SIG_DFL);
817 
818       if (!FdClose(&last_downstream))
819 	perror("cannot close inherited first link");
820 
821       if (!GuyOpen(guy,pid,&this_upstream,&this_downstream))
822 	GuyStop(guy,"cannot open guy",0L);
823       if (!GuyStart(guy,bufsize))
824 	GuyStop(guy,"cannot start guy",0L);
825       if (!GuyClose(guy))
826 	perror("cannot close guy");
827 
828       /*NOTREACHED*/
829     }
830     else if (pid < 0)
831     {
832       perror("team: forking a guy");
833       return false;
834     }
835   }
836 
837   if (!StreamSend(&last_downstream,TokenREAD,FdOPEN,0L) && errno != EPIPE)
838   {
839     perror("cannot send first READ token");
840     return false;
841   }
842 
843   if (!StreamSend(&last_downstream,TokenWRITE,FdOPEN,0L) && errno != EPIPE)
844   {
845     perror("cannot send first WRITE token");
846     return false;
847   }
848 
849   if (!FdClose(&last_downstream))
850     perror("cannot close first link");
851 
852   return true;
853 }
854 
TeamWait(team)855 local bool		TeamWait on((team)) is
856 (
857   fast Team		    *team
858 )
859 {
860   while (team->active != 0)
861   {
862     int 		    guypid;
863     int 		    status;
864 
865     guypid = wait(&status);
866     if (guypid >= 0)
867     {
868       fast short unsigned guyno;
869 
870       for (guyno = 0; guyno < team->size; guyno++)
871 	if (guypid == team->guys[guyno].pid)
872 	{
873 	  team->guys[guyno].pid = -1;
874 	  break;
875 	}
876     }
877     else
878     {
879       mesg("team: no guys, believed %u left\n",team->active);
880       return true;
881     }
882 
883     --team->active;
884 
885 #ifdef WIFEXITED
886     /* If a guy had an error, its exit status is 2.  Also catch a killed guy */
887     if ((WIFEXITED(status) && WEXITSTATUS(status) == 2) ||
888 	(WIFSIGNALED(status) && WTERMSIG(status) != SIGPIPE)) {
889       guyhaderror = true;
890     }
891 #endif
892 
893     if (status != 0   && team->active != 0)
894       return false;
895   }
896 
897   return true;
898 }
899 
TeamStop(team)900 local bool		TeamStop on((team)) is
901 (
902   fast Team		  *team
903 )
904 {
905   fast short unsigned	  guyno;
906 
907   Mesg(("TeamStop team %#o\n",team));
908 
909   for (guyno = 0; guyno < team->size; guyno++)
910   {
911     fast Guy		    *guy;
912 
913     guy = team->guys+guyno;
914     if (guy->pid >= 0)
915     {
916       /*kill(guy->pid,SIGKILL);*/
917       --team->active;
918     }
919   }
920 
921   return team->active == 0;
922 }
923 
TeamClose(team)924 local bool		TeamClose on((team)) is
925 (
926   fast Team		  *team
927 )
928 {
929   for (team->size; team->size != 0; --team->size)
930     continue;
931 
932   free(team->guys);
933 
934   return true;
935 }
936 
usage(noparms)937 local void		usage of((noparms))
938 {
939   fprintf(stderr,"\
940 syntax: team [-[vr]] [-iI[bkm] [-oO[bkm] [N[bkm] [P]]\n\
941   copies standard input to output\n\
942   -v gives ongoing report, -r final report\n\
943   I is input volume size (default %qum)\n\
944   O is output volume size (default %qum)\n\
945   N is buffer size (default %luk)\n\
946   P is number of processes (default %u)\n\
947   (postfix b means *512, k means *1KB, m means *1MB)\n\
948 ",
949       TeamHVOLSZ>>20,TeamHVOLSZ>>20,
950       TeamDBUFSZ>>10,TeamDTEAMSZ);
951 
952   exit(1);
953   /*NOTREACHED*/
954 }
955 
atos(s)956 local UOFF_T	atos on((s)) is
957 (
958   fast char		  *s
959 )
960 {
961   fast UOFF_T		  l;
962 
963   for (
964     s, l = 0L;
965     *s >= '0' && *s <= '9';
966     s++
967   )
968     l = l*10L + (UOFF_T) (*s-'0');
969 
970   if (*s == 'b') l *= (1L<<9);
971   if (*s == 'k') l *= (1L<<10);
972   if (*s == 'm') l *= (1L<<20);
973 
974   return l;
975 }
976 
main(argc,argv)977 global int		main on((argc,argv)) is
978 (
979   int			  argc
980 _ char			  *(argv[])
981 )
982 {
983   Team			  team;
984   short unsigned	  teamsize;
985 
986   address		  bufsize;
987   UOFF_T		  isize;
988   UOFF_T		  osize;
989   int			  opt;
990 
991   teamsize = TeamDTEAMSZ;
992   bufsize = TeamDBUFSZ;
993   isize = TeamHVOLSZ;
994   osize = TeamHVOLSZ;
995   optind = 1;
996 
997   while ((opt = getopt(argc,argv,"vri:o:")) != -1)
998     switch (opt)
999     {
1000     when 'i':
1001       isize = atos(optarg);
1002       if (isize < TeamLVOLSZ || isize > TeamHVOLSZ)
1003       {
1004 	fprintf(stderr,"team: invalid input volume size %qu\n",isize);
1005 	usage();
1006       }
1007 
1008     when 'o':
1009       osize = atos(optarg);
1010       if (osize < TeamLVOLSZ || osize > TeamHVOLSZ)
1011       {
1012 	fprintf(stderr,"team: invalid output volume size %qu\n",osize);
1013 	usage();
1014       }
1015 
1016     when 'v':
1017       verbose ^= 1;
1018 
1019     when 'r':
1020       report ^= 1;
1021 
1022     otherwise:
1023       usage();
1024     }
1025 
1026   argc -= optind, argv += optind;
1027 
1028   if (argc != 0)
1029   {
1030     bufsize = (address) atos(argv[0]);
1031     if (bufsize < TeamLBUFSZ || bufsize > TeamHBUFSZ)
1032     {
1033       fprintf(stderr,"team: invalid block size %u\n",
1034 	bufsize);
1035       usage();
1036     }
1037     --argc, argv++;
1038   }
1039 
1040   if (argc != 0)
1041   {
1042     teamsize = atoi(argv[0]);
1043     if (teamsize < 2 || teamsize > TeamHTEAMSZ)
1044     {
1045       fprintf(stderr,"team: invalid # of processes %d\n",teamsize);
1046       usage();
1047     }
1048     --argc, argv++;
1049   }
1050 
1051   if (argc != 0)   usage();
1052 
1053   if (!TeamOpen(&team,teamsize))
1054   {
1055     mesg("team: cannot setup the team with %u guys\n",teamsize);
1056     return 1;
1057   }
1058 
1059   origin = time((time_t *) 0);
1060 
1061   /*
1062    * Ignore SIGPIPE in the parent as it affects the exit status reporting.
1063    */
1064   signal(SIGPIPE, SIG_IGN);
1065 
1066   if (!TeamStart(&team,bufsize,isize,osize))
1067   {
1068     mesg("team: cannot start the team\n");
1069     return 1;
1070   }
1071 
1072   if (!TeamWait(&team))
1073   {
1074     mesg("team: stop remaining %u guys\n",team.active);
1075 
1076     if (!TeamStop(&team))
1077     {
1078       mesg("team: cannot stop the team\n");
1079       return 1;
1080     }
1081   }
1082 
1083   if (!TeamClose(&team))
1084   {
1085     mesg("team: cannot close the team\n");
1086     return 1;
1087   }
1088 
1089   if (guyhaderror)
1090   {
1091     mesg("team: guy had error\n");
1092     return 1;
1093   }
1094 
1095   return 0;
1096 }
1097