/* $Header: /fs/mumon/aware/piercarl/Cmd./Commands./team.c,v 3.1 1994/01/13 15:03:25 piercarl Exp piercarl $ */ static char Notice[] = "Copyright 1987,1989 Piercarlo Grandi. All rights reserved."; /* This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You may have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #undef DEBUG /* Unix programs normally do synchronous read and write, that is, you read and then you write; no overlap is possible. This is especially catastrophic for device to device copies, whereit is important to minimize elapsed time, by overlapping activity on one with activity on another. To obtain this, a multiprocess structure is necessary under Unix. This program is functionally equivalento to a pipe, in that it copies its input (fd 0) to its output (fd 1) link. This programs is executed as a Team of N processes, called Guys, all of which share the same input and output links; the first reads a chunk of input, awakens the second, writes the chunk to its output; the second does the same, and the last awakens the first. Since this process is essentially cyclic, we use a ring of pipes to synchronize the Guys. Each guy has un input pipe from the upstream guy and an output pipe to the downstream guy. Whenever a guy receives a READ command from the upstream, it first reads a block and then passes on the READ command downstream; it then waits for a WRITE command from upstream, and then writes the block and after that passes the WRITE command downstream. A count of how much has been processed is also passwd along, for statistics and verification. Two other commands are used, one is STOP, and is sent downstream from the guy that detects the end of file of the input, after which the guy exits, and ABORT, which is sent downstream from the guy which detects trouble in the guy upstream to it, which has much the same effect. */ #define UOFF_T uint64_t #define TeamLVOLSZ (UOFF_T)(1L<<10) #define TeamHVOLSZ ((UOFF_T) 3 * ((UOFF_T) 1 << 62)) #define TeamLBUFSZ (64) /* Low buffer size */ #define TeamDBUFSZ (60*512) /* Default buffer size */ #define TeamHBUFSZ (1L<<26) /* High buffer size */ #define TeamDTEAMSZ 4 /* Default # of processes */ #define TeamHTEAMSZ 16 /* High # of processes */ /* External components... Probably the only system dependent part of this program, as some systems have something in /usr/include/sys where others have it in /usr/include. Also, the mesg() procedure is highly system dependent... watch out for locking and variable number of arguments. */ #include #include #include #include #include #include #include #include #include #include #include /* for lseek() */ #ifdef HAVE_WAIT_H #include #endif #ifdef HAVE_PARAM_H #include #endif #ifdef sun # undef F_SETLKW #endif #ifdef __FreeBSD__ # undef F_SETLKW #endif #if (PCG) # include "Extend.h" # include "Here.h" # include "Type.h" #else # define call (void) # define were if # define fast register # define global /* extern */ # define local static # define when break; case # define otherwise break; default # define mode(which,name) typedef which name name; which name # define bool int # define true 1 # define false 0 # define nil(type) ((type) 0) # define scalar int typedef char *pointer; # if (defined(SMALL_M)) typedef unsigned address; # else typedef long address; # endif # if (__STDC__) # define of(list) list # define on(list) # define is(list) (list) # define _ , # define noparms void # else # define void int # define const /* const */ # define of(list) () # define on(list) list # define is(list) list; # define _ ; # define noparms # endif #endif #ifdef DEBUG # define Mesg(list) mesg list #else # define Mesg(list) #endif /*VARARGS1*/ mesg(a,b,c,d,e,f,g,h,i) char *a; int b,c,d,e,f,g,h,i; { # if (defined F_SETLKW) struct flock l; l.l_whence = 0; l.l_start = 0L; l.l_len = 0L; l.l_type = F_WRLCK; fcntl(fileno(stderr),F_SETLKW,&l); # endif # if (defined LOCK_EX) flock(fileno(stderr),LOCK_EX); # endif fprintf(stderr,a,b,c,d,e,f,g,h,i); # if (defined LOCK_EX) flock(fileno(stderr),LOCK_UN); # endif # if (defined F_SETLKW) l.l_type = F_UNLCK; fcntl(fileno(stderr),F_SETLKW,&l); # endif } local bool verbose = false; local bool report = true; local bool guyhaderror = false; local time_t origin; extern char *optarg; extern int optind; /* The regular Unix read and write calls are not guaranteed to process all the bytes requested. These procedures guarantee that if the request is for N bytes, all of them are read or written unless there is an error or eof. */ #define FdCLOSED 0 #define FdOPEN 1 #define FdEOF 2 #define FdERROR 3 mode(struct,Fd) { int fd; short status; UOFF_T size; }; local Fd FdIn,FdOut; local bool FdOpen on((fd,ffd,size)) is ( fast Fd *fd _ int ffd _ UOFF_T size ) { fd->status = (ffd >= 0) ? FdOPEN : FdCLOSED; fd->fd = ffd; fd->size = size; Mesg(("FdOpen fd %d\n",ffd)); return ffd >= 0; } local bool FdClose on((fd)) is ( fast Fd *fd ) { int ffd; ffd = fd->fd; Mesg(("FdClose fd %d\n",fd->fd)); fd->status = FdCLOSED; fd->fd = -1; return close(ffd) >= 0; } local bool FdCopy on((to,from)) is ( fast Fd *to _ fast Fd *from ) { to->size = from->size; to->status = from->status; to->fd = dup(from->fd); Mesg(("FdCopy of %d is %d\n",from->fd,to->fd)); return to->fd >= 0; } local void FdSet on((to,from)) is ( fast Fd *to _ fast Fd *from ) { if (from->fd < 0) mesg("team: set an invalid fd\n"); to->size = from->size; to->status = from->status; to->fd = from->fd; } local UOFF_T FdRetry on((fd,which,done,space)) is ( fast Fd *fd _ char *which _ UOFF_T done _ UOFF_T space ) { int tty; char reply[2]; struct stat st; if (fstat(fd->fd,&st) < 0) { perror(which); return 0; } st.st_mode &= S_IFMT; if (st.st_mode != S_IFCHR && st.st_mode != S_IFBLK) return 0; if (!isatty(fileno(stderr))) return 0; if ((tty = open("/dev/tty",0)) < 0) { perror("/dev/tty"); return 0; } do { #if (defined i386 || defined sun) # if !(defined(BSD) && (BSD >= 199306)) extern char *(sys_errlist[]); # endif # if (defined(BSD) && (BSD >= 199306)) && __STDC__ const # endif char *errmsg = sys_errlist[errno]; #else char errmsg[32]; (void) sprintf(errmsg,"Error %d",errno); #endif if (errno) mesg("'%s' on %s after %quk. Continue [cyn] ? ",errmsg,which,done>>10); else mesg("EOF on %s after %quk. Continue [cyn] ? ",which,done>>10); read(tty,reply,sizeof reply); } while (strchr("cCyYnN",reply[0]) == 0); call close(tty); if (strchr("nN",reply[0]) != 0) return 0L; errno = 0; if (strchr("cC",reply[0]) != 0) { call lseek(fd->fd,0L,0); return fd->size; } return space; } local unsigned FdCanDo on((remaining,available)) is ( fast address remaining _ fast UOFF_T available ) { return (remaining < available) ? (unsigned) remaining : (unsigned) available; } local address FdRead on((fd,buffer,todo,done)) is ( fast Fd *fd _ pointer buffer _ fast address todo _ UOFF_T done ) { fast UOFF_T space; fast int bytesRead; fast address justDone; switch (fd->status) { when FdEOF: return 0; when FdERROR: return -1; when FdCLOSED: return -1; when FdOPEN: space = fd->size - done%fd->size; for (justDone = 0; space != 0L && justDone < todo;) { bytesRead = read(fd->fd,buffer+justDone, FdCanDo(todo-justDone,space-justDone)); if (bytesRead <= 0 || (justDone += bytesRead) == space) space = FdRetry(fd,"input",done+justDone,space-justDone); } if (bytesRead == 0) fd->status = FdEOF; if (bytesRead < 0) fd->status = FdERROR; Mesg(("FdRead %d reads %d last %d\n",fd->fd,justDone,bytesRead)); return (justDone == 0) ? bytesRead : justDone; } /*NOTREACHED*/ } local address FdWrite on((fd,buffer,todo,done)) is ( fast Fd *fd _ pointer buffer _ fast address todo _ UOFF_T done ) { fast UOFF_T space; fast int bytesWritten; fast address justDone; switch (fd->status) { when FdEOF: return 0; when FdERROR: return -1; when FdCLOSED: return -1; when FdOPEN: space = fd->size - done%fd->size; for (justDone = 0; space != 0L && justDone < todo;) { bytesWritten = write(fd->fd,buffer+justDone, FdCanDo(todo-justDone,space-justDone)); if (bytesWritten <= 0 || (justDone += bytesWritten) == space) space = FdRetry(fd,"output",done+justDone,space-justDone); } Mesg(("FdWrite %d writes %d last %d\n",fd->fd,justDone,bytesWritten)); if (bytesWritten == 0) fd->status = FdEOF; if (bytesWritten < 0) fd->status = FdERROR; return (justDone == 0) ? bytesWritten : justDone; } /*NOTREACHED*/ } /* A Token is scalar value representing a command. */ typedef short scalar Token; #define TokenREAD 0 #define TokenWRITE 1 #define TokenSTOP 2 #define TokenABORT -1 /* Here we represent Streams as Fds; this is is not entirely appropriate, as Fds have also a volume size, and relatively high overhead write and read functions. Well, we just take some liberties with abstraction levels here. Actually we should have an Fd abstraction for stream pipes and a Vol abstraction for input and output... */ local bool StreamPipe on((downstream,upstream)) is ( fast Fd *downstream _ fast Fd *upstream ) { int links[2]; if (pipe(links) < 0) { perror("team: opening links"); return false; } Mesg(("StreamPipe fd downstream %d upstream %d\n",links[1],links[0])); return FdOpen(downstream,links[1],TeamHVOLSZ) && FdOpen(upstream,links[0],TeamHVOLSZ); } mode(struct,StreamMsg) { Token token; short status; UOFF_T done; }; local bool StreamSend on((fd,token,status,done)) is ( fast Fd *fd _ Token token _ short status _ UOFF_T done ) { fast int n; StreamMsg message; message.token = token; message.status = status; message.done = done; n = write(fd->fd,(pointer) &message,(unsigned) sizeof message); Mesg(("StreamSend fd %u n %d token %d\n",fd->fd,n,token)); return n == sizeof message; } local bool StreamReceive on((fd,tokenp,statusp,donep)) is ( fast Fd *fd _ Token *tokenp _ short *statusp _ UOFF_T *donep ) { fast int n; StreamMsg message; n = read(fd->fd,(pointer) &message,(unsigned) sizeof message); *tokenp = message.token; *statusp = message.status; *donep = message.done; Mesg(("StreamReceive fd %u n %d token %d\n",fd->fd,n,*tokenp)); return n == sizeof message; } /* A guy is an instance of the input to output copier. It is attached to a relay station, with an upstream link, from which commands arrive, and a downward link, to which they are relayed once they are executed. */ mode(struct,Guy) { int pid; Fd upStream; Fd downStream; }; local bool GuyOpen on((guy,pid,upstream,downstream)) is ( fast Guy *guy _ int pid _ Fd *upstream _ Fd *downstream ) { Mesg(("GuyOpen pid %u upstream %u downstream %u\n", pid,upstream->fd,downstream->fd)); guy->pid = pid; FdSet(&guy->upStream,upstream); FdSet(&guy->downStream,downstream); return true; } #define GuySEND(guy,token,status,done) \ StreamSend(&guy->downStream,token,status,done) #define GuyRECEIVE(guy,tokenp,statusp,donep) \ StreamReceive(&guy->upStream,tokenp,statusp,donep) local bool GuyStop of((Guy *,char *,UOFF_T)); local bool GuyStart on((guy,bufsize)) is ( fast Guy *guy _ address bufsize ) { fast char *buffer; Token token; short status; UOFF_T done; bool received; static int bytesRead,bytesWritten; Mesg(("GuyStart guy %#o bufsize %u\n",guy,bufsize)); buffer = (pointer) malloc((unsigned) bufsize); if (buffer == nil(pointer)) { mesg("team: guy %d cannot allocate %u bytes\n", guy->pid,bufsize); return false; } while ((received = GuyRECEIVE(guy,&token,&status,&done)) && token != TokenSTOP) switch (token) { when TokenREAD: FdIn.status = status; Mesg(("GuyStart reading %d chars\n",bufsize)); bytesRead = FdRead(&FdIn,(pointer) buffer,bufsize,done); Mesg(("GuyStart reads %d chars\n",bytesRead)); if (bytesRead == 0) GuyStop(guy,nil(char *),done); if (bytesRead < 0) GuyStop(guy,"error on guy read",done); done += bytesRead; if (verbose) mesg("%quk read \r",done>>10); if (!GuySEND(guy,TokenREAD,FdIn.status,done)) GuyStop(guy,"guy cannot send READ",done); when TokenWRITE: FdOut.status = status; Mesg(("GuyStart writing %d chars\n",bytesRead)); bytesWritten = FdWrite(&FdOut,(pointer) buffer,(address) bytesRead,done); Mesg(("GuyStart writes %d chars\n",bytesWritten)); if (bytesWritten == 0) GuyStop(guy,"eof on guy write",done); if (bytesWritten < 0) GuyStop(guy,"error on guy write",done); done += bytesWritten; if (verbose) mesg("%quk written\r",done>>10); if (!GuySEND(guy,TokenWRITE,FdOut.status,done)) GuyStop(guy,"guy cannot send WRITE",done); when TokenABORT: GuyStop(guy,"guy was aborted",0L); otherwise: GuyStop(guy,"impossible token on ring",done); } /* free((char *) buffer); */ GuyStop(guy,(received) ? nil(char *) : "error on upstream receive",0L); /*NOTREACHED*/ /*return true;*/ } local bool GuyStop on((guy,errormsg,done)) is ( fast Guy *guy _ char *errormsg _ UOFF_T done ) { Mesg(("GuyStop guy %#o\n",guy)); if (done) { if (report) mesg("%qu kilobytes, %lu seconds\r\n", done>>10,(UOFF_T) (time((time_t *) 0)-origin)); else if (verbose) mesg("\n"); } if (errormsg != nil(char *)) { mesg("team: guy pid %u: %s\n",guy->pid,errormsg); call GuySEND(guy,TokenABORT,FdERROR,0L); exit(2); /*NOTREACHED*/ } if (!GuySEND(guy,TokenSTOP,FdEOF,0L)) { exit(1); /*NOTREACHED*/ } exit(0); /*NOTREACHED*/ } local bool GuyClose on((guy)) is ( fast Guy *guy ) { return FdClose(&guy->upStream) && FdClose(&guy->downStream); } /* A team is made up of a ring of guys; each guy copies a blockfrom its input to its ouput, and is driven by tokens sent to it by the previous guy on a pipe. */ mode(struct,Team) { Guy *guys; short unsigned size; short unsigned active; }; local bool TeamOpen on((team,nominalsize)) is ( Team *team _ short unsigned nominalsize ) { Mesg(("TeamOpen nominalsize %u\n",nominalsize)); team->size = 0; team->active = 0; team->guys = (Guy *) calloc(sizeof (Guy),nominalsize); for (team->size = 0; team->size < nominalsize; team->size++); were (team->guys == nil(Guy *)) return false; return true; } local bool TeamStart on((team,bufsize,isize,osize)) is ( fast Team *team _ address bufsize _ UOFF_T isize _ UOFF_T osize ) { /* When generating each guy, we pass it an upstream link that is the downstream of the previous guy, and create a new downstream link that will be the next upstream. At each turn we obviously close the old downstream once it has been passed to the forked guy. A special case are the first and last guys; the upstreamof the first guy shall be the downstream of the last. This goes against the grain of our main logic, where the upstream is expected to already exist and the downstream must be created. This means that the last and first guys are created in a special way. When creating the first guy we shall create its upstreamlink as well as its downstream, and we shall save that in a special variable, last_downstream. This we shall use as the downstreamof the last guy. We shall also keep it open in the team manager (parent process) because we shall use it to do the initial send of the read and write tokens that will circulate in the relay ring, activating the guys. Of course because of this each guy will inherit this link as wellas its upstream and downstream, but shall graciously close it. */ Fd last_downstream; Fd this_upstream; Fd this_downstream; Fd next_upstream; Mesg(("TeamStart team %#o size %u bufsize %u\n", team,team->size,bufsize)); call FdOpen(&FdIn,0,isize); call FdOpen(&FdOut,1,osize); for (team->active = 0; team->active < team->size; team->active++) { fast Guy *guy; fast int pid; guy = team->guys+team->active; if (team->active == 0) { if (!StreamPipe(&last_downstream,&this_upstream)) { perror("cannot open first link"); return false; } if (!StreamPipe(&this_downstream,&next_upstream)) { perror("cannot open link"); return false; } } else if (team->active < (team->size-1)) { if (!StreamPipe(&this_downstream,&next_upstream)) { perror("cannot open link"); return false; } } else /*if (team->active == team->size-1)*/ { FdSet(&this_downstream,&last_downstream); if (!FdCopy(&last_downstream,&this_downstream)) perror("team: cannot copy last downstream"); } Mesg(("TeamStart going to fork for guy %#o\n",guy)); pid = fork(); if (pid > 0) { Mesg(("TeamStart forked guy %#o as pid %u\n",guy,pid)); guy->pid = pid; if (!FdClose(&this_upstream)) perror("cannot close this upstream link"); if (!FdClose(&this_downstream)) perror("cannot close this downstream link"); FdSet(&this_upstream,&next_upstream); } else if (pid == 0) { pid = getpid(); /* Set SIGPIPE handling back to the default in the guys */ signal(SIGPIPE, SIG_DFL); if (!FdClose(&last_downstream)) perror("cannot close inherited first link"); if (!GuyOpen(guy,pid,&this_upstream,&this_downstream)) GuyStop(guy,"cannot open guy",0L); if (!GuyStart(guy,bufsize)) GuyStop(guy,"cannot start guy",0L); if (!GuyClose(guy)) perror("cannot close guy"); /*NOTREACHED*/ } else if (pid < 0) { perror("team: forking a guy"); return false; } } if (!StreamSend(&last_downstream,TokenREAD,FdOPEN,0L) && errno != EPIPE) { perror("cannot send first READ token"); return false; } if (!StreamSend(&last_downstream,TokenWRITE,FdOPEN,0L) && errno != EPIPE) { perror("cannot send first WRITE token"); return false; } if (!FdClose(&last_downstream)) perror("cannot close first link"); return true; } local bool TeamWait on((team)) is ( fast Team *team ) { while (team->active != 0) { int guypid; int status; guypid = wait(&status); if (guypid >= 0) { fast short unsigned guyno; for (guyno = 0; guyno < team->size; guyno++) if (guypid == team->guys[guyno].pid) { team->guys[guyno].pid = -1; break; } } else { mesg("team: no guys, believed %u left\n",team->active); return true; } --team->active; #ifdef WIFEXITED /* If a guy had an error, its exit status is 2. Also catch a killed guy */ if ((WIFEXITED(status) && WEXITSTATUS(status) == 2) || (WIFSIGNALED(status) && WTERMSIG(status) != SIGPIPE)) { guyhaderror = true; } #endif if (status != 0 && team->active != 0) return false; } return true; } local bool TeamStop on((team)) is ( fast Team *team ) { fast short unsigned guyno; Mesg(("TeamStop team %#o\n",team)); for (guyno = 0; guyno < team->size; guyno++) { fast Guy *guy; guy = team->guys+guyno; if (guy->pid >= 0) { /*kill(guy->pid,SIGKILL);*/ --team->active; } } return team->active == 0; } local bool TeamClose on((team)) is ( fast Team *team ) { for (team->size; team->size != 0; --team->size) continue; free(team->guys); return true; } local void usage of((noparms)) { fprintf(stderr,"\ syntax: team [-[vr]] [-iI[bkm] [-oO[bkm] [N[bkm] [P]]\n\ copies standard input to output\n\ -v gives ongoing report, -r final report\n\ I is input volume size (default %qum)\n\ O is output volume size (default %qum)\n\ N is buffer size (default %luk)\n\ P is number of processes (default %u)\n\ (postfix b means *512, k means *1KB, m means *1MB)\n\ ", TeamHVOLSZ>>20,TeamHVOLSZ>>20, TeamDBUFSZ>>10,TeamDTEAMSZ); exit(1); /*NOTREACHED*/ } local UOFF_T atos on((s)) is ( fast char *s ) { fast UOFF_T l; for ( s, l = 0L; *s >= '0' && *s <= '9'; s++ ) l = l*10L + (UOFF_T) (*s-'0'); if (*s == 'b') l *= (1L<<9); if (*s == 'k') l *= (1L<<10); if (*s == 'm') l *= (1L<<20); return l; } global int main on((argc,argv)) is ( int argc _ char *(argv[]) ) { Team team; short unsigned teamsize; address bufsize; UOFF_T isize; UOFF_T osize; int opt; teamsize = TeamDTEAMSZ; bufsize = TeamDBUFSZ; isize = TeamHVOLSZ; osize = TeamHVOLSZ; optind = 1; while ((opt = getopt(argc,argv,"vri:o:")) != -1) switch (opt) { when 'i': isize = atos(optarg); if (isize < TeamLVOLSZ || isize > TeamHVOLSZ) { fprintf(stderr,"team: invalid input volume size %qu\n",isize); usage(); } when 'o': osize = atos(optarg); if (osize < TeamLVOLSZ || osize > TeamHVOLSZ) { fprintf(stderr,"team: invalid output volume size %qu\n",osize); usage(); } when 'v': verbose ^= 1; when 'r': report ^= 1; otherwise: usage(); } argc -= optind, argv += optind; if (argc != 0) { bufsize = (address) atos(argv[0]); if (bufsize < TeamLBUFSZ || bufsize > TeamHBUFSZ) { fprintf(stderr,"team: invalid block size %u\n", bufsize); usage(); } --argc, argv++; } if (argc != 0) { teamsize = atoi(argv[0]); if (teamsize < 2 || teamsize > TeamHTEAMSZ) { fprintf(stderr,"team: invalid # of processes %d\n",teamsize); usage(); } --argc, argv++; } if (argc != 0) usage(); if (!TeamOpen(&team,teamsize)) { mesg("team: cannot setup the team with %u guys\n",teamsize); return 1; } origin = time((time_t *) 0); /* * Ignore SIGPIPE in the parent as it affects the exit status reporting. */ signal(SIGPIPE, SIG_IGN); if (!TeamStart(&team,bufsize,isize,osize)) { mesg("team: cannot start the team\n"); return 1; } if (!TeamWait(&team)) { mesg("team: stop remaining %u guys\n",team.active); if (!TeamStop(&team)) { mesg("team: cannot stop the team\n"); return 1; } } if (!TeamClose(&team)) { mesg("team: cannot close the team\n"); return 1; } if (guyhaderror) { mesg("team: guy had error\n"); return 1; } return 0; }