1 /*
2 $Header: /fs/mumon/aware/piercarl/Cmd./Commands./team.c,v 3.1 1994/01/13 15:03:25 piercarl Exp piercarl $
3 */
4
5 static char Notice[] =
6 "Copyright 1987,1989 Piercarlo Grandi. All rights reserved.";
7
8 /*
9 This program is free software; you can redistribute it and/or
10 modify it under the terms of the GNU General Public License as
11 published by the Free Software Foundation; either version 2, or
12 (at your option) any later version.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You may have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24 #undef DEBUG
25
26 /*
27 Unix programs normally do synchronous read and write, that is,
28 you read and then you write; no overlap is possible.
29
30 This is especially catastrophic for device to device copies,
31 whereit is important to minimize elapsed time, by overlapping
32 activity on one with activity on another.
33
34 To obtain this, a multiprocess structure is necessary under
35 Unix. This program is functionally equivalento to a pipe, in
36 that it copies its input (fd 0) to its output (fd 1) link.
37
38 This programs is executed as a Team of N processes, called
39 Guys, all of which share the same input and output links; the
40 first reads a chunk of input, awakens the second, writes the
41 chunk to its output; the second does the same, and the last
42 awakens the first.
43
44 Since this process is essentially cyclic, we use a ring of
45 pipes to synchronize the Guys. Each guy has un input pipe from
46 the upstream guy and an output pipe to the downstream guy.
47 Whenever a guy receives a READ command from the upstream, it
48 first reads a block and then passes on the READ command
49 downstream; it then waits for a WRITE command from upstream,
50 and then writes the block and after that passes the WRITE
51 command downstream. A count of how much has been processed is
52 also passwd along, for statistics and verification.
53
54 Two other commands are used, one is STOP, and is sent
55 downstream from the guy that detects the end of file of the
56 input, after which the guy exits, and ABORT, which is sent
57 downstream from the guy which detects trouble in the guy
58 upstream to it, which has much the same effect.
59 */
60
61 #define UOFF_T uint64_t
62
63 #define TeamLVOLSZ (UOFF_T)(1L<<10)
64 #define TeamHVOLSZ ((UOFF_T) 3 * ((UOFF_T) 1 << 62))
65
66 #define TeamLBUFSZ (64) /* Low buffer size */
67 #define TeamDBUFSZ (60*512) /* Default buffer size */
68 #define TeamHBUFSZ (1L<<26) /* High buffer size */
69
70 #define TeamDTEAMSZ 4 /* Default # of processes */
71 #define TeamHTEAMSZ 16 /* High # of processes */
72
73 /*
74 External components... Probably the only system dependent part
75 of this program, as some systems have something in
76 /usr/include/sys where others have it in /usr/include.
77
78 Also, the mesg() procedure is highly system dependent... watch
79 out for locking and variable number of arguments.
80 */
81
82 #include <errno.h>
83 #include <signal.h>
84 #include <stdio.h>
85 #include <sys/types.h>
86 #include <sys/file.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <time.h>
90 #include <stdlib.h>
91 #include <string.h>
92 #include <unistd.h> /* for lseek() */
93 #ifdef HAVE_WAIT_H
94 #include <sys/wait.h>
95 #endif
96
97 #ifdef HAVE_PARAM_H
98 #include <sys/param.h>
99 #endif
100
101
102 #ifdef sun
103 # undef F_SETLKW
104 #endif
105
106 #ifdef __FreeBSD__
107 # undef F_SETLKW
108 #endif
109
110 #if (PCG)
111 # include "Extend.h"
112 # include "Here.h"
113 # include "Type.h"
114 #else
115 # define call (void)
116 # define were if
117 # define fast register
118 # define global /* extern */
119 # define local static
120 # define when break; case
121 # define otherwise break; default
122 # define mode(which,name) typedef which name name; which name
123 # define bool int
124 # define true 1
125 # define false 0
126 # define nil(type) ((type) 0)
127 # define scalar int
128 typedef char *pointer;
129 # if (defined(SMALL_M))
130 typedef unsigned address;
131 # else
132 typedef long address;
133 # endif
134 # if (__STDC__)
135 # define of(list) list
136 # define on(list)
137 # define is(list) (list)
138 # define _ ,
139 # define noparms void
140 # else
141 # define void int
142 # define const /* const */
143 # define of(list) ()
144 # define on(list) list
145 # define is(list) list;
146 # define _ ;
147 # define noparms
148 # endif
149 #endif
150
151 #ifdef DEBUG
152 # define Mesg(list) mesg list
153 #else
154 # define Mesg(list)
155 #endif
156
157 /*VARARGS1*/
mesg(a,b,c,d,e,f,g,h,i)158 mesg(a,b,c,d,e,f,g,h,i)
159 char *a;
160 int b,c,d,e,f,g,h,i;
161 {
162 # if (defined F_SETLKW)
163 struct flock l;
164 l.l_whence = 0; l.l_start = 0L; l.l_len = 0L;
165 l.l_type = F_WRLCK; fcntl(fileno(stderr),F_SETLKW,&l);
166 # endif
167 # if (defined LOCK_EX)
168 flock(fileno(stderr),LOCK_EX);
169 # endif
170 fprintf(stderr,a,b,c,d,e,f,g,h,i);
171 # if (defined LOCK_EX)
172 flock(fileno(stderr),LOCK_UN);
173 # endif
174 # if (defined F_SETLKW)
175 l.l_type = F_UNLCK; fcntl(fileno(stderr),F_SETLKW,&l);
176 # endif
177 }
178
179 local bool verbose = false;
180 local bool report = true;
181 local bool guyhaderror = false;
182
183 local time_t origin;
184 extern char *optarg;
185 extern int optind;
186
187 /*
188 The regular Unix read and write calls are not guaranteed to process
189 all the bytes requested. These procedures guarantee that if the
190 request is for N bytes, all of them are read or written unless there
191 is an error or eof.
192 */
193
194 #define FdCLOSED 0
195 #define FdOPEN 1
196 #define FdEOF 2
197 #define FdERROR 3
198
mode(struct,Fd)199 mode(struct,Fd)
200 {
201 int fd;
202 short status;
203 UOFF_T size;
204 };
205
206 local Fd FdIn,FdOut;
207
FdOpen(fd,ffd,size)208 local bool FdOpen on((fd,ffd,size)) is
209 (
210 fast Fd *fd
211 _ int ffd
212 _ UOFF_T size
213 )
214 {
215 fd->status = (ffd >= 0) ? FdOPEN : FdCLOSED;
216 fd->fd = ffd;
217 fd->size = size;
218
219 Mesg(("FdOpen fd %d\n",ffd));
220
221 return ffd >= 0;
222 }
223
FdClose(fd)224 local bool FdClose on((fd)) is
225 (
226 fast Fd *fd
227 )
228 {
229 int ffd;
230
231 ffd = fd->fd;
232 Mesg(("FdClose fd %d\n",fd->fd));
233 fd->status = FdCLOSED;
234 fd->fd = -1;
235
236 return close(ffd) >= 0;
237 }
238
FdCopy(to,from)239 local bool FdCopy on((to,from)) is
240 (
241 fast Fd *to
242 _ fast Fd *from
243 )
244 {
245 to->size = from->size;
246 to->status = from->status;
247 to->fd = dup(from->fd);
248 Mesg(("FdCopy of %d is %d\n",from->fd,to->fd));
249 return to->fd >= 0;
250 }
251
FdSet(to,from)252 local void FdSet on((to,from)) is
253 (
254 fast Fd *to
255 _ fast Fd *from
256 )
257 {
258 if (from->fd < 0)
259 mesg("team: set an invalid fd\n");
260 to->size = from->size;
261 to->status = from->status;
262 to->fd = from->fd;
263 }
264
FdRetry(fd,which,done,space)265 local UOFF_T FdRetry on((fd,which,done,space)) is
266 (
267 fast Fd *fd
268 _ char *which
269 _ UOFF_T done
270 _ UOFF_T space
271 )
272 {
273 int tty;
274 char reply[2];
275 struct stat st;
276
277 if (fstat(fd->fd,&st) < 0)
278 {
279 perror(which);
280 return 0;
281 }
282
283 st.st_mode &= S_IFMT;
284 if (st.st_mode != S_IFCHR && st.st_mode != S_IFBLK)
285 return 0;
286
287 if (!isatty(fileno(stderr)))
288 return 0;
289
290 if ((tty = open("/dev/tty",0)) < 0)
291 {
292 perror("/dev/tty");
293 return 0;
294 }
295
296 do
297 {
298 #if (defined i386 || defined sun)
299 # if !(defined(BSD) && (BSD >= 199306))
300 extern char *(sys_errlist[]);
301 # endif
302 # if (defined(BSD) && (BSD >= 199306)) && __STDC__
303 const
304 # endif
305 char *errmsg = sys_errlist[errno];
306 #else
307 char errmsg[32];
308 (void) sprintf(errmsg,"Error %d",errno);
309 #endif
310 if (errno)
311 mesg("'%s' on %s after %quk. Continue [cyn] ? ",errmsg,which,done>>10);
312 else
313 mesg("EOF on %s after %quk. Continue [cyn] ? ",which,done>>10);
314
315 read(tty,reply,sizeof reply);
316 }
317 while (strchr("cCyYnN",reply[0]) == 0);
318
319 call close(tty);
320
321 if (strchr("nN",reply[0]) != 0)
322 return 0L;
323
324 errno = 0;
325
326 if (strchr("cC",reply[0]) != 0)
327 {
328 call lseek(fd->fd,0L,0);
329 return fd->size;
330 }
331
332 return space;
333 }
334
FdCanDo(remaining,available)335 local unsigned FdCanDo on((remaining,available)) is
336 (
337 fast address remaining
338 _ fast UOFF_T available
339 )
340 {
341 return (remaining < available)
342 ? (unsigned) remaining : (unsigned) available;
343 }
344
FdRead(fd,buffer,todo,done)345 local address FdRead on((fd,buffer,todo,done)) is
346 (
347 fast Fd *fd
348 _ pointer buffer
349 _ fast address todo
350 _ UOFF_T done
351 )
352 {
353 fast UOFF_T space;
354 fast int bytesRead;
355 fast address justDone;
356
357 switch (fd->status)
358 {
359 when FdEOF: return 0;
360 when FdERROR: return -1;
361 when FdCLOSED: return -1;
362
363 when FdOPEN:
364
365 space = fd->size - done%fd->size;
366
367 for (justDone = 0; space != 0L && justDone < todo;)
368 {
369 bytesRead = read(fd->fd,buffer+justDone,
370 FdCanDo(todo-justDone,space-justDone));
371
372 if (bytesRead <= 0 || (justDone += bytesRead) == space)
373 space = FdRetry(fd,"input",done+justDone,space-justDone);
374 }
375
376 if (bytesRead == 0) fd->status = FdEOF;
377 if (bytesRead < 0) fd->status = FdERROR;
378
379 Mesg(("FdRead %d reads %d last %d\n",fd->fd,justDone,bytesRead));
380
381 return (justDone == 0) ? bytesRead : justDone;
382 }
383 /*NOTREACHED*/
384 }
385
FdWrite(fd,buffer,todo,done)386 local address FdWrite on((fd,buffer,todo,done)) is
387 (
388 fast Fd *fd
389 _ pointer buffer
390 _ fast address todo
391 _ UOFF_T done
392 )
393 {
394 fast UOFF_T space;
395 fast int bytesWritten;
396 fast address justDone;
397
398 switch (fd->status)
399 {
400 when FdEOF: return 0;
401 when FdERROR: return -1;
402 when FdCLOSED: return -1;
403
404 when FdOPEN:
405
406 space = fd->size - done%fd->size;
407
408 for (justDone = 0; space != 0L && justDone < todo;)
409 {
410 bytesWritten = write(fd->fd,buffer+justDone,
411 FdCanDo(todo-justDone,space-justDone));
412
413 if (bytesWritten <= 0 || (justDone += bytesWritten) == space)
414 space = FdRetry(fd,"output",done+justDone,space-justDone);
415 }
416
417 Mesg(("FdWrite %d writes %d last %d\n",fd->fd,justDone,bytesWritten));
418
419 if (bytesWritten == 0) fd->status = FdEOF;
420 if (bytesWritten < 0) fd->status = FdERROR;
421
422 return (justDone == 0) ? bytesWritten : justDone;
423 }
424 /*NOTREACHED*/
425 }
426
427 /*
428 A Token is scalar value representing a command.
429 */
430
431 typedef short scalar Token;
432
433 #define TokenREAD 0
434 #define TokenWRITE 1
435 #define TokenSTOP 2
436 #define TokenABORT -1
437
438 /*
439 Here we represent Streams as Fds; this is is not entirely
440 appropriate, as Fds have also a volume size, and relatively
441 high overhead write and read functions. Well, we just take
442 some liberties with abstraction levels here. Actually we
443 should have an Fd abstraction for stream pipes and a Vol
444 abstraction for input and output...
445 */
446
StreamPipe(downstream,upstream)447 local bool StreamPipe on((downstream,upstream)) is
448 (
449 fast Fd *downstream
450 _ fast Fd *upstream
451 )
452 {
453 int links[2];
454
455 if (pipe(links) < 0)
456 {
457 perror("team: opening links");
458 return false;
459 }
460
461 Mesg(("StreamPipe fd downstream %d upstream %d\n",links[1],links[0]));
462
463 return FdOpen(downstream,links[1],TeamHVOLSZ)
464 && FdOpen(upstream,links[0],TeamHVOLSZ);
465 }
466
mode(struct,StreamMsg)467 mode(struct,StreamMsg)
468 {
469 Token token;
470 short status;
471 UOFF_T done;
472 };
473
StreamSend(fd,token,status,done)474 local bool StreamSend on((fd,token,status,done)) is
475 (
476 fast Fd *fd
477 _ Token token
478 _ short status
479 _ UOFF_T done
480 )
481 {
482 fast int n;
483 StreamMsg message;
484
485 message.token = token;
486 message.status = status;
487 message.done = done;
488
489 n = write(fd->fd,(pointer) &message,(unsigned) sizeof message);
490
491 Mesg(("StreamSend fd %u n %d token %d\n",fd->fd,n,token));
492
493 return n == sizeof message;
494 }
495
StreamReceive(fd,tokenp,statusp,donep)496 local bool StreamReceive on((fd,tokenp,statusp,donep)) is
497 (
498 fast Fd *fd
499 _ Token *tokenp
500 _ short *statusp
501 _ UOFF_T *donep
502 )
503 {
504 fast int n;
505 StreamMsg message;
506
507 n = read(fd->fd,(pointer) &message,(unsigned) sizeof message);
508 *tokenp = message.token;
509 *statusp = message.status;
510 *donep = message.done;
511
512 Mesg(("StreamReceive fd %u n %d token %d\n",fd->fd,n,*tokenp));
513
514 return n == sizeof message;
515 }
516 /*
517 A guy is an instance of the input to output copier. It is attached
518 to a relay station, with an upstream link, from which commands
519 arrive, and a downward link, to which they are relayed once they are
520 executed.
521 */
522
mode(struct,Guy)523 mode(struct,Guy)
524 {
525 int pid;
526 Fd upStream;
527 Fd downStream;
528 };
529
GuyOpen(guy,pid,upstream,downstream)530 local bool GuyOpen on((guy,pid,upstream,downstream)) is
531 (
532 fast Guy *guy
533 _ int pid
534 _ Fd *upstream
535 _ Fd *downstream
536 )
537 {
538 Mesg(("GuyOpen pid %u upstream %u downstream %u\n",
539 pid,upstream->fd,downstream->fd));
540
541 guy->pid = pid;
542 FdSet(&guy->upStream,upstream);
543 FdSet(&guy->downStream,downstream);
544
545 return true;
546 }
547
548 #define GuySEND(guy,token,status,done) \
549 StreamSend(&guy->downStream,token,status,done)
550
551 #define GuyRECEIVE(guy,tokenp,statusp,donep) \
552 StreamReceive(&guy->upStream,tokenp,statusp,donep)
553
554 local bool GuyStop of((Guy *,char *,UOFF_T));
555
GuyStart(guy,bufsize)556 local bool GuyStart on((guy,bufsize)) is
557 (
558 fast Guy *guy
559 _ address bufsize
560 )
561 {
562 fast char *buffer;
563 Token token;
564 short status;
565 UOFF_T done;
566 bool received;
567 static int bytesRead,bytesWritten;
568
569 Mesg(("GuyStart guy %#o bufsize %u\n",guy,bufsize));
570
571 buffer = (pointer) malloc((unsigned) bufsize);
572 if (buffer == nil(pointer))
573 {
574 mesg("team: guy %d cannot allocate %u bytes\n",
575 guy->pid,bufsize);
576 return false;
577 }
578
579 while ((received = GuyRECEIVE(guy,&token,&status,&done)) && token != TokenSTOP)
580 switch (token)
581 {
582 when TokenREAD:
583 FdIn.status = status;
584
585 Mesg(("GuyStart reading %d chars\n",bufsize));
586 bytesRead = FdRead(&FdIn,(pointer) buffer,bufsize,done);
587 Mesg(("GuyStart reads %d chars\n",bytesRead));
588
589 if (bytesRead == 0) GuyStop(guy,nil(char *),done);
590 if (bytesRead < 0) GuyStop(guy,"error on guy read",done);
591
592 done += bytesRead;
593
594 if (verbose)
595 mesg("%quk read \r",done>>10);
596
597 if (!GuySEND(guy,TokenREAD,FdIn.status,done))
598 GuyStop(guy,"guy cannot send READ",done);
599
600 when TokenWRITE:
601 FdOut.status = status;
602
603 Mesg(("GuyStart writing %d chars\n",bytesRead));
604 bytesWritten = FdWrite(&FdOut,(pointer) buffer,(address) bytesRead,done);
605 Mesg(("GuyStart writes %d chars\n",bytesWritten));
606
607 if (bytesWritten == 0) GuyStop(guy,"eof on guy write",done);
608 if (bytesWritten < 0) GuyStop(guy,"error on guy write",done);
609
610 done += bytesWritten;
611
612 if (verbose)
613 mesg("%quk written\r",done>>10);
614
615 if (!GuySEND(guy,TokenWRITE,FdOut.status,done))
616 GuyStop(guy,"guy cannot send WRITE",done);
617
618 when TokenABORT:
619 GuyStop(guy,"guy was aborted",0L);
620
621 otherwise:
622 GuyStop(guy,"impossible token on ring",done);
623 }
624
625 /* free((char *) buffer); */
626
627 GuyStop(guy,(received) ? nil(char *) : "error on upstream receive",0L);
628 /*NOTREACHED*/
629
630 /*return true;*/
631 }
632
GuyStop(guy,errormsg,done)633 local bool GuyStop on((guy,errormsg,done)) is
634 (
635 fast Guy *guy
636 _ char *errormsg
637 _ UOFF_T done
638 )
639 {
640 Mesg(("GuyStop guy %#o\n",guy));
641
642 if (done)
643 {
644 if (report)
645 mesg("%qu kilobytes, %lu seconds\r\n",
646 done>>10,(UOFF_T) (time((time_t *) 0)-origin));
647 else if (verbose)
648 mesg("\n");
649 }
650
651 if (errormsg != nil(char *))
652 {
653 mesg("team: guy pid %u: %s\n",guy->pid,errormsg);
654 call GuySEND(guy,TokenABORT,FdERROR,0L);
655 exit(2);
656 /*NOTREACHED*/
657 }
658
659 if (!GuySEND(guy,TokenSTOP,FdEOF,0L))
660 {
661 exit(1);
662 /*NOTREACHED*/
663 }
664
665 exit(0);
666 /*NOTREACHED*/
667 }
668
GuyClose(guy)669 local bool GuyClose on((guy)) is
670 (
671 fast Guy *guy
672 )
673 {
674 return FdClose(&guy->upStream) && FdClose(&guy->downStream);
675 }
676
677 /*
678 A team is made up of a ring of guys; each guy copies a blockfrom its
679 input to its ouput, and is driven by tokens sent to it by the
680 previous guy on a pipe.
681 */
682
mode(struct,Team)683 mode(struct,Team)
684 {
685 Guy *guys;
686 short unsigned size;
687 short unsigned active;
688 };
689
TeamOpen(team,nominalsize)690 local bool TeamOpen on((team,nominalsize)) is
691 (
692 Team *team
693 _ short unsigned nominalsize
694 )
695 {
696 Mesg(("TeamOpen nominalsize %u\n",nominalsize));
697
698 team->size = 0;
699 team->active = 0;
700
701 team->guys = (Guy *) calloc(sizeof (Guy),nominalsize);
702
703 for (team->size = 0; team->size < nominalsize; team->size++);
704
705 were (team->guys == nil(Guy *))
706 return false;
707
708 return true;
709 }
710
TeamStart(team,bufsize,isize,osize)711 local bool TeamStart on((team,bufsize,isize,osize)) is
712 (
713 fast Team *team
714 _ address bufsize
715 _ UOFF_T isize
716 _ UOFF_T osize
717 )
718 {
719 /*
720 When generating each guy, we pass it an upstream link that
721 is the downstream of the previous guy, and create a new
722 downstream link that will be the next upstream.
723
724 At each turn we obviously close the old downstream once it
725 has been passed to the forked guy.
726
727 A special case are the first and last guys; the upstreamof
728 the first guy shall be the downstream of the last. This
729 goes against the grain of our main logic, where the
730 upstream is expected to already exist and the downstream
731 must be created.
732
733 This means that the last and first guys are created in a
734 special way. When creating the first guy we shall create
735 its upstreamlink as well as its downstream, and we shall
736 save that in a special variable, last_downstream. This we
737 shall use as the downstreamof the last guy.
738
739 We shall also keep it open in the team manager (parent
740 process) because we shall use it to do the initial send of
741 the read and write tokens that will circulate in the relay
742 ring, activating the guys.
743
744 Of course because of this each guy will inherit this link
745 as wellas its upstream and downstream, but shall graciously
746 close it.
747 */
748
749 Fd last_downstream;
750 Fd this_upstream;
751 Fd this_downstream;
752 Fd next_upstream;
753
754 Mesg(("TeamStart team %#o size %u bufsize %u\n",
755 team,team->size,bufsize));
756
757 call FdOpen(&FdIn,0,isize); call FdOpen(&FdOut,1,osize);
758
759 for (team->active = 0; team->active < team->size; team->active++)
760 {
761 fast Guy *guy;
762 fast int pid;
763
764 guy = team->guys+team->active;
765
766 if (team->active == 0)
767 {
768 if (!StreamPipe(&last_downstream,&this_upstream))
769 {
770 perror("cannot open first link");
771 return false;
772 }
773
774 if (!StreamPipe(&this_downstream,&next_upstream))
775 {
776 perror("cannot open link");
777 return false;
778 }
779 }
780 else if (team->active < (team->size-1))
781 {
782 if (!StreamPipe(&this_downstream,&next_upstream))
783 {
784 perror("cannot open link");
785 return false;
786 }
787 }
788 else /*if (team->active == team->size-1)*/
789 {
790 FdSet(&this_downstream,&last_downstream);
791 if (!FdCopy(&last_downstream,&this_downstream))
792 perror("team: cannot copy last downstream");
793 }
794
795 Mesg(("TeamStart going to fork for guy %#o\n",guy));
796
797 pid = fork();
798
799 if (pid > 0)
800 {
801 Mesg(("TeamStart forked guy %#o as pid %u\n",guy,pid));
802 guy->pid = pid;
803
804 if (!FdClose(&this_upstream))
805 perror("cannot close this upstream link");
806 if (!FdClose(&this_downstream))
807 perror("cannot close this downstream link");
808
809 FdSet(&this_upstream,&next_upstream);
810 }
811 else if (pid == 0)
812 {
813 pid = getpid();
814
815 /* Set SIGPIPE handling back to the default in the guys */
816 signal(SIGPIPE, SIG_DFL);
817
818 if (!FdClose(&last_downstream))
819 perror("cannot close inherited first link");
820
821 if (!GuyOpen(guy,pid,&this_upstream,&this_downstream))
822 GuyStop(guy,"cannot open guy",0L);
823 if (!GuyStart(guy,bufsize))
824 GuyStop(guy,"cannot start guy",0L);
825 if (!GuyClose(guy))
826 perror("cannot close guy");
827
828 /*NOTREACHED*/
829 }
830 else if (pid < 0)
831 {
832 perror("team: forking a guy");
833 return false;
834 }
835 }
836
837 if (!StreamSend(&last_downstream,TokenREAD,FdOPEN,0L) && errno != EPIPE)
838 {
839 perror("cannot send first READ token");
840 return false;
841 }
842
843 if (!StreamSend(&last_downstream,TokenWRITE,FdOPEN,0L) && errno != EPIPE)
844 {
845 perror("cannot send first WRITE token");
846 return false;
847 }
848
849 if (!FdClose(&last_downstream))
850 perror("cannot close first link");
851
852 return true;
853 }
854
TeamWait(team)855 local bool TeamWait on((team)) is
856 (
857 fast Team *team
858 )
859 {
860 while (team->active != 0)
861 {
862 int guypid;
863 int status;
864
865 guypid = wait(&status);
866 if (guypid >= 0)
867 {
868 fast short unsigned guyno;
869
870 for (guyno = 0; guyno < team->size; guyno++)
871 if (guypid == team->guys[guyno].pid)
872 {
873 team->guys[guyno].pid = -1;
874 break;
875 }
876 }
877 else
878 {
879 mesg("team: no guys, believed %u left\n",team->active);
880 return true;
881 }
882
883 --team->active;
884
885 #ifdef WIFEXITED
886 /* If a guy had an error, its exit status is 2. Also catch a killed guy */
887 if ((WIFEXITED(status) && WEXITSTATUS(status) == 2) ||
888 (WIFSIGNALED(status) && WTERMSIG(status) != SIGPIPE)) {
889 guyhaderror = true;
890 }
891 #endif
892
893 if (status != 0 && team->active != 0)
894 return false;
895 }
896
897 return true;
898 }
899
TeamStop(team)900 local bool TeamStop on((team)) is
901 (
902 fast Team *team
903 )
904 {
905 fast short unsigned guyno;
906
907 Mesg(("TeamStop team %#o\n",team));
908
909 for (guyno = 0; guyno < team->size; guyno++)
910 {
911 fast Guy *guy;
912
913 guy = team->guys+guyno;
914 if (guy->pid >= 0)
915 {
916 /*kill(guy->pid,SIGKILL);*/
917 --team->active;
918 }
919 }
920
921 return team->active == 0;
922 }
923
TeamClose(team)924 local bool TeamClose on((team)) is
925 (
926 fast Team *team
927 )
928 {
929 for (team->size; team->size != 0; --team->size)
930 continue;
931
932 free(team->guys);
933
934 return true;
935 }
936
usage(noparms)937 local void usage of((noparms))
938 {
939 fprintf(stderr,"\
940 syntax: team [-[vr]] [-iI[bkm] [-oO[bkm] [N[bkm] [P]]\n\
941 copies standard input to output\n\
942 -v gives ongoing report, -r final report\n\
943 I is input volume size (default %qum)\n\
944 O is output volume size (default %qum)\n\
945 N is buffer size (default %luk)\n\
946 P is number of processes (default %u)\n\
947 (postfix b means *512, k means *1KB, m means *1MB)\n\
948 ",
949 TeamHVOLSZ>>20,TeamHVOLSZ>>20,
950 TeamDBUFSZ>>10,TeamDTEAMSZ);
951
952 exit(1);
953 /*NOTREACHED*/
954 }
955
atos(s)956 local UOFF_T atos on((s)) is
957 (
958 fast char *s
959 )
960 {
961 fast UOFF_T l;
962
963 for (
964 s, l = 0L;
965 *s >= '0' && *s <= '9';
966 s++
967 )
968 l = l*10L + (UOFF_T) (*s-'0');
969
970 if (*s == 'b') l *= (1L<<9);
971 if (*s == 'k') l *= (1L<<10);
972 if (*s == 'm') l *= (1L<<20);
973
974 return l;
975 }
976
main(argc,argv)977 global int main on((argc,argv)) is
978 (
979 int argc
980 _ char *(argv[])
981 )
982 {
983 Team team;
984 short unsigned teamsize;
985
986 address bufsize;
987 UOFF_T isize;
988 UOFF_T osize;
989 int opt;
990
991 teamsize = TeamDTEAMSZ;
992 bufsize = TeamDBUFSZ;
993 isize = TeamHVOLSZ;
994 osize = TeamHVOLSZ;
995 optind = 1;
996
997 while ((opt = getopt(argc,argv,"vri:o:")) != -1)
998 switch (opt)
999 {
1000 when 'i':
1001 isize = atos(optarg);
1002 if (isize < TeamLVOLSZ || isize > TeamHVOLSZ)
1003 {
1004 fprintf(stderr,"team: invalid input volume size %qu\n",isize);
1005 usage();
1006 }
1007
1008 when 'o':
1009 osize = atos(optarg);
1010 if (osize < TeamLVOLSZ || osize > TeamHVOLSZ)
1011 {
1012 fprintf(stderr,"team: invalid output volume size %qu\n",osize);
1013 usage();
1014 }
1015
1016 when 'v':
1017 verbose ^= 1;
1018
1019 when 'r':
1020 report ^= 1;
1021
1022 otherwise:
1023 usage();
1024 }
1025
1026 argc -= optind, argv += optind;
1027
1028 if (argc != 0)
1029 {
1030 bufsize = (address) atos(argv[0]);
1031 if (bufsize < TeamLBUFSZ || bufsize > TeamHBUFSZ)
1032 {
1033 fprintf(stderr,"team: invalid block size %u\n",
1034 bufsize);
1035 usage();
1036 }
1037 --argc, argv++;
1038 }
1039
1040 if (argc != 0)
1041 {
1042 teamsize = atoi(argv[0]);
1043 if (teamsize < 2 || teamsize > TeamHTEAMSZ)
1044 {
1045 fprintf(stderr,"team: invalid # of processes %d\n",teamsize);
1046 usage();
1047 }
1048 --argc, argv++;
1049 }
1050
1051 if (argc != 0) usage();
1052
1053 if (!TeamOpen(&team,teamsize))
1054 {
1055 mesg("team: cannot setup the team with %u guys\n",teamsize);
1056 return 1;
1057 }
1058
1059 origin = time((time_t *) 0);
1060
1061 /*
1062 * Ignore SIGPIPE in the parent as it affects the exit status reporting.
1063 */
1064 signal(SIGPIPE, SIG_IGN);
1065
1066 if (!TeamStart(&team,bufsize,isize,osize))
1067 {
1068 mesg("team: cannot start the team\n");
1069 return 1;
1070 }
1071
1072 if (!TeamWait(&team))
1073 {
1074 mesg("team: stop remaining %u guys\n",team.active);
1075
1076 if (!TeamStop(&team))
1077 {
1078 mesg("team: cannot stop the team\n");
1079 return 1;
1080 }
1081 }
1082
1083 if (!TeamClose(&team))
1084 {
1085 mesg("team: cannot close the team\n");
1086 return 1;
1087 }
1088
1089 if (guyhaderror)
1090 {
1091 mesg("team: guy had error\n");
1092 return 1;
1093 }
1094
1095 return 0;
1096 }
1097