1#!/usr/bin/env python 2# 3# (C) 2001 by Argonne National Laboratory. 4# See COPYRIGHT in top-level directory. 5# 6 7""" 8usage: 9mpiexec [-h or -help or --help] # get this message 10mpiexec -file filename # (or -f) filename contains XML job description 11mpiexec [global args] [local args] executable [args] 12 where global args may be 13 -l # line labels by MPI rank 14 -bnr # MPICH1 compatibility mode 15 -machinefile # file mapping procs to machines 16 -s <spec> # direct stdin to "all" or 1,2 or 2-4,6 17 -1 # override default of trying 1st proc locally 18 -ifhn # network interface to use locally 19 -tv # run procs under totalview (must be installed) 20 -tvsu # totalview startup only 21 -gdb # run procs under gdb 22 -m # merge output lines (default with gdb) 23 -a # means assign this alias to the job 24 -ecfn # output_xml_exit_codes_filename 25 -recvtimeout <integer_val> # timeout for recvs to fail (e.g. from mpd daemon) 26 -g<local arg name> # global version of local arg (below) 27 and local args may be 28 -n <n> or -np <n> # number of processes to start 29 -wdir <dirname> # working directory to start in 30 -umask <umask> # umask for remote process 31 -path <dirname> # place to look for executables 32 -host <hostname> # host to start on 33 -soft <spec> # modifier of -n value 34 -arch <arch> # arch type to start on (not implemented) 35 -envall # pass all env vars in current environment 36 -envnone # pass no env vars 37 -envlist <list of env var names> # pass current values of these vars 38 -env <name> <value> # pass this value of this env var 39mpiexec [global args] [local args] executable args : [local args] executable... 40mpiexec -gdba jobid # gdb-attach to existing jobid 41mpiexec -configfile filename # filename contains cmd line segs as lines 42 (See User Guide for more details) 43 44Examples: 45 mpiexec -l -n 10 cpi 100 46 mpiexec -genv QPL_LICENSE 4705 -n 3 a.out 47 48 mpiexec -n 1 -host foo master : -n 4 -host mysmp slave 49""" 50from time import ctime 51__author__ = "Ralph Butler and Rusty Lusk" 52__date__ = ctime() 53__version__ = "$Revision: 1.90 $" 54__credits__ = "" 55 56import signal 57if hasattr(signal,'SIGTTIN'): 58 signal.signal(signal.SIGTTIN,signal.SIG_IGN) # asap 59 60import sys, os, socket, re 61 62from urllib import quote 63from time import time 64from urllib import unquote 65from mpdlib import mpd_set_my_id, mpd_get_my_username, mpd_version, mpd_print, \ 66 mpd_uncaught_except_tb, mpd_handle_signal, mpd_which, \ 67 MPDListenSock, MPDStreamHandler, MPDConClientSock, MPDParmDB 68 69try: 70 import pwd 71 pwd_module_available = 1 72except: 73 pwd_module_available = 0 74 75global parmdb, nextRange, appnum, recvTimeout 76global numDoneWithIO, myExitStatus, sigOccurred, outXmlDoc, outECs 77 78 79def mpiexec(): 80 global parmdb, nextRange, appnum, recvTimeout 81 global numDoneWithIO, myExitStatus, sigOccurred, outXmlDoc, outECs 82 83 import sys # for sys.excepthook on next line 84 sys.excepthook = mpd_uncaught_except_tb 85 86 myExitStatus = 0 87 if len(sys.argv) < 2 or sys.argv[1] == '-h' \ 88 or sys.argv[1] == '-help' or sys.argv[1] == '--help': 89 usage() 90 myHost = socket.gethostname() 91 mpd_set_my_id(myid='mpiexec_%s' % (myHost) ) 92 try: 93 hostinfo = socket.gethostbyname_ex(myHost) 94 except: 95 print 'mpiexec failed: gethostbyname_ex failed for %s' % (myHost) 96 sys.exit(-1) 97 myIP = hostinfo[2][0] 98 99 parmdb = MPDParmDB(orderedSources=['cmdline','xml','env','rcfile','thispgm']) 100 parmsToOverride = { 101 'MPD_USE_ROOT_MPD' : 0, 102 'MPD_SECRETWORD' : '', 103 'MPIEXEC_SHOW_LINE_LABELS' : 0, 104 'MPIEXEC_LINE_LABEL_FMT' : '%r', 105 'MPIEXEC_JOB_ALIAS' : '', 106 'MPIEXEC_USIZE' : 0, 107 'MPIEXEC_GDB' : 0, 108 'MPIEXEC_IFHN' : '', # use one from mpd as default 109 'MPIEXEC_MERGE_OUTPUT' : 0, 110 'MPIEXEC_STDIN_DEST' : '0', 111 'MPIEXEC_MACHINEFILE' : '', 112 'MPIEXEC_BNR' : 0, 113 'MPIEXEC_TOTALVIEW' : 0, 114 'MPIEXEC_TVSU' : 0, 115 'MPIEXEC_EXITCODES_FILENAME' : '', 116 'MPIEXEC_TRY_1ST_LOCALLY' : 1, 117 'MPIEXEC_TIMEOUT' : 0, 118 'MPIEXEC_HOST_LIST' : [], 119 'MPIEXEC_HOST_CHECK' : 0, 120 'MPIEXEC_RECV_TIMEOUT' : 20, 121 } 122 for (k,v) in parmsToOverride.items(): 123 parmdb[('thispgm',k)] = v 124 parmdb[('thispgm','mship')] = '' 125 parmdb[('thispgm','rship')] = '' 126 parmdb[('thispgm','userpgm')] = '' 127 parmdb[('thispgm','nprocs')] = 0 128 parmdb[('thispgm','ecfn_format')] = '' 129 parmdb[('thispgm','-gdba')] = '' 130 parmdb[('thispgm','singinitpid')] = 0 131 parmdb[('thispgm','singinitport')] = 0 132 parmdb[('thispgm','ignore_rcfile')] = 0 133 parmdb[('thispgm','ignore_environ')] = 0 134 parmdb[('thispgm','inXmlFilename')] = '' 135 parmdb[('thispgm','print_parmdb_all')] = 0 136 parmdb[('thispgm','print_parmdb_def')] = 0 137 138 appnum = 0 139 nextRange = 0 140 localArgSets = { 0 : [] } 141 142 if sys.argv[1] == '-gdba': 143 if len(sys.argv) != 3: 144 print '-gdba arg must appear only with jobid' 145 usage() 146 parmdb[('cmdline','-gdba')] = sys.argv[2] 147 parmdb[('cmdline','MPIEXEC_GDB')] = 1 148 parmdb[('cmdline','MPIEXEC_MERGE_OUTPUT')] = 1 # implied 149 parmdb[('cmdline','MPIEXEC_SHOW_LINE_LABELS')] = 1 # implied 150 parmdb[('cmdline','MPIEXEC_STDIN_DEST')] = 'all' # implied 151 elif sys.argv[1] == '-file' or sys.argv[1] == '-f': 152 if len(sys.argv) != 3: 153 print '-file (-f) arg must appear alone' 154 usage() 155 parmdb[('cmdline','inXmlFilename')] = sys.argv[2] 156 elif sys.argv[1] == '-pmi_args': 157 parmdb[('cmdline','singinitport')] = sys.argv[2] 158 # ignoring interface name (where app is listening) and authentication key, for now 159 parmdb[('cmdline','singinitpid')] = sys.argv[5] 160 parmdb[('cmdline','userpgm')] = 'unknown_pgmname' 161 parmdb[('cmdline','nprocs')] = 1 162 parmdb[('cmdline','MPIEXEC_TRY_1ST_LOCALLY')] = 1 163 machineFileInfo = {} 164 tempargv = [sys.argv[0],'unknown_pgmname'] 165 collect_args(tempargv,localArgSets) 166 else: 167 if sys.argv[1] == '-configfile': 168 if len(sys.argv) != 3: 169 usage() 170 configFile = open(sys.argv[2],'r',0) 171 configLines = configFile.readlines() 172 configLines = [ x.strip() + ' : ' for x in configLines if x[0] != '#' and x.strip() != '' ] 173 tempargv = [] 174 for line in configLines: 175 line = 'mpddummyarg ' + line # gets pitched in shells that can't handle -- 176 (shellIn,shellOut) = \ 177 os.popen4("/bin/sh -c 'for a in $*; do echo _$a; done' -- %s" % (line)) 178 for shellLine in shellOut: 179 if shellLine.startswith('_mpddummyarg'): 180 continue 181 tempargv.append(shellLine[1:].strip()) # 1: strips off the leading _ 182 tempargv = [sys.argv[0]] + tempargv[0:-1] # strip off the last : I added 183 collect_args(tempargv,localArgSets) 184 else: 185 collect_args(sys.argv,localArgSets) 186 machineFileInfo = read_machinefile(parmdb['MPIEXEC_MACHINEFILE']) 187 188 # set some default values for mpd; others added as discovered below 189 msgToMPD = { 'cmd' : 'mpdrun', 190 'conhost' : myHost, 191 'spawned' : 0, 192 'nstarted' : 0, 193 'hosts' : {}, 194 'execs' : {}, 195 'users' : {}, 196 'cwds' : {}, 197 'umasks' : {}, 198 'paths' : {}, 199 'args' : {}, 200 'limits' : {}, 201 'envvars' : {}, 202 'ifhns' : {}, 203 } 204 205 if parmdb['inXmlFilename']: 206 get_parms_from_xml_file(msgToMPD) # fills in some more values of msgToMPD 207 else: 208 parmdb.get_parms_from_env(parmsToOverride) 209 parmdb.get_parms_from_rcfile(parmsToOverride) 210 211 # mostly old mpdrun below here 212 numDoneWithIO = 0 213 outXmlDoc = '' 214 outECs = '' 215 outECFile = None 216 sigOccurred = 0 217 218 recvTimeout = int(parmdb['MPIEXEC_RECV_TIMEOUT']) # may be changed below 219 220 listenSock = MPDListenSock('',0,name='socket_to_listen_for_man') 221 listenPort = listenSock.getsockname()[1] 222 if (hasattr(os,'getuid') and os.getuid() == 0) or parmdb['MPD_USE_ROOT_MPD']: 223 fullDirName = os.path.abspath(os.path.split(sys.argv[0])[0]) # normalize 224 mpdroot = os.path.join(fullDirName,'mpdroot') 225 conSock = MPDConClientSock(mpdroot=mpdroot,secretword=parmdb['MPD_SECRETWORD']) 226 else: 227 conSock = MPDConClientSock(secretword=parmdb['MPD_SECRETWORD']) 228 229 if parmdb['MPIEXEC_HOST_CHECK']: # if this was requested in the xml file 230 msgToSend = { 'cmd' : 'verify_hosts_in_ring', 231 'host_list' : parmdb['MPIEXEC_HOST_LIST'] } 232 conSock.send_dict_msg(msgToSend) 233 msg = conSock.recv_dict_msg(timeout=recvTimeout) 234 if not msg: 235 mpd_print(1,'no msg recvd from mpd for verify_hosts_in_ring') 236 sys.exit(-1) 237 elif msg['cmd'] != 'verify_hosts_in_ring_response': 238 mpd_print(1,'unexpected msg from mpd :%s:' % (msg) ) 239 sys.exit(-1) 240 if msg['host_list']: 241 print 'These hosts are not in the mpd ring:' 242 for host in msg['host_list']: 243 if host[0].isdigit(): 244 print ' %s' % (host), 245 try: 246 print ' (%s)' % (socket.gethostbyaddr(host)[0]) 247 except: 248 print '' 249 else: 250 print ' %s' % (host) 251 sys.exit(-1) 252 253 msgToSend = { 'cmd' : 'get_mpdrun_values' } 254 conSock.send_dict_msg(msgToSend) 255 msg = conSock.recv_dict_msg(timeout=recvTimeout) 256 if not msg: 257 mpd_print(1, 'no msg recvd from mpd during version check') 258 sys.exit(-1) 259 elif msg['cmd'] != 'response_get_mpdrun_values': 260 mpd_print(1,'unexpected msg from mpd :%s:' % (msg) ) 261 sys.exit(-1) 262 if msg['mpd_version'] != mpd_version(): 263 mpd_print(1,'mpd version %s does not match mpiexec version %s' % \ 264 (msg['mpd_version'],mpd_version()) ) 265 sys.exit(-1) 266 267 # if using/testing the INET CONSOLE 268 if os.environ.has_key('MPD_CON_INET_HOST_PORT'): 269 try: 270 myIfhn = socket.gethostbyname_ex(myHost)[2][0] 271 except: 272 print 'mpiexec failed: gethostbyname_ex failed for %s' % (myHost) 273 sys.exit(-1) 274 parmdb[('thispgm','MPIEXEC_IFHN')] = myIfhn 275 elif not parmdb['MPIEXEC_IFHN']: # if user did not specify one, use mpd's 276 parmdb[('thispgm','MPIEXEC_IFHN')] = msg['mpd_ifhn'] # not really thispgm here 277 278 if parmdb['-gdba']: 279 get_vals_for_attach(parmdb,conSock,msgToMPD) 280 elif not parmdb['inXmlFilename']: 281 parmdb[('cmdline','nprocs')] = 0 # for incr later 282 for k in localArgSets.keys(): 283 handle_local_argset(localArgSets[k],machineFileInfo,msgToMPD) 284 285 if parmdb['MPIEXEC_MERGE_OUTPUT'] and not parmdb['MPIEXEC_SHOW_LINE_LABELS']: 286 parmdb[('thispgm','MPIEXEC_SHOW_LINE_LABELS')] = 1 # causes line labels also 287 288 if parmdb['print_parmdb_all']: 289 parmdb.printall() 290 if parmdb['print_parmdb_def']: 291 parmdb.printdef() 292 293 if parmdb['mship']: 294 mshipSock = MPDListenSock('',0,name='socket_for_mship') 295 mshipPort = mshipSock.getsockname()[1] 296 mshipPid = os.fork() 297 if mshipPid == 0: 298 conSock.close() 299 os.environ['MPDCP_AM_MSHIP'] = '1' 300 os.environ['MPDCP_MSHIP_PORT'] = str(mshipPort) 301 os.environ['MPDCP_MSHIP_FD'] = str(mshipSock.fileno()) 302 os.environ['MPDCP_MSHIP_NPROCS'] = str(parmdb['nprocs']) 303 try: 304 os.execvpe(parmdb['mship'],[parmdb['MPIEXEC_MSHIP']],os.environ) 305 except Exception, errmsg: 306 mpd_print(1,'execvpe failed for copgm %s; errmsg=:%s:' % \ 307 (parmdb['MPIEXEC_MSHIP'],errmsg)) 308 sys.exit(-1) 309 os._exit(0); # do NOT do cleanup 310 mshipSock.close() 311 else: 312 mshipPid = 0 313 314 # make sure to do this after nprocs has its value 315 linesPerRank = {} # keep this a dict instead of a list 316 for i in range(parmdb['nprocs']): 317 linesPerRank[i] = [] 318 # make sure to do this after nprocs has its value 319 if recvTimeout == 20: # still the default 320 recvTimeoutMultiplier = 0.1 321 if os.environ.has_key('MPD_RECVTIMEOUT_MULTIPLIER'): 322 try: 323 recvTimeoutMultiplier = int(os.environ ['MPD_RECVTIMEOUT_MULTIPLIER']) 324 except ValueError: 325 try: 326 recvTimeoutMultiplier = float(os.environ ['MPD_RECVTIMEOUT_MULTIPLIER']) 327 except ValueError: 328 print 'Invalid MPD_RECVTIMEOUT_MULTIPLIER. Value must be a number.' 329 sys.exit(-1) 330 recvTimeout = int(parmdb['nprocs']) * recvTimeoutMultiplier 331 332 if parmdb['MPIEXEC_EXITCODES_FILENAME']: 333 if parmdb['ecfn_format'] == 'xml': 334 try: 335 import xml.dom.minidom 336 except: 337 print 'you requested to save the exit codes in an xml file, but' 338 print ' I was unable to import the xml.dom.minidom module' 339 sys.exit(-1) 340 outXmlDoc = xml.dom.minidom.Document() 341 outECs = outXmlDoc.createElement('exit-codes') 342 outXmlDoc.appendChild(outECs) 343 else: 344 outECs = 'exit-codes\n' 345 346 msgToMPD['nprocs'] = parmdb['nprocs'] 347 msgToMPD['limits'][(0,parmdb['nprocs']-1)] = {} 348 msgToMPD['conport'] = listenPort 349 msgToMPD['conip'] = myIP 350 msgToMPD['conifhn'] = parmdb['MPIEXEC_IFHN'] 351 if parmdb['MPIEXEC_JOB_ALIAS']: 352 msgToMPD['jobalias'] = parmdb['MPIEXEC_JOB_ALIAS'] 353 else: 354 msgToMPD['jobalias'] = '' 355 if parmdb['MPIEXEC_TRY_1ST_LOCALLY']: 356 msgToMPD['try_1st_locally'] = 1 357 if parmdb['rship']: 358 msgToMPD['rship'] = parmdb['rship'] 359 msgToMPD['mship_host'] = socket.gethostname() 360 msgToMPD['mship_port'] = mshipPort 361 if parmdb['MPIEXEC_BNR']: 362 msgToMPD['doing_bnr'] = 1 363 if parmdb['MPIEXEC_STDIN_DEST'] == 'all': 364 stdinDest = '0-%d' % (parmdb['nprocs']-1) 365 else: 366 stdinDest = parmdb['MPIEXEC_STDIN_DEST'] 367 if parmdb['MPIEXEC_SHOW_LINE_LABELS']: 368 msgToMPD['line_labels'] = parmdb['MPIEXEC_LINE_LABEL_FMT'] 369 else: 370 msgToMPD['line_labels'] = '' 371 msgToMPD['stdin_dest'] = stdinDest 372 msgToMPD['gdb'] = parmdb['MPIEXEC_GDB'] 373 msgToMPD['gdba'] = parmdb['-gdba'] 374 msgToMPD['totalview'] = parmdb['MPIEXEC_TOTALVIEW'] 375 msgToMPD['singinitpid'] = parmdb['singinitpid'] 376 msgToMPD['singinitport'] = parmdb['singinitport'] 377 msgToMPD['host_spec_pool'] = parmdb['MPIEXEC_HOST_LIST'] 378 379 # set sig handlers up right before we send mpdrun msg to mpd 380 if hasattr(signal,'SIGINT'): 381 signal.signal(signal.SIGINT, sig_handler) 382 if hasattr(signal,'SIGTSTP'): 383 signal.signal(signal.SIGTSTP,sig_handler) 384 if hasattr(signal,'SIGCONT'): 385 signal.signal(signal.SIGCONT,sig_handler) 386 if hasattr(signal,'SIGALRM'): 387 signal.signal(signal.SIGALRM,sig_handler) 388 389 conSock.send_dict_msg(msgToMPD) 390 msg = conSock.recv_dict_msg(timeout=recvTimeout) 391 if not msg: 392 mpd_print(1, 'no msg recvd from mpd when expecting ack of request') 393 sys.exit(-1) 394 elif msg['cmd'] == 'mpdrun_ack': 395 currRingSize = msg['ringsize'] 396 currRingNCPUs = msg['ring_ncpus'] 397 else: 398 if msg['cmd'] == 'already_have_a_console': 399 print 'mpd already has a console (e.g. for long ringtest); try later' 400 sys.exit(-1) 401 elif msg['cmd'] == 'job_failed': 402 if msg['reason'] == 'some_procs_not_started': 403 print 'mpiexec: unable to start all procs; may have invalid machine names' 404 print ' remaining specified hosts:' 405 for host in msg['remaining_hosts'].values(): 406 if host != '_any_': 407 try: 408 print ' %s (%s)' % (host,socket.gethostbyaddr(host)[0]) 409 except: 410 print ' %s' % (host) 411 elif msg['reason'] == 'invalid_username': 412 print 'mpiexec: invalid username %s at host %s' % \ 413 (msg['username'],msg['host']) 414 else: 415 print 'mpiexec: job failed; reason=:%s:' % (msg['reason']) 416 sys.exit(-1) 417 else: 418 mpd_print(1, 'unexpected message from mpd: %s' % (msg) ) 419 sys.exit(-1) 420 conSock.close() 421 jobTimeout = int(parmdb['MPIEXEC_TIMEOUT']) 422 if jobTimeout: 423 if hasattr(signal,'alarm'): 424 signal.alarm(jobTimeout) 425 else: 426 def timeout_function(): 427 mpd_print(1,'job ending due to env var MPIEXEC_TIMEOUT=%d' % jobTimeout) 428 thread.interrupt_main() 429 try: 430 import thread, threading 431 timer = threading.Timer(jobTimeout,timeout_function) 432 timer.start() 433 except: 434 print 'unable to establish timeout for MPIEXEC_TIMEOUT' 435 436 streamHandler = MPDStreamHandler() 437 438 (manSock,addr) = listenSock.accept() 439 if not manSock: 440 mpd_print(1, 'mpiexec: failed to obtain sock from manager') 441 sys.exit(-1) 442 streamHandler.set_handler(manSock,handle_man_input,args=(streamHandler,)) 443 if hasattr(os,'fork'): 444 streamHandler.set_handler(sys.stdin,handle_stdin_input, 445 args=(parmdb,streamHandler,manSock)) 446 else: # not using select on fd's when using subprocess module (probably M$) 447 import threading 448 def read_fd_with_func(fd,func): 449 line = 'x' 450 while line: 451 line = func(fd) 452 stdin_thread = threading.Thread(target=read_fd_with_func, 453 args=(sys.stdin.fileno(),handle_stdin_input)) 454 # first, do handshaking with man 455 msg = manSock.recv_dict_msg() 456 if (not msg or not msg.has_key('cmd') or msg['cmd'] != 'man_checking_in'): 457 mpd_print(1, 'mpiexec: from man, invalid msg=:%s:' % (msg) ) 458 sys.exit(-1) 459 msgToSend = { 'cmd' : 'ringsize', 'ring_ncpus' : currRingNCPUs, 460 'ringsize' : currRingSize } 461 manSock.send_dict_msg(msgToSend) 462 msg = manSock.recv_dict_msg() 463 if (not msg or not msg.has_key('cmd')): 464 mpd_print(1, 'mpiexec: from man, invalid msg=:%s:' % (msg) ) 465 sys.exit(-1) 466 if (msg['cmd'] == 'job_started'): 467 jobid = msg['jobid'] 468 if outECs: 469 if parmdb['ecfn_format'] == 'xml': 470 outECs.setAttribute('jobid',jobid.strip()) 471 else: 472 outECs += 'jobid=%s\n' % (jobid.strip()) 473 # print 'mpiexec: job %s started' % (jobid) 474 if parmdb['MPIEXEC_TVSU']: 475 import mtv 476 mtv.allocate_proctable(parmdb['nprocs']) 477 # extract procinfo (rank,hostname,exec,pid) tuples from msg 478 for i in range(parmdb['nprocs']): 479 tvhost = msg['procinfo'][i][0] 480 tvpgm = msg['procinfo'][i][1] 481 tvpid = msg['procinfo'][i][2] 482 # print "%d %s %s %d" % (i,host,pgm,pid) 483 mtv.append_proctable_entry(tvhost,tvpgm,tvpid) 484 mtv.complete_spawn() 485 msgToSend = { 'cmd' : 'tv_ready' } 486 manSock.send_dict_msg(msgToSend) 487 elif parmdb['MPIEXEC_TOTALVIEW']: 488 tvname = 'totalview' 489 if os.environ.has_key('TOTALVIEW'): 490 tvname = os.environ['TOTALVIEW'] 491 if not mpd_which(((tvname.strip()).split()[0])): 492 print 'cannot find "%s" in your $PATH:' % (tvname) 493 print ' ', os.environ['PATH'] 494 sys.exit(-1) 495 import mtv 496 tv_cmd = 'dattach python ' + `os.getpid()` + '; dgo; dassign MPIR_being_debugged 1' 497 os.system(tvname + ' -e "%s" &' % (tv_cmd) ) 498 mtv.wait_for_debugger() 499 mtv.allocate_proctable(parmdb['nprocs']) 500 # extract procinfo (rank,hostname,exec,pid) tuples from msg 501 for i in range(parmdb['nprocs']): 502 tvhost = msg['procinfo'][i][0] 503 tvpgm = msg['procinfo'][i][1] 504 tvpid = msg['procinfo'][i][2] 505 # print "%d %s %s %d" % (i,host,pgm,pid) 506 mtv.append_proctable_entry(tvhost,tvpgm,tvpid) 507 mtv.complete_spawn() 508 msgToSend = { 'cmd' : 'tv_ready' } 509 manSock.send_dict_msg(msgToSend) 510 else: 511 mpd_print(1, 'mpiexec: from man, unknown msg=:%s:' % (msg) ) 512 sys.exit(-1) 513 514 (manCliStdoutSock,addr) = listenSock.accept() 515 streamHandler.set_handler(manCliStdoutSock, 516 handle_cli_stdout_input, 517 args=(parmdb,streamHandler,linesPerRank,)) 518 (manCliStderrSock,addr) = listenSock.accept() 519 streamHandler.set_handler(manCliStderrSock, 520 handle_cli_stderr_input, 521 args=(streamHandler,)) 522 523 # Main Loop 524 timeDelayForPrints = 2 # seconds 525 timeForPrint = time() + timeDelayForPrints # to get started 526 numDoneWithIO = 0 527 while numDoneWithIO < 3: # man, client stdout, and client stderr 528 if sigOccurred: 529 handle_sig_occurred(manSock) 530 rv = streamHandler.handle_active_streams(timeout=1.0) 531 if rv[0] < 0: # will handle some sigs at top of next loop 532 pass # may have to handle some err conditions here 533 if parmdb['MPIEXEC_MERGE_OUTPUT']: 534 if timeForPrint < time(): 535 print_ready_merged_lines(1,parmdb,linesPerRank) 536 timeForPrint = time() + timeDelayForPrints 537 else: 538 print_ready_merged_lines(parmdb['nprocs'],parmdb,linesPerRank) 539 540 if parmdb['MPIEXEC_MERGE_OUTPUT']: 541 print_ready_merged_lines(1,parmdb,linesPerRank) 542 if mshipPid: 543 (donePid,status) = os.wait() # os.waitpid(mshipPid,0) 544 if parmdb['MPIEXEC_EXITCODES_FILENAME']: 545 outECFile = open(parmdb['MPIEXEC_EXITCODES_FILENAME'],'w') 546 if parmdb['ecfn_format'] == 'xml': 547 print >>outECFile, outXmlDoc.toprettyxml(indent=' ') 548 else: 549 print >>outECFile, outECs, 550 outECFile.close() 551 return myExitStatus 552 553 554def collect_args(args,localArgSets): 555 validGlobalArgs = { '-l' : 0, '-usize' : 1, '-gdb' : 0, '-bnr' : 0, 556 '-tv' : 0, '-tvsu' : 0, 557 '-ifhn' : 1, '-machinefile' : 1, '-s' : 1, '-1' : 0, 558 '-a' : 1, '-m' : 0, '-ecfn' : 1, '-recvtimeout' : 1, 559 '-gn' : 1, '-gnp' : 1, '-ghost' : 1, '-gpath' : 1, '-gwdir' : 1, 560 '-gsoft' : 1, '-garch' : 1, '-gexec' : 1, '-gumask' : 1, 561 '-genvall' : 0, '-genv' : 2, '-genvnone' : 0, 562 '-genvlist' : 1 } 563 currumask = os.umask(0) ; os.umask(currumask) # grab it and set it back 564 parmdb[('cmdline','-gn')] = 1 565 parmdb[('cmdline','-ghost')] = '_any_' 566 if os.environ.has_key('PATH'): 567 parmdb[('cmdline','-gpath')] = os.environ['PATH'] 568 else: 569 parmdb[('cmdline','-gpath')] = '' 570 parmdb[('cmdline','-gwdir')] = os.path.abspath(os.getcwd()) 571 parmdb[('cmdline','-gumask')] = str(currumask) 572 parmdb[('cmdline','-gsoft')] = 0 573 parmdb[('cmdline','-garch')] = '' 574 parmdb[('cmdline','-gexec')] = '' 575 parmdb[('cmdline','-genv')] = {} 576 parmdb[('cmdline','-genvlist')] = [] 577 parmdb[('cmdline','-genvnone')] = 0 578 argidx = 1 579 while argidx < len(args) and args[argidx] in validGlobalArgs.keys(): 580 garg = args[argidx] 581 if len(args) <= (argidx+validGlobalArgs[garg]): 582 print "missing sub-arg to %s" % (garg) 583 usage() 584 if garg == '-genv': 585 parmdb['-genv'][args[argidx+1]] = args[argidx+2] 586 argidx += 3 587 elif garg == '-gn' or garg == '-gnp': 588 if args[argidx+1].isdigit(): 589 parmdb[('cmdline','-gn')] = int(args[argidx+1]) 590 else: 591 print 'argument to %s must be numeric' % (garg) 592 usage() 593 argidx += 2 594 elif garg == '-ghost': 595 try: 596 parmdb[('cmdline',garg)] = socket.gethostbyname_ex(args[argidx+1])[2][0] 597 except: 598 print 'unable to do find info for host %s' % (args[argidx+1]) 599 sys.exit(-1) 600 argidx += 2 601 elif garg == '-gpath': 602 parmdb[('cmdline','-gpath')] = args[argidx+1] 603 argidx += 2 604 elif garg == '-gwdir': 605 parmdb[('cmdline','-gwdir')] = args[argidx+1] 606 argidx += 2 607 elif garg == '-gumask': 608 parmdb[('cmdline','-gumask')] = args[argidx+1] 609 argidx += 2 610 elif garg == '-gsoft': 611 parmdb[('cmdline','-gsoft')] = args[argidx+1] 612 argidx += 2 613 elif garg == '-garch': 614 parmdb[('cmdline','-garch')] = args[argidx+1] 615 argidx += 2 616 print '** -garch is accepted but not used' 617 elif garg == '-gexec': 618 parmdb[('cmdline','-gexec')] = args[argidx+1] 619 argidx += 2 620 elif garg == '-genv': 621 parmdb[('cmdline','-genv')] = args[argidx+1] 622 argidx += 2 623 elif garg == '-genvlist': 624 parmdb[('cmdline','-genvlist')] = args[argidx+1].split(',') 625 argidx += 2 626 elif garg == '-genvnone': 627 parmdb[('cmdline','-genvnone')] = args[argidx+1] 628 argidx += 1 629 elif garg == '-l': 630 parmdb[('cmdline','MPIEXEC_SHOW_LINE_LABELS')] = 1 631 argidx += 1 632 elif garg == '-a': 633 parmdb[('cmdline','MPIEXEC_JOB_ALIAS')] = args[argidx+1] 634 argidx += 2 635 elif garg == '-usize': 636 if args[argidx+1].isdigit(): 637 parmdb[('cmdline','MPIEXEC_USIZE')] = int(args[argidx+1]) 638 else: 639 print 'argument to %s must be numeric' % (garg) 640 usage() 641 argidx += 2 642 elif garg == '-recvtimeout': 643 if args[argidx+1].isdigit(): 644 parmdb[('cmdline','MPIEXEC_RECV_TIMEOUT')] = int(args[argidx+1]) 645 else: 646 print 'argument to %s must be numeric' % (garg) 647 usage() 648 argidx += 2 649 elif garg == '-gdb': 650 parmdb[('cmdline','MPIEXEC_GDB')] = 1 651 argidx += 1 652 parmdb[('cmdline','MPIEXEC_MERGE_OUTPUT')] = 1 # implied 653 parmdb[('cmdline','MPIEXEC_SHOW_LINE_LABELS')] = 1 # implied 654 parmdb[('cmdline','MPIEXEC_STDIN_DEST')] = 'all' # implied 655 elif garg == '-ifhn': 656 parmdb[('cmdline','MPIEXEC_IFHN')] = args[argidx+1] 657 argidx += 2 658 try: 659 hostinfo = socket.gethostbyname_ex(parmdb['MPIEXEC_IFHN']) 660 except: 661 print 'mpiexec: gethostbyname_ex failed for ifhn %s' % (parmdb['MPIEXEC_IFHN']) 662 sys.exit(-1) 663 elif garg == '-m': 664 parmdb[('cmdline','MPIEXEC_MERGE_OUTPUT')] = 1 665 argidx += 1 666 elif garg == '-s': 667 parmdb[('cmdline','MPIEXEC_STDIN_DEST')] = args[argidx+1] 668 argidx += 2 669 elif garg == '-machinefile': 670 parmdb[('cmdline','MPIEXEC_MACHINEFILE')] = args[argidx+1] 671 argidx += 2 672 elif garg == '-bnr': 673 parmdb[('cmdline','MPIEXEC_BNR')] = 1 674 argidx += 1 675 elif garg == '-tv': 676 parmdb[('cmdline','MPIEXEC_TOTALVIEW')] = 1 677 argidx += 1 678 elif garg == '-tvsu': 679 parmdb[('cmdline','MPIEXEC_TOTALVIEW')] = 1 680 parmdb[('cmdline','MPIEXEC_TVSU')] = 1 681 argidx += 1 682 elif garg == '-ecfn': 683 parmdb[('cmdline','MPIEXEC_EXITCODES_FILENAME')] = args[argidx+1] 684 argidx += 2 685 elif garg == '-1': 686 parmdb[('cmdline','MPIEXEC_TRY_1ST_LOCALLY')] = 0 # reverses meaning 687 argidx += 1 688 if len(args) <= argidx: 689 print "mpiexec: missing arguments after global args" 690 usage() 691 if args[argidx] == ':': 692 argidx += 1 693 localArgsKey = 0 694 # collect local arg sets but do not validate them until handled below 695 while argidx < len(args): 696 if args[argidx] == ':': 697 localArgsKey += 1 698 localArgSets[localArgsKey] = [] 699 else: 700 localArgSets[localArgsKey].append(args[argidx]) 701 argidx += 1 702 703def handle_local_argset(argset,machineFileInfo,msgToMPD): 704 global parmdb, nextRange, appnum, recvTimeout 705 validLocalArgs = { '-n' : 1, '-np' : 1, '-host' : 1, '-path' : 1, '-wdir' : 1, 706 '-soft' : 1, '-arch' : 1, '-umask' : 1, 707 '-envall' : 0, '-env' : 2, '-envnone' : 0, '-envlist' : 1 } 708 host = parmdb['-ghost'] 709 wdir = parmdb['-gwdir'] 710 wumask = parmdb['-gumask'] 711 wpath = parmdb['-gpath'] 712 nProcs = parmdb['-gn'] 713 usize = parmdb['MPIEXEC_USIZE'] 714 gexec = parmdb['-gexec'] 715 softness = parmdb['-gsoft'] 716 if parmdb['-genvnone']: 717 envall = 0 718 else: 719 envall = 1 720 localEnvlist = [] 721 localEnv = {} 722 723 argidx = 0 724 while argidx < len(argset): 725 if argset[argidx] not in validLocalArgs: 726 if argset[argidx][0] == '-': 727 print 'invalid "local" arg: %s' % argset[argidx] 728 usage() 729 break # since now at executable 730 if parmdb['MPIEXEC_MACHINEFILE']: 731 if argset[argidx] == '-host' or argset[argidx] == ['-ghost']: 732 print '-host (or -ghost) and -machinefile are incompatible' 733 sys.exit(-1) 734 if argset[argidx] == '-n' or argset[argidx] == '-np': 735 if len(argset) < (argidx+2): 736 print '** missing arg to -n' 737 usage() 738 nProcs = argset[argidx+1] 739 if not nProcs.isdigit(): 740 print '** non-numeric arg to -n: %s' % nProcs 741 usage() 742 nProcs = int(nProcs) 743 argidx += 2 744 elif argset[argidx] == '-host': 745 if len(argset) < (argidx+2): 746 print '** missing arg to -host' 747 usage() 748 try: 749 host = socket.gethostbyname_ex(argset[argidx+1])[2][0] 750 except: 751 print 'unable to do find info for host %s' % (argset[argidx+1]) 752 sys.exit(-1) 753 argidx += 2 754 elif argset[argidx] == '-path': 755 if len(argset) < (argidx+2): 756 print '** missing arg to -path' 757 usage() 758 wpath = argset[argidx+1] 759 argidx += 2 760 elif argset[argidx] == '-wdir': 761 if len(argset) < (argidx+2): 762 print '** missing arg to -wdir' 763 usage() 764 wdir = argset[argidx+1] 765 argidx += 2 766 elif argset[argidx] == '-umask': 767 if len(argset) < (argidx+2): 768 print '** missing arg to -umask' 769 usage() 770 wumask = argset[argidx+1] 771 argidx += 2 772 elif argset[argidx] == '-soft': 773 if len(argset) < (argidx+2): 774 print '** missing arg to -soft' 775 usage() 776 softness = argset[argidx+1] 777 argidx += 2 778 elif argset[argidx] == '-arch': 779 if len(argset) < (argidx+2): 780 print '** missing arg to -arch' 781 usage() 782 print '** -arch is accepted but not used' 783 argidx += 2 784 elif argset[argidx] == '-envall': 785 envall = 1 786 argidx += 1 787 elif argset[argidx] == '-envnone': 788 envall = 0 789 argidx += 1 790 elif argset[argidx] == '-envlist': 791 localEnvlist = argset[argidx+1].split(',') 792 argidx += 2 793 elif argset[argidx] == '-env': 794 if len(argset) < (argidx+3): 795 print '** missing arg to -env' 796 usage() 797 var = argset[argidx+1] 798 val = argset[argidx+2] 799 localEnv[var] = val 800 argidx += 3 801 else: 802 print 'unknown "local" option: %s' % argset[argidx] 803 usage() 804 805 if softness: 806 nProcs = adjust_nprocs(nProcs,softness) 807 808 cmdAndArgs = [] 809 if argidx < len(argset): 810 while argidx < len(argset): 811 cmdAndArgs.append(argset[argidx]) 812 argidx += 1 813 else: 814 if gexec: 815 cmdAndArgs = [gexec] 816 if not cmdAndArgs: 817 print 'no cmd specified' 818 usage() 819 820 argsetLoRange = nextRange 821 argsetHiRange = nextRange + nProcs - 1 822 loRange = argsetLoRange 823 hiRange = argsetHiRange 824 825 defaultHostForArgset = host 826 while loRange <= argsetHiRange: 827 host = defaultHostForArgset 828 if machineFileInfo: 829 if len(machineFileInfo) <= hiRange: 830 print 'too few entries in machinefile' 831 sys.exit(-1) 832 host = machineFileInfo[loRange]['host'] 833 ifhn = machineFileInfo[loRange]['ifhn'] 834 if ifhn: 835 msgToMPD['ifhns'][loRange] = ifhn 836 for i in range(loRange+1,hiRange+1): 837 if machineFileInfo[i]['host'] != host or machineFileInfo[i]['ifhn'] != ifhn: 838 hiRange = i - 1 839 break 840 841 asRange = (loRange,hiRange) # this argset range as a tuple 842 843 msgToMPD['users'][asRange] = mpd_get_my_username() 844 msgToMPD['execs'][asRange] = cmdAndArgs[0] 845 msgToMPD['paths'][asRange] = wpath 846 msgToMPD['cwds'][asRange] = wdir 847 msgToMPD['umasks'][asRange] = wumask 848 msgToMPD['args'][asRange] = cmdAndArgs[1:] 849 if host.startswith('_any_'): 850 msgToMPD['hosts'][(loRange,hiRange)] = host 851 else: 852 try: 853 msgToMPD['hosts'][asRange] = socket.gethostbyname_ex(host)[2][0] 854 except: 855 print 'unable to do find info for host %s' % (host) 856 sys.exit(-1) 857 858 envToSend = {} 859 if envall: 860 for envvar in os.environ.keys(): 861 envToSend[envvar] = os.environ[envvar] 862 for envvar in parmdb['-genvlist']: 863 if not os.environ.has_key(envvar): 864 print '%s in envlist does not exist in your env' % (envvar) 865 sys.exit(-1) 866 envToSend[envvar] = os.environ[envvar] 867 for envvar in localEnvlist: 868 if not os.environ.has_key(envvar): 869 print '%s in envlist does not exist in your env' % (envvar) 870 sys.exit(-1) 871 envToSend[envvar] = os.environ[envvar] 872 for envvar in parmdb['-genv'].keys(): 873 envToSend[envvar] = parmdb['-genv'][envvar] 874 for envvar in localEnv.keys(): 875 envToSend[envvar] = localEnv[envvar] 876 if usize: 877 envToSend['MPI_UNIVERSE_SIZE'] = str(usize) 878 envToSend['MPI_APPNUM'] = str(appnum) 879 msgToMPD['envvars'][(loRange,hiRange)] = envToSend 880 881 loRange = hiRange + 1 882 hiRange = argsetHiRange # again 883 884 appnum += 1 885 nextRange += nProcs 886 parmdb[('cmdline','nprocs')] = parmdb['nprocs'] + nProcs 887 888# Adjust nProcs (called maxprocs in the Standard) according to soft: 889# Our interpretation is that we need the largest number <= nProcs that is 890# consistent with the list of possible values described by soft. I.e. 891# if the user says 892# 893# mpiexec -n 10 -soft 5 a.out 894# 895# we adjust the 10 down to 5. This may not be what was intended in the Standard, 896# but it seems to be what it says. 897 898def adjust_nprocs(nProcs,softness): 899 biglist = [] 900 list1 = softness.split(',') 901 for triple in list1: # triple is a or a:b or a:b:c 902 thingy = triple.split(':') 903 if len(thingy) == 1: 904 a = int(thingy[0]) 905 if a <= nProcs and a >= 0: 906 biglist.append(a) 907 elif len(thingy) == 2: 908 a = int(thingy[0]) 909 b = int(thingy[1]) 910 for i in range(a,b+1): 911 if i <= nProcs and i >= 0: 912 biglist.append(i) 913 elif len(thingy) == 3: 914 a = int(thingy[0]) 915 b = int(thingy[1]) 916 c = int(thingy[2]) 917 for i in range(a,b+1,c): 918 if i <= nProcs and i >= 0: 919 biglist.append(i) 920 else: 921 print 'invalid subargument to -soft: %s' % (softness) 922 print 'should be a or a:b or a:b:c' 923 usage() 924 925 if len(biglist) == 0: 926 print '-soft argument %s allows no valid number of processes' % (softness) 927 usage() 928 else: 929 return max(biglist) 930 931 932def read_machinefile(machineFilename): 933 if not machineFilename: 934 return None 935 try: 936 machineFile = open(machineFilename,'r') 937 except: 938 print '** unable to open machinefile' 939 sys.exit(-1) 940 procID = 0 941 machineFileInfo = {} 942 for line in machineFile: 943 line = line.strip() 944 if not line or line[0] == '#': 945 continue 946 splitLine = re.split(r'\s+',line) 947 host = splitLine[0] 948 if ':' in host: 949 (host,nprocs) = host.split(':',1) 950 nprocs = int(nprocs) 951 else: 952 nprocs = 1 953 kvps = {'ifhn' : ''} 954 for kv in splitLine[1:]: 955 (k,v) = kv.split('=',1) 956 if k == 'ifhn': # interface hostname 957 kvps[k] = v 958 else: # may be other kv pairs later 959 print 'unrecognized key in machinefile:', k 960 sys.exit(-1) 961 for i in range(procID,procID+nprocs): 962 machineFileInfo[i] = { 'host' : host, 'nprocs' : nprocs } 963 machineFileInfo[i].update(kvps) 964 procID += nprocs 965 return machineFileInfo 966 967def handle_man_input(sock,streamHandler): 968 global numDoneWithIO, myExitStatus 969 global outXmlDoc, outECs 970 msg = sock.recv_dict_msg() 971 if not msg: 972 streamHandler.del_handler(sock) 973 numDoneWithIO += 1 974 elif not msg.has_key('cmd'): 975 mpd_print(1,'mpiexec: from man, invalid msg=:%s:' % (msg) ) 976 sys.exit(-1) 977 elif msg['cmd'] == 'startup_status': 978 if msg['rc'] != 0: 979 # print 'rank %d (%s) in job %s failed to find executable %s' % \ 980 # ( msg['rank'], msg['src'], msg['jobid'], msg['exec'] ) 981 host = msg['src'].split('_')[0] 982 reason = unquote(msg['reason']) 983 print 'problem with execution of %s on %s: %s ' % \ 984 (msg['exec'],host,reason) 985 # don't stop ; keep going until all top-level mans finish 986 elif msg['cmd'] == 'job_aborted_early': 987 print 'rank %d in job %s caused collective abort of all ranks' % \ 988 ( msg['rank'], msg['jobid'] ) 989 status = msg['exit_status'] 990 if hasattr(os,'WIFSIGNALED') and os.WIFSIGNALED(status): 991 if status > myExitStatus: 992 myExitStatus = status 993 killed_status = status & 0x007f # AND off core flag 994 print ' exit status of rank %d: killed by signal %d ' % \ 995 (msg['rank'],killed_status) 996 elif hasattr(os,'WEXITSTATUS'): 997 exit_status = os.WEXITSTATUS(status) 998 if exit_status > myExitStatus: 999 myExitStatus = exit_status 1000 print ' exit status of rank %d: return code %d ' % \ 1001 (msg['rank'],exit_status) 1002 else: 1003 myExitStatus = 0 1004 elif msg['cmd'] == 'job_aborted': 1005 print 'job aborted; reason = %s' % (msg['reason']) 1006 elif msg['cmd'] == 'client_exit_status': 1007 if outECs: 1008 if parmdb['ecfn_format'] == 'xml': 1009 outXmlProc = outXmlDoc.createElement('exit-code') 1010 outECs.appendChild(outXmlProc) 1011 outXmlProc.setAttribute('rank',str(msg['cli_rank'])) 1012 outXmlProc.setAttribute('status',str(msg['cli_status'])) 1013 outXmlProc.setAttribute('pid',str(msg['cli_pid'])) 1014 outXmlProc.setAttribute('host',msg['cli_host']) # cli_ifhn is also avail 1015 else: 1016 outECs += 'rank=%d status=%d pid=%d host=%s\n' % \ 1017 (msg['cli_rank'],msg['cli_status'],msg['cli_pid'],msg['cli_host']) 1018 1019 # print "exit info: rank=%d host=%s pid=%d status=%d" % \ 1020 # (msg['cli_rank'],msg['cli_host'], 1021 # msg['cli_pid'],msg['cli_status']) 1022 status = msg['cli_status'] 1023 if hasattr(os,'WIFSIGNALED') and os.WIFSIGNALED(status): 1024 if status > myExitStatus: 1025 myExitStatus = status 1026 killed_status = status & 0x007f # AND off core flag 1027 # print 'exit status of rank %d: killed by signal %d ' % \ 1028 # (msg['cli_rank'],killed_status) 1029 elif hasattr(os,'WEXITSTATUS'): 1030 exit_status = os.WEXITSTATUS(status) 1031 if exit_status > myExitStatus: 1032 myExitStatus = exit_status 1033 # print 'exit status of rank %d: return code %d ' % \ 1034 # (msg['cli_rank'],exit_status) 1035 else: 1036 myExitStatus = 0 1037 else: 1038 print 'unrecognized msg from manager :%s:' % msg 1039 1040def handle_cli_stdout_input(sock,parmdb,streamHandler,linesPerRank): 1041 global numDoneWithIO 1042 if parmdb['MPIEXEC_MERGE_OUTPUT']: 1043 line = sock.recv_one_line() 1044 if not line: 1045 streamHandler.del_handler(sock) 1046 numDoneWithIO += 1 1047 else: 1048 if parmdb['MPIEXEC_GDB']: 1049 line = line.replace('(gdb)\n','(gdb) ') 1050 try: 1051 (rank,rest) = line.split(':',1) 1052 rank = int(rank) 1053 linesPerRank[rank].append(rest) 1054 except: 1055 print line 1056 print_ready_merged_lines(parmdb['nprocs'],parmdb,linesPerRank) 1057 else: 1058 msg = sock.recv(1024) 1059 if not msg: 1060 streamHandler.del_handler(sock) 1061 numDoneWithIO += 1 1062 else: 1063 sys.stdout.write(msg) 1064 sys.stdout.flush() 1065 1066def handle_cli_stderr_input(sock,streamHandler): 1067 global numDoneWithIO 1068 msg = sock.recv(1024) 1069 if not msg: 1070 streamHandler.del_handler(sock) 1071 numDoneWithIO += 1 1072 else: 1073 sys.stderr.write(msg) 1074 sys.stderr.flush() 1075 1076# NOTE: stdin is supposed to be slow, low-volume. We read it all here (as it 1077# appears on the fd) and send it immediately to the receivers. If the user 1078# redirects a "large" file (perhaps as small as 5k) into us, we will send it 1079# all out right away. This can cause things to hang on the remote (recvr) side. 1080# We do not wait to read here until the recvrs read because there may be several 1081# recvrs and they may read at different speeds/times. 1082def handle_stdin_input(stdin_stream,parmdb,streamHandler,manSock): 1083 line = '' 1084 try: 1085 line = stdin_stream.readline() 1086 except IOError, errinfo: 1087 sys.stdin.flush() # probably does nothing 1088 # print "I/O err on stdin:", errinfo 1089 mpd_print(1,'stdin problem; if pgm is run in background, redirect from /dev/null') 1090 mpd_print(1,' e.g.: mpiexec -n 4 a.out < /dev/null &') 1091 else: 1092 gdbFlag = parmdb['MPIEXEC_GDB'] 1093 if line: # not EOF 1094 msgToSend = { 'cmd' : 'stdin_from_user', 'line' : line } # default 1095 if gdbFlag and line.startswith('z'): 1096 line = line.rstrip() 1097 if len(line) < 3: # just a 'z' 1098 line += ' 0-%d' % (parmdb['nprocs']-1) 1099 s1 = line[2:].rstrip().split(',') 1100 for s in s1: 1101 s2 = s.split('-') 1102 for i in s2: 1103 if not i.isdigit(): 1104 print 'invalid arg to z :%s:' % i 1105 continue 1106 msgToSend = { 'cmd' : 'stdin_dest', 'stdin_procs' : line[2:] } 1107 sys.stdout.softspace = 0 1108 print '%s: (gdb) ' % (line[2:]), 1109 elif gdbFlag and line.startswith('q'): 1110 msgToSend = { 'cmd' : 'stdin_dest', 1111 'stdin_procs' : '0-%d' % (parmdb['nprocs']-1) } 1112 if manSock: 1113 manSock.send_dict_msg(msgToSend) 1114 msgToSend = { 'cmd' : 'stdin_from_user','line' : 'q\n' } 1115 elif gdbFlag and line.startswith('^'): 1116 msgToSend = { 'cmd' : 'stdin_dest', 1117 'stdin_procs' : '0-%d' % (parmdb['nprocs']-1) } 1118 if manSock: 1119 manSock.send_dict_msg(msgToSend) 1120 msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGINT' } 1121 if manSock: 1122 manSock.send_dict_msg(msgToSend) 1123 else: 1124 streamHandler.del_handler(sys.stdin) 1125 sys.stdin.close() 1126 if manSock: 1127 msgToSend = { 'cmd' : 'stdin_from_user', 'eof' : '' } 1128 manSock.send_dict_msg(msgToSend) 1129 return line 1130 1131def handle_sig_occurred(manSock): 1132 global sigOccurred 1133 if sigOccurred == signal.SIGINT: 1134 if manSock: 1135 msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGINT' } 1136 manSock.send_dict_msg(msgToSend) 1137 manSock.close() 1138 sys.exit(-1) 1139 elif sigOccurred == signal.SIGALRM: 1140 if manSock: 1141 msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGKILL' } 1142 manSock.send_dict_msg(msgToSend) 1143 manSock.close() 1144 mpd_print(1,'job ending due to env var MPIEXEC_TIMEOUT=%s' % \ 1145 os.environ['MPIEXEC_TIMEOUT']) 1146 sys.exit(-1) 1147 elif sigOccurred == signal.SIGTSTP: 1148 sigOccurred = 0 # do this before kill below 1149 if manSock: 1150 msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGTSTP' } 1151 manSock.send_dict_msg(msgToSend) 1152 signal.signal(signal.SIGTSTP,signal.SIG_DFL) # stop myself 1153 os.kill(os.getpid(),signal.SIGTSTP) 1154 signal.signal(signal.SIGTSTP,sig_handler) # restore this handler 1155 elif sigOccurred == signal.SIGCONT: 1156 sigOccurred = 0 # do it before handling 1157 if manSock: 1158 msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGCONT' } 1159 manSock.send_dict_msg(msgToSend) 1160 1161def sig_handler(signum,frame): 1162 global sigOccurred 1163 sigOccurred = signum 1164 mpd_handle_signal(signum,frame) 1165 1166def format_sorted_ranks(ranks): 1167 all = [] 1168 one = [] 1169 prevRank = -999 1170 for i in range(len(ranks)): 1171 if i != 0 and ranks[i] != (prevRank+1): 1172 all.append(one) 1173 one = [] 1174 one.append(ranks[i]) 1175 if i == (len(ranks)-1): 1176 all.append(one) 1177 prevRank = ranks[i] 1178 pline = '' 1179 for i in range(len(all)): 1180 if len(all[i]) > 1: 1181 pline += '%d-%d' % (all[i][0],all[i][-1]) 1182 else: 1183 pline += '%d' % (all[i][0]) 1184 if i != (len(all)-1): 1185 pline += ',' 1186 return pline 1187 1188def print_ready_merged_lines(minRanks,parmdb,linesPerRank): 1189 printFlag = 1 # default to get started 1190 while printFlag: 1191 printFlag = 0 1192 for r1 in range(parmdb['nprocs']): 1193 if not linesPerRank[r1]: 1194 continue 1195 sortedRanks = [] 1196 lineToPrint = linesPerRank[r1][0] 1197 for r2 in range(parmdb['nprocs']): 1198 if linesPerRank[r2] and linesPerRank[r2][0] == lineToPrint: # myself also 1199 sortedRanks.append(r2) 1200 if len(sortedRanks) >= minRanks: 1201 fsr = format_sorted_ranks(sortedRanks) 1202 sys.stdout.softspace = 0 1203 print '%s: %s' % (fsr,lineToPrint), 1204 for r2 in sortedRanks: 1205 linesPerRank[r2] = linesPerRank[r2][1:] 1206 printFlag = 1 1207 sys.stdout.flush() 1208 1209def get_parms_from_xml_file(msgToMPD): 1210 global parmdb 1211 try: 1212 import xml.dom.minidom 1213 except: 1214 print 'you requested to parse an xml file, but' 1215 print ' I was unable to import the xml.dom.minidom module' 1216 sys.exit(-1) 1217 known_rlimit_types = ['core','cpu','fsize','data','stack','rss', 1218 'nproc','nofile','ofile','memlock','as','vmem'] 1219 try: 1220 inXmlFilename = parmdb['inXmlFilename'] 1221 parmsXMLFile = open(inXmlFilename,'r') 1222 except: 1223 print 'could not open job xml specification file %s' % (inXmlFilename) 1224 sys.exit(-1) 1225 fileContents = parmsXMLFile.read() 1226 try: 1227 parsedXML = xml.dom.minidom.parseString(fileContents) 1228 except: 1229 print "mpiexec failed parsing xml file (perhaps from mpiexec); here is the content:" 1230 print fileContents 1231 sys.exit(-1) 1232 if parsedXML.documentElement.tagName != 'create-process-group': 1233 print 'expecting create-process-group; got unrecognized doctype: %s' % \ 1234 (parsedXML.documentElement.tagName) 1235 sys.exit(-1) 1236 cpg = parsedXML.getElementsByTagName('create-process-group')[0] 1237 if cpg.hasAttribute('totalprocs'): 1238 parmdb[('xml','nprocs')] = int(cpg.getAttribute('totalprocs')) 1239 else: 1240 print '** totalprocs not specified in %s' % inXmlFilename 1241 sys.exit(-1) 1242 if cpg.hasAttribute('try_1st_locally'): 1243 parmdb[('xml','MPIEXEC_TRY_1ST_LOCALLY')] = int(cpg.getAttribute('try_1st_locally')) 1244 if cpg.hasAttribute('output') and cpg.getAttribute('output') == 'label': 1245 parmdb[('xml','MPIEXEC_SHOW_LINE_LABELS')] = 1 1246 if cpg.hasAttribute('pgid'): # our jobalias 1247 parmdb[('xml','MPIEXEC_JOB_ALIAS')] = cpg.getAttribute('pgid') 1248 if cpg.hasAttribute('stdin_dest'): 1249 parmdb[('xml','MPIEXEC_STDIN_DEST')] = cpg.getAttribute('stdin_dest') 1250 if cpg.hasAttribute('doing_bnr'): 1251 parmdb[('xml','MPIEXEC_BNR')] = int(cpg.getAttribute('doing_bnr')) 1252 if cpg.hasAttribute('ifhn'): 1253 parmdb[('xml','MPIEXEC_IFHN')] = cpg.getAttribute('ifhn') 1254 if cpg.hasAttribute('exit_codes_filename'): 1255 parmdb[('xml','MPIEXEC_EXITCODES_FILENAME')] = cpg.getAttribute('exit_codes_filename') 1256 parmdb[('xml','ecfn_format')] = 'xml' 1257 if cpg.hasAttribute('gdb'): 1258 gdbFlag = int(cpg.getAttribute('gdb')) 1259 if gdbFlag: 1260 parmdb[('xml','MPIEXEC_GDB')] = 1 1261 parmdb[('xml','MPIEXEC_MERGE_OUTPUT')] = 1 # implied 1262 parmdb[('xml','MPIEXEC_SHOW_LINE_LABELS')] = 1 # implied 1263 parmdb[('xml','MPIEXEC_STDIN_DEST')] = 'all' # implied 1264 if cpg.hasAttribute('use_root_pm'): 1265 parmdb[('xml','MPD_USE_ROOT_MPD')] = int(cpg.getAttribute('use_root_pm')) 1266 if cpg.hasAttribute('tv'): 1267 parmdb[('xml','MPIEXEC_TOTALVIEW')] = int(cpg.getAttribute('tv')) 1268 hostSpec = cpg.getElementsByTagName('host-spec') 1269 if hostSpec: 1270 hostList = [] 1271 for node in hostSpec[0].childNodes: 1272 node = node.data.strip() 1273 hostnames = re.findall(r'\S+',node) 1274 for hostname in hostnames: 1275 if hostname: # some may be the empty string 1276 try: 1277 ipaddr = socket.gethostbyname_ex(hostname)[2][0] 1278 except: 1279 print 'unable to determine IP info for host %s' % (hostname) 1280 sys.exit(-1) 1281 hostList.append(ipaddr) 1282 parmdb[('xml','MPIEXEC_HOST_LIST')] = hostList 1283 if hostSpec and hostSpec[0].hasAttribute('check'): 1284 hostSpecMode = hostSpec[0].getAttribute('check') 1285 if hostSpecMode == 'yes': 1286 parmdb[('xml','MPIEXEC_HOST_CHECK')] = 1 1287 covered = [0] * parmdb['nprocs'] 1288 procSpec = cpg.getElementsByTagName('process-spec') 1289 if not procSpec: 1290 print 'No process-spec specified' 1291 usage() 1292 for p in procSpec: 1293 if p.hasAttribute('range'): 1294 therange = p.getAttribute('range') 1295 splitRange = therange.split('-') 1296 if len(splitRange) == 1: 1297 loRange = int(splitRange[0]) 1298 hiRange = loRange 1299 else: 1300 (loRange,hiRange) = (int(splitRange[0]),int(splitRange[1])) 1301 else: 1302 (loRange,hiRange) = (0,parmdb['nprocs']-1) 1303 for i in xrange(loRange,hiRange+1): 1304 nprocs = parmdb['nprocs'] 1305 if i >= nprocs: 1306 print '*** exiting; rank %d is greater than nprocs' % (nprocs) 1307 sys.exit(-1) 1308 if covered[i]: 1309 print '*** exiting; rank %d is doubly used in proc specs' % (nprocs) 1310 sys.exit(-1) 1311 covered[i] = 1 1312 if p.hasAttribute('exec'): 1313 msgToMPD['execs'][(loRange,hiRange)] = p.getAttribute('exec') 1314 else: 1315 print '*** exiting; range %d-%d has no exec' % (loRange,hiRange) 1316 sys.exit(-1) 1317 if p.hasAttribute('user'): 1318 username = p.getAttribute('user') 1319 if pwd_module_available: 1320 try: 1321 pwent = pwd.getpwnam(username) 1322 except: 1323 print username, 'is an invalid username' 1324 sys.exit(-1) 1325 if username == mpd_get_my_username() \ 1326 or (hasattr(os,'getuid') and os.getuid() == 0): 1327 msgToMPD['users'][(loRange,hiRange)] = p.getAttribute('user') 1328 else: 1329 print username, 'username does not match yours and you are not root' 1330 sys.exit(-1) 1331 else: 1332 msgToMPD['users'][(loRange,hiRange)] = mpd_get_my_username() 1333 if p.hasAttribute('cwd'): 1334 msgToMPD['cwds'][(loRange,hiRange)] = p.getAttribute('cwd') 1335 else: 1336 msgToMPD['cwds'][(loRange,hiRange)] = os.path.abspath(os.getcwd()) 1337 if p.hasAttribute('umask'): 1338 msgToMPD['umasks'][(loRange,hiRange)] = p.getAttribute('umask') 1339 else: 1340 currumask = os.umask(0) ; os.umask(currumask) 1341 msgToMPD['umasks'][(loRange,hiRange)] = str(currumask) 1342 if p.hasAttribute('path'): 1343 msgToMPD['paths'][(loRange,hiRange)] = p.getAttribute('path') 1344 else: 1345 msgToMPD['paths'][(loRange,hiRange)] = os.environ['PATH'] 1346 if p.hasAttribute('host'): 1347 host = p.getAttribute('host') 1348 if host.startswith('_any_'): 1349 msgToMPD['hosts'][(loRange,hiRange)] = host 1350 else: 1351 try: 1352 msgToMPD['hosts'][(loRange,hiRange)] = socket.gethostbyname_ex(host)[2][0] 1353 except: 1354 print 'unable to do find info for host %s' % (host) 1355 sys.exit(-1) 1356 else: 1357 if hostSpec and hostList: 1358 msgToMPD['hosts'][(loRange,hiRange)] = '_any_from_pool_' 1359 else: 1360 msgToMPD['hosts'][(loRange,hiRange)] = '_any_' 1361 argDict = {} 1362 argList = p.getElementsByTagName('arg') 1363 for argElem in argList: 1364 argDict[int(argElem.getAttribute('idx'))] = argElem.getAttribute('value') 1365 argVals = [0] * len(argList) 1366 for i in argDict.keys(): 1367 argVals[i-1] = unquote(argDict[i]) 1368 msgToMPD['args'][(loRange,hiRange)] = argVals 1369 limitDict = {} 1370 limitList = p.getElementsByTagName('limit') 1371 for limitElem in limitList: 1372 typ = limitElem.getAttribute('type') 1373 if typ in known_rlimit_types: 1374 limitDict[typ] = limitElem.getAttribute('value') 1375 else: 1376 print 'mpiexec: invalid type in limit: %s' % (typ) 1377 sys.exit(-1) 1378 msgToMPD['limits'][(loRange,hiRange)] = limitDict 1379 envVals = {} 1380 envVarList = p.getElementsByTagName('env') 1381 for envVarElem in envVarList: 1382 envkey = envVarElem.getAttribute('name') 1383 envval = unquote(envVarElem.getAttribute('value')) 1384 envVals[envkey] = envval 1385 msgToMPD['envvars'][(loRange,hiRange)] = envVals 1386 for i in range(len(covered)): 1387 if not covered[i]: 1388 print '*** exiting; %d procs are requested, but proc %d is not described' % \ 1389 (parmdb['nprocs'],i) 1390 sys.exit(-1) 1391 1392def get_vals_for_attach(parmdb,conSock,msgToMPD): 1393 global recvTimeout 1394 sjobid = parmdb['-gdba'].split('@') # jobnum and originating host 1395 msgToSend = { 'cmd' : 'mpdlistjobs' } 1396 conSock.send_dict_msg(msgToSend) 1397 msg = conSock.recv_dict_msg(timeout=recvTimeout) 1398 if not msg: 1399 mpd_print(1,'no msg recvd from mpd before timeout') 1400 sys.exit(-1) 1401 if msg['cmd'] != 'local_mpdid': # get full id of local mpd for filters later 1402 mpd_print(1,'did not recv local_mpdid msg from local mpd; recvd: %s' % msg) 1403 sys.exit(-1) 1404 else: 1405 if len(sjobid) == 1: 1406 sjobid.append(msg['id']) 1407 got_info = 0 1408 while 1: 1409 msg = conSock.recv_dict_msg() 1410 if not msg.has_key('cmd'): 1411 mpd_print(1,'invalid message from mpd :%s:' % (msg)) 1412 sys.exit(-1) 1413 if msg['cmd'] == 'mpdlistjobs_info': 1414 got_info = 1 1415 smjobid = msg['jobid'].split(' ') # jobnum, mpdid, and alias (if present) 1416 if sjobid[0] == smjobid[0] and sjobid[1] == smjobid[1]: # jobnum and mpdid 1417 rank = int(msg['rank']) 1418 msgToMPD['users'][(rank,rank)] = msg['username'] 1419 msgToMPD['hosts'][(rank,rank)] = msg['ifhn'] 1420 msgToMPD['execs'][(rank,rank)] = msg['pgm'] 1421 msgToMPD['cwds'][(rank,rank)] = os.path.abspath(os.getcwd()) 1422 msgToMPD['paths'][(rank,rank)] = os.environ['PATH'] 1423 msgToMPD['args'][(rank,rank)] = [msg['clipid']] 1424 msgToMPD['envvars'][(rank,rank)] = {} 1425 msgToMPD['limits'][(rank,rank)] = {} 1426 currumask = os.umask(0) ; os.umask(currumask) # grab it and set it back 1427 msgToMPD['umasks'][(rank,rank)] = str(currumask) 1428 elif msg['cmd'] == 'mpdlistjobs_trailer': 1429 if not got_info: 1430 print 'no info on this jobid; probably invalid' 1431 sys.exit(-1) 1432 break 1433 else: 1434 print 'invaild msg from mpd :%s:' % (msg) 1435 sys.exit(-1) 1436 parmdb[('thispgm','nprocs')] = len(msgToMPD['execs'].keys()) # all dicts are same len 1437 1438 1439def usage(): 1440 print __doc__ 1441 sys.exit(-1) 1442 1443 1444if __name__ == '__main__': 1445 try: 1446 mpiexec() 1447 except SystemExit, errExitStatus: # bounced to here by sys.exit inside mpiexec() 1448 myExitStatus = errExitStatus 1449 sys.exit(myExitStatus) 1450