1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3 * (C) 2008 by Argonne National Laboratory.
4 * See COPYRIGHT in top-level directory.
5 */
6 #include "mpi.h"
7 #include <pthread.h>
8 #include <signal.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <time.h>
13 #include <unistd.h>
14
15 #include "connectstuff.h"
16
main(int argc,char ** argv)17 int main( int argc, char ** argv ) {
18 MPI_Comm tmp, comm, startComm;
19 char * fname;
20 char * actualFname = NULL;
21 char * globalFname = NULL;
22 int totalSize, expectedRank, size, cachedRank;
23 char portName[MPI_MAX_PORT_NAME];
24 int rankToAccept = 1;
25
26 /* Debug - print out where we picked up the MPICH build from */
27 #ifdef MPICHLIBSTR
28 msg( "MPICH library taken from: %s\n", MPICHLIBSTR );
29 #endif
30
31 if( argc != 4 ) {
32 printf( "Usage: %s <fname> <totalSize> <idx-1-based>\n", argv[0] );
33 exit( 1 );
34 }
35
36 /* This is the base name of the file into which we write the port */
37 fname = argv[1];
38 /* This is the total number of processes launched */
39 totalSize = atoi( argv[2] );
40 /* Each process knows its expected rank */
41 expectedRank = atoi( argv[3] )-1;
42
43 /* Start a watchdog thread which will abort after 120 seconds, and will
44 * print stack traces using GDB every 5 seconds if you don't call
45 * strokeWatchdog() */
46 startWatchdog( 120 );
47
48 /* Print a debug header */
49 msg( "Waiting for: %d - my rank is %d\n", totalSize, expectedRank );
50
51 /* Singleton init */
52 MPI_Init( 0, 0 );
53
54 /* Duplicate from MPI_COMM_SELF the starting point */
55 MPI_Comm_dup( MPI_COMM_SELF, &startComm );
56
57
58 if( expectedRank == 0 ) {
59 /* This process opens the port, and writes the information to the file */
60 MPI_Open_port( MPI_INFO_NULL, portName );
61
62 /* Write the port to fname.<rank> so that the connecting processes can
63 * wait their turn by checking for the correct file to show up */
64 actualFname = writePortToFile( portName, "%s.%d", fname, rankToAccept++ );
65
66 /* The wrapper script I'm using checks for the existance of "fname", so
67 * create that - even though it isn't used */
68 globalFname = writePortToFile( portName, fname );
69 installExitHandler( globalFname );
70
71 comm = startComm;
72 } else {
73 char * readPort;
74 readPort = getPortFromFile( "%s.%d", fname, expectedRank );
75 strncpy( portName, readPort, MPI_MAX_PORT_NAME );
76 free( readPort );
77 msg( "Read port <%s>\n", portName );
78
79 MPI_Comm_connect( portName, MPI_INFO_NULL, 0, startComm, &comm );
80 MPI_Intercomm_merge( comm, 1, &tmp );
81 comm = tmp;
82 MPI_Comm_size( comm, &size );
83 msg( "After my first merge, size is now: %d\n", size );
84 }
85 while( size < totalSize ) {
86 /* Make sure we don't print a stack until we stall */
87 strokeWatchdog();
88
89 /* Accept the connection */
90 MPI_Comm_accept( portName, MPI_INFO_NULL, 0, comm, &tmp );
91
92 /* Merge into intracomm */
93 MPI_Intercomm_merge( tmp, 0, &comm );
94
95 /* Free the intercomm */
96 MPI_Comm_free( &tmp );
97
98 /* See where we're up to */
99 MPI_Comm_rank( comm, &cachedRank );
100 MPI_Comm_size( comm, &size );
101
102 if( expectedRank == 0 ) {
103 msg( "Up to size: %d\n", size );
104
105 /* Delete the old file, create the new one */
106 unlink( actualFname );
107 free( actualFname );
108
109 /* Allow the next rank to connect */
110 actualFname = writePortToFile( portName, "%s.%d", fname, rankToAccept++ );
111 }
112 }
113 MPI_Comm_rank( comm, &cachedRank );
114
115 msg( "All done - I got rank: %d.\n", cachedRank );
116
117 MPI_Barrier( comm );
118
119 if( expectedRank == 0 ) {
120
121 /* Cleanup on rank zero - delete some files */
122 sleep( 4 );
123 unlink( actualFname );
124 free( actualFname );
125 unlink( globalFname );
126 free( globalFname );
127
128 /* This lets my wrapper script know that we did everything correctly */
129 indicateConnectSucceeded();
130 }
131 MPI_Finalize();
132
133 return 0;
134 }
135