1 #if HAVE_CONFIG_H
2 # include "config.h"
3 #endif
4
5 /* $Header: /tmp/hpctools/ga/tcgmsg/ipcv4.0/pbegin.c,v 1.20 2005-02-22 18:47:02 manoj Exp $ */
6
7 #include <stdio.h>
8 #include <signal.h>
9 #include <unistd.h>
10 #ifdef SEQUENT
11 #include <strings.h>
12 #else
13 #include <string.h>
14 #endif
15 #include <sys/types.h>
16 #include <sys/time.h>
17 #if defined(CONVEX) && defined(HPUX)
18 #include <sys/cnx_types.h>
19 #endif
20 #if defined(SUN) || defined(ALLIANT) || defined(ENCORE) || defined(SEQUENT) \
21 || defined(CONVEX) || defined(AIX) || defined(NEXT) \
22 || defined(LINUX)
23 #include <sys/wait.h>
24 #endif
25
26 #if defined(SHMEM) || defined(SYSV)
27 # if (defined(SGI_N32) || defined(SGITFP))
28 # define PARTIALSPIN
29 # else
30 # define NOSPIN
31 # endif
32 #endif
33
34 #if defined(SOLARIS)
35 /* See notes below on processor binding */
36 /*#include <sys/processor.h>*/
37 /*#include <sys/procset.h>*/
38 #endif
39
40 #include "cluster.h"
41 #include "sndrcv.h"
42 #include "sndrcvP.h"
43 #include "signals.h"
44 #include "tcgsockets.h"
45
46 #if defined(SHMEM) || defined(SYSV)
47 #include "tcgshmem.h"
48 #include "sema.h"
49 #endif
50
51 #ifdef EVENTLOG
52 #include "evlog.h"
53 #endif
54
55 extern void exit();
56 extern void InitClusInfoNotParallel();
57 extern int WaitAll(long nchild);
58
59 #if defined(ALLIANT) || defined(ENCORE) || defined(SEQUENT) || \
60 defined(CONVEX) || defined(ARDENT) || defined(ULTRIX) || defined(AIX) || \
61 defined(NEXT) || defined(DECOSF)
62 extern char *strdup();
63 #endif
64
65 #define max(A, B) ( (A) > (B) ? (A) : (B) )
66 #define min(A, B) ( (A) < (B) ? (A) : (B) )
67
68 #if defined(ULTRIX) || defined(SGI) || defined(NEXT) || defined(HPUX) || \
69 defined(KSR) || defined(DECOSF)
70 extern void *malloc();
71 #else
72 #include <stdlib.h>
73 #endif
74
75 #ifdef IPSC
76 #define bzero(A,N) memset((A), 0, (N))
77 #endif
78
79 static int SR_initialized=0;
TCGREADY_()80 long TCGREADY_()
81 {
82 return (long)SR_initialized;
83 }
84
85
ConnectAll()86 static void ConnectAll()
87 {
88 long j, k, clus1, clus2, node1, node2, nslave1, nslave2;
89
90 for (clus1=1; clus1 < SR_n_clus; clus1++) {
91
92 node1 = SR_clus_info[clus1].masterid;
93 nslave1 = SR_clus_info[clus1].nslave;
94
95 for (clus2=0; clus2 < clus1; clus2++) {
96
97 node2 = SR_clus_info[clus2].masterid;
98
99 RemoteConnect(node1, node2, SR_n_proc); /* connect masters */
100
101 #if defined(SHMEM) || defined(SYSV)
102
103 nslave2 = SR_clus_info[clus2].nslave;
104
105 for (j=1; j<nslave1; j++) {
106 RemoteConnect(node1+j, node2, node1);
107 for (k=1; k<nslave2; k++)
108 RemoteConnect(node1+j, node2+k, node2);
109 }
110 for (k=1; k<nslave2; k++)
111 RemoteConnect(node1, node2+k, node2);
112
113 #endif
114 }
115 }
116
117 /* Connect local slaves to master soley for next value service */
118
119 for (clus1=0; clus1 < SR_n_clus; clus1++)
120 for (j=1; j<SR_clus_info[clus1].nslave; j++)
121 RemoteConnect(SR_n_proc,
122 SR_clus_info[clus1].masterid + j,
123 SR_clus_info[clus1].masterid);
124 }
125
PrintArgs(argc,argv)126 static void PrintArgs(argc, argv)
127 int argc;
128 char **argv;
129 {
130 int i;
131
132 for (i=0; i<argc; i++)
133 (void) printf("argv[%d] = %s\n",i, argv[i]);
134
135 (void) fflush(stdout);
136 }
137
138
tcgi_pbegin(argc,argv)139 void tcgi_pbegin(argc, argv)
140 int argc;
141 char **argv;
142 /*
143 first thing to call on entering program
144
145 if the argument list contains
146 '-master hostname port nclus nproc clusid procid'
147 then this is running in parallel, else we are just running as
148 a single process.
149 */
150 {
151 long i, masterid, len_pgrp, lenbuf, lenmes, sync=1;
152 long nodesel, nodefrom, type, me, nslave, status;
153 char *masterhostname, *cport;
154 char *procgrp;
155 #ifdef EVENTLOG
156 long start=MTIME_();
157 char *eventfile;
158 #endif
159 #if defined(SHMEM) || defined(SYSV)
160 long *flags;
161 #endif
162
163 if(SR_initialized)Error("TCGMSG initialized already???",-1);
164 else SR_initialized=1;
165
166 if (DEBUG_) {
167 (void) printf("In pbegin .. print the arguments\n");
168 PrintArgs(argc, argv);
169 (void) fflush(stdout);
170 }
171
172 /* First initialize the globals as if only one process */
173
174 if (DEBUG_) {
175 (void) printf("pbegin: InitGlobal\n");
176 (void) fflush(stdout);
177 }
178 InitGlobal();
179
180 /* Set up handler for SIGINT and SIGCHLD */
181
182 TrapSigint();
183 TrapSigchld();
184
185 /* If '-master host port' is not present return, else extract the
186 master's hostname and port number */
187
188 if (DEBUG_) {
189 (void) printf("pbegin: look for -master in arglist\n");
190 (void) fflush(stdout);
191 }
192
193 for (i=1; i<argc; i++)
194 if (strcmp(argv[i],"-master") == 0) {
195 if ( (i+6) >= argc )
196 Error("pbegin: -master present but not other arguments",
197 (long) argc);
198 break;
199 }
200
201 if ( (i+6) >= argc ) {
202 SR_parallel = FALSE;
203 InitClusInfoNotParallel();
204 SR_n_clus=1;
205 return;
206 }
207 else
208 SR_parallel = TRUE;
209
210 if (DEBUG_) {
211 (void) printf("pbegin: assign argument values\n");
212 (void) fflush(stdout);
213 }
214
215 masterhostname = strdup(argv[i+1]);
216 cport = strdup(argv[i+2]);
217 SR_n_clus = atoi(argv[i+3]);
218 SR_n_proc = atoi(argv[i+4]);
219 SR_clus_id = atoi(argv[i+5]);
220 SR_proc_id = atoi(argv[i+6]);
221
222 /* Check out some of this info */
223
224 if ((SR_n_clus >= MAX_CLUSTER) || (SR_n_clus < 1))
225 Error("pbegin: invalid no. of clusters", SR_n_clus);
226 if ((SR_n_proc >= MAX_PROCESS) || (SR_n_proc < 1))
227 Error("pbegin: invalid no. of processes", SR_n_proc);
228 if ((SR_clus_id >= SR_n_clus) || (SR_clus_id < 0))
229 Error("pbegin: invalid cluster id", SR_clus_id);
230 if ((SR_proc_id >= SR_n_proc) || (SR_proc_id < 0))
231 Error("pbegin: invalid process id", SR_proc_id);
232
233 /* Close all files we don't need. Process 0 keeps stdin/out/err.
234 All others only stdout/err. */
235
236 if (SR_clus_id != 0)
237 (void) fclose(stdin);
238 #ifdef SPARC64_GP
239 for (i=3; i<62; i++)
240 #else
241 for (i=3; i<64; i++)
242 #endif
243 (void) close((int) i);
244
245 /* Connect to the master process which will have process id
246 equal to the number of processes */
247
248 if (DEBUG_) {
249 (void) printf("pbegin: %ld CreateSocketAndConnect\n",NODEID_());
250 (void) fflush(stdout);
251 }
252 masterid = SR_n_proc;
253 SR_proc_info[SR_n_proc].sock = CreateSocketAndConnect(masterhostname,
254 cport);
255
256 /* Now we have initialized this info we should be able to use the
257 standard interface routines rather than accessing the SR variables
258 directly */
259
260 /* Get the procgrp from the master process
261
262 Note that byteordering and word length start to be an issue. */
263
264 if (DEBUG_) {
265 (void) printf("pbegin: %ld get len_pgrp\n",NODEID_());
266 (void) fflush(stdout);
267 }
268 type = TYPE_BEGIN | MSGINT;
269 lenbuf = sizeof(long);
270 nodesel = masterid;
271 RCV_(&type, (char *) &len_pgrp, &lenbuf, &lenmes, &nodesel, &nodefrom,
272 &sync);
273 if (DEBUG_) {
274 (void) printf("len_pgrp = %ld\n",len_pgrp); (void) fflush(stdout);
275 }
276 if ( (procgrp = malloc((unsigned) len_pgrp)) == (char *) NULL )
277 Error("pbegin: failed to allocate procgrp",len_pgrp);
278
279 if (DEBUG_) {
280 (void) printf("pbegin: %ld get progcrp len=%ld\n",NODEID_(),len_pgrp);
281 (void) fflush(stdout);
282 }
283 type = TYPE_BEGIN | MSGCHR;
284 RCV_(&type, procgrp, &len_pgrp, &lenmes, &nodesel, &nodefrom, &sync);
285 if (DEBUG_) {
286 (void) printf("procgrp:\n%55s...\n",procgrp); (void) fflush(stdout);
287 (void) fflush(stdout);
288 }
289
290 /* Parse the procgrp to fill out SR_clus_info ... it also again works out
291 SR_n_clus and SR_n_proc ... ugh */
292
293 InitClusInfo(procgrp, masterhostname);
294
295 if (DEBUG_) {
296 PrintClusInfo();
297 (void) fflush(stdout);
298 }
299
300 /* Change to desired working directory ... forked processes
301 will inherit it */
302
303 if(chdir(SR_clus_info[SR_clus_id].workdir) != 0)
304 Error("pbegin: failed to switch to work directory", (long) -1);
305
306 if (DEBUG_) {
307 printf("%2ld: pbegin: changed to working directory %s\n",
308 NODEID_(), SR_clus_info[SR_clus_id].workdir);
309 (void) fflush(stdout);
310 }
311
312 /* If we have more than 1 process in this cluster we have to
313 create the shared memory and semaphores and fork the processes
314 partitioning out the resources */
315
316 SR_using_shmem = 0;
317 #if defined(SHMEM) || defined(SYSV)
318 me = NODEID_();
319 nslave = SR_clus_info[SR_clus_id].nslave;
320 if (nslave > 1) {
321 SR_proc_info[me].shmem_size = nslave*SHMEM_BUF_SIZE +
322 (nslave+1)*sizeof(long);
323 SR_proc_info[me].shmem_size =
324 ((SR_proc_info[me].shmem_size - 1)/4096)*4096 + 4096;
325 if (DEBUG_) {
326 (void) printf("pbegin: %ld allocate shmem, nslave=%ld\n",
327 NODEID_(), nslave);
328 (void) fflush(stdout);
329 }
330 SR_using_shmem = 1;
331 SR_proc_info[me].shmem = CreateSharedRegion(&SR_proc_info[me].shmem_id,
332 &SR_proc_info[me].shmem_size);
333 if (DEBUG_) {
334 (void) printf("pbegin: %ld allocate sema, nslave=%ld\n",
335 NODEID_(), nslave);
336 (void) fflush(stdout);
337 }
338
339 flags = (long *) (SR_proc_info[me].shmem + nslave*SHMEM_BUF_SIZE);
340
341 (void) bzero(SR_proc_info[me].shmem, SR_proc_info[me].shmem_size);
342
343 for (i=0; i<nslave; i++) {
344 ((MessageHeader *)
345 (SR_proc_info[me].shmem + i * SHMEM_BUF_SIZE))->nodeto = -1;
346 flags[i] = FALSE;
347 }
348
349 #if defined(NOSPIN)
350 SR_proc_info[me].semid = SemSetCreate((long) 3*nslave, (long) 0);
351 #else
352 #ifdef KSR_NATIVE
353 /* Bind myself to a processor */
354 KSR_BindProcess(0);
355 if (DEBUG_) {
356 (void) printf("pbegin: bound master process\n");
357 (void) fflush(stdout);
358 }
359 #endif
360 #endif
361
362 #if defined(SOLARIS)
363 /* If there fewer processes than processors it appears beneficial
364 to bind processes to processors. It also appears useful to
365 leave the lowest numbered processors free (???).
366 BUT ... this code is not general enough since the configured
367 processors are not necessarily numbered consecutively and
368 we also need to add logic to determine the list of processors
369 that have not already been bound to a process.
370
371 Need to also modify the code below for binding slaves and enable
372 the include of processor.h and procset.h */
373
374 /* printf("binding master process %d to processor %d\n", getpid(), 31-0);
375 if (processor_bind(P_PID, P_MYID, 31-0, (void *) NULL))
376 printf("binding to %d failed\n", 31-0); */
377 #endif
378
379
380 for (i=1; i<nslave; i++) {
381 if (DEBUG_) {
382 (void) printf("pbegin: %ld fork process, i=%ld\n", NODEID_(), nslave);
383 (void) fflush(stdout);
384 }
385 #if defined(CONVEX) && defined(HPUX)
386 status=i/8; /* on SPP-1200 there are eight processors per hypernode */
387 status = cnx_sc_fork(CNX_INHERIT_SC,status);
388 #else
389 status = fork();
390 #endif
391 if (status < 0)
392 Error("pbegin: error forking process",status);
393 else if (status == 0) {
394
395 /* Child process */
396 me = SR_proc_id += i; /* change process id */
397
398 #ifdef KSR_NATIVE
399 /* Bind myself to a processor */
400 KSR_BindProcess(me);
401 if (DEBUG_) {
402 (void) printf("pbegin: bound slave process %ld\n", NODEID_());
403 (void) fflush(stdout);
404 }
405 #endif
406
407 #if defined(SOLARIS)
408 /*printf("binding slave process %d to processor %d\n", getpid(), 31-i);
409 if (processor_bind(P_PID, P_MYID, 31-i, (void *) NULL))
410 printf("binding to %d failed\n", 31-i); */
411 #endif
412
413 /* Tidy up files */
414 if (SR_clus_id == 0) /* if not 0 is shut already */
415 (void) fclose(stdin);
416 (void) close(SR_proc_info[SR_n_proc].sock);
417 SR_proc_info[SR_n_proc].sock = -1; /* eliminate connection */
418 break;
419 }
420 else if (status > 0)
421 SR_pids[SR_numchild++] = status;
422 }
423
424 masterid = SR_clus_info[SR_clus_id].masterid;
425
426 for (i=masterid; i<(masterid+nslave); i++) {
427 long slaveid = i - masterid;
428 SR_proc_info[i].slaveid = slaveid;
429 SR_proc_info[i].local = 1;
430 SR_proc_info[i].sock = -1;
431 SR_proc_info[i].shmem = SR_proc_info[masterid].shmem;
432 SR_proc_info[i].shmem_size = SR_proc_info[masterid].shmem_size;
433 SR_proc_info[i].shmem_id = SR_proc_info[masterid].shmem_id;
434 #ifndef KSR_NATIVE
435 SR_proc_info[i].header = (MessageHeader *)
436 (SR_proc_info[i].shmem + slaveid * SHMEM_BUF_SIZE);
437 /* SR_proc_info[i].header->nodeto = -1; */
438 SR_proc_info[i].buffer = ((char *) SR_proc_info[i].header) +
439 sizeof(MessageHeader) + (sizeof(MessageHeader) % 8);
440 SR_proc_info[i].buflen = SHMEM_BUF_SIZE - sizeof(MessageHeader) -
441 (sizeof(MessageHeader) % 8);
442 #ifdef NOSPIN
443 SR_proc_info[i].semid = SR_proc_info[masterid].semid;
444 SR_proc_info[i].sem_pend = 3*slaveid;
445 SR_proc_info[i].sem_read = 3*slaveid + 1;
446 SR_proc_info[i].sem_written = 3*slaveid + 2;
447 #else
448 SR_proc_info[i].semid = -1;
449 #endif
450 SR_proc_info[i].buffer_full = flags + slaveid;
451 /* *SR_proc_info[i].buffer_full = FALSE;*/
452 #endif
453 }
454
455 #ifdef KSR_NATIVE
456 /* Map the data structures onto the shared memory */
457 KSR_MapBufferSpace(masterid, nslave);
458 if (DEBUG_) {
459 (void) printf("pbegin: %2ld: Mapped buffer space\n", NODEID_());
460 (void) fflush(stdout);
461 }
462 #else
463 /* Post read semaphore to make sends partially asynchronous */
464
465 #ifdef NOSPIN
466 SemPost(SR_proc_info[me].semid, SR_proc_info[me].sem_read);
467 #endif
468 #endif
469
470 #ifdef KSR_NATIVE
471 /* Initialize the buffer space data structures */
472 KSR_InitBufferSpace();
473 if (DEBUG_) {
474 (void) printf("pbegin: %2ld: Initialized buffer space\n", NODEID_());
475 (void) fflush(stdout);
476 }
477 #endif
478
479 }
480
481 #else
482 if (SR_clus_info[SR_clus_id].nslave != 1)
483 Error("pbegin: no shared memory on this host ... nslave=1 only",
484 SR_clus_info[SR_clus_id].nslave);
485 #endif
486
487 /* Now have to connect everyone together */
488
489 ConnectAll();
490
491 /* If we are only using sockets we can block in select when waiting for a message */
492 SR_nsock = 0;
493 for (i=0; i<(SR_n_proc+1); i++) {
494 if (SR_proc_info[i].sock >= 0) {
495 SR_socks[SR_nsock] = SR_proc_info[i].sock;
496 SR_socks_proc[SR_nsock] = i;
497 SR_nsock++;
498 }
499 }
500 /* Synchronize timers before returning to application
501 or logging any events */
502
503 (void) TCGTIME_();
504 type = TYPE_CLOCK_SYNCH;
505 SYNCH_(&type);
506 MtimeReset();
507
508 /* If logging events make the file events.<nodeid> */
509
510 #ifdef EVENTLOG
511 if (eventfile=malloc((unsigned) 32)) {
512 (void) sprintf(eventfile, "events.%03ld", NODEID_());
513 evlog(EVKEY_ENABLE, EVKEY_FILENAME, eventfile,
514 EVKEY_BEGIN, EVENT_PROCESS,
515 EVKEY_STR_INT, "Startup used (cs)", (int) (MTIME_()-start),
516 EVKEY_STR_INT, "No. of processes", (int) NNODES_(),
517 EVKEY_DISABLE,
518 EVKEY_LAST_ARG);
519 (void) free(eventfile);
520 SYNCH_(&type);
521 }
522 #endif
523
524 if (DEBUG_) {
525 printf("pbegin: %2ld: Returning to application\n",NODEID_());
526 fflush(stdout);
527 }
528 }
529
PEND_()530 void PEND_()
531 /*
532 Call this to tidy up after parallel section.
533 The cluster master is responsible for tidying up any shared
534 memory/semaphore resources. Everyone else can just quit.
535
536 Woops ... everyone should return so that FORTRAN can tidy up
537 after itself.
538 */
539 {
540 long me = NODEID_();
541 long masterid = SR_clus_info[SR_clus_id].masterid;
542 long nslave = SR_clus_info[SR_clus_id].nslave;
543 long zero = 0;
544 long status;
545 #ifdef EVENTLOG
546 long start=MTIME_();
547 #endif
548
549 SR_initialized = 0;
550 if (!SR_parallel) return;
551
552 (void) signal(SIGCHLD, SIG_DFL); /* Death of children now OK */
553 (void) NXTVAL_(&zero); /* Send termination flag to nxtval server */
554
555 if (me != masterid)
556 status = 0;
557 else {
558 status = WaitAll(nslave-1); /* Wait for demise of children */
559 #if defined(SHMEM) || defined(SYSV)
560 if (nslave > 1) {
561 #if defined(NOSPIN)
562 (void) SemSetDestroyAll(); /* Ex the semaphores and shmem */
563 #endif
564 (void) DeleteSharedRegion(SR_proc_info[me].shmem_id);
565 }
566 #endif
567 }
568
569 ShutdownAll(); /* Close sockets for machines with static kernel */
570
571
572 /* If logging events log end of process and dump trace */
573 #ifdef EVENTLOG
574 evlog(EVKEY_ENABLE,
575 EVKEY_END, EVENT_PROCESS,
576 EVKEY_STR_INT, "Time (cs) waiting to finish", (int) (MTIME_()-start),
577 EVKEY_DUMP,
578 EVKEY_LAST_ARG);
579 #endif
580 /* Return to calling program unless we had an error */
581
582 if (status)
583 exit((int) status);
584 }
585
586
tcgi_alt_pbegin(int * argc,char ** argv[])587 void tcgi_alt_pbegin(int *argc, char **argv[])
588 {
589 tcgi_pbegin(*argc, *argv);
590 }
591
592