1 /*===========================================================================*
2  * parallel.c							             *
3  *									     *
4  *	Procedures to make encoder run in parallel			     *
5  *									     *
6  * EXPORTED PROCEDURES:							     *
7  *	StartIOServer							     *
8  *	StartCombineServer						     *
9  *	StartDecodeServer						     *
10  *	SendRemoteFrame							     *
11  *	GetRemoteFrame							     *
12  *	StartMasterServer						     *
13  *	NotifyMasterDone						     *
14  *									     *
15  *===========================================================================*/
16 
17 /*
18  * Copyright (c) 1995 The Regents of the University of California.
19  * All rights reserved.
20  *
21  * Permission to use, copy, modify, and distribute this software and its
22  * documentation for any purpose, without fee, and without written agreement is
23  * hereby granted, provided that the above copyright notice and the following
24  * two paragraphs appear in all copies of this software.
25  *
26  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
27  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
28  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
29  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30  *
31  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
32  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
33  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
34  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
35  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
36  */
37 
38 /*
39  *  $Header$
40  *  $Log$
41  *  Revision 1.4  2004/04/02 15:12:40  rwcox
42  *  Cput
43  *
44  *  Revision 1.3  2003/12/23 13:50:08  rwcox
45  *  Cput
46  *
47  *  Revision 1.2  2003/12/03 14:46:14  rwcox
48  *  Cput
49  *
50  *  Revision 1.1  2001/12/17 16:11:55  rwcox
51  *  Cadd
52  *
53  *  Revision 1.9  1995/08/16 18:22:08  smoot
54  *  indents
55  *
56  *  Revision 1.8  1995/08/14 22:30:20  smoot
57  *  added safe_fork to allow us to kill kids when we die.
58  *
59  *  Revision 1.7  1995/08/07 21:46:14  smoot
60  *  spawns the same encoder as it is for combine, etc.
61  *  uses new pattern tables to determine frame types
62  *
63  *  Revision 1.6  1995/06/21 18:32:14  smoot
64  *  Defined SOMAXCONN when not (LINUX)
65  *  added binary r/w (DOsS!)
66  *  ANSIified bcopy call
67  *
68  * Revision 1.5  1995/01/19  23:09:00  eyhung
69  * Changed copyrights
70  *
71  * Revision 1.4  1994/03/15  00:27:11  keving
72  * nothing
73  *
74  * Revision 1.3  1993/12/22  19:19:01  keving
75  * nothing
76  *
77  * Revision 1.2  1993/07/22  22:23:43  keving
78  * nothing
79  *
80  * Revision 1.1  1993/06/30  20:06:09  keving
81  * nothing
82  *
83  */
84 
85 
86 /*==============*
87  * HEADER FILES *
88  *==============*/
89 
90 #include <sys/types.h>
91 #include <sys/socket.h>
92 #include <sys/times.h>
93 #include <time.h>
94 #include <netinet/in.h>
95 #include <unistd.h>
96 #include <netdb.h>
97 #include <errno.h>
98 #include <string.h>
99 #include <signal.h>
100 #include "all.h"
101 #include "param.h"
102 #include "mpeg.h"
103 #include "prototypes.h"
104 #include "parallel.h"
105 #include "readframe.h"
106 #include "fsize.h"
107 #include "combine.h"
108 #include "frames.h"
109 
110 
111 #define MAX_IO_SERVERS	10
112 #ifndef SOMAXCONN
113 #define SOMAXCONN 5
114 #endif
115 
116 /*==================*
117  * CONSTANTS        *
118  *==================*/
119 
120 #define	TERMINATE_PID_SIGNAL	SIGTERM	 /* signal used to terminate forked childs */
121 #ifndef MAXARGS
122 #define	MAXARGS		1024   /* Max Number of arguments in safe_fork command */
123 #endif
124 
125 /*==================*
126  * STATIC VARIABLES *
127  *==================*/
128 
129 static int32   diffTime;
130 static char	rsh[256];
131 static struct hostent *hostEntry = NULL;
132 static boolean	*frameDone;
133 static int	outputServerSocket;
134 static int	decodeServerSocket;
135 static boolean	parallelPerfect = FALSE;
136 static	int	current_max_forked_pid=0;
137 
138 
139 /*==================*
140  * GLOBAL VARIABLES *
141  *==================*/
142 
143 extern int yuvHeight, yuvWidth;
144 extern	time_t  timeStart, timeEnd;
145 extern char	statFileName[256];
146 extern FILE *statFile;
147 extern boolean  debugMachines;
148 extern boolean debugSockets;
149 int parallelTestFrames = 10;
150 int parallelTimeChunks = 60;
151 char *IOhostName;
152 int ioPortNumber;
153 int combinePortNumber;
154 int decodePortNumber;
155 boolean	niceProcesses = FALSE;
156 boolean	forceIalign = FALSE;
157 int	    machineNumber = -1;
158 boolean	remoteIO = FALSE;
159 boolean	separateConversion;
160 time_t	IOtime = 0;
161 extern char encoder_name[];
162 int     ClientPid[MAX_MACHINES+4];
163 
164 /*===============================*
165  * INTERNAL PROCEDURE prototypes *
166  *===============================*/
167 
168 static void	TransmitPortNum _ANSI_ARGS_((char *hostName, int portNum,
169 					       int ioPortNum));
170 static void	EndIOServer _ANSI_ARGS_((void));
171 static void SafeRead _ANSI_ARGS_((int fd, char *buf, int nbyte));
172 static void SafeWrite _ANSI_ARGS_((int fd, char *buf, int nbyte));
173 static int  CreateListeningSocket _ANSI_ARGS_((int *portNumber));
174 static int  ConnectToSocket _ANSI_ARGS_((char *machineName, int portNum,
175 					 struct hostent **hostEnt));
176 static int safe_fork _ANSI_ARGS_((char *command));
177 void cleanup_fork _ANSI_ARGS_ ((int dummy));
178 
179 
180 /*=====================*
181  * EXPORTED PROCEDURES *
182  *=====================*/
183 
184 			/*=================*
185 			 * IO SERVER STUFF *
186 			 *=================*/
187 
188 
189 /*===========================================================================*
190  *
191  * SetIOConvert
192  *
193  *	sets the IO conversion to be separate or not.  If separate, then
194  *	some post-processing is done at slave end
195  *
196  * RETURNS:	nothing
197  *
198  * SIDE EFFECTS:    none
199  *
200  *===========================================================================*/
201 void
SetIOConvert(separate)202 SetIOConvert(separate)
203     boolean separate;
204 {
205     separateConversion = separate;
206 }
207 
208 
209 /*===========================================================================*
210  *
211  * SetParallelPerfect
212  *
213  *	If this is called, then frames will be divided up completely, and
214  *	evenly (modulo rounding) between all the processors
215  *
216  * RETURNS:	nothing
217  *
218  * SIDE EFFECTS:    Sets parallelPerfect ....
219  *
220  *===========================================================================*/
221 void
SetParallelPerfect(val)222 SetParallelPerfect(val)
223 boolean val;
224 {
225     parallelPerfect = val;
226 }
227 
228 
229 /*===========================================================================*
230  *
231  * SetRemoteShell
232  *
233  *	sets the remote shell program (usually rsh, but different on some
234  *	machines)
235  *
236  * RETURNS:	nothing
237  *
238  * SIDE EFFECTS:    none
239  *
240  *===========================================================================*/
241 void
SetRemoteShell(shell)242 SetRemoteShell(shell)
243     char *shell;
244 {
245     strcpy(rsh, shell);
246 }
247 
248 
249 /*===========================================================================*
250  *
251  * StartIOServer
252  *
253  *	start-up the IOServer with this process
254  *	handles slave requests for frames, and exits when master tells it to
255  *
256  * RETURNS:	nothing
257  *
258  * SIDE EFFECTS:    none
259  *
260  *===========================================================================*/
261 void
StartIOServer(numInputFiles,parallelHostName,portNum)262   StartIOServer(numInputFiles, parallelHostName, portNum)
263 int numInputFiles;
264 char *parallelHostName;
265 int portNum;
266 {
267   int	    ioPortNum;
268   int	    serverSocket;
269   int	    otherSock, otherSize;
270   struct sockaddr otherSocket;
271   int32   buffer[8];
272   boolean	done = FALSE;
273   int	    frameNumber;
274   MpegFrame *frame;
275   register int y;
276   int	    numBytes;
277   unsigned char   *bigBuffer;
278   unsigned char   smallBuffer[1000];
279   int	    bigBufferSize;
280   FILE    *filePtr;
281   uint32  data;
282   char    inputFileName[1024];
283   char    fileName[1024];
284 
285   bigBufferSize = 0;
286   bigBuffer = NULL;
287 
288   /* once we get IO port num, should transmit it to parallel server */
289 
290   serverSocket = CreateListeningSocket(&ioPortNum);
291 
292   if ( debugSockets ) {
293     fprintf(stdout, "====I/O USING PORT %d\n", ioPortNum);
294   }
295 
296   TransmitPortNum(parallelHostName, portNum, ioPortNum);
297 
298   otherSize = sizeof(otherSocket);
299 
300   if ( separateConversion ) {
301     SetFileType(ioConversion);	/* for reading */
302   } else {
303     SetFileType(inputConversion);
304   }
305 
306   /* now, wait until get done signal */
307   while ( ! done ) {
308     otherSock = accept(serverSocket, &otherSocket, &otherSize);
309     if ( otherSock == -1 ) {
310       fprintf(stderr, "ERROR:  I/O SERVER accept returned error %d\n", errno);
311       exit(1);
312     }
313 
314     SafeRead(otherSock, (char *)buffer, 4);
315     frameNumber = ntohl(buffer[0]);
316 
317     if ( frameNumber == -1 ) {
318       done = TRUE;
319     } else if ( frameNumber == -2 ) {
320       /* decoded frame to be output to disk */
321       SafeRead(otherSock, (char *)buffer, 4);
322       frameNumber = ntohl(buffer[0]);
323 
324       if ( debugSockets ) {
325 	fprintf(stdout, "INPUT SERVER:  GETTING DECODED FRAME %d\n", frameNumber);
326 	fflush(stdout);
327       }
328 
329       /* should read frame from socket, then write to disk */
330       frame = Frame_New(frameNumber, 'i');
331 
332       Frame_AllocDecoded(frame, TRUE);
333 
334       for ( y = 0; y < Fsize_y; y++ ) {
335 	SafeRead(otherSock, (char *)frame->decoded_y[y], Fsize_x);
336       }
337 
338       for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
339 	SafeRead(otherSock, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
340       }
341 
342       for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
343 	SafeRead(otherSock, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
344       }
345 
346       /* now output to disk */
347       WriteDecodedFrame(frame);
348 
349       Frame_Free(frame);
350     } else if ( frameNumber == -3 ) {
351       /* request for decoded frame from disk */
352       SafeRead(otherSock, (char *)buffer, 4);
353       frameNumber = ntohl(buffer[0]);
354 
355       if ( debugSockets ) {
356 	fprintf(stdout, "INPUT SERVER:  READING DECODED FRAME %d from DISK\n", frameNumber);
357 	fflush(stdout);
358       }
359 
360       /* should read frame from disk, then write to socket */
361       frame = Frame_New(frameNumber, 'i');
362 
363       Frame_AllocDecoded(frame, TRUE);
364 
365       ReadDecodedRefFrame(frame, frameNumber);
366 
367       /* now write to socket */
368       for ( y = 0; y < Fsize_y; y++ ) {
369 	SafeWrite(otherSock, (char *)frame->decoded_y[y], Fsize_x);
370       }
371 
372       for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
373 	SafeWrite(otherSock, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
374       }
375 
376       for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
377 	SafeWrite(otherSock, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
378       }
379 
380       Frame_Free(frame);
381     } else if ( frameNumber == -4 ) {
382       /* routing output frame from socket to disk */
383       SafeRead(otherSock, (char *)buffer, 8);
384       frameNumber = buffer[0];
385       frameNumber = ntohl(frameNumber);
386 
387       /* read in number of bytes */
388       numBytes = buffer[1];
389       numBytes = ntohl(numBytes);
390 
391       /* make sure buffer is big enough for data */
392       if ( numBytes > bigBufferSize ) {
393 	bigBufferSize = numBytes;
394 	if ( bigBuffer != NULL ) {
395 	  free(bigBuffer);
396 	}
397 
398 	bigBuffer = (unsigned char *) malloc(bigBufferSize*
399 					     sizeof(unsigned char));
400       }
401 
402       /* now read in the bytes */
403       SafeRead(otherSock, (char *) bigBuffer, numBytes);
404 
405       /* open file to output this stuff to */
406       sprintf(fileName, "%s.frame.%d", outputFileName, frameNumber);
407       if ( (filePtr = fopen(fileName, "wb")) == NULL ) {
408 	fprintf(stderr, "ERROR:  Could not open output file(3):  %s\n",
409 		fileName);
410 	exit(1);
411       }
412 
413       /* now write the bytes here */
414       fwrite(bigBuffer, sizeof(char), numBytes, filePtr);
415 
416       fclose(filePtr);
417 
418       if ( debugSockets ) {
419 	fprintf(stdout, "====I/O SERVER:  WROTE FRAME %d to disk\n",
420 		frameNumber);
421 	fflush(stdout);
422       }
423     } else {
424       if ( debugSockets ) {
425 	fprintf(stdout, "I/O SERVER GETTING FRAME %d\n", frameNumber);
426 	fflush(stdout);
427       }
428 
429       /* should read in frame, then write to socket */
430       frame = Frame_New(frameNumber, 'i');
431 
432       if ( separateConversion ) {
433 	GetNthInputFileName(inputFileName, frameNumber);
434 
435 	/* do conversion and send right to the socket */
436 	filePtr = ReadIOConvert(inputFileName);
437 	do {
438 	  numBytes = fread(smallBuffer, 1, 1000, filePtr);
439 
440 	  if ( numBytes > 0 ) {
441 	    data = numBytes;
442 	    data = htonl(data);
443 	    SafeWrite(otherSock, (char *)&data, 4);
444 	    SafeWrite(otherSock, (char *)smallBuffer, numBytes);
445 	  }
446 	}
447 	while ( numBytes == 1000 );
448 
449 	if ( strcmp(ioConversion, "*") == 0 ) {
450 	  fclose(filePtr);
451 	} else {
452 	  pclose(filePtr);
453 	}
454       } else {
455 	GetNthInputFileName(inputFileName, frameNumber);
456 	ReadFrame(frame, inputFileName, inputConversion, TRUE);
457 
458 	/* should now transmit yuv values */
459 	for (y = 0; y < yuvHeight; y++) { /* Y */
460 	  SafeWrite(otherSock, (char *)frame->orig_y[y], yuvWidth);
461 	}
462 
463 	for (y = 0; y < (yuvHeight >> 1); y++) { /* U */
464 	  SafeWrite(otherSock, (char *)frame->orig_cb[y], yuvWidth >> 1);
465 	}
466 
467 	for (y = 0; y < (yuvHeight >> 1); y++) { /* V */
468 	  SafeWrite(otherSock, (char *)frame->orig_cr[y], yuvWidth >> 1);
469 	}
470 
471 	/* now, make sure we don't leave until other processor read everything */
472 
473 	SafeRead(otherSock, (char *)buffer, 4);
474 	/* should = 0 */
475       }
476 
477       if ( debugSockets ) {
478 	fprintf(stdout, "====I/O SERVER:  READ FRAME %d\n",
479 		frameNumber);
480       }
481 
482       Frame_Free(frame);
483     }
484 
485     close(otherSock);
486   }
487 
488   close(serverSocket);
489 
490   if ( debugSockets ) {
491     fprintf(stdout, "====I/O SERVER:  Shutting Down\n");
492   }
493 }
494 
495 
496 /*===========================================================================*
497  *
498  * SendRemoteFrame
499  *
500  *	called by a slave to the I/O server; sends an encoded frame
501  *	to the server to be sent to disk
502  *
503  * RETURNS:	nothing
504  *
505  * SIDE EFFECTS:    none
506  *
507  *===========================================================================*/
508 void
SendRemoteFrame(frameNumber,bb)509 SendRemoteFrame(frameNumber, bb)
510     int frameNumber;
511     BitBucket *bb;
512 {
513     int	clientSocket;
514     u_long  data;
515     int	    negativeFour = -4;
516     time_t  tempTimeStart, tempTimeEnd;
517 
518     time(&tempTimeStart);
519 
520     clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
521 
522     data = htonl(negativeFour);
523     SafeWrite(clientSocket, (char *)&data, 4);
524 
525     data = htonl(frameNumber);
526     SafeWrite(clientSocket, (char *)&data, 4);
527 
528     if ( frameNumber != -1 ) {
529 	/* send number of bytes */
530 	data = (bb->totalbits+7)>>3;
531 	data = htonl(data);
532 	SafeWrite(clientSocket, (char *)&data, 4);
533 
534 	/* now send the bytes themselves */
535 	Bitio_WriteToSocket(bb, clientSocket);
536     }
537 
538     close(clientSocket);
539 
540     time(&tempTimeEnd);
541     IOtime += (tempTimeEnd-tempTimeStart);
542 }
543 
544 
545 
546 
547 /*===========================================================================*
548  *
549  * NoteFrameDone
550  *
551  *	called by slave to the Combine server; tells it these frames are
552  *	done
553  *
554  * RETURNS:	nothing
555  *
556  * SIDE EFFECTS:    none
557  *
558  *===========================================================================*/
559 void
NoteFrameDone(frameStart,frameEnd)560 NoteFrameDone(frameStart, frameEnd)
561     int frameStart;
562     int frameEnd;
563 {
564     int	clientSocket;
565     u_long  data;
566     int	    negativeTwo = -2;
567     time_t  tempTimeStart, tempTimeEnd;
568 
569     time(&tempTimeStart);
570 
571     clientSocket = ConnectToSocket(IOhostName, combinePortNumber, &hostEntry);
572 
573     data = negativeTwo;
574     data = htonl(negativeTwo);
575     SafeWrite(clientSocket, (char *)&data, 4);
576 
577     data = htonl(frameStart);
578     SafeWrite(clientSocket, (char *)&data, 4);
579 
580     data = htonl(frameEnd);
581     SafeWrite(clientSocket, (char *)&data, 4);
582 
583     close(clientSocket);
584 
585     time(&tempTimeEnd);
586     IOtime += (tempTimeEnd-tempTimeStart);
587 }
588 
589 
590 /*===========================================================================*
591  *
592  * GetRemoteFrame
593  *
594  *	called by a slave; gets a remote frame from the I/O server
595  *
596  * RETURNS:	nothing
597  *
598  * SIDE EFFECTS:    none
599  *
600  *===========================================================================*/
601 void
GetRemoteFrame(frame,frameNumber)602   GetRemoteFrame(frame, frameNumber)
603 MpegFrame *frame;
604 int frameNumber;
605 {
606   FILE    *filePtr;
607   int	clientSocket;
608   unsigned char   smallBuffer[1000];
609   register int y;
610   int	    numBytes;
611   u_long  data;
612   char    fileName[256];
613 
614   Fsize_Note(frameNumber, yuvWidth, yuvHeight);
615 
616   if ( debugSockets ) {
617     fprintf(stdout, "MACHINE %s REQUESTING connection for FRAME %d\n",
618 	    getenv("HOST"), frameNumber);
619     fflush(stdout);
620   }
621 
622   clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
623 
624   data = frameNumber;
625   data = htonl(data);
626   SafeWrite(clientSocket, (char *)&data, 4);
627 
628   if ( frameNumber != -1 ) {
629     if ( separateConversion ) {
630       sprintf(fileName, "/tmp/foobar%d", machineNumber);
631       filePtr = fopen(fileName, "wb");
632 
633       /* read in stuff, SafeWrite to file, perform local conversion */
634       do {
635 	SafeRead(clientSocket, (char *)&numBytes, 4);
636 	numBytes = ntohl(numBytes);
637 
638 	SafeRead(clientSocket, (char *)smallBuffer, numBytes);
639 
640 	fwrite(smallBuffer, 1, numBytes, filePtr);
641       } while ( numBytes == 1000 );
642       fflush(filePtr);
643       fclose(filePtr);
644 
645       /* now do slave conversion */
646       ReadFrame(frame, fileName, slaveConversion, FALSE);
647     } else {
648       Frame_AllocYCC(frame);
649 
650       if ( debugSockets ) {
651 	fprintf(stdout, "MACHINE %s allocated YCC FRAME %d\n",
652 		getenv("HOST"), frameNumber);
653 	fflush(stdout);
654       }
655 
656       /* should now read yuv values */
657       for (y = 0; y < yuvHeight; y++) {	/* Y */
658 	SafeRead(clientSocket, (char *)frame->orig_y[y], yuvWidth);
659       }
660 
661       for (y = 0; y < (yuvHeight >> 1); y++) { /* U */
662 	SafeRead(clientSocket, (char *)frame->orig_cb[y], yuvWidth>>1);
663       }
664 
665       for (y = 0; y < (yuvHeight >> 1); y++) { /* V */
666 	SafeRead(clientSocket, (char *)frame->orig_cr[y], yuvWidth>>1);
667       }
668     }
669   }
670 
671   data = 0;
672   data = htonl(data);
673   SafeWrite(clientSocket, (char *)&data, 4);
674 
675   close(clientSocket);
676 
677   if ( debugSockets ) {
678     fprintf(stdout, "MACHINE %s READ COMPLETELY FRAME %d\n",
679 	    getenv("HOST"), frameNumber);
680     fflush(stdout);
681   }
682 }
683 
684 
685 /*===========================================================================*
686  *
687  * StartCombineServer
688  *
689  *	start-up the CombineServer with this process
690  *	handles combination of frames, and tells the
691  *	master when it's done
692  *
693  * RETURNS:	nothing
694  *
695  * SIDE EFFECTS:    none
696  *
697  *===========================================================================*/
698 void
StartCombineServer(numInputFiles,outputFileName,parallelHostName,portNum)699   StartCombineServer(numInputFiles, outputFileName, parallelHostName, portNum)
700 int numInputFiles;
701 char *outputFileName;
702 char *parallelHostName;
703 int portNum;
704 {
705   int	    combinePortNum;
706   FILE    *ofp;
707 
708   /* once we get Combine port num, should transmit it to parallel server */
709 
710   outputServerSocket = CreateListeningSocket(&combinePortNum);
711 
712   if ( debugSockets ) {
713     fprintf(stdout, "====OUTPUT USING PORT %d\n", combinePortNum);
714   }
715 
716   TransmitPortNum(parallelHostName, portNum, combinePortNum);
717 
718   frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
719   memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));
720 
721   if ( (ofp = fopen(outputFileName, "wb")) == NULL ) {
722     fprintf(stderr, "ERROR:  Could not open output file!!\n");
723     fflush(stderr);
724     exit(1);
725   }
726   FramesToMPEG(numInputFiles, outputFileName, ofp, TRUE);
727 
728   if ( debugSockets ) {
729     fprintf(stdout, "====COMBINE SERVER:  Shutting Down\n");
730     fflush(stdout);
731   }
732 
733   /* tell Master server we are done */
734   TransmitPortNum(parallelHostName, portNum, combinePortNum);
735 
736   close(outputServerSocket);
737 }
738 
739 
740 /*===========================================================================*
741  *
742  * WaitForOutputFile
743  *
744  *	keep handling output events until we get the specified frame
745  *	number
746  *
747  * RETURNS:	nothing
748  *
749  * SIDE EFFECTS:    none
750  *
751  *===========================================================================*/
752 void
WaitForOutputFile(number)753   WaitForOutputFile(number)
754 int number;
755 {
756   int	    otherSock;
757   static int otherSize = sizeof(struct sockaddr);
758   struct sockaddr otherSocket;
759   int	    frameNumber;
760   int32   buffer[8];
761   int frameStart, frameEnd;
762 
763   while ( ! frameDone[number] ) {
764     otherSock = accept(outputServerSocket, &otherSocket, &otherSize);
765     if ( otherSock == -1 ) {
766       fprintf(stderr, "ERROR:  Combine SERVER accept returned error %d\n", errno);
767       exit(1);
768     }
769 
770     SafeRead(otherSock, (char *)buffer, 4);
771     frameNumber = ntohl(buffer[0]);
772 
773     if ( frameNumber == -2 ) {
774       /* this is notification from non-remote process that a frame is done */
775 
776       SafeRead(otherSock, (char *)buffer, 8);
777       frameStart = buffer[0];
778       frameStart = ntohl(frameStart);
779       frameEnd = buffer[1];
780       frameEnd = ntohl(frameEnd);
781 
782       for ( frameNumber = frameStart; frameNumber <= frameEnd;
783 	   frameNumber++ ) {
784 	frameDone[frameNumber] = TRUE;
785       }
786     }
787 
788     close(otherSock);
789   }
790 
791   if ( debugSockets ) {
792     fprintf(stdout, "WAIT FOR FRAME %d over\n", number);
793     fflush(stdout);
794   }
795 }
796 
797 
798 /*=====================*
799  * MASTER SERVER STUFF *
800  *=====================*/
801 
802 
803 /*===========================================================================*
804  *
805  * StartMasterServer
806  *
807  *	start the master server with this process
808  *
809  * RETURNS:	nothing
810  *
811  * SIDE EFFECTS:    none
812  *
813  *===========================================================================*/
814 void
StartMasterServer(numInputFiles,paramFile,outputFileName)815   StartMasterServer(numInputFiles, paramFile, outputFileName)
816 int numInputFiles;
817 char *paramFile;
818 char *outputFileName;
819 {
820   FILE    *filePtr;
821   register int ind, ind2;
822   int	    framesPerMachine;
823   char    command[1024];
824   char    *hostName;
825   int	    portNum;
826   int	    serverSocket;
827   boolean finished[MAX_MACHINES];
828   int	    numFinished;
829   int	    otherSock, otherSize;
830   struct sockaddr otherSocket;
831   int	    seconds;
832   int32   buffer[8];
833   int ioPortNum[MAX_IO_SERVERS];
834   int	    combinePortNum, decodePortNum;
835   int	    nextFrame;
836   int	    startFrames[MAX_MACHINES];
837   int	    numFrames[MAX_MACHINES];
838   int	    lastNumFrames[MAX_MACHINES];
839   int	    numSeconds[MAX_MACHINES];
840   float   fps[MAX_MACHINES];
841   int	    numMachinesToEstimate;
842   float   framesPerSecond;
843   float   totalFPS, localFPS;
844   int	    framesDone;
845   float   avgFPS;
846   char    niceNess[256];
847   int32   startFrame, endFrame;
848   int numInputPorts = 0;
849   int	numRemote = SOMAXCONN;
850   int totalRemote = 0;
851   time_t  startUpBegin, startUpEnd;
852   time_t  shutDownBegin, shutDownEnd;
853   float   timeChunk;
854 
855   time(&startUpBegin);
856 
857   if ( niceProcesses ) {
858     sprintf(niceNess, "nice");
859   } else {
860     niceNess[0] = '\0';
861   }
862 
863   time(&timeStart);
864 
865   PrintStartStats(-1, 0);
866 
867   /* create a server socket */
868   hostName = getenv("HOST");
869 
870   if ( hostName == NULL ) {
871     fprintf(stderr, "ERROR:  Set HOST environment variable\n");
872     exit(1);
873   }
874 
875   hostEntry = gethostbyname(hostName);
876   if ( hostEntry == NULL ) {
877     fprintf(stderr, "ERROR:  Could not find host %s in database\n",
878 	    hostName);
879     exit(1);
880   }
881 
882   hostName = hostEntry->h_name;
883 
884   serverSocket = CreateListeningSocket(&portNum);
885   if ( debugSockets ) {
886     fprintf(stdout, "---USING PORT %d\n", portNum);
887   }
888 
889   /* START COMBINE SERVER */
890   sprintf(command, "%s -max_machines %d -output_server %s %d %d %s",
891 	  encoder_name, numMachines, hostName, portNum, numInputFiles, paramFile);
892   safe_fork(command);
893 
894   /* should now listen for connection from Combine server */
895   otherSize = sizeof(otherSocket);
896   otherSock = accept(serverSocket, &otherSocket, &otherSize);
897   if ( otherSock == -1 ) {
898     fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
899     exit(1);
900   }
901 
902   SafeRead(otherSock, (char *)(&combinePortNum), 4);
903   combinePortNum = ntohl(combinePortNum);
904   combinePortNumber = combinePortNum;
905   close(otherSock);
906 
907   if ( debugSockets ) {
908     fprintf(stdout, "---MASTER SERVER:  Combine port number = %d\n",
909 	    combinePortNum);
910   }
911 
912   /* START DECODE SERVER if necessary */
913   if ( referenceFrame == DECODED_FRAME ) {
914     sprintf(command, "%s -max_machines %d -decode_server %s %d %d %s",
915 	    encoder_name, numMachines, hostName, portNum, numInputFiles, paramFile);
916     safe_fork(command);
917 
918     /* should now listen for connection from Decode server */
919     otherSize = sizeof(otherSocket);
920     otherSock = accept(serverSocket, &otherSocket, &otherSize);
921     if ( otherSock == -1 ) {
922       fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
923       exit(1);
924     }
925 
926     SafeRead(otherSock, (char *)(&decodePortNum), 4);
927     decodePortNum = ntohl(decodePortNum);
928     close(otherSock);
929 
930     if ( debugSockets ) {
931       fprintf(stdout, "---MASTER SERVER:  Decode port number = %d\n",
932 	      decodePortNum);
933     }
934   }
935 
936   /* we are doing whole thing (if not, see above) */
937 
938   framesPerMachine = numInputFiles/numMachines;
939 
940   numFinished = 0;
941 
942   /* count number of remote machines */
943   for ( ind = 0; ind < numMachines; ind++ ) {
944     fps[ind] = -1.0;		/* illegal value as flag */
945     if ( remote[ind] ) {
946       totalRemote++;
947     }
948   }
949 
950   /* DO INITIAL TIME TESTS */
951   nextFrame = 0;
952   for ( ind = 0; ind < numMachines; ind++ ) {
953     if ( (totalRemote != 0) && (numRemote == SOMAXCONN) ) {
954       /* Create an I/O server */
955       sprintf(command, "%s -max_machines %d -io_server %s %d %s",
956 	      encoder_name, numMachines, hostName, portNum, paramFile);
957       safe_fork(command);
958 
959       /* should now listen for connection from I/O server */
960       otherSize = sizeof(otherSocket);
961       otherSock = accept(serverSocket, &otherSocket, &otherSize);
962       if ( otherSock == -1 ) {
963 	fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
964 	exit(1);
965       }
966 
967       SafeRead(otherSock, (char *)(&ioPortNum[numInputPorts]), 4);
968       ioPortNum[numInputPorts] = ntohl(ioPortNum[numInputPorts]);
969       close(otherSock);
970 
971       if ( debugSockets ) {
972 	fprintf(stdout, "---MASTER SERVER:  I/O port number = %d\n",
973 		ioPortNum[numInputPorts]);
974       }
975 
976       numInputPorts++;
977       numRemote = 0;
978     }
979 
980     finished[ind] = FALSE;
981     numSeconds[ind] = 0;
982 
983     startFrame = nextFrame;
984     if ( parallelPerfect ) {
985       endFrame = startFrame+((numInputFiles-startFrame)/
986 			     (numMachines-ind))  -1;
987 
988       if ( forceIalign ) {
989 	while (FType_Type(endFrame) != 'i') {endFrame++;}
990       }
991 
992       /* always give at least 1 frame */
993       if ( endFrame < startFrame ) {
994 	endFrame = startFrame;
995       }
996 
997       /* make sure not out of bounds */
998       if ( endFrame >= numInputFiles ) {
999 	endFrame = numInputFiles-1;
1000       }
1001     } else if ( forceIalign ) {
1002       endFrame = startFrame+framePatternLen-1;
1003       while (FType_Type(endFrame) != 'i') {endFrame++;}
1004     } else {
1005       endFrame = startFrame+parallelTestFrames-1;
1006     }
1007 
1008     if ( remote[ind] ) {
1009       sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s",
1010 	      rsh,
1011 	      machineName[ind], userName[ind], niceNess,
1012 	      executable[ind],
1013 	      hostName, portNum, ioPortNum[numInputPorts-1],
1014 	      combinePortNum, decodePortNum, ind,
1015 	      remote[ind],
1016 	      startFrame, endFrame,
1017 	      remoteParamFile[ind]);
1018       numRemote++;
1019       totalRemote--;
1020     } else {
1021       sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s",
1022 	      rsh,
1023 	      machineName[ind], userName[ind], niceNess,
1024 	      executable[ind],
1025 	      hostName, portNum, ioPortNum[numInputPorts-1],
1026 	      combinePortNum, decodePortNum, ind,
1027 	      remote[ind],
1028 	      startFrame, endFrame,
1029 	      paramFile);
1030     }
1031 
1032     if ( debugMachines ) {
1033       fprintf(stdout, "---%s:  frames %d to %d\n",
1034 	      machineName[ind],
1035 	      startFrame, endFrame);
1036     }
1037 
1038 
1039     safe_fork(command);
1040 
1041     nextFrame = endFrame+1;
1042     startFrames[ind] = startFrame;
1043     numFrames[ind] = endFrame-startFrame+1;
1044     lastNumFrames[ind] = endFrame-startFrame+1;
1045   }
1046 
1047   framesDone = 0;
1048 
1049   time(&startUpEnd);
1050 
1051   /* now, wait for other processes to finish and boss them around */
1052   while ( numFinished != numMachines ) {
1053     otherSize = sizeof(otherSocket);
1054     otherSock = accept(serverSocket, &otherSocket, &otherSize);
1055     if ( otherSock == -1 ) {
1056       fprintf(stderr, "ERROR:  MASTER SERVER 2 accept returned error %d\n", errno);
1057       exit(1);
1058     }
1059 
1060     SafeRead(otherSock, (char *)buffer, 8);
1061 
1062     ind = ntohl(buffer[0]);
1063     seconds = ntohl(buffer[1]);
1064 
1065     NoteFrameDone(startFrames[ind],
1066 		  startFrames[ind]+lastNumFrames[ind]-1);
1067 
1068     numSeconds[ind] += seconds;
1069     fps[ind] = (float)numFrames[ind]/(float)numSeconds[ind];
1070 
1071     if ( seconds != 0 )
1072       framesPerSecond = (float)lastNumFrames[ind]/(float)seconds;
1073     else
1074       framesPerSecond = (float)lastNumFrames[ind]*2.0;
1075 
1076     framesDone += lastNumFrames[ind];
1077 
1078     if ( nextFrame >= numInputFiles ) {
1079       buffer[0] = htonl(-1);
1080       buffer[1] = htonl(0);
1081       SafeWrite(otherSock, (char *)buffer, 8);
1082       numFinished++;
1083 
1084       if ( debugMachines ) {
1085 	fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  DONE\n",
1086 		machineName[ind], framesPerSecond, numFinished,
1087 		numMachines);
1088       }
1089     } else {
1090       if (numSeconds[ind] != 0) {
1091 	avgFPS = (float)numFrames[ind]/(float)numSeconds[ind];
1092       } else {
1093 	avgFPS = 0.1;		/* arbitrary small value */
1094       }
1095 
1096       startFrame = nextFrame;
1097 
1098       if ( parallelTimeChunks == -1 ) {	/* TAPER STUFF */
1099 	/* estimate time left */
1100 	/* frames left = numInputFiles-nextFrame */
1101 	totalFPS = 0.0;
1102 	numMachinesToEstimate = 0;
1103 	for ( ind2 = 0; ind2 < numMachines; ind2++ ) {
1104 	  if ( fps[ind2] < 0.0 ) {
1105 	    numMachinesToEstimate++;
1106 	  } else {
1107 	    totalFPS += fps[ind2];
1108 	  }
1109 	}
1110 
1111 	totalFPS = (float)numMachines*
1112 	  (totalFPS/(float)(numMachines-numMachinesToEstimate));
1113 
1114 	timeChunk = (float)(numInputFiles-nextFrame)/totalFPS;
1115 
1116 	fprintf(stdout, "ASSIGNING %s %.2f seconds of work\n",
1117 		machineName[ind], timeChunk);
1118 	fflush(stdout);
1119 	endFrame = nextFrame +
1120 	  (int)((float)timeChunk*avgFPS) - 1;
1121       } else {
1122 	endFrame = nextFrame +
1123 	  (int)((float)parallelTimeChunks*avgFPS) - 1;
1124       }
1125 
1126       if ( forceIalign ) {
1127 	while (FType_Type(endFrame) != 'i') {endFrame++;}
1128       }
1129 
1130       if ( endFrame < startFrame ) { /* always give at least 1 frame */
1131 	endFrame = startFrame;
1132       }
1133       if ( endFrame >= numInputFiles ) {
1134 	endFrame = numInputFiles-1;
1135       }
1136 
1137       nextFrame = endFrame+1;
1138 
1139       startFrames[ind] = startFrame;
1140       numFrames[ind] += (endFrame-startFrame+1);
1141       lastNumFrames[ind] = (endFrame-startFrame+1);
1142 
1143       if ( debugMachines ) {
1144 	fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  next:  %d to %d\n",
1145 		machineName[ind], framesPerSecond, numFinished,
1146 		numMachines, startFrame, endFrame);
1147       }
1148 
1149       buffer[0] = htonl(startFrame);
1150       buffer[1] = htonl(endFrame);
1151 
1152       SafeWrite(otherSock, (char *)buffer, 8);
1153     }
1154 
1155     close(otherSock);
1156 
1157     if ( debugMachines ) {
1158       fprintf(stdout, "---FRAMES DONE:  %d\tFARMED OUT:  %d\tLEFT:  %d\n",
1159 	      framesDone, nextFrame-framesDone, numInputFiles-nextFrame);
1160     }
1161   }
1162 
1163   time(&shutDownBegin);
1164 
1165   /* end all input servers */
1166   IOhostName = hostName;
1167   for ( ind = 0; ind < numInputPorts; ind++ ) {
1168     ioPortNumber = ioPortNum[ind];
1169     EndIOServer();
1170   }
1171 
1172   /* now wait for CombineServer to tell us they're done */
1173   otherSize = sizeof(otherSocket);
1174   otherSock = accept(serverSocket, &otherSocket, &otherSize);
1175   if ( otherSock == -1 ) {
1176     fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
1177     exit(1);
1178   }
1179 
1180   SafeRead(otherSock, (char *)buffer, 4);
1181   close(otherSock);
1182 
1183   close(serverSocket);
1184 
1185   time(&timeEnd);
1186   diffTime = (int32)(timeEnd-timeStart);
1187 
1188   time(&shutDownEnd);
1189 
1190   for ( ind2 = 0; ind2 < 2; ind2++ ) {
1191     if ( ind2 == 0 ) {
1192       filePtr = stdout;
1193     } else if ( statFile != NULL ) {
1194       filePtr = statFile;
1195     } else {
1196       continue;
1197     }
1198 
1199     fprintf(filePtr, "\n\n");
1200     fprintf(filePtr, "PARALLEL SUMMARY\n");
1201     fprintf(filePtr, "----------------\n");
1202     fprintf(filePtr, "\n");
1203     fprintf(filePtr, "START UP TIME:  %d seconds\n",
1204 	    (int)startUpEnd-(int)startUpBegin);
1205     fprintf(filePtr, "SHUT DOWN TIME:  %d seconds\n",
1206 	    (int)shutDownEnd-(int)shutDownBegin);
1207 
1208     fprintf(filePtr, "%14s\tFrames\tSeconds\tFrames Per Second\tSelf Time\n",
1209 	    "MACHINE");
1210     fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
1211     totalFPS = 0.0;
1212     for ( ind = 0; ind < numMachines; ind++ ) {
1213       localFPS = (float)numFrames[ind]/(float)numSeconds[ind];
1214       fprintf(filePtr, "%14s\t%d\t%d\t%f\t\t%d\n",
1215 	      machineName[ind], numFrames[ind], numSeconds[ind],
1216 	      localFPS, (int)((float)numInputFiles/localFPS));
1217       totalFPS += localFPS;
1218     }
1219 
1220     fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
1221 
1222     fprintf(filePtr, "%14s\t\t%d\t%f\n", "OPTIMAL",
1223 	    (int)((float)numInputFiles/totalFPS),
1224 	    totalFPS);
1225     fprintf(filePtr, "%14s\t\t%d\t%f\n", "ACTUAL", diffTime,
1226 	    (float)numInputFiles/(float)diffTime);
1227 
1228     fprintf(filePtr, "\n\n");
1229   }
1230 
1231   if ( statFile != NULL ) {
1232     fclose(statFile);
1233   }
1234 }
1235 
1236 
1237 /*===========================================================================*
1238  *
1239  * NotifyMasterDone
1240  *
1241  *	called by a slave process; tells the master process it is done
1242  *
1243  * RETURNS:	nothing
1244  *
1245  * SIDE EFFECTS:    none
1246  *
1247  *===========================================================================*/
1248 boolean
NotifyMasterDone(hostName,portNum,machineNumber,seconds,frameStart,frameEnd)1249   NotifyMasterDone(hostName, portNum, machineNumber, seconds, frameStart,
1250 		   frameEnd)
1251 char *hostName;
1252 int portNum;
1253 int machineNumber;
1254 int seconds;
1255 int *frameStart;
1256 int *frameEnd;
1257 {
1258   int	clientSocket;
1259   int32   buffer[8];
1260   time_t  tempTimeStart, tempTimeEnd;
1261 
1262   time(&tempTimeStart);
1263 
1264   clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);
1265 
1266   buffer[0] = htonl(machineNumber);
1267   buffer[1] = htonl(seconds);
1268 
1269   SafeWrite(clientSocket, (char *)buffer, 8);
1270 
1271   SafeRead(clientSocket, (char *)buffer, 8);
1272   *frameStart = ntohl(buffer[0]);
1273   *frameEnd = ntohl(buffer[1]);
1274 
1275   close(clientSocket);
1276 
1277   time(&tempTimeEnd);
1278   IOtime += (tempTimeEnd-tempTimeStart);
1279 
1280   return ((*frameStart) >= 0);
1281 }
1282 
1283 
1284 /*===========================================================================*
1285  *
1286  * StartDecodeServer
1287  *
1288  *	start-up the DecodeServer with this process
1289  *	handles transfer of decoded frames to/from processes, and exits
1290  *	when master tells it to
1291  *	this is necessary only if referenceFrame == DECODED_FRAME
1292  *
1293  * RETURNS:	nothing
1294  *
1295  * SIDE EFFECTS:    none
1296  *
1297  *===========================================================================*/
1298 void
StartDecodeServer(numInputFiles,decodeFileName,parallelHostName,portNum)1299   StartDecodeServer(numInputFiles, decodeFileName, parallelHostName, portNum)
1300 int numInputFiles;
1301 char *decodeFileName;
1302 char *parallelHostName;
1303 int portNum;
1304 {
1305   int	    otherSock, otherSize;
1306   struct sockaddr otherSocket;
1307   int	    decodePortNum;
1308   int32   buffer[8];
1309   int	    frameReady;
1310   boolean *ready;
1311   int	    *waitMachine;
1312   int	    *waitPort;
1313   int	    *waitList;
1314   int	    slaveNumber;
1315   int	    slavePort;
1316   int	    waitPtr;
1317   struct hostent *nullHost = NULL;
1318   int	    clientSocket;
1319 
1320   /* should keep list of port numbers to notify when frames become ready */
1321 
1322   ready = (boolean *) calloc(numInputFiles, sizeof(boolean));
1323   waitMachine = (int *) calloc(numInputFiles, sizeof(int));
1324   waitPort = (int *) malloc(numMachines*sizeof(int));
1325   waitList = (int *) calloc(numMachines, sizeof(int));
1326 
1327   /* once we get Decode port num, should transmit it to parallel server */
1328 
1329   decodeServerSocket = CreateListeningSocket(&decodePortNum);
1330 
1331   if ( debugSockets ) {
1332     fprintf(stdout, "====DECODE USING PORT %d\n", decodePortNum);
1333   }
1334 
1335   TransmitPortNum(parallelHostName, portNum, decodePortNum);
1336 
1337   frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
1338   memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));
1339 
1340   /* wait for ready signals and requests */
1341   while ( TRUE ) {
1342     otherSize = sizeof(otherSocket);
1343     otherSock = accept(decodeServerSocket, &otherSocket, &otherSize);
1344     if ( otherSock == -1 ) {
1345       fprintf(stderr, "ERROR:  DECODE SERVER accept returned error %d\n", errno);
1346       exit(1);
1347     }
1348 
1349     SafeRead(otherSock, (char *)buffer, 4);
1350     frameReady = buffer[0];
1351     frameReady = ntohl(frameReady);
1352 
1353     if ( frameReady == -2 ) {
1354       SafeRead(otherSock, (char *)buffer, 4);
1355       frameReady = buffer[0];
1356       frameReady = ntohl(frameReady);
1357 
1358       if ( debugSockets ) {
1359 	fprintf(stdout, "====DECODE SERVER:  REQUEST FOR %d\n", frameReady);
1360 	fflush(stdout);
1361       }
1362 
1363       /* now respond if it's ready yet */
1364       buffer[0] = frameDone[frameReady];
1365       buffer[0] = htonl(buffer[0]);
1366       SafeWrite(otherSock, (char *)buffer, 4);
1367 
1368       if ( ! frameDone[frameReady] ) {
1369 	/* read machine number, port number */
1370 	SafeRead(otherSock, (char *)buffer, 8);
1371 	slaveNumber = buffer[0];
1372 	slaveNumber = ntohl(slaveNumber);
1373 	slavePort = buffer[1];
1374 	slavePort = ntohl(slavePort);
1375 
1376 	if ( debugSockets ) {
1377 	  fprintf(stdout, "WAITING:  SLAVE %d, PORT %d\n",
1378 		  slaveNumber, slavePort);
1379 	}
1380 
1381 	waitPort[slaveNumber] = slavePort;
1382 	if ( waitMachine[frameReady] == 0 ) {
1383 	  waitMachine[frameReady] = slaveNumber+1;
1384 	} else {
1385 	  /* someone already waiting for this frame */
1386 	  /* follow list of waiters to the end */
1387 	  waitPtr = waitMachine[frameReady]-1;
1388 	  while ( waitList[waitPtr] != 0 ) {
1389 	    waitPtr = waitList[waitPtr]-1;
1390 	  }
1391 
1392 	  waitList[waitPtr] = slaveNumber+1;
1393 	  waitList[slaveNumber] = 0;
1394 	}
1395       }
1396     } else {
1397       frameDone[frameReady] = TRUE;
1398 
1399       if ( debugSockets ) {
1400 	fprintf(stdout, "====DECODE SERVER:  FRAME %d READY\n", frameReady);
1401 	fflush(stdout);
1402       }
1403 
1404       if ( waitMachine[frameReady] ) {
1405 	/* need to notify one or more machines it's ready */
1406 	waitPtr = waitMachine[frameReady]-1;
1407 	while ( waitPtr >= 0 ) {
1408 	  clientSocket = ConnectToSocket(machineName[waitPtr],
1409 					 waitPort[waitPtr],
1410 					 &nullHost);
1411 	  close(clientSocket);
1412 	  waitPtr = waitList[waitPtr]-1;
1413 	}
1414       }
1415     }
1416 
1417     close(otherSock);
1418   }
1419 
1420   if ( debugSockets ) {
1421     fprintf(stdout, "====DECODE SERVER:  Shutting Down\n");
1422     fflush(stdout);
1423   }
1424 
1425   /* tell Master server we are done */
1426   TransmitPortNum(parallelHostName, portNum, decodePortNum);
1427 
1428   close(decodeServerSocket);
1429 }
1430 
1431 
1432 /*=====================*
1433  * INTERNAL PROCEDURES *
1434  *=====================*/
1435 
1436 
1437 /*===========================================================================*
1438  *
1439  * TransmitPortNum
1440  *
1441  *	called by the I/O or Combine server; transmits the appropriate
1442  *	port number to the master
1443  *
1444  * RETURNS:	nothing
1445  *
1446  * SIDE EFFECTS:    none
1447  *
1448  *===========================================================================*/
1449 static void
TransmitPortNum(hostName,portNum,newPortNum)1450   TransmitPortNum(hostName, portNum, newPortNum)
1451 char *hostName;
1452 int portNum;
1453 int newPortNum;
1454 {
1455   int	clientSocket;
1456   u_long  data;
1457 
1458   clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);
1459 
1460   data = htonl(newPortNum);
1461   SafeWrite(clientSocket, (char *) &data, 4);
1462 
1463   close(clientSocket);
1464 }
1465 
1466 
1467 /*===========================================================================*
1468  *
1469  * SafeRead
1470  *
1471  *	safely read from the given socket; the procedure keeps reading until
1472  *	it gets the number of bytes specified
1473  *
1474  * RETURNS:	nothing
1475  *
1476  * SIDE EFFECTS:    none
1477  *
1478  *===========================================================================*/
1479 static void
SafeRead(fd,buf,nbyte)1480   SafeRead(fd, buf, nbyte)
1481 int fd;
1482 char *buf;
1483 int nbyte;
1484 {
1485   int numRead;
1486   int result;
1487 
1488   numRead = 0;
1489 
1490   while ( numRead != nbyte ) {
1491     result = read(fd, &buf[numRead], nbyte-numRead);
1492 
1493     if ( result == -1 ) {
1494       fprintf(stderr, "ERROR:  read (of %d bytes (total %d) ) returned error %d\n",
1495 	      nbyte-numRead, nbyte, errno);
1496       exit(1);
1497     }
1498     numRead += result;
1499   }
1500 }
1501 
1502 
1503 /*===========================================================================*
1504  *
1505  * SafeWrite
1506  *
1507  *	safely write to the given socket; the procedure keeps writing until
1508  *	it sends the number of bytes specified
1509  *
1510  * RETURNS:	nothing
1511  *
1512  * SIDE EFFECTS:    none
1513  *
1514  *===========================================================================*/
1515 static void
SafeWrite(fd,buf,nbyte)1516   SafeWrite(fd, buf, nbyte)
1517 int fd;
1518 char *buf;
1519 int nbyte;
1520 {
1521   int numWritten;
1522   int result;
1523 
1524   numWritten = 0;
1525 
1526   while ( numWritten != nbyte ) {
1527     result = write(fd, &buf[numWritten], nbyte-numWritten);
1528 
1529     if ( result == -1 ) {
1530       fprintf(stderr, "ERROR:  read (of %d bytes (total %d) ) returned error %d\n",
1531 	      nbyte-numWritten, nbyte, errno);
1532       exit(1);
1533     }
1534     numWritten += result;
1535   }
1536 }
1537 
1538 
1539 /*===========================================================================*
1540  *
1541  * EndIOServer
1542  *
1543  *	called by the master process -- tells the I/O server to commit
1544  *	suicide
1545  *
1546  * RETURNS:	nothing
1547  *
1548  * SIDE EFFECTS:    none
1549  *
1550  *===========================================================================*/
1551 static void
EndIOServer()1552   EndIOServer()
1553 {
1554   /* send signal to IO server:  -1 as frame number */
1555   GetRemoteFrame(NULL, -1);
1556 }
1557 
1558 
1559 /*===========================================================================*
1560  *
1561  * NotifyDecodeServerReady
1562  *
1563  *	called by a slave to the Decode Server to tell it a decoded frame
1564  *	is ready and waiting
1565  *
1566  * RETURNS:	nothing
1567  *
1568  * SIDE EFFECTS:    none
1569  *
1570  *===========================================================================*/
1571 void
NotifyDecodeServerReady(id)1572   NotifyDecodeServerReady(id)
1573 int id;
1574 {
1575   int	clientSocket;
1576   u_long  data;
1577   time_t  tempTimeStart, tempTimeEnd;
1578 
1579   time(&tempTimeStart);
1580 
1581   clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);
1582 
1583   data = htonl(id);
1584   SafeWrite(clientSocket, (char *)&data, 4);
1585 
1586   close(clientSocket);
1587 
1588   time(&tempTimeEnd);
1589   IOtime += (tempTimeEnd-tempTimeStart);
1590 }
1591 
1592 
1593 /*===========================================================================*
1594  *
1595  * WaitForDecodedFrame
1596  *
1597  *	blah blah blah
1598  *
1599  * RETURNS:	nothing
1600  *
1601  * SIDE EFFECTS:    none
1602  *
1603  *===========================================================================*/
1604 void
WaitForDecodedFrame(id)1605   WaitForDecodedFrame(id)
1606 int id;
1607 {
1608   int	clientSocket;
1609   u_long  data;
1610   int	    negativeTwo = -2;
1611   int     ready;
1612 
1613   /* wait for a decoded frame */
1614   if ( debugSockets ) {
1615     fprintf(stdout, "WAITING FOR DECODED FRAME %d\n", id);
1616   }
1617 
1618   clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);
1619 
1620   /* first, tell DecodeServer we're waiting for this frame */
1621   data = negativeTwo;
1622   data = htonl(negativeTwo);
1623   SafeWrite(clientSocket, (char *)&data, 4);
1624 
1625   data = htonl(id);
1626   SafeWrite(clientSocket, (char *)&data, 4);
1627 
1628   SafeRead(clientSocket, (char *)&data, 4);
1629   ready = data;
1630   ready = ntohl(ready);
1631 
1632   if ( ! ready ) {
1633     int	    waitSocket;
1634     int	    waitPort;
1635     int	    otherSock, otherSize;
1636     struct sockaddr otherSocket;
1637 
1638     /* it's not ready; set up a connection and wait for decode server */
1639     waitSocket = CreateListeningSocket(&waitPort);
1640 
1641     /* tell decode server where we are */
1642     data = machineNumber;
1643     data = ntohl(data);
1644     SafeWrite(clientSocket, (char *)&data, 4);
1645 
1646     data = waitPort;
1647     data = ntohl(data);
1648     SafeWrite(clientSocket, (char *)&data, 4);
1649 
1650     close(clientSocket);
1651 
1652     if ( debugSockets ) {
1653       fprintf(stdout, "SLAVE:  WAITING ON SOCKET %d\n", waitPort);
1654       fflush(stdout);
1655     }
1656 
1657     otherSize = sizeof(otherSocket);
1658     otherSock = accept(waitSocket, &otherSocket, &otherSize);
1659     if ( otherSock == -1 ) {
1660       fprintf(stderr, "ERROR:  I/O SERVER accept returned error %d\n", errno);
1661       exit(1);
1662     }
1663 
1664     /* should we verify this is decode server? */
1665     /* for now, we won't */
1666 
1667     close(otherSock);
1668 
1669     close(waitSocket);
1670   } else {
1671     close(clientSocket);
1672   }
1673 
1674   if ( debugSockets ) {
1675     fprintf(stdout, "YE-HA FRAME %d IS NOW READY\n", id);
1676   }
1677 }
1678 
1679 
1680 /*===========================================================================*
1681  *
1682  * CreateListeningSocket
1683  *
1684  *	create a socket, using the first unused port number we can find
1685  *
1686  * RETURNS:	the socket; portNumber is modified appropriately
1687  *
1688  * SIDE EFFECTS:    none
1689  *
1690  *===========================================================================*/
1691 static int
CreateListeningSocket(portNumber)1692   CreateListeningSocket(portNumber)
1693 int *portNumber;
1694 {
1695   int	    resultSocket;
1696   u_short tempShort;
1697   int	    result;
1698   struct sockaddr_in	nameEntry;
1699 
1700   resultSocket = socket(AF_INET, SOCK_STREAM, 0);
1701   if ( resultSocket == -1 ) {
1702     fprintf(stderr, "ERROR:  Call to socket() gave error %d\n", errno);
1703     exit(1);
1704   }
1705 
1706   memset((char *) &nameEntry, 0, sizeof(nameEntry));
1707   nameEntry.sin_family = AF_INET;
1708 
1709   /* find a port number that isn't used */
1710   (*portNumber) = 2048;
1711   do {
1712     (*portNumber)++;
1713     tempShort = (*portNumber);
1714     nameEntry.sin_port = htons(tempShort);
1715     result = bind(resultSocket, (struct sockaddr *) &nameEntry,
1716 		  sizeof(struct sockaddr));
1717   }
1718   while ( result == -1 );
1719 
1720   /* would really like to wait for 1+numMachines machines, but this is max
1721    * allowable, unfortunately
1722    */
1723   result = listen(resultSocket, SOMAXCONN);
1724   if ( result == -1 ) {
1725     fprintf(stderr, "ERROR:  call to listen() gave error %d\n", errno);
1726     exit(1);
1727   }
1728 
1729   return resultSocket;
1730 }
1731 
1732 
1733 /*===========================================================================*
1734  *
1735  * ConnectToSocket
1736  *
1737  *	creates a socket and connects it to the specified socket
1738  *	hostEnt either is the host entry, or is NULL and needs to be
1739  *	found by using machineName
1740  *
1741  * RETURNS:	the socket
1742  *
1743  * SIDE EFFECTS:    none
1744  *
1745  *===========================================================================*/
1746 static int
ConnectToSocket(machineName,portNum,hostEnt)1747   ConnectToSocket(machineName, portNum, hostEnt)
1748 char *machineName;
1749 int	portNum;
1750 struct hostent **hostEnt;
1751 {
1752   int	resultSocket;
1753   int	    result;
1754   u_short	    tempShort;
1755   struct sockaddr_in  nameEntry;
1756 
1757   if ( (*hostEnt) == NULL ) {
1758     (*hostEnt) = gethostbyname(machineName);
1759     if ( (*hostEnt) == NULL ) {
1760       fprintf(stderr, "ERROR:  Couldn't get host by name (%s)\n",
1761 	      machineName);
1762       exit(1);
1763     }
1764   }
1765 
1766   resultSocket = socket(AF_INET, SOCK_STREAM, 0);
1767   if ( resultSocket == -1 ) {
1768     fprintf(stderr, "ERROR:  socket returned error %d\n", errno);
1769     exit(1);
1770   }
1771 
1772   nameEntry.sin_family = AF_INET;
1773   memset((void *) nameEntry.sin_zero, 0, 8);
1774   memcpy((void *) &(nameEntry.sin_addr.s_addr),
1775 	 (void *) (*hostEnt)->h_addr_list[0],
1776 	 (size_t) (*hostEnt)->h_length);
1777   tempShort = portNum;
1778   nameEntry.sin_port = htons(tempShort);
1779 
1780   result = connect(resultSocket, (struct sockaddr *) &nameEntry,
1781 		   sizeof(struct sockaddr));
1782   if ( result == -1 ) {
1783     fprintf(stderr, "ERROR:  connect (ConnectToSocket, port %d) from machine %s returned error %d\n",
1784 	    portNum, getenv("HOST"), errno);
1785     exit(1);
1786   }
1787 
1788   return resultSocket;
1789 }
1790 
1791 
1792 /*===========================================================================*
1793  *
1794  * SendDecodedFrame
1795  *
1796  *  Send the frame to the decode server.
1797  *
1798  * RETURNS:	nothing
1799  *
1800  * SIDE EFFECTS:    none
1801  *
1802  *===========================================================================*/
1803 void
SendDecodedFrame(frame)1804   SendDecodedFrame(frame)
1805 MpegFrame *frame;
1806 {
1807   int	clientSocket;
1808   register int y;
1809   int	    negativeTwo = -2;
1810   uint32  data;
1811 
1812   /* send to IOServer */
1813   clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
1814 
1815   data = negativeTwo;
1816   data = htonl(data);
1817   SafeWrite(clientSocket, (char *)&data, 4);
1818 
1819   data = frame->id;
1820   data = htonl(data);
1821   SafeWrite(clientSocket, (char *)&data, 4);
1822 
1823   for ( y = 0; y < Fsize_y; y++ ) {
1824     SafeWrite(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
1825   }
1826 
1827   for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
1828     SafeWrite(clientSocket, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
1829   }
1830 
1831   for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
1832     SafeWrite(clientSocket, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
1833   }
1834 
1835   close(clientSocket);
1836 }
1837 
1838 
1839 /*===========================================================================*
1840  *
1841  * GetRemoteDecodedFrame
1842  *
1843  *  get the decoded frame from the decode server.
1844  *
1845  * RETURNS:	nothing
1846  *
1847  * SIDE EFFECTS:
1848  *
1849  *===========================================================================*/
1850 void
GetRemoteDecodedRefFrame(frame,frameNumber)1851   GetRemoteDecodedRefFrame(frame, frameNumber)
1852 MpegFrame *frame;
1853 int frameNumber;
1854 {
1855   int	clientSocket;
1856   register int y;
1857   int	    negativeThree = -3;
1858   uint32  data;
1859 
1860   /* send to IOServer */
1861   clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
1862 
1863   /* ask IOServer for decoded frame */
1864   data = negativeThree;
1865   data = htonl(data);
1866   SafeWrite(clientSocket, (char *)&data, 4);
1867 
1868   data = frame->id;
1869   data = htonl(data);
1870   SafeWrite(clientSocket, (char *)&data, 4);
1871 
1872   for ( y = 0; y < Fsize_y; y++ ) {
1873     SafeRead(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
1874   }
1875 
1876   for (y = 0; y < (Fsize_y >> 1); y++) { /* U */
1877     SafeRead(clientSocket, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
1878   }
1879 
1880   for (y = 0; y < (Fsize_y >> 1); y++) { /* V */
1881     SafeRead(clientSocket, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
1882   }
1883 
1884   close(clientSocket);
1885 
1886 }
1887 
1888 
1889 /*********
1890   routines handling forks, execs, PIDs and signals
1891   save, system-style forks
1892   apian@ise.fhg.de
1893   *******/
1894 
1895 
1896 /*===========================================================================*
1897  *
1898  * cleanup_fork
1899  *
1900  *  Kill all the children, to be used when we get killed
1901  *
1902  * RETURNS:	nothing
1903  *
1904  * SIDE EFFECTS:   kills other processes
1905  *
1906  *===========================================================================*/
cleanup_fork(dummy)1907 void cleanup_fork( dummy )			/* try to kill all child processes */
1908      int dummy;
1909 {
1910   register int i;
1911   for (i = 0;  i < current_max_forked_pid;  ++i ) {
1912 
1913 #ifdef DEBUG_FORK
1914     fprintf(stderr, "cleanup_fork: killing PID %d\n", ClientPid[i]);
1915 #endif
1916 
1917     if (kill(ClientPid[i], TERMINATE_PID_SIGNAL)) {
1918       fprintf(stderr, "cleanup_fork: killed PID=%d failed (errno %d)\n",
1919 	      ClientPid[i], errno);
1920     }
1921   }
1922 }
1923 
1924 /*===========================================================================*
1925  *
1926  * safe_fork
1927  *
1928  *  fork a command
1929  *
1930  * RETURNS:     success/failure
1931  *
1932  * SIDE EFFECTS:   Fork the command, and save to PID so you can kil it later!
1933  *
1934  *===========================================================================*/
safe_fork(command)1935 static int safe_fork(command)		/* fork child process and remember its PID */
1936      char *command;
1937 {
1938   static int init=0;
1939   char *argis[MAXARGS];
1940   register int i=1;
1941 
1942   if (!(argis[0] = strtok(command, " \t"))) return(0); /* tokenize */
1943   while ((argis[i] = strtok(NULL, " \t")) && i < MAXARGS) ++i;
1944   argis[i] = NULL;
1945 
1946 #ifdef DEBUG_FORK
1947   {register int i=0;
1948    fprintf(stderr, "Command %s becomes:\n", command);
1949    while(argis[i]) {fprintf(stderr, "--%s--\n", argis[i]); ++i;} }
1950 #endif
1951 
1952   if (!init) {			/* register clean-up routine */
1953     signal (SIGQUIT, cleanup_fork);
1954     signal (SIGTERM, cleanup_fork);
1955     signal (SIGINT , cleanup_fork);
1956     init=1;
1957   }
1958 
1959   if (-1 == (ClientPid[current_max_forked_pid] = fork()) )  {
1960     perror("safe_fork: fork failed ");
1961     return(-1);
1962   }
1963   if( !ClientPid[current_max_forked_pid]) { /* we are in child process */
1964     execvp(argis[0], argis );
1965     perror("safe_fork child: exec failed ");
1966     exit(1);
1967   }
1968 #ifdef DEBUG_FORK
1969   fprintf(stderr, "parallel: forked PID=%d\n", ClientPid[current_max_forked_pid]);
1970 #endif
1971   current_max_forked_pid++;
1972   return(0);
1973 }
1974