1#!/usr/bin/env python
2#
3#   (C) 2001 by Argonne National Laboratory.
4#       See COPYRIGHT in top-level directory.
5#
6
7"""
8usage:
9mpiexec [-h or -help or --help]    # get this message
10mpiexec -file filename             # (or -f) filename contains XML job description
11mpiexec [global args] [local args] executable [args]
12   where global args may be
13      -l                           # line labels by MPI rank
14      -bnr                         # MPICH1 compatibility mode
15      -machinefile                 # file mapping procs to machines
16      -s <spec>                    # direct stdin to "all" or 1,2 or 2-4,6
17      -1                           # override default of trying 1st proc locally
18      -ifhn                        # network interface to use locally
19      -tv                          # run procs under totalview (must be installed)
20      -tvsu                        # totalview startup only
21      -gdb                         # run procs under gdb
22      -m                           # merge output lines (default with gdb)
23      -a                           # means assign this alias to the job
24      -ecfn                        # output_xml_exit_codes_filename
25      -recvtimeout <integer_val>   # timeout for recvs to fail (e.g. from mpd daemon)
26      -g<local arg name>           # global version of local arg (below)
27    and local args may be
28      -n <n> or -np <n>            # number of processes to start
29      -wdir <dirname>              # working directory to start in
30      -umask <umask>               # umask for remote process
31      -path <dirname>              # place to look for executables
32      -host <hostname>             # host to start on
33      -soft <spec>                 # modifier of -n value
34      -arch <arch>                 # arch type to start on (not implemented)
35      -envall                      # pass all env vars in current environment
36      -envnone                     # pass no env vars
37      -envlist <list of env var names> # pass current values of these vars
38      -env <name> <value>          # pass this value of this env var
39mpiexec [global args] [local args] executable args : [local args] executable...
40mpiexec -gdba jobid                # gdb-attach to existing jobid
41mpiexec -configfile filename       # filename contains cmd line segs as lines
42  (See User Guide for more details)
43
44Examples:
45   mpiexec -l -n 10 cpi 100
46   mpiexec -genv QPL_LICENSE 4705 -n 3 a.out
47
48   mpiexec -n 1 -host foo master : -n 4 -host mysmp slave
49"""
50from time import ctime
51__author__ = "Ralph Butler and Rusty Lusk"
52__date__ = ctime()
53__version__ = "$Revision: 1.90 $"
54__credits__ = ""
55
56import signal
57if hasattr(signal,'SIGTTIN'):
58    signal.signal(signal.SIGTTIN,signal.SIG_IGN)    # asap
59
60import sys, os, socket, re
61
62from  urllib import quote
63from  time   import time
64from  urllib import unquote
65from  mpdlib import mpd_set_my_id, mpd_get_my_username, mpd_version, mpd_print, \
66                    mpd_uncaught_except_tb, mpd_handle_signal, mpd_which, \
67                    MPDListenSock, MPDStreamHandler, MPDConClientSock, MPDParmDB
68
69try:
70    import pwd
71    pwd_module_available = 1
72except:
73    pwd_module_available = 0
74
75global parmdb, nextRange, appnum, recvTimeout
76global numDoneWithIO, myExitStatus, sigOccurred, outXmlDoc, outECs
77
78
79def mpiexec():
80    global parmdb, nextRange, appnum, recvTimeout
81    global numDoneWithIO, myExitStatus, sigOccurred, outXmlDoc, outECs
82
83    import sys  # for sys.excepthook on next line
84    sys.excepthook = mpd_uncaught_except_tb
85
86    myExitStatus = 0
87    if len(sys.argv) < 2  or  sys.argv[1] == '-h'  \
88    or  sys.argv[1] == '-help'  or  sys.argv[1] == '--help':
89	usage()
90    myHost = socket.gethostname()
91    mpd_set_my_id(myid='mpiexec_%s' % (myHost) )
92    try:
93        hostinfo = socket.gethostbyname_ex(myHost)
94    except:
95        print 'mpiexec failed: gethostbyname_ex failed for %s' % (myHost)
96        sys.exit(-1)
97    myIP = hostinfo[2][0]
98
99    parmdb = MPDParmDB(orderedSources=['cmdline','xml','env','rcfile','thispgm'])
100    parmsToOverride = {
101                        'MPD_USE_ROOT_MPD'            :  0,
102                        'MPD_SECRETWORD'              :  '',
103                        'MPIEXEC_SHOW_LINE_LABELS'    :  0,
104                        'MPIEXEC_LINE_LABEL_FMT'      :  '%r',
105                        'MPIEXEC_JOB_ALIAS'           :  '',
106                        'MPIEXEC_USIZE'               :  0,
107                        'MPIEXEC_GDB'                 :  0,
108                        'MPIEXEC_IFHN'                :  '',  # use one from mpd as default
109                        'MPIEXEC_MERGE_OUTPUT'        :  0,
110                        'MPIEXEC_STDIN_DEST'          :  '0',
111                        'MPIEXEC_MACHINEFILE'         :  '',
112                        'MPIEXEC_BNR'                 :  0,
113                        'MPIEXEC_TOTALVIEW'           :  0,
114                        'MPIEXEC_TVSU'                :  0,
115                        'MPIEXEC_EXITCODES_FILENAME'  :  '',
116                        'MPIEXEC_TRY_1ST_LOCALLY'     :  1,
117                        'MPIEXEC_TIMEOUT'             :  0,
118                        'MPIEXEC_HOST_LIST'           :  [],
119                        'MPIEXEC_HOST_CHECK'          :  0,
120                        'MPIEXEC_RECV_TIMEOUT'        :  20,
121                      }
122    for (k,v) in parmsToOverride.items():
123        parmdb[('thispgm',k)] = v
124    parmdb[('thispgm','mship')] = ''
125    parmdb[('thispgm','rship')] = ''
126    parmdb[('thispgm','userpgm')] = ''
127    parmdb[('thispgm','nprocs')] = 0
128    parmdb[('thispgm','ecfn_format')] = ''
129    parmdb[('thispgm','-gdba')] = ''
130    parmdb[('thispgm','singinitpid')] = 0
131    parmdb[('thispgm','singinitport')] = 0
132    parmdb[('thispgm','ignore_rcfile')] = 0
133    parmdb[('thispgm','ignore_environ')] = 0
134    parmdb[('thispgm','inXmlFilename')] = ''
135    parmdb[('thispgm','print_parmdb_all')] = 0
136    parmdb[('thispgm','print_parmdb_def')] = 0
137
138    appnum = 0
139    nextRange = 0
140    localArgSets = { 0 : [] }
141
142    if sys.argv[1] == '-gdba':
143	if len(sys.argv) != 3:
144            print '-gdba arg must appear only with jobid'
145	    usage()
146        parmdb[('cmdline','-gdba')] = sys.argv[2]
147        parmdb[('cmdline','MPIEXEC_GDB')] = 1
148        parmdb[('cmdline','MPIEXEC_MERGE_OUTPUT')] = 1       # implied
149        parmdb[('cmdline','MPIEXEC_SHOW_LINE_LABELS')] = 1   # implied
150        parmdb[('cmdline','MPIEXEC_STDIN_DEST')]   = 'all'   # implied
151    elif sys.argv[1] == '-file'  or  sys.argv[1] == '-f':
152	if len(sys.argv) != 3:
153            print '-file (-f) arg must appear alone'
154	    usage()
155        parmdb[('cmdline','inXmlFilename')] = sys.argv[2]
156    elif sys.argv[1] == '-pmi_args':
157        parmdb[('cmdline','singinitport')] = sys.argv[2]
158        # ignoring interface name (where app is listening) and authentication key, for now
159        parmdb[('cmdline','singinitpid')]  = sys.argv[5]
160        parmdb[('cmdline','userpgm')] = 'unknown_pgmname'
161        parmdb[('cmdline','nprocs')] = 1
162        parmdb[('cmdline','MPIEXEC_TRY_1ST_LOCALLY')] = 1
163        machineFileInfo = {}
164        tempargv = [sys.argv[0],'unknown_pgmname']
165        collect_args(tempargv,localArgSets)
166    else:
167        if sys.argv[1] == '-configfile':
168	    if len(sys.argv) != 3:
169	        usage()
170            configFile = open(sys.argv[2],'r',0)
171            configLines = configFile.readlines()
172            configLines = [ x.strip() + ' : '  for x in configLines if x[0] != '#' and x.strip() != '' ]
173            tempargv = []
174            for line in configLines:
175                line = 'mpddummyarg ' + line  # gets pitched in shells that can't handle --
176                (shellIn,shellOut) = \
177                    os.popen4("/bin/sh -c 'for a in $*; do echo _$a; done' -- %s" % (line))
178                for shellLine in shellOut:
179                    if shellLine.startswith('_mpddummyarg'):
180                        continue
181                    tempargv.append(shellLine[1:].strip())    # 1: strips off the leading _
182	    tempargv = [sys.argv[0]] + tempargv[0:-1]   # strip off the last : I added
183            collect_args(tempargv,localArgSets)
184        else:
185            collect_args(sys.argv,localArgSets)
186        machineFileInfo = read_machinefile(parmdb['MPIEXEC_MACHINEFILE'])
187
188    # set some default values for mpd; others added as discovered below
189    msgToMPD = { 'cmd'            : 'mpdrun',
190                 'conhost'        : myHost,
191                 'spawned'        : 0,
192                 'nstarted'       : 0,
193                 'hosts'          : {},
194                 'execs'          : {},
195                 'users'          : {},
196                 'cwds'           : {},
197                 'umasks'         : {},
198                 'paths'          : {},
199                 'args'           : {},
200                 'limits'         : {},
201                 'envvars'        : {},
202                 'ifhns'          : {},
203               }
204
205    if parmdb['inXmlFilename']:
206        get_parms_from_xml_file(msgToMPD)  # fills in some more values of msgToMPD
207    else:
208        parmdb.get_parms_from_env(parmsToOverride)
209        parmdb.get_parms_from_rcfile(parmsToOverride)
210
211    # mostly old mpdrun below here
212    numDoneWithIO = 0
213    outXmlDoc = ''
214    outECs = ''
215    outECFile = None
216    sigOccurred = 0
217
218    recvTimeout = int(parmdb['MPIEXEC_RECV_TIMEOUT'])  # may be changed below
219
220    listenSock = MPDListenSock('',0,name='socket_to_listen_for_man')
221    listenPort = listenSock.getsockname()[1]
222    if (hasattr(os,'getuid')  and  os.getuid() == 0)  or  parmdb['MPD_USE_ROOT_MPD']:
223        fullDirName = os.path.abspath(os.path.split(sys.argv[0])[0])  # normalize
224        mpdroot = os.path.join(fullDirName,'mpdroot')
225        conSock = MPDConClientSock(mpdroot=mpdroot,secretword=parmdb['MPD_SECRETWORD'])
226    else:
227        conSock = MPDConClientSock(secretword=parmdb['MPD_SECRETWORD'])
228
229    if parmdb['MPIEXEC_HOST_CHECK']:    # if this was requested in the xml file
230        msgToSend = { 'cmd' : 'verify_hosts_in_ring',
231                      'host_list' : parmdb['MPIEXEC_HOST_LIST'] }
232        conSock.send_dict_msg(msgToSend)
233        msg = conSock.recv_dict_msg(timeout=recvTimeout)
234        if not msg:
235            mpd_print(1,'no msg recvd from mpd for verify_hosts_in_ring')
236            sys.exit(-1)
237        elif msg['cmd'] != 'verify_hosts_in_ring_response':
238            mpd_print(1,'unexpected msg from mpd :%s:' % (msg) )
239            sys.exit(-1)
240        if msg['host_list']:
241            print 'These hosts are not in the mpd ring:'
242            for host in  msg['host_list']:
243                if host[0].isdigit():
244                    print '    %s' % (host),
245                    try:
246                        print ' (%s)' % (socket.gethostbyaddr(host)[0])
247                    except:
248                        print ''
249                else:
250                    print '    %s' % (host)
251            sys.exit(-1)
252
253    msgToSend = { 'cmd' : 'get_mpdrun_values' }
254    conSock.send_dict_msg(msgToSend)
255    msg = conSock.recv_dict_msg(timeout=recvTimeout)
256    if not msg:
257        mpd_print(1, 'no msg recvd from mpd during version check')
258        sys.exit(-1)
259    elif msg['cmd'] != 'response_get_mpdrun_values':
260        mpd_print(1,'unexpected msg from mpd :%s:' % (msg) )
261        sys.exit(-1)
262    if msg['mpd_version'] != mpd_version():
263        mpd_print(1,'mpd version %s does not match mpiexec version %s' % \
264                  (msg['mpd_version'],mpd_version()) )
265        sys.exit(-1)
266
267    # if using/testing the INET CONSOLE
268    if os.environ.has_key('MPD_CON_INET_HOST_PORT'):
269        try:
270            myIfhn = socket.gethostbyname_ex(myHost)[2][0]
271        except:
272            print 'mpiexec failed: gethostbyname_ex failed for %s' % (myHost)
273            sys.exit(-1)
274        parmdb[('thispgm','MPIEXEC_IFHN')] = myIfhn
275    elif not parmdb['MPIEXEC_IFHN']:    # if user did not specify one, use mpd's
276        parmdb[('thispgm','MPIEXEC_IFHN')] = msg['mpd_ifhn']    # not really thispgm here
277
278    if parmdb['-gdba']:
279        get_vals_for_attach(parmdb,conSock,msgToMPD)
280    elif not parmdb['inXmlFilename']:
281        parmdb[('cmdline','nprocs')] = 0  # for incr later
282        for k in localArgSets.keys():
283	    handle_local_argset(localArgSets[k],machineFileInfo,msgToMPD)
284
285    if parmdb['MPIEXEC_MERGE_OUTPUT']  and  not parmdb['MPIEXEC_SHOW_LINE_LABELS']:
286        parmdb[('thispgm','MPIEXEC_SHOW_LINE_LABELS')] = 1   # causes line labels also
287
288    if parmdb['print_parmdb_all']:
289        parmdb.printall()
290    if parmdb['print_parmdb_def']:
291        parmdb.printdef()
292
293    if parmdb['mship']:
294        mshipSock = MPDListenSock('',0,name='socket_for_mship')
295        mshipPort = mshipSock.getsockname()[1]
296        mshipPid = os.fork()
297        if mshipPid == 0:
298            conSock.close()
299            os.environ['MPDCP_AM_MSHIP'] = '1'
300            os.environ['MPDCP_MSHIP_PORT'] = str(mshipPort)
301            os.environ['MPDCP_MSHIP_FD'] = str(mshipSock.fileno())
302            os.environ['MPDCP_MSHIP_NPROCS'] = str(parmdb['nprocs'])
303            try:
304                os.execvpe(parmdb['mship'],[parmdb['MPIEXEC_MSHIP']],os.environ)
305            except Exception, errmsg:
306                mpd_print(1,'execvpe failed for copgm %s; errmsg=:%s:' % \
307                          (parmdb['MPIEXEC_MSHIP'],errmsg))
308                sys.exit(-1)
309            os._exit(0);  # do NOT do cleanup
310        mshipSock.close()
311    else:
312        mshipPid = 0
313
314    # make sure to do this after nprocs has its value
315    linesPerRank = {}  # keep this a dict instead of a list
316    for i in range(parmdb['nprocs']):
317        linesPerRank[i] = []
318    # make sure to do this after nprocs has its value
319    if recvTimeout == 20:  # still the default
320        recvTimeoutMultiplier = 0.1
321        if os.environ.has_key('MPD_RECVTIMEOUT_MULTIPLIER'):
322            try:
323                recvTimeoutMultiplier = int(os.environ ['MPD_RECVTIMEOUT_MULTIPLIER'])
324            except ValueError:
325                try:
326                    recvTimeoutMultiplier = float(os.environ ['MPD_RECVTIMEOUT_MULTIPLIER'])
327                except ValueError:
328                    print 'Invalid MPD_RECVTIMEOUT_MULTIPLIER. Value must be a number.'
329                    sys.exit(-1)
330        recvTimeout = int(parmdb['nprocs']) * recvTimeoutMultiplier
331
332    if parmdb['MPIEXEC_EXITCODES_FILENAME']:
333        if parmdb['ecfn_format'] == 'xml':
334            try:
335                import xml.dom.minidom
336            except:
337                print 'you requested to save the exit codes in an xml file, but'
338                print '  I was unable to import the xml.dom.minidom module'
339                sys.exit(-1)
340            outXmlDoc = xml.dom.minidom.Document()
341            outECs = outXmlDoc.createElement('exit-codes')
342            outXmlDoc.appendChild(outECs)
343        else:
344            outECs = 'exit-codes\n'
345
346    msgToMPD['nprocs'] = parmdb['nprocs']
347    msgToMPD['limits'][(0,parmdb['nprocs']-1)]  = {}
348    msgToMPD['conport'] = listenPort
349    msgToMPD['conip'] = myIP
350    msgToMPD['conifhn'] = parmdb['MPIEXEC_IFHN']
351    if parmdb['MPIEXEC_JOB_ALIAS']:
352        msgToMPD['jobalias'] = parmdb['MPIEXEC_JOB_ALIAS']
353    else:
354        msgToMPD['jobalias'] = ''
355    if parmdb['MPIEXEC_TRY_1ST_LOCALLY']:
356        msgToMPD['try_1st_locally'] = 1
357    if parmdb['rship']:
358        msgToMPD['rship'] = parmdb['rship']
359        msgToMPD['mship_host'] = socket.gethostname()
360        msgToMPD['mship_port'] = mshipPort
361    if parmdb['MPIEXEC_BNR']:
362        msgToMPD['doing_bnr'] = 1
363    if parmdb['MPIEXEC_STDIN_DEST'] == 'all':
364        stdinDest = '0-%d' % (parmdb['nprocs']-1)
365    else:
366        stdinDest = parmdb['MPIEXEC_STDIN_DEST']
367    if parmdb['MPIEXEC_SHOW_LINE_LABELS']:
368        msgToMPD['line_labels'] = parmdb['MPIEXEC_LINE_LABEL_FMT']
369    else:
370        msgToMPD['line_labels'] = ''
371    msgToMPD['stdin_dest'] = stdinDest
372    msgToMPD['gdb'] = parmdb['MPIEXEC_GDB']
373    msgToMPD['gdba'] = parmdb['-gdba']
374    msgToMPD['totalview'] = parmdb['MPIEXEC_TOTALVIEW']
375    msgToMPD['singinitpid'] = parmdb['singinitpid']
376    msgToMPD['singinitport'] = parmdb['singinitport']
377    msgToMPD['host_spec_pool'] = parmdb['MPIEXEC_HOST_LIST']
378
379    # set sig handlers up right before we send mpdrun msg to mpd
380    if hasattr(signal,'SIGINT'):
381        signal.signal(signal.SIGINT, sig_handler)
382    if hasattr(signal,'SIGTSTP'):
383        signal.signal(signal.SIGTSTP,sig_handler)
384    if hasattr(signal,'SIGCONT'):
385        signal.signal(signal.SIGCONT,sig_handler)
386    if hasattr(signal,'SIGALRM'):
387        signal.signal(signal.SIGALRM,sig_handler)
388
389    conSock.send_dict_msg(msgToMPD)
390    msg = conSock.recv_dict_msg(timeout=recvTimeout)
391    if not msg:
392        mpd_print(1, 'no msg recvd from mpd when expecting ack of request')
393        sys.exit(-1)
394    elif msg['cmd'] == 'mpdrun_ack':
395        currRingSize = msg['ringsize']
396        currRingNCPUs = msg['ring_ncpus']
397    else:
398        if msg['cmd'] == 'already_have_a_console':
399            print 'mpd already has a console (e.g. for long ringtest); try later'
400            sys.exit(-1)
401        elif msg['cmd'] == 'job_failed':
402            if  msg['reason'] == 'some_procs_not_started':
403                print 'mpiexec: unable to start all procs; may have invalid machine names'
404                print '    remaining specified hosts:'
405                for host in msg['remaining_hosts'].values():
406                    if host != '_any_':
407                        try:
408                            print '        %s (%s)' % (host,socket.gethostbyaddr(host)[0])
409                        except:
410                            print '        %s' % (host)
411            elif  msg['reason'] == 'invalid_username':
412                print 'mpiexec: invalid username %s at host %s' % \
413                      (msg['username'],msg['host'])
414            else:
415                print 'mpiexec: job failed; reason=:%s:' % (msg['reason'])
416            sys.exit(-1)
417        else:
418            mpd_print(1, 'unexpected message from mpd: %s' % (msg) )
419            sys.exit(-1)
420    conSock.close()
421    jobTimeout = int(parmdb['MPIEXEC_TIMEOUT'])
422    if jobTimeout:
423        if hasattr(signal,'alarm'):
424            signal.alarm(jobTimeout)
425        else:
426            def timeout_function():
427                mpd_print(1,'job ending due to env var MPIEXEC_TIMEOUT=%d' % jobTimeout)
428                thread.interrupt_main()
429            try:
430                import thread, threading
431                timer = threading.Timer(jobTimeout,timeout_function)
432                timer.start()
433            except:
434                print 'unable to establish timeout for MPIEXEC_TIMEOUT'
435
436    streamHandler = MPDStreamHandler()
437
438    (manSock,addr) = listenSock.accept()
439    if not manSock:
440        mpd_print(1, 'mpiexec: failed to obtain sock from manager')
441        sys.exit(-1)
442    streamHandler.set_handler(manSock,handle_man_input,args=(streamHandler,))
443    if hasattr(os,'fork'):
444        streamHandler.set_handler(sys.stdin,handle_stdin_input,
445                                  args=(parmdb,streamHandler,manSock))
446    else:  # not using select on fd's when using subprocess module (probably M$)
447        import threading
448        def read_fd_with_func(fd,func):
449            line = 'x'
450            while line:
451                line = func(fd)
452        stdin_thread = threading.Thread(target=read_fd_with_func,
453                                        args=(sys.stdin.fileno(),handle_stdin_input))
454    # first, do handshaking with man
455    msg = manSock.recv_dict_msg()
456    if (not msg  or  not msg.has_key('cmd') or msg['cmd'] != 'man_checking_in'):
457        mpd_print(1, 'mpiexec: from man, invalid msg=:%s:' % (msg) )
458        sys.exit(-1)
459    msgToSend = { 'cmd' : 'ringsize', 'ring_ncpus' : currRingNCPUs,
460                  'ringsize' : currRingSize }
461    manSock.send_dict_msg(msgToSend)
462    msg = manSock.recv_dict_msg()
463    if (not msg  or  not msg.has_key('cmd')):
464        mpd_print(1, 'mpiexec: from man, invalid msg=:%s:' % (msg) )
465        sys.exit(-1)
466    if (msg['cmd'] == 'job_started'):
467        jobid = msg['jobid']
468        if outECs:
469            if parmdb['ecfn_format'] == 'xml':
470                outECs.setAttribute('jobid',jobid.strip())
471            else:
472                outECs += 'jobid=%s\n' % (jobid.strip())
473        # print 'mpiexec: job %s started' % (jobid)
474        if parmdb['MPIEXEC_TVSU']:
475            import mtv
476            mtv.allocate_proctable(parmdb['nprocs'])
477            # extract procinfo (rank,hostname,exec,pid) tuples from msg
478            for i in range(parmdb['nprocs']):
479                tvhost = msg['procinfo'][i][0]
480                tvpgm  = msg['procinfo'][i][1]
481                tvpid  = msg['procinfo'][i][2]
482                # print "%d %s %s %d" % (i,host,pgm,pid)
483                mtv.append_proctable_entry(tvhost,tvpgm,tvpid)
484            mtv.complete_spawn()
485            msgToSend = { 'cmd' : 'tv_ready' }
486            manSock.send_dict_msg(msgToSend)
487        elif parmdb['MPIEXEC_TOTALVIEW']:
488            tvname = 'totalview'
489            if os.environ.has_key('TOTALVIEW'):
490                tvname = os.environ['TOTALVIEW']
491            if not mpd_which(((tvname.strip()).split()[0])):
492                print 'cannot find "%s" in your $PATH:' % (tvname)
493                print '    ', os.environ['PATH']
494                sys.exit(-1)
495            import mtv
496            tv_cmd = 'dattach python ' + `os.getpid()` + '; dgo; dassign MPIR_being_debugged 1'
497            os.system(tvname + ' -e "%s" &' % (tv_cmd) )
498            mtv.wait_for_debugger()
499            mtv.allocate_proctable(parmdb['nprocs'])
500            # extract procinfo (rank,hostname,exec,pid) tuples from msg
501            for i in range(parmdb['nprocs']):
502                tvhost = msg['procinfo'][i][0]
503                tvpgm  = msg['procinfo'][i][1]
504                tvpid  = msg['procinfo'][i][2]
505                # print "%d %s %s %d" % (i,host,pgm,pid)
506                mtv.append_proctable_entry(tvhost,tvpgm,tvpid)
507            mtv.complete_spawn()
508            msgToSend = { 'cmd' : 'tv_ready' }
509            manSock.send_dict_msg(msgToSend)
510    else:
511        mpd_print(1, 'mpiexec: from man, unknown msg=:%s:' % (msg) )
512        sys.exit(-1)
513
514    (manCliStdoutSock,addr) = listenSock.accept()
515    streamHandler.set_handler(manCliStdoutSock,
516                              handle_cli_stdout_input,
517                              args=(parmdb,streamHandler,linesPerRank,))
518    (manCliStderrSock,addr) = listenSock.accept()
519    streamHandler.set_handler(manCliStderrSock,
520                              handle_cli_stderr_input,
521                              args=(streamHandler,))
522
523    # Main Loop
524    timeDelayForPrints = 2  # seconds
525    timeForPrint = time() + timeDelayForPrints   # to get started
526    numDoneWithIO = 0
527    while numDoneWithIO < 3:    # man, client stdout, and client stderr
528        if sigOccurred:
529            handle_sig_occurred(manSock)
530        rv = streamHandler.handle_active_streams(timeout=1.0)
531        if rv[0] < 0:  # will handle some sigs at top of next loop
532            pass       # may have to handle some err conditions here
533        if parmdb['MPIEXEC_MERGE_OUTPUT']:
534            if timeForPrint < time():
535                print_ready_merged_lines(1,parmdb,linesPerRank)
536                timeForPrint = time() + timeDelayForPrints
537            else:
538                print_ready_merged_lines(parmdb['nprocs'],parmdb,linesPerRank)
539
540    if parmdb['MPIEXEC_MERGE_OUTPUT']:
541        print_ready_merged_lines(1,parmdb,linesPerRank)
542    if mshipPid:
543        (donePid,status) = os.wait()    # os.waitpid(mshipPid,0)
544    if parmdb['MPIEXEC_EXITCODES_FILENAME']:
545        outECFile = open(parmdb['MPIEXEC_EXITCODES_FILENAME'],'w')
546        if parmdb['ecfn_format'] == 'xml':
547            print >>outECFile, outXmlDoc.toprettyxml(indent='   ')
548        else:
549            print >>outECFile, outECs,
550        outECFile.close()
551    return myExitStatus
552
553
554def collect_args(args,localArgSets):
555    validGlobalArgs = { '-l' : 0, '-usize' : 1, '-gdb' : 0, '-bnr' : 0,
556                        '-tv' : 0, '-tvsu' : 0,
557                        '-ifhn' : 1, '-machinefile' : 1, '-s' : 1, '-1' : 0,
558                        '-a' : 1, '-m' : 0, '-ecfn' : 1, '-recvtimeout' : 1,
559                        '-gn' : 1, '-gnp' : 1, '-ghost' : 1, '-gpath' : 1, '-gwdir' : 1,
560			'-gsoft' : 1, '-garch' : 1, '-gexec' : 1, '-gumask' : 1,
561			'-genvall' : 0, '-genv' : 2, '-genvnone' : 0,
562			'-genvlist' : 1 }
563    currumask = os.umask(0) ; os.umask(currumask)  # grab it and set it back
564    parmdb[('cmdline','-gn')]          = 1
565    parmdb[('cmdline','-ghost')]       = '_any_'
566    if os.environ.has_key('PATH'):
567        parmdb[('cmdline','-gpath')]   = os.environ['PATH']
568    else:
569        parmdb[('cmdline','-gpath')]   =  ''
570    parmdb[('cmdline','-gwdir')]       = os.path.abspath(os.getcwd())
571    parmdb[('cmdline','-gumask')]      = str(currumask)
572    parmdb[('cmdline','-gsoft')]       = 0
573    parmdb[('cmdline','-garch')]       = ''
574    parmdb[('cmdline','-gexec')]       = ''
575    parmdb[('cmdline','-genv')]        = {}
576    parmdb[('cmdline','-genvlist')]    = []
577    parmdb[('cmdline','-genvnone')]    = 0
578    argidx = 1
579    while argidx < len(args)  and  args[argidx] in validGlobalArgs.keys():
580        garg = args[argidx]
581        if len(args) <= (argidx+validGlobalArgs[garg]):
582            print "missing sub-arg to %s" % (garg)
583            usage()
584        if garg == '-genv':
585            parmdb['-genv'][args[argidx+1]] = args[argidx+2]
586            argidx += 3
587        elif garg == '-gn'  or  garg == '-gnp':
588            if args[argidx+1].isdigit():
589                parmdb[('cmdline','-gn')] = int(args[argidx+1])
590            else:
591                print 'argument to %s must be numeric' % (garg)
592                usage()
593            argidx += 2
594        elif garg == '-ghost':
595            try:
596                parmdb[('cmdline',garg)] = socket.gethostbyname_ex(args[argidx+1])[2][0]
597            except:
598                print 'unable to do find info for host %s' % (args[argidx+1])
599                sys.exit(-1)
600            argidx += 2
601        elif garg == '-gpath':
602            parmdb[('cmdline','-gpath')] = args[argidx+1]
603            argidx += 2
604        elif garg == '-gwdir':
605            parmdb[('cmdline','-gwdir')] = args[argidx+1]
606            argidx += 2
607        elif garg == '-gumask':
608            parmdb[('cmdline','-gumask')] = args[argidx+1]
609            argidx += 2
610        elif garg == '-gsoft':
611            parmdb[('cmdline','-gsoft')] = args[argidx+1]
612            argidx += 2
613        elif garg == '-garch':
614            parmdb[('cmdline','-garch')] = args[argidx+1]
615            argidx += 2
616            print '** -garch is accepted but not used'
617        elif garg == '-gexec':
618            parmdb[('cmdline','-gexec')] = args[argidx+1]
619            argidx += 2
620        elif garg == '-genv':
621            parmdb[('cmdline','-genv')] = args[argidx+1]
622            argidx += 2
623        elif garg == '-genvlist':
624            parmdb[('cmdline','-genvlist')] = args[argidx+1].split(',')
625            argidx += 2
626        elif garg == '-genvnone':
627            parmdb[('cmdline','-genvnone')] = args[argidx+1]
628            argidx += 1
629        elif garg == '-l':
630            parmdb[('cmdline','MPIEXEC_SHOW_LINE_LABELS')] = 1
631            argidx += 1
632        elif garg == '-a':
633            parmdb[('cmdline','MPIEXEC_JOB_ALIAS')] = args[argidx+1]
634            argidx += 2
635        elif garg == '-usize':
636            if args[argidx+1].isdigit():
637                parmdb[('cmdline','MPIEXEC_USIZE')] = int(args[argidx+1])
638            else:
639                print 'argument to %s must be numeric' % (garg)
640                usage()
641            argidx += 2
642        elif garg == '-recvtimeout':
643            if args[argidx+1].isdigit():
644                parmdb[('cmdline','MPIEXEC_RECV_TIMEOUT')] = int(args[argidx+1])
645            else:
646                print 'argument to %s must be numeric' % (garg)
647                usage()
648            argidx += 2
649        elif garg == '-gdb':
650            parmdb[('cmdline','MPIEXEC_GDB')] = 1
651            argidx += 1
652            parmdb[('cmdline','MPIEXEC_MERGE_OUTPUT')] = 1       # implied
653            parmdb[('cmdline','MPIEXEC_SHOW_LINE_LABELS')] = 1   # implied
654            parmdb[('cmdline','MPIEXEC_STDIN_DEST')]   = 'all'   # implied
655        elif garg == '-ifhn':
656            parmdb[('cmdline','MPIEXEC_IFHN')] = args[argidx+1]
657            argidx += 2
658            try:
659                hostinfo = socket.gethostbyname_ex(parmdb['MPIEXEC_IFHN'])
660            except:
661                print 'mpiexec: gethostbyname_ex failed for ifhn %s' % (parmdb['MPIEXEC_IFHN'])
662                sys.exit(-1)
663        elif garg == '-m':
664            parmdb[('cmdline','MPIEXEC_MERGE_OUTPUT')] = 1
665            argidx += 1
666        elif garg == '-s':
667            parmdb[('cmdline','MPIEXEC_STDIN_DEST')] = args[argidx+1]
668            argidx += 2
669        elif garg == '-machinefile':
670            parmdb[('cmdline','MPIEXEC_MACHINEFILE')] = args[argidx+1]
671            argidx += 2
672        elif garg == '-bnr':
673            parmdb[('cmdline','MPIEXEC_BNR')] = 1
674            argidx += 1
675        elif garg == '-tv':
676            parmdb[('cmdline','MPIEXEC_TOTALVIEW')] = 1
677            argidx += 1
678        elif garg == '-tvsu':
679            parmdb[('cmdline','MPIEXEC_TOTALVIEW')] = 1
680            parmdb[('cmdline','MPIEXEC_TVSU')] = 1
681            argidx += 1
682        elif garg == '-ecfn':
683            parmdb[('cmdline','MPIEXEC_EXITCODES_FILENAME')] = args[argidx+1]
684            argidx += 2
685        elif garg == '-1':
686            parmdb[('cmdline','MPIEXEC_TRY_1ST_LOCALLY')] = 0  # reverses meaning
687            argidx += 1
688    if len(args) <= argidx:
689        print "mpiexec: missing arguments after global args"
690        usage()
691    if args[argidx] == ':':
692        argidx += 1
693    localArgsKey = 0
694    # collect local arg sets but do not validate them until handled below
695    while argidx < len(args):
696        if args[argidx] == ':':
697            localArgsKey += 1
698            localArgSets[localArgsKey] = []
699        else:
700            localArgSets[localArgsKey].append(args[argidx])
701        argidx += 1
702
703def handle_local_argset(argset,machineFileInfo,msgToMPD):
704    global parmdb, nextRange, appnum, recvTimeout
705    validLocalArgs  = { '-n' : 1, '-np' : 1, '-host' : 1, '-path' : 1, '-wdir' : 1,
706                        '-soft' : 1, '-arch' : 1, '-umask' : 1,
707			'-envall' : 0, '-env' : 2, '-envnone' : 0, '-envlist' : 1 }
708    host   = parmdb['-ghost']
709    wdir   = parmdb['-gwdir']
710    wumask = parmdb['-gumask']
711    wpath  = parmdb['-gpath']
712    nProcs = parmdb['-gn']
713    usize  = parmdb['MPIEXEC_USIZE']
714    gexec  = parmdb['-gexec']
715    softness =  parmdb['-gsoft']
716    if parmdb['-genvnone']:
717        envall = 0
718    else:
719        envall = 1
720    localEnvlist = []
721    localEnv  = {}
722
723    argidx = 0
724    while argidx < len(argset):
725        if argset[argidx] not in validLocalArgs:
726            if argset[argidx][0] == '-':
727                print 'invalid "local" arg: %s' % argset[argidx]
728                usage()
729            break                       # since now at executable
730        if parmdb['MPIEXEC_MACHINEFILE']:
731            if argset[argidx] == '-host'  or  argset[argidx] == ['-ghost']:
732                print '-host (or -ghost) and -machinefile are incompatible'
733                sys.exit(-1)
734        if argset[argidx] == '-n' or argset[argidx] == '-np':
735            if len(argset) < (argidx+2):
736                print '** missing arg to -n'
737                usage()
738            nProcs = argset[argidx+1]
739            if not nProcs.isdigit():
740                print '** non-numeric arg to -n: %s' % nProcs
741                usage()
742            nProcs = int(nProcs)
743            argidx += 2
744        elif argset[argidx] == '-host':
745            if len(argset) < (argidx+2):
746                print '** missing arg to -host'
747                usage()
748            try:
749                host = socket.gethostbyname_ex(argset[argidx+1])[2][0]
750            except:
751                print 'unable to do find info for host %s' % (argset[argidx+1])
752                sys.exit(-1)
753            argidx += 2
754        elif argset[argidx] == '-path':
755            if len(argset) < (argidx+2):
756                print '** missing arg to -path'
757                usage()
758            wpath = argset[argidx+1]
759            argidx += 2
760        elif argset[argidx] == '-wdir':
761            if len(argset) < (argidx+2):
762                print '** missing arg to -wdir'
763                usage()
764            wdir = argset[argidx+1]
765            argidx += 2
766        elif argset[argidx] == '-umask':
767            if len(argset) < (argidx+2):
768                print '** missing arg to -umask'
769                usage()
770            wumask = argset[argidx+1]
771            argidx += 2
772        elif argset[argidx] == '-soft':
773            if len(argset) < (argidx+2):
774                print '** missing arg to -soft'
775                usage()
776            softness = argset[argidx+1]
777            argidx += 2
778        elif argset[argidx] == '-arch':
779            if len(argset) < (argidx+2):
780                print '** missing arg to -arch'
781                usage()
782            print '** -arch is accepted but not used'
783            argidx += 2
784        elif argset[argidx] == '-envall':
785            envall = 1
786            argidx += 1
787        elif argset[argidx] == '-envnone':
788            envall = 0
789            argidx += 1
790        elif argset[argidx] == '-envlist':
791            localEnvlist = argset[argidx+1].split(',')
792            argidx += 2
793        elif argset[argidx] == '-env':
794            if len(argset) < (argidx+3):
795                print '** missing arg to -env'
796                usage()
797            var = argset[argidx+1]
798            val = argset[argidx+2]
799            localEnv[var] = val
800            argidx += 3
801        else:
802            print 'unknown "local" option: %s' % argset[argidx]
803            usage()
804
805    if softness:
806        nProcs = adjust_nprocs(nProcs,softness)
807
808    cmdAndArgs = []
809    if argidx < len(argset):
810        while argidx < len(argset):
811            cmdAndArgs.append(argset[argidx])
812            argidx += 1
813    else:
814        if gexec:
815            cmdAndArgs = [gexec]
816    if not cmdAndArgs:
817        print 'no cmd specified'
818        usage()
819
820    argsetLoRange = nextRange
821    argsetHiRange = nextRange + nProcs - 1
822    loRange = argsetLoRange
823    hiRange = argsetHiRange
824
825    defaultHostForArgset = host
826    while loRange <= argsetHiRange:
827        host = defaultHostForArgset
828        if machineFileInfo:
829            if len(machineFileInfo) <= hiRange:
830                print 'too few entries in machinefile'
831                sys.exit(-1)
832            host = machineFileInfo[loRange]['host']
833            ifhn = machineFileInfo[loRange]['ifhn']
834            if ifhn:
835                msgToMPD['ifhns'][loRange] = ifhn
836            for i in range(loRange+1,hiRange+1):
837                if machineFileInfo[i]['host'] != host  or  machineFileInfo[i]['ifhn'] != ifhn:
838                    hiRange = i - 1
839                    break
840
841        asRange = (loRange,hiRange)  # this argset range as a tuple
842
843        msgToMPD['users'][asRange]  = mpd_get_my_username()
844        msgToMPD['execs'][asRange]  = cmdAndArgs[0]
845        msgToMPD['paths'][asRange]  = wpath
846        msgToMPD['cwds'][asRange]   = wdir
847        msgToMPD['umasks'][asRange] = wumask
848        msgToMPD['args'][asRange]   = cmdAndArgs[1:]
849        if host.startswith('_any_'):
850            msgToMPD['hosts'][(loRange,hiRange)] = host
851        else:
852            try:
853                msgToMPD['hosts'][asRange] = socket.gethostbyname_ex(host)[2][0]
854            except:
855                print 'unable to do find info for host %s' % (host)
856                sys.exit(-1)
857
858        envToSend = {}
859        if envall:
860            for envvar in os.environ.keys():
861                envToSend[envvar] = os.environ[envvar]
862        for envvar in parmdb['-genvlist']:
863            if not os.environ.has_key(envvar):
864                print '%s in envlist does not exist in your env' % (envvar)
865                sys.exit(-1)
866            envToSend[envvar] = os.environ[envvar]
867        for envvar in localEnvlist:
868            if not os.environ.has_key(envvar):
869                print '%s in envlist does not exist in your env' % (envvar)
870                sys.exit(-1)
871            envToSend[envvar] = os.environ[envvar]
872        for envvar in parmdb['-genv'].keys():
873            envToSend[envvar] = parmdb['-genv'][envvar]
874        for envvar in localEnv.keys():
875            envToSend[envvar] = localEnv[envvar]
876        if usize:
877            envToSend['MPI_UNIVERSE_SIZE'] = str(usize)
878        envToSend['MPI_APPNUM'] = str(appnum)
879        msgToMPD['envvars'][(loRange,hiRange)] = envToSend
880
881        loRange = hiRange + 1
882        hiRange = argsetHiRange  # again
883
884    appnum += 1
885    nextRange += nProcs
886    parmdb[('cmdline','nprocs')] = parmdb['nprocs'] + nProcs
887
888# Adjust nProcs (called maxprocs in the Standard) according to soft:
889# Our interpretation is that we need the largest number <= nProcs that is
890# consistent with the list of possible values described by soft.  I.e.
891# if the user says
892#
893#   mpiexec -n 10 -soft 5 a.out
894#
895# we adjust the 10 down to 5.  This may not be what was intended in the Standard,
896# but it seems to be what it says.
897
898def adjust_nprocs(nProcs,softness):
899    biglist = []
900    list1 = softness.split(',')
901    for triple in list1:                # triple is a or a:b or a:b:c
902        thingy = triple.split(':')
903        if len(thingy) == 1:
904            a = int(thingy[0])
905            if a <= nProcs and a >= 0:
906                biglist.append(a)
907        elif len(thingy) == 2:
908            a = int(thingy[0])
909            b = int(thingy[1])
910            for i in range(a,b+1):
911                if i <= nProcs and i >= 0:
912                    biglist.append(i)
913        elif len(thingy) == 3:
914            a = int(thingy[0])
915            b = int(thingy[1])
916            c = int(thingy[2])
917            for i in range(a,b+1,c):
918                if i <= nProcs and i >= 0:
919                    biglist.append(i)
920        else:
921            print 'invalid subargument to -soft: %s' % (softness)
922            print 'should be a or a:b or a:b:c'
923            usage()
924
925        if len(biglist) == 0:
926            print '-soft argument %s allows no valid number of processes' % (softness)
927            usage()
928        else:
929            return max(biglist)
930
931
932def read_machinefile(machineFilename):
933    if not machineFilename:
934        return None
935    try:
936        machineFile = open(machineFilename,'r')
937    except:
938        print '** unable to open machinefile'
939        sys.exit(-1)
940    procID = 0
941    machineFileInfo = {}
942    for line in machineFile:
943        line = line.strip()
944        if not line  or  line[0] == '#':
945            continue
946        splitLine = re.split(r'\s+',line)
947        host = splitLine[0]
948        if ':' in host:
949            (host,nprocs) = host.split(':',1)
950            nprocs = int(nprocs)
951        else:
952            nprocs = 1
953        kvps = {'ifhn' : ''}
954        for kv in splitLine[1:]:
955            (k,v) = kv.split('=',1)
956            if k == 'ifhn':  # interface hostname
957                kvps[k] = v
958            else:  # may be other kv pairs later
959                print 'unrecognized key in machinefile:', k
960                sys.exit(-1)
961        for i in range(procID,procID+nprocs):
962            machineFileInfo[i] = { 'host' : host, 'nprocs' : nprocs }
963            machineFileInfo[i].update(kvps)
964        procID += nprocs
965    return machineFileInfo
966
967def handle_man_input(sock,streamHandler):
968    global numDoneWithIO, myExitStatus
969    global outXmlDoc, outECs
970    msg = sock.recv_dict_msg()
971    if not msg:
972        streamHandler.del_handler(sock)
973        numDoneWithIO += 1
974    elif not msg.has_key('cmd'):
975        mpd_print(1,'mpiexec: from man, invalid msg=:%s:' % (msg) )
976        sys.exit(-1)
977    elif msg['cmd'] == 'startup_status':
978        if msg['rc'] != 0:
979            # print 'rank %d (%s) in job %s failed to find executable %s' % \
980                  # ( msg['rank'], msg['src'], msg['jobid'], msg['exec'] )
981            host = msg['src'].split('_')[0]
982            reason = unquote(msg['reason'])
983            print 'problem with execution of %s  on  %s:  %s ' % \
984                  (msg['exec'],host,reason)
985            # don't stop ; keep going until all top-level mans finish
986    elif msg['cmd'] == 'job_aborted_early':
987        print 'rank %d in job %s caused collective abort of all ranks' % \
988              ( msg['rank'], msg['jobid'] )
989        status = msg['exit_status']
990        if hasattr(os,'WIFSIGNALED')  and  os.WIFSIGNALED(status):
991            if status > myExitStatus:
992                myExitStatus = status
993            killed_status = status & 0x007f  # AND off core flag
994            print '  exit status of rank %d: killed by signal %d ' % \
995                  (msg['rank'],killed_status)
996        elif hasattr(os,'WEXITSTATUS'):
997            exit_status = os.WEXITSTATUS(status)
998            if exit_status > myExitStatus:
999                myExitStatus = exit_status
1000            print '  exit status of rank %d: return code %d ' % \
1001                  (msg['rank'],exit_status)
1002        else:
1003            myExitStatus = 0
1004    elif msg['cmd'] == 'job_aborted':
1005        print 'job aborted; reason = %s' % (msg['reason'])
1006    elif msg['cmd'] == 'client_exit_status':
1007        if outECs:
1008            if parmdb['ecfn_format'] == 'xml':
1009                outXmlProc = outXmlDoc.createElement('exit-code')
1010                outECs.appendChild(outXmlProc)
1011                outXmlProc.setAttribute('rank',str(msg['cli_rank']))
1012                outXmlProc.setAttribute('status',str(msg['cli_status']))
1013                outXmlProc.setAttribute('pid',str(msg['cli_pid']))
1014                outXmlProc.setAttribute('host',msg['cli_host'])  # cli_ifhn is also avail
1015            else:
1016                outECs += 'rank=%d status=%d pid=%d host=%s\n' % \
1017                          (msg['cli_rank'],msg['cli_status'],msg['cli_pid'],msg['cli_host'])
1018
1019        # print "exit info: rank=%d  host=%s  pid=%d  status=%d" % \
1020              # (msg['cli_rank'],msg['cli_host'],
1021               # msg['cli_pid'],msg['cli_status'])
1022        status = msg['cli_status']
1023        if hasattr(os,'WIFSIGNALED')  and  os.WIFSIGNALED(status):
1024            if status > myExitStatus:
1025                myExitStatus = status
1026            killed_status = status & 0x007f  # AND off core flag
1027            # print 'exit status of rank %d: killed by signal %d ' % \
1028            #        (msg['cli_rank'],killed_status)
1029        elif hasattr(os,'WEXITSTATUS'):
1030            exit_status = os.WEXITSTATUS(status)
1031            if exit_status > myExitStatus:
1032                myExitStatus = exit_status
1033            # print 'exit status of rank %d: return code %d ' % \
1034            #       (msg['cli_rank'],exit_status)
1035        else:
1036            myExitStatus = 0
1037    else:
1038        print 'unrecognized msg from manager :%s:' % msg
1039
1040def handle_cli_stdout_input(sock,parmdb,streamHandler,linesPerRank):
1041    global numDoneWithIO
1042    if parmdb['MPIEXEC_MERGE_OUTPUT']:
1043        line = sock.recv_one_line()
1044        if not line:
1045            streamHandler.del_handler(sock)
1046            numDoneWithIO += 1
1047        else:
1048            if parmdb['MPIEXEC_GDB']:
1049                line = line.replace('(gdb)\n','(gdb) ')
1050            try:
1051                (rank,rest) = line.split(':',1)
1052                rank = int(rank)
1053                linesPerRank[rank].append(rest)
1054            except:
1055                print line
1056            print_ready_merged_lines(parmdb['nprocs'],parmdb,linesPerRank)
1057    else:
1058        msg = sock.recv(1024)
1059        if not msg:
1060            streamHandler.del_handler(sock)
1061            numDoneWithIO += 1
1062        else:
1063            sys.stdout.write(msg)
1064            sys.stdout.flush()
1065
1066def handle_cli_stderr_input(sock,streamHandler):
1067    global numDoneWithIO
1068    msg = sock.recv(1024)
1069    if not msg:
1070        streamHandler.del_handler(sock)
1071        numDoneWithIO += 1
1072    else:
1073        sys.stderr.write(msg)
1074        sys.stderr.flush()
1075
1076# NOTE: stdin is supposed to be slow, low-volume.  We read it all here (as it
1077# appears on the fd) and send it immediately to the receivers.  If the user
1078# redirects a "large" file (perhaps as small as 5k) into us, we will send it
1079# all out right away.  This can cause things to hang on the remote (recvr) side.
1080# We do not wait to read here until the recvrs read because there may be several
1081# recvrs and they may read at different speeds/times.
1082def handle_stdin_input(stdin_stream,parmdb,streamHandler,manSock):
1083    line  = ''
1084    try:
1085        line = stdin_stream.readline()
1086    except IOError, errinfo:
1087        sys.stdin.flush()  # probably does nothing
1088        # print "I/O err on stdin:", errinfo
1089        mpd_print(1,'stdin problem; if pgm is run in background, redirect from /dev/null')
1090        mpd_print(1,'    e.g.: mpiexec -n 4 a.out < /dev/null &')
1091    else:
1092        gdbFlag = parmdb['MPIEXEC_GDB']
1093        if line:    # not EOF
1094            msgToSend = { 'cmd' : 'stdin_from_user', 'line' : line } # default
1095            if gdbFlag and line.startswith('z'):
1096                line = line.rstrip()
1097                if len(line) < 3:    # just a 'z'
1098                    line += ' 0-%d' % (parmdb['nprocs']-1)
1099                s1 = line[2:].rstrip().split(',')
1100                for s in s1:
1101                    s2 = s.split('-')
1102                    for i in s2:
1103                        if not i.isdigit():
1104                            print 'invalid arg to z :%s:' % i
1105                            continue
1106                msgToSend = { 'cmd' : 'stdin_dest', 'stdin_procs' : line[2:] }
1107                sys.stdout.softspace = 0
1108                print '%s:  (gdb) ' % (line[2:]),
1109            elif gdbFlag and line.startswith('q'):
1110                msgToSend = { 'cmd' : 'stdin_dest',
1111                              'stdin_procs' : '0-%d' % (parmdb['nprocs']-1) }
1112                if manSock:
1113                    manSock.send_dict_msg(msgToSend)
1114                msgToSend = { 'cmd' : 'stdin_from_user','line' : 'q\n' }
1115            elif gdbFlag and line.startswith('^'):
1116                msgToSend = { 'cmd' : 'stdin_dest',
1117                              'stdin_procs' : '0-%d' % (parmdb['nprocs']-1) }
1118                if manSock:
1119                    manSock.send_dict_msg(msgToSend)
1120                msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGINT' }
1121            if manSock:
1122                manSock.send_dict_msg(msgToSend)
1123        else:
1124            streamHandler.del_handler(sys.stdin)
1125            sys.stdin.close()
1126            if manSock:
1127                msgToSend = { 'cmd' : 'stdin_from_user', 'eof' : '' }
1128                manSock.send_dict_msg(msgToSend)
1129    return line
1130
1131def handle_sig_occurred(manSock):
1132    global sigOccurred
1133    if sigOccurred == signal.SIGINT:
1134        if manSock:
1135            msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGINT' }
1136            manSock.send_dict_msg(msgToSend)
1137            manSock.close()
1138        sys.exit(-1)
1139    elif sigOccurred == signal.SIGALRM:
1140        if manSock:
1141            msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGKILL' }
1142            manSock.send_dict_msg(msgToSend)
1143            manSock.close()
1144        mpd_print(1,'job ending due to env var MPIEXEC_TIMEOUT=%s' % \
1145                  os.environ['MPIEXEC_TIMEOUT'])
1146        sys.exit(-1)
1147    elif sigOccurred == signal.SIGTSTP:
1148        sigOccurred = 0  # do this before kill below
1149        if manSock:
1150            msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGTSTP' }
1151            manSock.send_dict_msg(msgToSend)
1152        signal.signal(signal.SIGTSTP,signal.SIG_DFL)      # stop myself
1153        os.kill(os.getpid(),signal.SIGTSTP)
1154        signal.signal(signal.SIGTSTP,sig_handler)  # restore this handler
1155    elif sigOccurred == signal.SIGCONT:
1156        sigOccurred = 0  # do it before handling
1157        if manSock:
1158            msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGCONT' }
1159            manSock.send_dict_msg(msgToSend)
1160
1161def sig_handler(signum,frame):
1162    global sigOccurred
1163    sigOccurred = signum
1164    mpd_handle_signal(signum,frame)
1165
1166def format_sorted_ranks(ranks):
1167    all = []
1168    one = []
1169    prevRank = -999
1170    for i in range(len(ranks)):
1171        if i != 0  and  ranks[i] != (prevRank+1):
1172            all.append(one)
1173            one = []
1174        one.append(ranks[i])
1175        if i == (len(ranks)-1):
1176            all.append(one)
1177        prevRank = ranks[i]
1178    pline = ''
1179    for i in range(len(all)):
1180        if len(all[i]) > 1:
1181            pline += '%d-%d' % (all[i][0],all[i][-1])
1182        else:
1183            pline += '%d' % (all[i][0])
1184        if i != (len(all)-1):
1185            pline += ','
1186    return pline
1187
1188def print_ready_merged_lines(minRanks,parmdb,linesPerRank):
1189    printFlag = 1  # default to get started
1190    while printFlag:
1191        printFlag = 0
1192        for r1 in range(parmdb['nprocs']):
1193            if not linesPerRank[r1]:
1194                continue
1195            sortedRanks = []
1196            lineToPrint = linesPerRank[r1][0]
1197            for r2 in range(parmdb['nprocs']):
1198                if linesPerRank[r2] and linesPerRank[r2][0] == lineToPrint: # myself also
1199                    sortedRanks.append(r2)
1200            if len(sortedRanks) >= minRanks:
1201                fsr = format_sorted_ranks(sortedRanks)
1202                sys.stdout.softspace = 0
1203                print '%s: %s' % (fsr,lineToPrint),
1204                for r2 in sortedRanks:
1205                    linesPerRank[r2] = linesPerRank[r2][1:]
1206                printFlag = 1
1207    sys.stdout.flush()
1208
1209def get_parms_from_xml_file(msgToMPD):
1210    global parmdb
1211    try:
1212        import xml.dom.minidom
1213    except:
1214        print 'you requested to parse an xml file, but'
1215        print '  I was unable to import the xml.dom.minidom module'
1216        sys.exit(-1)
1217    known_rlimit_types = ['core','cpu','fsize','data','stack','rss',
1218                          'nproc','nofile','ofile','memlock','as','vmem']
1219    try:
1220        inXmlFilename = parmdb['inXmlFilename']
1221        parmsXMLFile = open(inXmlFilename,'r')
1222    except:
1223        print 'could not open job xml specification file %s' % (inXmlFilename)
1224        sys.exit(-1)
1225    fileContents = parmsXMLFile.read()
1226    try:
1227        parsedXML = xml.dom.minidom.parseString(fileContents)
1228    except:
1229        print "mpiexec failed parsing xml file (perhaps from mpiexec); here is the content:"
1230        print fileContents
1231        sys.exit(-1)
1232    if parsedXML.documentElement.tagName != 'create-process-group':
1233        print 'expecting create-process-group; got unrecognized doctype: %s' % \
1234              (parsedXML.documentElement.tagName)
1235        sys.exit(-1)
1236    cpg = parsedXML.getElementsByTagName('create-process-group')[0]
1237    if cpg.hasAttribute('totalprocs'):
1238        parmdb[('xml','nprocs')] = int(cpg.getAttribute('totalprocs'))
1239    else:
1240        print '** totalprocs not specified in %s' % inXmlFilename
1241        sys.exit(-1)
1242    if cpg.hasAttribute('try_1st_locally'):
1243        parmdb[('xml','MPIEXEC_TRY_1ST_LOCALLY')] = int(cpg.getAttribute('try_1st_locally'))
1244    if cpg.hasAttribute('output')  and  cpg.getAttribute('output') == 'label':
1245        parmdb[('xml','MPIEXEC_SHOW_LINE_LABELS')] = 1
1246    if cpg.hasAttribute('pgid'):    # our jobalias
1247        parmdb[('xml','MPIEXEC_JOB_ALIAS')] = cpg.getAttribute('pgid')
1248    if cpg.hasAttribute('stdin_dest'):
1249        parmdb[('xml','MPIEXEC_STDIN_DEST')] = cpg.getAttribute('stdin_dest')
1250    if cpg.hasAttribute('doing_bnr'):
1251        parmdb[('xml','MPIEXEC_BNR')] = int(cpg.getAttribute('doing_bnr'))
1252    if cpg.hasAttribute('ifhn'):
1253        parmdb[('xml','MPIEXEC_IFHN')] = cpg.getAttribute('ifhn')
1254    if cpg.hasAttribute('exit_codes_filename'):
1255        parmdb[('xml','MPIEXEC_EXITCODES_FILENAME')] = cpg.getAttribute('exit_codes_filename')
1256        parmdb[('xml','ecfn_format')] = 'xml'
1257    if cpg.hasAttribute('gdb'):
1258        gdbFlag = int(cpg.getAttribute('gdb'))
1259        if gdbFlag:
1260            parmdb[('xml','MPIEXEC_GDB')]     = 1
1261            parmdb[('xml','MPIEXEC_MERGE_OUTPUT')] = 1       # implied
1262            parmdb[('xml','MPIEXEC_SHOW_LINE_LABELS')] = 1   # implied
1263            parmdb[('xml','MPIEXEC_STDIN_DEST')]   = 'all'   # implied
1264    if cpg.hasAttribute('use_root_pm'):
1265        parmdb[('xml','MPD_USE_ROOT_MPD')] = int(cpg.getAttribute('use_root_pm'))
1266    if cpg.hasAttribute('tv'):
1267        parmdb[('xml','MPIEXEC_TOTALVIEW')] = int(cpg.getAttribute('tv'))
1268    hostSpec = cpg.getElementsByTagName('host-spec')
1269    if hostSpec:
1270        hostList = []
1271        for node in hostSpec[0].childNodes:
1272            node = node.data.strip()
1273            hostnames = re.findall(r'\S+',node)
1274            for hostname in hostnames:
1275                if hostname:    # some may be the empty string
1276                    try:
1277                        ipaddr = socket.gethostbyname_ex(hostname)[2][0]
1278                    except:
1279                        print 'unable to determine IP info for host %s' % (hostname)
1280                        sys.exit(-1)
1281                    hostList.append(ipaddr)
1282        parmdb[('xml','MPIEXEC_HOST_LIST')] = hostList
1283    if hostSpec and hostSpec[0].hasAttribute('check'):
1284        hostSpecMode = hostSpec[0].getAttribute('check')
1285        if hostSpecMode == 'yes':
1286            parmdb[('xml','MPIEXEC_HOST_CHECK')] = 1
1287    covered = [0] * parmdb['nprocs']
1288    procSpec = cpg.getElementsByTagName('process-spec')
1289    if not procSpec:
1290        print 'No process-spec specified'
1291        usage()
1292    for p in procSpec:
1293        if p.hasAttribute('range'):
1294            therange = p.getAttribute('range')
1295            splitRange = therange.split('-')
1296            if len(splitRange) == 1:
1297                loRange = int(splitRange[0])
1298                hiRange = loRange
1299            else:
1300                (loRange,hiRange) = (int(splitRange[0]),int(splitRange[1]))
1301        else:
1302            (loRange,hiRange) = (0,parmdb['nprocs']-1)
1303        for i in xrange(loRange,hiRange+1):
1304            nprocs = parmdb['nprocs']
1305            if i >= nprocs:
1306                print '*** exiting; rank %d is greater than nprocs' % (nprocs)
1307                sys.exit(-1)
1308            if covered[i]:
1309                print '*** exiting; rank %d is doubly used in proc specs' % (nprocs)
1310                sys.exit(-1)
1311            covered[i] = 1
1312        if p.hasAttribute('exec'):
1313            msgToMPD['execs'][(loRange,hiRange)] = p.getAttribute('exec')
1314        else:
1315            print '*** exiting; range %d-%d has no exec' % (loRange,hiRange)
1316            sys.exit(-1)
1317        if p.hasAttribute('user'):
1318            username = p.getAttribute('user')
1319            if pwd_module_available:
1320                try:
1321                    pwent = pwd.getpwnam(username)
1322                except:
1323                    print username, 'is an invalid username'
1324                    sys.exit(-1)
1325            if username == mpd_get_my_username()  \
1326            or (hasattr(os,'getuid') and os.getuid() == 0):
1327                msgToMPD['users'][(loRange,hiRange)] = p.getAttribute('user')
1328            else:
1329                print username, 'username does not match yours and you are not root'
1330                sys.exit(-1)
1331        else:
1332            msgToMPD['users'][(loRange,hiRange)] = mpd_get_my_username()
1333        if p.hasAttribute('cwd'):
1334            msgToMPD['cwds'][(loRange,hiRange)] = p.getAttribute('cwd')
1335        else:
1336            msgToMPD['cwds'][(loRange,hiRange)] = os.path.abspath(os.getcwd())
1337        if p.hasAttribute('umask'):
1338            msgToMPD['umasks'][(loRange,hiRange)] = p.getAttribute('umask')
1339        else:
1340            currumask = os.umask(0) ; os.umask(currumask)
1341            msgToMPD['umasks'][(loRange,hiRange)] = str(currumask)
1342        if p.hasAttribute('path'):
1343            msgToMPD['paths'][(loRange,hiRange)] = p.getAttribute('path')
1344        else:
1345            msgToMPD['paths'][(loRange,hiRange)] = os.environ['PATH']
1346        if p.hasAttribute('host'):
1347            host = p.getAttribute('host')
1348            if host.startswith('_any_'):
1349                msgToMPD['hosts'][(loRange,hiRange)] = host
1350            else:
1351                try:
1352                    msgToMPD['hosts'][(loRange,hiRange)] = socket.gethostbyname_ex(host)[2][0]
1353                except:
1354                    print 'unable to do find info for host %s' % (host)
1355                    sys.exit(-1)
1356        else:
1357            if hostSpec  and  hostList:
1358                msgToMPD['hosts'][(loRange,hiRange)] = '_any_from_pool_'
1359            else:
1360                msgToMPD['hosts'][(loRange,hiRange)] = '_any_'
1361        argDict = {}
1362        argList = p.getElementsByTagName('arg')
1363        for argElem in argList:
1364            argDict[int(argElem.getAttribute('idx'))] = argElem.getAttribute('value')
1365        argVals = [0] * len(argList)
1366        for i in argDict.keys():
1367            argVals[i-1] = unquote(argDict[i])
1368        msgToMPD['args'][(loRange,hiRange)] = argVals
1369        limitDict = {}
1370        limitList = p.getElementsByTagName('limit')
1371        for limitElem in limitList:
1372            typ = limitElem.getAttribute('type')
1373            if typ in known_rlimit_types:
1374                limitDict[typ] = limitElem.getAttribute('value')
1375            else:
1376                print 'mpiexec: invalid type in limit: %s' % (typ)
1377                sys.exit(-1)
1378        msgToMPD['limits'][(loRange,hiRange)] = limitDict
1379        envVals = {}
1380        envVarList = p.getElementsByTagName('env')
1381        for envVarElem in envVarList:
1382            envkey = envVarElem.getAttribute('name')
1383            envval = unquote(envVarElem.getAttribute('value'))
1384            envVals[envkey] = envval
1385        msgToMPD['envvars'][(loRange,hiRange)] = envVals
1386    for i in range(len(covered)):
1387        if not covered[i]:
1388            print '*** exiting; %d procs are requested, but proc %d is not described' % \
1389                  (parmdb['nprocs'],i)
1390            sys.exit(-1)
1391
1392def get_vals_for_attach(parmdb,conSock,msgToMPD):
1393    global recvTimeout
1394    sjobid = parmdb['-gdba'].split('@')    # jobnum and originating host
1395    msgToSend = { 'cmd' : 'mpdlistjobs' }
1396    conSock.send_dict_msg(msgToSend)
1397    msg = conSock.recv_dict_msg(timeout=recvTimeout)
1398    if not msg:
1399        mpd_print(1,'no msg recvd from mpd before timeout')
1400        sys.exit(-1)
1401    if msg['cmd'] != 'local_mpdid':     # get full id of local mpd for filters later
1402        mpd_print(1,'did not recv local_mpdid msg from local mpd; recvd: %s' % msg)
1403        sys.exit(-1)
1404    else:
1405        if len(sjobid) == 1:
1406            sjobid.append(msg['id'])
1407    got_info = 0
1408    while 1:
1409        msg = conSock.recv_dict_msg()
1410        if not msg.has_key('cmd'):
1411            mpd_print(1,'invalid message from mpd :%s:' % (msg))
1412            sys.exit(-1)
1413        if msg['cmd'] == 'mpdlistjobs_info':
1414            got_info = 1
1415            smjobid = msg['jobid'].split('  ')  # jobnum, mpdid, and alias (if present)
1416            if sjobid[0] == smjobid[0]  and  sjobid[1] == smjobid[1]:  # jobnum and mpdid
1417                rank = int(msg['rank'])
1418                msgToMPD['users'][(rank,rank)]   = msg['username']
1419                msgToMPD['hosts'][(rank,rank)]   = msg['ifhn']
1420                msgToMPD['execs'][(rank,rank)]   = msg['pgm']
1421                msgToMPD['cwds'][(rank,rank)]    = os.path.abspath(os.getcwd())
1422                msgToMPD['paths'][(rank,rank)]   = os.environ['PATH']
1423                msgToMPD['args'][(rank,rank)]    = [msg['clipid']]
1424                msgToMPD['envvars'][(rank,rank)] = {}
1425                msgToMPD['limits'][(rank,rank)]  = {}
1426                currumask = os.umask(0) ; os.umask(currumask)  # grab it and set it back
1427                msgToMPD['umasks'][(rank,rank)]  = str(currumask)
1428        elif  msg['cmd'] == 'mpdlistjobs_trailer':
1429            if not got_info:
1430                print 'no info on this jobid; probably invalid'
1431                sys.exit(-1)
1432            break
1433        else:
1434            print 'invaild msg from mpd :%s:' % (msg)
1435            sys.exit(-1)
1436    parmdb[('thispgm','nprocs')] = len(msgToMPD['execs'].keys())  # all dicts are same len
1437
1438
1439def usage():
1440    print __doc__
1441    sys.exit(-1)
1442
1443
1444if __name__ == '__main__':
1445    try:
1446        mpiexec()
1447    except SystemExit, errExitStatus:  # bounced to here by sys.exit inside mpiexec()
1448        myExitStatus = errExitStatus
1449    sys.exit(myExitStatus)
1450