1 #if HAVE_CONFIG_H
2 #   include "config.h"
3 #endif
4 
5 /* $Header: /tmp/hpctools/ga/tcgmsg/ipcv4.0/pbegin.c,v 1.20 2005-02-22 18:47:02 manoj Exp $ */
6 
7 #include <stdio.h>
8 #include <signal.h>
9 #include <unistd.h>
10 #ifdef SEQUENT
11 #include <strings.h>
12 #else
13 #include <string.h>
14 #endif
15 #include <sys/types.h>
16 #include <sys/time.h>
17 #if defined(CONVEX) && defined(HPUX)
18 #include <sys/cnx_types.h>
19 #endif
20 #if defined(SUN) || defined(ALLIANT) || defined(ENCORE) || defined(SEQUENT) \
21                  || defined(CONVEX)  || defined(AIX)    || defined(NEXT) \
22                  || defined(LINUX)
23 #include <sys/wait.h>
24 #endif
25 
26 #if defined(SHMEM) || defined(SYSV)
27 #   if (defined(SGI_N32) || defined(SGITFP))
28 #       define PARTIALSPIN
29 #   else
30 #       define NOSPIN
31 #   endif
32 #endif
33 
34 #if defined(SOLARIS)
35 /* See notes below on processor binding */
36 /*#include <sys/processor.h>*/
37 /*#include <sys/procset.h>*/
38 #endif
39 
40 #include "cluster.h"
41 #include "sndrcv.h"
42 #include "sndrcvP.h"
43 #include "signals.h"
44 #include "tcgsockets.h"
45 
46 #if defined(SHMEM) || defined(SYSV)
47 #include "tcgshmem.h"
48 #include "sema.h"
49 #endif
50 
51 #ifdef EVENTLOG
52 #include "evlog.h"
53 #endif
54 
55 extern void exit();
56 extern void InitClusInfoNotParallel();
57 extern int WaitAll(long nchild);
58 
59 #if defined(ALLIANT) || defined(ENCORE) || defined(SEQUENT) || \
60     defined(CONVEX)  || defined(ARDENT) || defined(ULTRIX) || defined(AIX) || \
61     defined(NEXT)    || defined(DECOSF)
62 extern char *strdup();
63 #endif
64 
65 #define max(A, B) ( (A) > (B) ? (A) : (B) )
66 #define min(A, B) ( (A) < (B) ? (A) : (B) )
67 
68 #if defined(ULTRIX) || defined(SGI) || defined(NEXT) || defined(HPUX) || \
69     defined(KSR)    || defined(DECOSF)
70 extern void *malloc();
71 #else
72 #include <stdlib.h>
73 #endif
74 
75 #ifdef IPSC
76 #define bzero(A,N) memset((A), 0, (N))
77 #endif
78 
79 static int SR_initialized=0;
TCGREADY_()80 long TCGREADY_()
81 {
82      return (long)SR_initialized;
83 }
84 
85 
ConnectAll()86 static void ConnectAll()
87 {
88   long j, k, clus1, clus2, node1, node2, nslave1, nslave2;
89 
90   for (clus1=1; clus1 < SR_n_clus; clus1++) {
91 
92     node1 = SR_clus_info[clus1].masterid;
93     nslave1 = SR_clus_info[clus1].nslave;
94 
95     for (clus2=0; clus2 < clus1; clus2++) {
96 
97       node2 = SR_clus_info[clus2].masterid;
98 
99       RemoteConnect(node1, node2, SR_n_proc);  /* connect masters */
100 
101 #if defined(SHMEM) || defined(SYSV)
102 
103       nslave2 = SR_clus_info[clus2].nslave;
104 
105       for (j=1; j<nslave1; j++) {
106 	RemoteConnect(node1+j, node2, node1);
107 	for (k=1; k<nslave2; k++)
108 	  RemoteConnect(node1+j, node2+k, node2);
109       }
110       for (k=1; k<nslave2; k++)
111 	RemoteConnect(node1, node2+k, node2);
112 
113 #endif
114     }
115   }
116 
117   /* Connect local slaves to master soley for next value service */
118 
119   for (clus1=0; clus1 < SR_n_clus; clus1++)
120     for (j=1; j<SR_clus_info[clus1].nslave; j++)
121       RemoteConnect(SR_n_proc,
122 		    SR_clus_info[clus1].masterid + j,
123 		    SR_clus_info[clus1].masterid);
124 }
125 
PrintArgs(argc,argv)126 static void PrintArgs(argc, argv)
127       int argc;
128       char **argv;
129 {
130   int i;
131 
132   for (i=0; i<argc; i++)
133      (void) printf("argv[%d] = %s\n",i, argv[i]);
134 
135   (void) fflush(stdout);
136 }
137 
138 
tcgi_pbegin(argc,argv)139 void tcgi_pbegin(argc, argv)
140       int argc;
141       char **argv;
142 /*
143   first thing to call on entering program
144 
145   if the argument list contains
146     '-master hostname port nclus nproc clusid procid'
147   then this is running in parallel, else we are just running as
148   a single process.
149 */
150 {
151   long i, masterid, len_pgrp, lenbuf, lenmes, sync=1;
152   long nodesel, nodefrom, type, me, nslave, status;
153   char *masterhostname, *cport;
154   char *procgrp;
155 #ifdef EVENTLOG
156   long start=MTIME_();
157   char *eventfile;
158 #endif
159 #if defined(SHMEM) || defined(SYSV)
160   long *flags;
161 #endif
162 
163   if(SR_initialized)Error("TCGMSG initialized already???",-1);
164   else SR_initialized=1;
165 
166   if (DEBUG_) {
167 	(void) printf("In pbegin .. print the arguments\n");
168         PrintArgs(argc, argv);
169 	(void) fflush(stdout);
170   }
171 
172   /* First initialize the globals as if only one process */
173 
174   if (DEBUG_) {
175 	(void) printf("pbegin: InitGlobal\n");
176 	(void) fflush(stdout);
177   }
178   InitGlobal();
179 
180   /* Set up handler for SIGINT and SIGCHLD */
181 
182   TrapSigint();
183   TrapSigchld();
184 
185   /* If '-master host port' is not present return, else extract the
186      master's hostname and port number */
187 
188   if (DEBUG_) {
189 	(void) printf("pbegin: look for -master in arglist\n");
190 	(void) fflush(stdout);
191   }
192 
193   for (i=1; i<argc; i++)
194     if (strcmp(argv[i],"-master") == 0) {
195       if ( (i+6) >= argc )
196 	Error("pbegin: -master present but not other arguments",
197               (long) argc);
198       break;
199     }
200 
201   if ( (i+6) >= argc ) {
202     SR_parallel = FALSE;
203     InitClusInfoNotParallel();
204     SR_n_clus=1;
205     return;
206   }
207   else
208     SR_parallel = TRUE;
209 
210   if (DEBUG_) {
211 	(void) printf("pbegin: assign argument values\n");
212 	(void) fflush(stdout);
213   }
214 
215   masterhostname = strdup(argv[i+1]);
216   cport = strdup(argv[i+2]);
217   SR_n_clus = atoi(argv[i+3]);
218   SR_n_proc = atoi(argv[i+4]);
219   SR_clus_id = atoi(argv[i+5]);
220   SR_proc_id = atoi(argv[i+6]);
221 
222   /* Check out some of this info */
223 
224   if ((SR_n_clus >= MAX_CLUSTER) || (SR_n_clus < 1))
225     Error("pbegin: invalid no. of clusters", SR_n_clus);
226   if ((SR_n_proc >= MAX_PROCESS) || (SR_n_proc < 1))
227     Error("pbegin: invalid no. of processes", SR_n_proc);
228   if ((SR_clus_id >= SR_n_clus) || (SR_clus_id < 0))
229     Error("pbegin: invalid cluster id", SR_clus_id);
230   if ((SR_proc_id >= SR_n_proc) || (SR_proc_id < 0))
231     Error("pbegin: invalid process id", SR_proc_id);
232 
233   /* Close all files we don't need. Process 0 keeps stdin/out/err.
234      All others only stdout/err. */
235 
236   if (SR_clus_id != 0)
237     (void) fclose(stdin);
238 #ifdef SPARC64_GP
239   for (i=3; i<62; i++)
240 #else
241   for (i=3; i<64; i++)
242 #endif
243     (void) close((int) i);
244 
245   /* Connect to the master process which will have process id
246      equal to the number of processes */
247 
248   if (DEBUG_) {
249 	(void) printf("pbegin: %ld CreateSocketAndConnect\n",NODEID_());
250 	(void) fflush(stdout);
251   }
252   masterid = SR_n_proc;
253   SR_proc_info[SR_n_proc].sock = CreateSocketAndConnect(masterhostname,
254 							 cport);
255 
256   /* Now we have initialized this info we should be able to use the
257      standard interface routines rather than accessing the SR variables
258      directly */
259 
260   /* Get the procgrp from the master process
261 
262      Note that byteordering and word length start to be an issue. */
263 
264   if (DEBUG_) {
265 	(void) printf("pbegin: %ld get len_pgrp\n",NODEID_());
266 	(void) fflush(stdout);
267   }
268   type  = TYPE_BEGIN | MSGINT;
269   lenbuf = sizeof(long);
270   nodesel = masterid;
271   RCV_(&type, (char *) &len_pgrp, &lenbuf, &lenmes, &nodesel, &nodefrom,
272                                                               &sync);
273   if (DEBUG_) {
274     (void) printf("len_pgrp = %ld\n",len_pgrp); (void) fflush(stdout);
275   }
276   if ( (procgrp = malloc((unsigned) len_pgrp)) == (char *) NULL )
277     Error("pbegin: failed to allocate procgrp",len_pgrp);
278 
279   if (DEBUG_) {
280 	(void) printf("pbegin: %ld get progcrp len=%ld\n",NODEID_(),len_pgrp);
281 	(void) fflush(stdout);
282   }
283   type = TYPE_BEGIN | MSGCHR;
284   RCV_(&type, procgrp, &len_pgrp, &lenmes, &nodesel, &nodefrom, &sync);
285   if (DEBUG_) {
286     (void) printf("procgrp:\n%55s...\n",procgrp); (void) fflush(stdout);
287     (void) fflush(stdout);
288   }
289 
290   /* Parse the procgrp to fill out SR_clus_info ... it also again works out
291      SR_n_clus and SR_n_proc ... ugh */
292 
293   InitClusInfo(procgrp, masterhostname);
294 
295   if (DEBUG_) {
296     PrintClusInfo();
297     (void) fflush(stdout);
298   }
299 
300   /* Change to desired working directory ... forked processes
301      will inherit it */
302 
303   if(chdir(SR_clus_info[SR_clus_id].workdir) != 0)
304     Error("pbegin: failed to switch to work directory", (long) -1);
305 
306   if (DEBUG_) {
307     printf("%2ld: pbegin: changed to working directory %s\n",
308 	   NODEID_(), SR_clus_info[SR_clus_id].workdir);
309     (void) fflush(stdout);
310    }
311 
312   /* If we have more than 1 process in this cluster we have to
313      create the shared memory and semaphores and fork the processes
314      partitioning out the resources */
315 
316   SR_using_shmem = 0;
317 #if defined(SHMEM) || defined(SYSV)
318   me = NODEID_();
319   nslave = SR_clus_info[SR_clus_id].nslave;
320   if (nslave > 1) {
321     SR_proc_info[me].shmem_size = nslave*SHMEM_BUF_SIZE +
322       (nslave+1)*sizeof(long);
323     SR_proc_info[me].shmem_size =
324       ((SR_proc_info[me].shmem_size - 1)/4096)*4096 + 4096;
325     if (DEBUG_) {
326       (void) printf("pbegin: %ld allocate shmem, nslave=%ld\n",
327 		    NODEID_(), nslave);
328       (void) fflush(stdout);
329     }
330     SR_using_shmem = 1;
331     SR_proc_info[me].shmem = CreateSharedRegion(&SR_proc_info[me].shmem_id,
332 						&SR_proc_info[me].shmem_size);
333     if (DEBUG_) {
334 	(void) printf("pbegin: %ld allocate sema, nslave=%ld\n",
335 			NODEID_(), nslave);
336 	(void) fflush(stdout);
337     }
338 
339     flags = (long *) (SR_proc_info[me].shmem + nslave*SHMEM_BUF_SIZE);
340 
341     (void) bzero(SR_proc_info[me].shmem, SR_proc_info[me].shmem_size);
342 
343     for (i=0; i<nslave; i++) {
344       ((MessageHeader *)
345        (SR_proc_info[me].shmem + i * SHMEM_BUF_SIZE))->nodeto = -1;
346       flags[i] = FALSE;
347     }
348 
349 #if defined(NOSPIN)
350     SR_proc_info[me].semid = SemSetCreate((long) 3*nslave, (long) 0);
351 #else
352 #ifdef KSR_NATIVE
353     /* Bind myself to a processor */
354     KSR_BindProcess(0);
355     if (DEBUG_) {
356       (void) printf("pbegin: bound master process\n");
357       (void) fflush(stdout);
358     }
359 #endif
360 #endif
361 
362 #if defined(SOLARIS)
363     /* If there fewer processes than processors it appears beneficial
364        to bind processes to processors.  It also appears useful to
365        leave the lowest numbered processors free (???).
366        BUT ... this code is not general enough since the configured
367        processors are not necessarily numbered consecutively and
368        we also need to add logic to determine the list of processors
369        that have not already been bound to a process.
370 
371        Need to also modify the code below for binding slaves and enable
372        the include of processor.h and procset.h */
373 
374     /* printf("binding master process %d to processor %d\n", getpid(), 31-0);
375     if (processor_bind(P_PID, P_MYID, 31-0, (void *) NULL))
376     printf("binding to %d failed\n", 31-0); */
377 #endif
378 
379 
380     for (i=1; i<nslave; i++) {
381       if (DEBUG_) {
382   	(void) printf("pbegin: %ld fork process, i=%ld\n", NODEID_(), nslave);
383   	(void) fflush(stdout);
384       }
385 #if   defined(CONVEX) && defined(HPUX)
386       status=i/8; /* on SPP-1200 there are eight processors per hypernode */
387       status = cnx_sc_fork(CNX_INHERIT_SC,status);
388 #else
389       status = fork();
390 #endif
391       if (status < 0)
392         Error("pbegin: error forking process",status);
393       else if (status == 0) {
394 
395 	/* Child process */
396 	me = SR_proc_id += i;               /* change process id */
397 
398 #ifdef KSR_NATIVE
399 	/* Bind myself to a processor */
400 	KSR_BindProcess(me);
401 	if (DEBUG_) {
402 	  (void) printf("pbegin: bound slave process %ld\n", NODEID_());
403 	  (void) fflush(stdout);
404 	}
405 #endif
406 
407 #if defined(SOLARIS)
408 	/*printf("binding slave process %d to processor %d\n", getpid(), 31-i);
409 	if (processor_bind(P_PID, P_MYID, 31-i, (void *) NULL))
410 	printf("binding to %d failed\n", 31-i); */
411 #endif
412 
413 	/* Tidy up files */
414 	if (SR_clus_id == 0)                /* if not 0 is shut already */
415 	  (void) fclose(stdin);
416 	(void) close(SR_proc_info[SR_n_proc].sock);
417 	SR_proc_info[SR_n_proc].sock = -1;  /* eliminate connection */
418 	break;
419       }
420       else if (status > 0)
421 	SR_pids[SR_numchild++] = status;
422     }
423 
424     masterid = SR_clus_info[SR_clus_id].masterid;
425 
426     for (i=masterid; i<(masterid+nslave); i++) {
427       long slaveid = i - masterid;
428       SR_proc_info[i].slaveid = slaveid;
429       SR_proc_info[i].local = 1;
430       SR_proc_info[i].sock = -1;
431       SR_proc_info[i].shmem = SR_proc_info[masterid].shmem;
432       SR_proc_info[i].shmem_size = SR_proc_info[masterid].shmem_size;
433       SR_proc_info[i].shmem_id = SR_proc_info[masterid].shmem_id;
434 #ifndef KSR_NATIVE
435       SR_proc_info[i].header = (MessageHeader *)
436 	(SR_proc_info[i].shmem + slaveid * SHMEM_BUF_SIZE);
437 /*      SR_proc_info[i].header->nodeto = -1; */
438       SR_proc_info[i].buffer = ((char *) SR_proc_info[i].header) +
439 	sizeof(MessageHeader) + (sizeof(MessageHeader) % 8);
440       SR_proc_info[i].buflen = SHMEM_BUF_SIZE - sizeof(MessageHeader) -
441 	(sizeof(MessageHeader) % 8);
442 #ifdef NOSPIN
443       SR_proc_info[i].semid = SR_proc_info[masterid].semid;
444       SR_proc_info[i].sem_pend = 3*slaveid;
445       SR_proc_info[i].sem_read = 3*slaveid + 1;
446       SR_proc_info[i].sem_written = 3*slaveid + 2;
447 #else
448       SR_proc_info[i].semid = -1;
449 #endif
450       SR_proc_info[i].buffer_full = flags + slaveid;
451 /*      *SR_proc_info[i].buffer_full = FALSE;*/
452 #endif
453     }
454 
455 #ifdef KSR_NATIVE
456     /* Map the data structures onto the shared memory */
457     KSR_MapBufferSpace(masterid, nslave);
458     if (DEBUG_) {
459       (void) printf("pbegin: %2ld: Mapped buffer space\n", NODEID_());
460       (void) fflush(stdout);
461     }
462 #else
463     /* Post read semaphore to make sends partially asynchronous */
464 
465 #ifdef NOSPIN
466     SemPost(SR_proc_info[me].semid, SR_proc_info[me].sem_read);
467 #endif
468 #endif
469 
470 #ifdef KSR_NATIVE
471     /* Initialize the buffer space data structures */
472     KSR_InitBufferSpace();
473     if (DEBUG_) {
474       (void) printf("pbegin: %2ld: Initialized buffer space\n", NODEID_());
475       (void) fflush(stdout);
476     }
477 #endif
478 
479   }
480 
481 #else
482   if (SR_clus_info[SR_clus_id].nslave != 1)
483     Error("pbegin: no shared memory on this host ... nslave=1 only",
484 	  SR_clus_info[SR_clus_id].nslave);
485 #endif
486 
487   /* Now have to connect everyone together */
488 
489   ConnectAll();
490 
491   /* If we are only using sockets we can block in select when waiting for a message */
492   SR_nsock = 0;
493   for (i=0; i<(SR_n_proc+1); i++) {
494     if (SR_proc_info[i].sock >= 0) {
495       SR_socks[SR_nsock] = SR_proc_info[i].sock;
496       SR_socks_proc[SR_nsock] = i;
497       SR_nsock++;
498     }
499   }
500   /* Synchronize timers before returning to application
501      or logging any events */
502 
503   (void) TCGTIME_();
504   type = TYPE_CLOCK_SYNCH;
505   SYNCH_(&type);
506   MtimeReset();
507 
508   /* If logging events make the file events.<nodeid> */
509 
510 #ifdef EVENTLOG
511   if (eventfile=malloc((unsigned) 32)) {
512     (void) sprintf(eventfile, "events.%03ld", NODEID_());
513     evlog(EVKEY_ENABLE, EVKEY_FILENAME, eventfile,
514 	  EVKEY_BEGIN, EVENT_PROCESS,
515 	  EVKEY_STR_INT, "Startup used (cs)", (int) (MTIME_()-start),
516 	  EVKEY_STR_INT, "No. of processes", (int) NNODES_(),
517 	  EVKEY_DISABLE,
518 	  EVKEY_LAST_ARG);
519     (void) free(eventfile);
520     SYNCH_(&type);
521   }
522 #endif
523 
524   if (DEBUG_) {
525     printf("pbegin: %2ld: Returning to application\n",NODEID_());
526     fflush(stdout);
527   }
528 }
529 
PEND_()530 void PEND_()
531 /*
532   Call this to tidy up after parallel section.
533   The cluster master is responsible for tidying up any shared
534   memory/semaphore resources. Everyone else can just quit.
535 
536   Woops ... everyone should return so that FORTRAN can tidy up
537   after itself.
538 */
539 {
540   long me = NODEID_();
541   long masterid = SR_clus_info[SR_clus_id].masterid;
542   long nslave = SR_clus_info[SR_clus_id].nslave;
543   long zero = 0;
544   long status;
545 #ifdef EVENTLOG
546   long start=MTIME_();
547 #endif
548 
549   SR_initialized = 0;
550   if (!SR_parallel) return;
551 
552   (void) signal(SIGCHLD, SIG_DFL); /* Death of children now OK */
553   (void) NXTVAL_(&zero);  /* Send termination flag to nxtval server */
554 
555   if (me != masterid)
556     status = 0;
557   else {
558     status = WaitAll(nslave-1);       /* Wait for demise of children */
559 #if defined(SHMEM) || defined(SYSV)
560     if (nslave > 1) {
561 #if defined(NOSPIN)
562       (void) SemSetDestroyAll();      /* Ex the semaphores and shmem */
563 #endif
564       (void) DeleteSharedRegion(SR_proc_info[me].shmem_id);
565     }
566 #endif
567   }
568 
569   ShutdownAll();    /* Close sockets for machines with static kernel */
570 
571 
572   /* If logging events log end of process and dump trace */
573 #ifdef EVENTLOG
574   evlog(EVKEY_ENABLE,
575 	EVKEY_END, EVENT_PROCESS,
576 	EVKEY_STR_INT, "Time (cs) waiting to finish", (int) (MTIME_()-start),
577 	EVKEY_DUMP,
578 	EVKEY_LAST_ARG);
579 #endif
580   /* Return to calling program unless we had an error */
581 
582   if (status)
583     exit((int) status);
584 }
585 
586 
tcgi_alt_pbegin(int * argc,char ** argv[])587 void tcgi_alt_pbegin(int *argc, char **argv[])
588 {
589   tcgi_pbegin(*argc, *argv);
590 }
591 
592