1#!/usr/bin/env python 2# 3# (C) 2001 by Argonne National Laboratory. 4# See COPYRIGHT in top-level directory. 5# 6 7""" 8mpdman does NOT run as a standalone console program; 9 it is only exec'd (or imported) by mpd 10""" 11from time import ctime 12__author__ = "Ralph Butler and Rusty Lusk" 13__date__ = ctime() 14__version__ = "$Revision: 1.160 $" 15__credits__ = "" 16 17 18import sys, os, signal, socket 19 20from types import ClassType 21from re import findall, sub 22from cPickle import loads 23from time import sleep 24from urllib import quote 25from mpdlib import mpd_set_my_id, mpd_print, mpd_read_nbytes, \ 26 mpd_sockpair, mpd_get_ranks_in_binary_tree, \ 27 mpd_get_my_username, mpd_set_cli_app, \ 28 mpd_dbg_level, mpd_handle_signal, \ 29 MPDSock, MPDListenSock, MPDStreamHandler, MPDRing 30 31try: 32 import syslog 33 syslog_module_available = 1 34except: 35 syslog_module_available = 0 36try: 37 import subprocess 38 subprocess_module_available = 1 39except: 40 subprocess_module_available = 0 41 42 43global clientPid, clientExited, clientExitStatus, clientExitStatusSent 44 45class MPDMan(object): 46 def __init__(self): 47 pass 48 def run(self): 49 global clientPid, clientExited, clientExitStatus, clientExitStatusSent 50 clientExited = 0 51 clientExitStatusSent = 0 52 if hasattr(signal,'SIGCHLD'): 53 signal.signal(signal.SIGCHLD,sigchld_handler) 54 self.myHost = os.environ['MPDMAN_MYHOST'] 55 self.myIfhn = os.environ['MPDMAN_MYIFHN'] 56 self.myRank = int(os.environ['MPDMAN_RANK']) 57 self.posInRing = int(os.environ['MPDMAN_POS_IN_RING']) 58 self.myId = self.myHost + '_mpdman_' + str(self.myRank) 59 self.spawned = int(os.environ['MPDMAN_SPAWNED']) 60 self.spawnInProgress = 0 61 if self.spawned: 62 self.myId = self.myId + '_s' 63 # Note that in the spawned process case, this id for the mpdman 64 # will not be unique (it needs something like the world number 65 # or the pid of the mpdman process itself) 66 mpd_set_my_id(myid=self.myId) 67 self.clientPgm = os.environ['MPDMAN_CLI_PGM'] 68 mpd_set_cli_app(self.clientPgm) 69 try: 70 os.chdir(os.environ['MPDMAN_CWD']) 71 except Exception, errmsg: 72 errmsg = '%s: invalid dir: %s' % (self.myId,os.environ['MPDMAN_CWD']) 73 # print errmsg ## may syslog it in some cases ? 74 if os.environ['MPDMAN_HOW_LAUNCHED'] == 'FORK': 75 self.listenRingPort = int(os.environ['MPDMAN_MY_LISTEN_PORT']) 76 listenRingFD = int(os.environ['MPDMAN_MY_LISTEN_FD']) # closed in loop below 77 self.listenRingSock = socket.fromfd(listenRingFD,socket.AF_INET,socket.SOCK_STREAM) 78 self.listenRingSock = MPDSock(sock=self.listenRingSock) 79 mpdFD = int(os.environ['MPDMAN_TO_MPD_FD']) # closed in loop below 80 self.mpdSock = socket.fromfd(mpdFD,socket.AF_INET,socket.SOCK_STREAM) 81 self.mpdSock = MPDSock(sock=self.mpdSock) 82 elif os.environ['MPDMAN_HOW_LAUNCHED'] == 'SUBPROCESS': 83 self.listenRingSock = MPDListenSock() 84 self.listenRingPort = self.listenRingSock.getsockname()[1] 85 mpdPort = int(os.environ['MPDMAN_MPD_PORT']) 86 self.mpdSock = MPDSock() 87 self.mpdSock.connect((self.myIfhn,mpdPort)) 88 self.mpdSock.send_dict_msg( {'man_listen_port' : self.listenRingPort} ) 89 else: 90 mpd_print(1,'I cannot figure out how I was launched') 91 sys.exit(-1) 92 self.pos0Ifhn = os.environ['MPDMAN_POS0_IFHN'] 93 self.pos0Port = int(os.environ['MPDMAN_POS0_PORT']) 94 # close unused fds before I grab any more 95 # NOTE: this will also close syslog's fd inherited from mpd; re-opened below 96 try: max_fds = os.sysconf('SC_OPEN_MAX') 97 except: max_fds = 1024 98 # FIXME This snippet causes problems on Fedora Core 12. FC12's python 99 # opens a file object to /etc/abrt/pyhook.conf. Closing the fd out from 100 # under the higher level object causes problems at exit time when the 101 # higher level object is garbage collected. See MPICH2 ticket #902 for 102 # more information. 103 #for fd in range(3,max_fds): 104 # if fd == self.mpdSock.fileno() or fd == self.listenRingSock.fileno(): 105 # continue 106 # try: os.close(fd) 107 # except: pass 108 if syslog_module_available: 109 syslog.openlog("mpdman",0,syslog.LOG_DAEMON) 110 syslog.syslog(syslog.LOG_INFO,"mpdman starting new log; %s" % (self.myId) ) 111 self.umask = os.environ['MPDMAN_UMASK'] 112 if self.umask.startswith('0x'): 113 self.umask = int(self.umask,16) 114 elif self.umask.startswith('0'): 115 self.umask = int(self.umask,8) 116 else: 117 self.umask = int(self.umask) 118 self.oldumask = os.umask(self.umask) 119 self.clientPgmArgs = loads(os.environ['MPDMAN_PGM_ARGS']) 120 self.clientPgmEnv = loads(os.environ['MPDMAN_PGM_ENVVARS']) 121 self.clientPgmLimits = loads(os.environ['MPDMAN_PGM_LIMITS']) 122 self.jobid = os.environ['MPDMAN_JOBID'] 123 self.nprocs = int(os.environ['MPDMAN_NPROCS']) 124 self.mpdPort = int(os.environ['MPDMAN_MPD_LISTEN_PORT']) 125 self.mpdConfPasswd = os.environ['MPDMAN_MPD_CONF_SECRETWORD'] 126 os.environ['MPDMAN_MPD_CONF_SECRETWORD'] = '' ## do NOT pass it on to clients 127 self.kvs_template_from_env = os.environ['MPDMAN_KVS_TEMPLATE'] 128 self.conIfhn = os.environ['MPDMAN_CONIFHN'] 129 self.conPort = int(os.environ['MPDMAN_CONPORT']) 130 self.lhsIfhn = os.environ['MPDMAN_LHS_IFHN'] 131 self.lhsPort = int(os.environ['MPDMAN_LHS_PORT']) 132 self.stdinDest = os.environ['MPDMAN_STDIN_DEST'] 133 self.totalview = int(os.environ['MPDMAN_TOTALVIEW']) 134 self.gdb = int(os.environ['MPDMAN_GDB']) 135 self.gdba = os.environ['MPDMAN_GDBA'] 136 self.lineLabelFmt = os.environ['MPDMAN_LINE_LABELS_FMT'] 137 self.startStdoutLineLabel = 1 138 self.startStderrLineLabel = 1 139 self.singinitPID = int(os.environ['MPDMAN_SINGINIT_PID']) 140 self.singinitPORT = int(os.environ['MPDMAN_SINGINIT_PORT']) 141 self.doingBNR = int(os.environ['MPDMAN_DOING_BNR']) 142 self.listenNonRingSock = MPDListenSock('',0,name='nonring_listen_sock') 143 self.listenNonRingPort = self.listenNonRingSock.getsockname()[1] 144 self.streamHandler = MPDStreamHandler() 145 self.streamHandler.set_handler(self.mpdSock,self.handle_mpd_input) 146 self.streamHandler.set_handler(self.listenNonRingSock, 147 self.handle_nonring_connection) 148 149 # set up pmi stuff early in case I was spawned 150 self.universeSize = -1 151 self.appnum = -1 152 self.pmiVersion = 1 153 self.pmiSubversion = 1 154 self.KVSs = {} 155 if self.singinitPID: 156 # self.kvsname_template = 'singinit_kvs_' 157 self.kvsname_template = 'singinit_kvs_' + str(os.getpid()) 158 else: 159 self.kvsname_template = 'kvs_' + self.kvs_template_from_env + '_' 160 self.default_kvsname = self.kvsname_template + '0' 161 self.default_kvsname = sub('\.','_',self.default_kvsname) # magpie.cs to magpie_cs 162 self.default_kvsname = sub('\-','_',self.default_kvsname) # chg node-0 to node_0 163 self.KVSs[self.default_kvsname] = {} 164 cli_env = {} 165 cli_env['MPICH_INTERFACE_HOSTNAME'] = os.environ['MPICH_INTERFACE_HOSTNAME'] 166 cli_env['MPICH_INTERFACE_HOSTNAME_R%d' % self.myRank] = os.environ['MPICH_INTERFACE_HOSTNAME'] 167 for k in self.clientPgmEnv.keys(): 168 if k.startswith('MPI_APPNUM'): 169 self.appnum = self.clientPgmEnv[k] # don't put in application env 170 elif k.startswith('MPICH_INTERFACE_HOSTNAME'): 171 continue ## already put it in above 172 else: 173 cli_env[k] = self.clientPgmEnv[k] 174 self.kvs_next_id = 1 175 self.jobEndingEarly = 0 176 self.pmiCollectiveJob = 0 177 self.spawnedCnt = 0 178 self.pmiSock = 0 # obtained later 179 self.ring = MPDRing(listenSock=self.listenRingSock, 180 streamHandler=self.streamHandler, 181 myIfhn=self.myIfhn) 182 if self.nprocs == 1: 183 self.ring.create_single_mem_ring(ifhn=self.myIfhn, 184 port=self.listenRingPort, 185 lhsHandler=self.handle_lhs_input, 186 rhsHandler=self.handle_rhs_input) 187 else: 188 if self.posInRing == 0: # one 'end' 189 self.ring.accept_rhs(rhsHandler=self.handle_rhs_input) 190 self.ring.accept_lhs(lhsHandler=self.handle_lhs_input) 191 elif self.posInRing == (self.nprocs-1): # the other 'end' 192 rv = self.ring.connect_lhs(lhsIfhn=self.lhsIfhn, 193 lhsPort=self.lhsPort, 194 lhsHandler=self.handle_lhs_input, 195 numTries=8) 196 if rv[0] <= 0: 197 mpd_print(1,"lhs connect failed") 198 sys.exit(-1) 199 self.rhsIfhn = self.pos0Ifhn 200 self.rhsPort = self.pos0Port 201 rv = self.ring.connect_rhs(rhsIfhn=self.rhsIfhn, 202 rhsPort=self.rhsPort, 203 rhsHandler=self.handle_rhs_input, 204 numTries=8) 205 if rv[0] <= 0: # connect did not succeed; may try again 206 mpd_print(1,"rhs connect failed") 207 sys.exit(-1) 208 else: # ring members 'in the middle' 209 rv = self.ring.connect_lhs(lhsIfhn=self.lhsIfhn, 210 lhsPort=self.lhsPort, 211 lhsHandler=self.handle_lhs_input, 212 numTries=8) 213 if rv[0] <= 0: 214 mpd_print(1,"lhs connect failed") 215 sys.exit(-1) 216 self.ring.accept_rhs(rhsHandler=self.handle_rhs_input) 217 218 if self.myRank == 0: 219 self.conSock = MPDSock(name='to_console') 220 self.conSock.connect((self.conIfhn,self.conPort)) 221 self.streamHandler.set_handler(self.conSock,self.handle_console_input) 222 if self.spawned: 223 msgToSend = { 'cmd' : 'spawned_man0_is_up', 224 'spawned_id' : os.environ['MPDMAN_SPAWNED'] } 225 self.conSock.send_dict_msg(msgToSend) 226 msg = self.conSock.recv_dict_msg() 227 # If there is a failure in the connection, this 228 # receive will fail and if not handled, cause mpdman 229 # to fail. For now, we just check on a empty or unexpected 230 # message 231 if not msg or msg['cmd'] != 'preput_info_for_child': 232 mpd_print(1,'invalid msg from parent :%s:' % msg) 233 sys.exit(-1) 234 try: 235 for k in msg['kvs'].keys(): 236 self.KVSs[self.default_kvsname][k] = msg['kvs'][k] 237 except: 238 mpd_print(1,'failed to insert preput_info') 239 sys.exit(-1) 240 msg = self.conSock.recv_dict_msg() 241 if not msg or not msg.has_key('cmd') or msg['cmd'] != 'ringsize': 242 mpd_print(1,'spawned: bad msg from con; got: %s' % (msg) ) 243 sys.exit(-1) 244 self.universeSize = msg['ring_ncpus'] 245 # if the rshSock is closed, we'll get an AttributeError 246 # exception about 'int' has no attribute 'send_dict_msg' 247 # FIXME: Does every use of a sock on which send_dict_msg 248 # is used need an "if xxxx.rhsSock:" test first? 249 # Is there an else for those cases? 250 self.ring.rhsSock.send_dict_msg(msg) # forward it on 251 else: 252 msgToSend = { 'cmd' : 'man_checking_in' } 253 self.conSock.send_dict_msg(msgToSend) 254 msg = self.conSock.recv_dict_msg() 255 if not msg or not msg.has_key('cmd') or msg['cmd'] != 'ringsize': 256 mpd_print(1,'invalid msg from con; expected ringsize got: %s' % (msg) ) 257 sys.exit(-1) 258 if self.clientPgmEnv.has_key('MPI_UNIVERSE_SIZE'): 259 self.universeSize = int(self.clientPgmEnv['MPI_UNIVERSE_SIZE']) 260 else: 261 self.universeSize = msg['ring_ncpus'] 262 self.ring.rhsSock.send_dict_msg(msg) 263 ## NOTE: if you spawn a non-MPI job, it may not send this msg 264 ## in which case the pgm will hang; the reason for this is that 265 ## mpich2 does an Accept after the PMI_Spawn_multiple and a non-mpi 266 ## pgm will never do the expected Connect. 267 self.stdoutToConSock = MPDSock(name='stdout_to_console') 268 self.stdoutToConSock.connect((self.conIfhn,self.conPort)) 269 if self.spawned: 270 msgToSend = { 'cmd' : 'child_in_stdout_tree', 'from_rank' : self.myRank } 271 self.stdoutToConSock.send_dict_msg(msgToSend) 272 self.stderrToConSock = MPDSock(name='stderr_to_console') 273 self.stderrToConSock.connect((self.conIfhn,self.conPort)) 274 if self.spawned: 275 msgToSend = { 'cmd' : 'child_in_stderr_tree', 'from_rank' : self.myRank } 276 self.stderrToConSock.send_dict_msg(msgToSend) 277 else: 278 self.conSock = 0 279 if self.myRank == 0: 280 self.parentStdoutSock = self.stdoutToConSock 281 self.parentStderrSock = self.stderrToConSock 282 else: 283 self.parentStdoutSock = 0 284 self.parentStderrSock = 0 285 msg = self.ring.lhsSock.recv_dict_msg() # recv msg containing ringsize 286 if not msg or not msg.has_key('cmd') or msg['cmd'] != 'ringsize': 287 mpd_print(1,'invalid msg from lhs; expecting ringsize got: %s' % (msg) ) 288 sys.exit(-1) 289 if self.myRank != 0: 290 self.ring.rhsSock.send_dict_msg(msg) 291 if self.clientPgmEnv.has_key('MPI_UNIVERSE_SIZE'): 292 self.universeSize = int(self.clientPgmEnv['MPI_UNIVERSE_SIZE']) 293 else: 294 self.universeSize = msg['ring_ncpus'] 295 if self.doingBNR: 296 (self.pmiSock,self.cliBNRSock) = mpd_sockpair() 297 self.streamHandler.set_handler(self.pmiSock,self.handle_pmi_input) 298 cli_env['MAN_MSGS_FD'] = str(self.cliBNRSock.fileno()) ## BNR 299 self.numDone = 0 300 self.numWithIO = 2 # stdout and stderr so far 301 self.numConndWithIO = 2 302 # FIXME: This is the old singleton approach, which didn't allow 303 # for more than one process to be a singleton 304 if self.singinitPORT: 305 self.pmiListenSock = 0 306 self.pmiSock = MPDSock(name='pmi') 307 self.pmiSock.connect((self.myIfhn,self.singinitPORT)) 308 self.streamHandler.set_handler(self.pmiSock,self.handle_pmi_input) 309 self.pmiSock.send_char_msg('cmd=singinit authtype=none\n') 310 line = self.pmiSock.recv_char_msg() 311 charMsg = 'cmd=singinit_info rc=0 versionok=yes stdio=yes kvsname=%s\n' % (self.default_kvsname) 312 self.pmiSock.send_char_msg(charMsg) 313 314 sock_write_cli_stdin = MPDSock(name='write_cli_stdin') 315 sock_write_cli_stdin.connect((self.myIfhn,self.singinitPORT)) 316 self.fd_write_cli_stdin = sock_write_cli_stdin.fileno() 317 318 sock_read_cli_stdout = MPDSock(name='read_cli_stdout') 319 sock_read_cli_stdout.connect((self.myIfhn,self.singinitPORT)) 320 self.fd_read_cli_stdout = sock_read_cli_stdout.fileno() 321 322 sock_read_cli_stderr = MPDSock(name='read_cli_stderr') 323 sock_read_cli_stderr.connect((self.myIfhn,self.singinitPORT)) 324 self.fd_read_cli_stderr = sock_read_cli_stderr.fileno() 325 else: 326 self.cliListenSock = MPDListenSock('',0,name='cli_listen_sock') ## BNR 327 self.cliListenPort = self.cliListenSock.getsockname()[1] ## BNR 328 self.pmiListenSock = MPDListenSock('',0,name='pmi_listen_sock') 329 self.pmiListenPort = self.pmiListenSock.getsockname()[1] 330 self.subproc = 0 # default; use fork instead of subprocess 331 if self.singinitPID: 332 clientPid = self.singinitPID 333 else: 334 cli_env['PATH'] = os.environ['MPDMAN_CLI_PATH'] 335 cli_env['PMI_PORT'] = '%s:%s' % (self.myIfhn,self.pmiListenPort) 336 cli_env['PMI_SIZE'] = str(self.nprocs) 337 cli_env['PMI_RANK'] = str(self.myRank) 338 cli_env['PMI_DEBUG'] = str(0) 339 cli_env['PMI_TOTALVIEW'] = str(self.totalview) 340 if self.spawned: 341 cli_env['PMI_SPAWNED'] = '1' 342 else: 343 cli_env['PMI_SPAWNED'] = '0' 344 if self.doingBNR: 345 cli_env['MPD_TVDEBUG'] = str(0) ## BNR 346 cli_env['MPD_JID'] = os.environ['MPDMAN_JOBID'] ## BNR 347 cli_env['MPD_JSIZE'] = str(self.nprocs) ## BNR 348 cli_env['MPD_JRANK'] = str(self.myRank) ## BNR 349 cli_env['CLIENT_LISTENER_FD'] = str(self.cliListenSock.fileno()) ## BNR 350 if hasattr(os,'fork'): 351 (self.fd_read_cli_stdin, self.fd_write_cli_stdin ) = os.pipe() 352 (self.fd_read_cli_stdout,self.fd_write_cli_stdout) = os.pipe() 353 (self.fd_read_cli_stderr,self.fd_write_cli_stderr) = os.pipe() 354 (self.handshake_sock_man_end,self.handshake_sock_cli_end) = mpd_sockpair() 355 clientPid = self.launch_client_via_fork_exec(cli_env) 356 if clientPid < 0: 357 print '**** mpdman: launch_client_via_fork_exec failed; exiting' 358 sys.exit(-1) 359 elif clientPid > 0: 360 self.handshake_sock_cli_end.close() 361 else: # 0 362 self.handshake_sock_man_end.close() 363 elif subprocess_module_available: 364 clientPid = self.launch_client_via_subprocess(cli_env) # may chg self.subproc 365 else: 366 mpd_print(1,'neither fork nor subprocess is available') 367 sys.exit(-1) 368 # if not initially a recvr of stdin (e.g. gdb) then give immediate eof to client 369 if not in_stdinRcvrs(self.myRank,self.stdinDest): 370 if self.subproc: # must close subproc's file (not just the fd) 371 self.subproc.stdin.close() 372 else: 373 os.close(self.fd_write_cli_stdin) 374 if self.doingBNR: 375 self.cliBNRSock.close() 376 msgToSend = { 'cmd' : 'client_info', 'jobid' : self.jobid, 'clipid' : clientPid, 377 'manpid' : os.getpid(), 'rank' : self.myRank, 378 'spawner_manpid' : int(os.environ['MPDMAN_SPAWNER_MANPID']), 379 'spawner_mpd' : os.environ['MPDMAN_SPAWNER_MPD'] } 380 self.mpdSock.send_dict_msg(msgToSend) 381 382 if not self.subproc: 383 self.streamHandler.set_handler(self.fd_read_cli_stdout, 384 self.handle_cli_stdout_input) 385 self.streamHandler.set_handler(self.fd_read_cli_stderr, 386 self.handle_cli_stderr_input) 387 self.waitPids = [clientPid] 388 389 if self.pmiListenSock: 390 self.streamHandler.set_handler(self.pmiListenSock,self.handle_pmi_connection) 391 392 # begin setup of stdio tree 393 (parent,lchild,rchild) = mpd_get_ranks_in_binary_tree(self.myRank,self.nprocs) 394 self.spawnedChildSocks = [] 395 self.childrenStdoutTreeSocks = [] 396 self.childrenStderrTreeSocks = [] 397 if lchild >= 0: 398 self.numWithIO += 2 # stdout and stderr from child 399 msgToSend = { 'cmd' : 'info_from_parent_in_tree', 400 'to_rank' : str(lchild), 401 'parent_ifhn' : self.myIfhn, 402 'parent_port' : self.listenNonRingPort } 403 self.ring.rhsSock.send_dict_msg(msgToSend) 404 if rchild >= 0: 405 self.numWithIO += 2 # stdout and stderr from child 406 msgToSend = { 'cmd' : 'info_from_parent_in_tree', 407 'to_rank' : str(rchild), 408 'parent_ifhn' : self.myIfhn, 409 'parent_port' : self.listenNonRingPort } 410 self.ring.rhsSock.send_dict_msg(msgToSend) 411 412 if os.environ.has_key('MPDMAN_RSHIP'): 413 rship = os.environ['MPDMAN_RSHIP'] 414 # (rshipSock,rshipPort) = mpd_get_inet_listen_sock('',0) 415 rshipPid = os.fork() 416 if rshipPid == 0: 417 os.environ['MPDCP_MSHIP_HOST'] = os.environ['MPDMAN_MSHIP_HOST'] 418 os.environ['MPDCP_MSHIP_PORT'] = os.environ['MPDMAN_MSHIP_PORT'] 419 os.environ['MPDCP_MSHIP_NPROCS'] = str(self.nprocs) 420 os.environ['MPDCP_CLI_PID'] = str(clientPid) 421 try: 422 os.execvpe(rship,[rship],os.environ) 423 except Exception, errmsg: 424 # make sure my error msgs get to console 425 os.dup2(self.parentStdoutSock.fileno(),1) # closes fd 1 (stdout) if open 426 os.dup2(self.parentStderrSock.fileno(),2) # closes fd 2 (stderr) if open 427 mpd_print(1,'execvpe failed for copgm %s; errmsg=:%s:' % (rship,errmsg) ) 428 sys.exit(-1) 429 sys.exit(0) 430 # rshipSock.close() 431 self.waitPids.append(rshipPid) 432 433 if not self.spawned: 434 # receive the final process mapping from our MPD overlords 435 msg = self.mpdSock.recv_dict_msg(timeout=-1) 436 437 # a few defensive checks now to make sure that the various parts of the 438 # code are all on the same page 439 if not msg.has_key('cmd') or msg['cmd'] != 'process_mapping': 440 mpd_print(1,'expected cmd="process_mapping", got cmd="%s" instead' % (msg.get('cmd','**not_present**'))) 441 sys.exit(-1) 442 if msg['jobid'] != self.jobid: 443 mpd_print(1,'expected jobid="%s", got jobid="%s" instead' % (self.jobid,msg['jobid'])) 444 sys.exit(-1) 445 if not msg.has_key('process_mapping'): 446 mpd_print(1,'expected msg to contain a process_mapping key') 447 sys.exit(-1) 448 self.KVSs[self.default_kvsname]['PMI_process_mapping'] = msg['process_mapping'] 449 450 451 self.tvReady = 0 452 self.pmiBarrierInRecvd = 0 453 self.holdingPMIBarrierLoop1 = 0 454 if self.myRank == 0: 455 self.holdingEndBarrierLoop1 = 1 456 self.holdingJobgoLoop1 = { 'cmd' : 'jobgo_loop_1', 'procinfo' : [] } 457 else: 458 self.holdingEndBarrierLoop1 = 0 459 self.holdingJobgoLoop1 = 0 460 self.jobStarted = 0 461 self.endBarrierDone = 0 462 # Main Loop 463 while not self.endBarrierDone: 464 if self.numDone >= self.numWithIO and (self.singinitPID or self.subproc): 465 clientExited = 1 466 clientExitStatus = 0 467 if self.holdingJobgoLoop1 and self.numConndWithIO >= self.numWithIO: 468 msgToSend = self.holdingJobgoLoop1 469 self.ring.rhsSock.send_dict_msg(msgToSend) 470 self.holdingJobgoLoop1 = 0 471 rv = self.streamHandler.handle_active_streams(timeout=5.0) 472 if rv[0] < 0: 473 if type(rv[1]) == ClassType and rv[1] == KeyboardInterrupt: # ^C 474 sys.exit(-1) 475 if clientExited: 476 if self.jobStarted and not clientExitStatusSent: 477 msgToSend = { 'cmd' : 'client_exit_status', 'man_id' : self.myId, 478 'cli_status' : clientExitStatus, 'cli_host' : self.myHost, 479 'cli_ifhn' : self.myIfhn, 'cli_pid' : clientPid, 480 'cli_rank' : self.myRank } 481 if self.myRank == 0: 482 if self.conSock: 483 try: 484 self.conSock.send_dict_msg(msgToSend) 485 except: 486 pass 487 else: 488 if self.ring.rhsSock: 489 self.ring.rhsSock.send_dict_msg(msgToSend) 490 clientExitStatusSent = 1 491 if self.holdingEndBarrierLoop1 and self.numDone >= self.numWithIO: 492 self.holdingEndBarrierLoop1 = 0 493 msgToSend = {'cmd' : 'end_barrier_loop_1'} 494 self.ring.rhsSock.send_dict_msg(msgToSend) 495 mpd_print(0000, "out of loop") 496 # may want to wait for waitPids here 497 def handle_nonring_connection(self,sock): 498 (tempSock,tempConnAddr) = self.listenNonRingSock.accept() 499 msg = tempSock.recv_dict_msg() 500 if msg and msg.has_key('cmd'): 501 if msg['cmd'] == 'child_in_stdout_tree': 502 self.streamHandler.set_handler(tempSock,self.handle_child_stdout_tree_input) 503 self.childrenStdoutTreeSocks.append(tempSock) 504 self.numConndWithIO += 1 505 elif msg['cmd'] == 'child_in_stderr_tree': 506 self.streamHandler.set_handler(tempSock,self.handle_child_stderr_tree_input) 507 self.childrenStderrTreeSocks.append(tempSock) 508 self.numConndWithIO += 1 509 elif msg['cmd'] == 'spawned_man0_is_up': 510 self.streamHandler.set_handler(tempSock,self.handle_spawned_child_input) 511 self.spawnedChildSocks.append(tempSock) 512 tempID = msg['spawned_id'] 513 spawnedKVSname = 'mpdman_kvs_for_spawned_' + tempID 514 msgToSend = { 'cmd' : 'preput_info_for_child', 515 'kvs' : self.KVSs[spawnedKVSname] } 516 tempSock.send_dict_msg(msgToSend) 517 msgToSend = { 'cmd' : 'ringsize', 'ring_ncpus' : self.universeSize } 518 tempSock.send_dict_msg(msgToSend) 519 else: 520 mpd_print(1, 'unknown msg recvd on listenNonRingSock :%s:' % (msg) ) 521 def handle_lhs_input(self,sock): 522 msg = self.ring.lhsSock.recv_dict_msg() 523 if not msg: 524 mpd_print(0000, 'lhs died' ) 525 self.streamHandler.del_handler(self.ring.lhsSock) 526 self.ring.lhsSock.close() 527 elif msg['cmd'] == 'jobgo_loop_1': 528 if self.myRank == 0: 529 if self.totalview: 530 msg['procinfo'].insert(0,(socket.gethostname(),self.clientPgm,clientPid)) 531 # let console pgm proceed 532 msgToSend = { 'cmd' : 'job_started', 'jobid' : self.jobid, 533 'procinfo' : msg['procinfo'] } 534 self.conSock.send_dict_msg(msgToSend,errprint=0) 535 msgToSend = { 'cmd' : 'jobgo_loop_2' } 536 self.ring.rhsSock.send_dict_msg(msgToSend) 537 else: 538 if self.totalview: 539 msg['procinfo'].append((socket.gethostname(),self.clientPgm,clientPid)) 540 if self.numConndWithIO >= self.numWithIO: 541 self.ring.rhsSock.send_dict_msg(msg) # forward it on 542 else: 543 self.holdingJobgoLoop1 = msg 544 elif msg['cmd'] == 'jobgo_loop_2': 545 if self.myRank != 0: 546 self.ring.rhsSock.send_dict_msg(msg) # forward it on 547 if not self.singinitPID: 548 self.handshake_sock_man_end.send_char_msg('go\n') 549 self.handshake_sock_man_end.close() 550 self.jobStarted = 1 551 elif msg['cmd'] == 'info_from_parent_in_tree': 552 if int(msg['to_rank']) == self.myRank: 553 self.parentIfhn = msg['parent_ifhn'] 554 self.parentPort = msg['parent_port'] 555 self.parentStdoutSock = MPDSock(name='stdout_ro_parent') 556 self.parentStdoutSock.connect((self.parentIfhn,self.parentPort)) 557 msgToSend = { 'cmd' : 'child_in_stdout_tree', 'from_rank' : self.myRank } 558 self.parentStdoutSock.send_dict_msg(msgToSend) 559 self.parentStderrSock = MPDSock(name='stderr_ro_parent') 560 self.parentStderrSock.connect((self.parentIfhn,self.parentPort)) 561 msgToSend = { 'cmd' : 'child_in_stderr_tree', 'from_rank' : self.myRank } 562 self.parentStderrSock.send_dict_msg(msgToSend) 563 else: 564 self.ring.rhsSock.send_dict_msg(msg) 565 elif msg['cmd'] == 'end_barrier_loop_1': 566 if self.myRank == 0: 567 msgToSend = { 'cmd' : 'end_barrier_loop_2' } 568 self.ring.rhsSock.send_dict_msg(msgToSend) 569 else: 570 if self.numDone >= self.numWithIO: 571 if self.ring.rhsSock: 572 self.ring.rhsSock.send_dict_msg(msg) 573 else: 574 self.holdingEndBarrierLoop1 = 1 575 elif msg['cmd'] == 'end_barrier_loop_2': 576 self.endBarrierDone = 1 577 if self.myRank != 0: 578 self.ring.rhsSock.send_dict_msg(msg) 579 elif msg['cmd'] == 'pmi_barrier_loop_1': 580 if self.myRank == 0: 581 msgToSend = { 'cmd' : 'pmi_barrier_loop_2' } 582 self.ring.rhsSock.send_dict_msg(msgToSend) 583 if self.doingBNR: ## BNR 584 pmiMsgToSend = 'cmd=client_bnr_fence_out\n' 585 self.pmiSock.send_char_msg(pmiMsgToSend) 586 sleep(0.1) # minor pause before intr 587 os.kill(clientPid,signal.SIGUSR1) 588 else: 589 if self.pmiSock: 590 pmiMsgToSend = 'cmd=barrier_out\n' 591 self.pmiSock.send_char_msg(pmiMsgToSend) 592 else: 593 self.holdingPMIBarrierLoop1 = 1 594 if self.pmiBarrierInRecvd: 595 self.ring.rhsSock.send_dict_msg(msg) 596 elif msg['cmd'] == 'pmi_barrier_loop_2': 597 self.pmiBarrierInRecvd = 0 598 self.holdingPMIBarrierLoop1 = 0 599 if self.myRank != 0: 600 self.ring.rhsSock.send_dict_msg(msg) 601 if self.doingBNR: ## BNR 602 pmiMsgToSend = 'cmd=client_bnr_fence_out\n' 603 self.pmiSock.send_char_msg(pmiMsgToSend) 604 sleep(0.1) # minor pause before intr 605 os.kill(clientPid,signal.SIGUSR1) 606 else: 607 if self.pmiSock: 608 pmiMsgToSend = 'cmd=barrier_out\n' 609 self.pmiSock.send_char_msg(pmiMsgToSend) 610 elif msg['cmd'] == 'pmi_get': 611 if msg['from_rank'] == self.myRank: 612 if self.pmiSock: # may have disappeared in early shutdown 613 pmiMsgToSend = 'cmd=get_result rc=-1 key="%s"\n' % msg['key'] 614 self.pmiSock.send_char_msg(pmiMsgToSend) 615 else: 616 key = msg['key'] 617 kvsname = msg['kvsname'] 618 if self.KVSs.has_key(kvsname) and self.KVSs[kvsname].has_key(key): 619 value = self.KVSs[kvsname][key] 620 msgToSend = { 'cmd' : 'response_to_pmi_get', 'key' : key, 621 'kvsname' : kvsname, 'value' : value, 622 'to_rank' : msg['from_rank'] } 623 self.ring.rhsSock.send_dict_msg(msgToSend) 624 else: 625 self.ring.rhsSock.send_dict_msg(msg) 626 elif msg['cmd'] == 'pmi_getbyidx': 627 if msg['from_rank'] == self.myRank: 628 if self.pmiSock: # may have disappeared in early shutdown 629 self.KVSs[self.default_kvsname].update(msg['kvs']) 630 if self.KVSs[self.default_kvsname].keys(): 631 key = self.KVSs[self.default_kvsname].keys()[0] 632 val = self.KVSs[self.default_kvsname][key] 633 pmiMsgToSend = 'cmd=getbyidx_results rc=0 nextidx=1 key=%s val=%s\n' % \ 634 (key,val) 635 else: 636 pmiMsgToSend = 'cmd=getbyidx_results rc=-2 reason=no_more_keyvals\n' 637 self.pmiSock.send_char_msg(pmiMsgToSend) 638 else: 639 msg['kvs'].update(self.KVSs[self.default_kvsname]) 640 self.ring.rhsSock.send_dict_msg(msg) 641 elif msg['cmd'] == 'response_to_pmi_get': 642 # [goodell@ 2009-05-05] The next few lines add caching in to the kvs 643 # gets to improve lookup performance and reduce MPI_Init times. 644 # Note that this doesn't handle consistency correctly if PMI is ever 645 # changed to permit overwriting keyvals. 646 if msg['kvsname'] not in self.KVSs.keys(): 647 self.KVSs[msg['kvsname']] = dict() 648 self.KVSs[msg['kvsname']][msg['key']] = msg['value'] 649 650 if msg['to_rank'] == self.myRank: 651 if self.pmiSock: # may have disappeared in early shutdown 652 pmiMsgToSend = 'cmd=get_result rc=0 value=%s\n' % (msg['value']) 653 self.pmiSock.send_char_msg(pmiMsgToSend) 654 else: 655 self.ring.rhsSock.send_dict_msg(msg) 656 elif msg['cmd'] == 'signal': 657 if msg['signo'] == 'SIGINT': 658 if not self.gdb: 659 self.jobEndingEarly = 1 660 for s in self.spawnedChildSocks: 661 s.send_dict_msg(msg) 662 if self.myRank != 0: 663 if self.ring.rhsSock: # still alive ? 664 self.ring.rhsSock.send_dict_msg(msg) 665 if self.gdb: 666 os.kill(clientPid,signal.SIGINT) 667 else: 668 try: 669 pgrp = clientPid * (-1) # neg Pid -> group 670 os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler 671 except: 672 pass 673 elif msg['signo'] == 'SIGKILL': 674 self.jobEndingEarly = 1 675 for s in self.spawnedChildSocks: 676 s.send_dict_msg(msg) 677 if self.myRank != 0: 678 if self.ring.rhsSock: # still alive ? 679 self.ring.rhsSock.send_dict_msg(msg) 680 if self.gdb: 681 os.kill(clientPid,signal.SIGUSR1) # tell gdb driver to kill all 682 else: 683 try: 684 pgrp = clientPid * (-1) # neg Pid -> group 685 os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler 686 except: 687 pass 688 elif msg['signo'] == 'SIGTSTP': 689 if msg['dest'] != self.myId: 690 self.ring.rhsSock.send_dict_msg(msg) 691 try: 692 pgrp = clientPid * (-1) # neg Pid -> group 693 os.kill(pgrp,signal.SIGTSTP) # may be reaped by sighandler 694 except: 695 pass 696 elif msg['signo'] == 'SIGCONT': 697 if msg['dest'] != self.myId: 698 self.ring.rhsSock.send_dict_msg(msg) 699 try: 700 pgrp = clientPid * (-1) # neg Pid -> group 701 os.kill(pgrp,signal.SIGCONT) # may be reaped by sighandler 702 except: 703 pass 704 elif msg['cmd'] == 'client_exit_status': 705 if self.myRank == 0: 706 if self.conSock: 707 self.conSock.send_dict_msg(msg,errprint=0) 708 else: 709 if self.ring.rhsSock: 710 self.ring.rhsSock.send_dict_msg(msg) 711 elif msg['cmd'] == 'collective_abort': 712 self.jobEndingEarly = 1 713 if msg['src'] != self.myId: 714 if self.ring.rhsSock: # still alive ? 715 self.ring.rhsSock.send_dict_msg(msg) 716 if self.conSock: 717 msgToSend = { 'cmd' : 'job_aborted_early', 'jobid' : self.jobid, 718 'rank' : msg['rank'], 719 'exit_status' : msg['exit_status'] } 720 self.conSock.send_dict_msg(msgToSend,errprint=0) 721 try: 722 pgrp = clientPid * (-1) # neg Pid -> group 723 os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler 724 except: 725 pass 726 elif msg['cmd'] == 'startup_status': 727 if msg['rc'] < 0: 728 self.jobEndingEarly = 1 729 try: 730 pgrp = clientPid * (-1) # neg Pid -> group 731 os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler 732 except: 733 pass 734 ##### RMB if msg['src'] == self.myId: 735 if self.myRank == 0: 736 if self.conSock: 737 self.conSock.send_dict_msg(msg,errprint=0) 738 else: 739 if msg['src'] != self.myId and self.ring.rhsSock: # rhs still alive ? 740 self.ring.rhsSock.send_dict_msg(msg) 741 elif msg['cmd'] == 'stdin_from_user': 742 if msg['src'] != self.myId: 743 self.ring.rhsSock.send_dict_msg(msg) 744 if in_stdinRcvrs(self.myRank,self.stdinDest): 745 if msg.has_key('eof'): 746 if self.subproc: # must close subproc's file (not just the fd) 747 self.subproc.stdin.close() 748 else: 749 os.close(self.fd_write_cli_stdin) 750 else: 751 os.write(self.fd_write_cli_stdin,msg['line']) 752 elif msg['cmd'] == 'stdin_dest': 753 if msg['src'] != self.myId: 754 self.stdinDest = msg['stdin_procs'] 755 self.ring.rhsSock.send_dict_msg(msg) 756 elif msg['cmd'] == 'interrupt_peer_with_msg': ## BNR 757 if int(msg['torank']) == self.myRank: 758 if self.pmiSock: # may have disappeared in early shutdown 759 pmiMsgToSend = '%s\n' % (msg['msg']) 760 self.pmiSock.send_char_msg(pmiMsgToSend) 761 sleep(0.1) # minor pause before intr 762 os.kill(clientPid,signal.SIGUSR1) 763 else: 764 self.ring.rhsSock.send_dict_msg(msg) 765 elif msg['cmd'] == 'tv_ready': 766 self.tvReady = 1 767 if self.myRank != 0: 768 msg['src'] = self.myId 769 self.ring.rhsSock.send_dict_msg(msg) 770 if self.pmiSock: # should be valid sock if running tv 771 pmiMsgToSend = 'cmd=tv_ready\n' 772 self.pmiSock.send_char_msg(pmiMsgToSend) 773 else: 774 mpd_print(1, 'unexpected msg recvd on lhsSock :%s:' % msg ) 775 776 def handle_rhs_input(self,sock): 777 msg = self.ring.rhsSock.recv_dict_msg() #### NOT USING msg; should I ? 778 mpd_print(0000, 'rhs died' ) 779 self.streamHandler.del_handler(self.ring.rhsSock) 780 self.ring.rhsSock.close() 781 self.ring.rhsSock = 0 782 def handle_cli_stdout_input(self,sock): 783 line = mpd_read_nbytes(sock,1024) # sock is self.fd_read_cli_stdout 784 if not line: 785 if self.subproc: # must close subproc's file (not just the fd) 786 self.subproc.stdout.close() 787 else: 788 self.streamHandler.del_handler(self.fd_read_cli_stdout) 789 os.close(self.fd_read_cli_stdout) 790 self.numDone += 1 791 if self.numDone >= self.numWithIO: 792 if self.parentStdoutSock: 793 self.parentStdoutSock.close() 794 self.parentStdoutSock = 0 795 if self.parentStderrSock: 796 self.parentStderrSock.close() 797 self.parentStderrSock = 0 798 else: 799 if self.parentStdoutSock: 800 if self.lineLabelFmt: 801 lineLabel = self.create_line_label(self.lineLabelFmt,self.spawned) 802 splitLine = line.split('\n',1024) 803 if self.startStdoutLineLabel: 804 line = lineLabel 805 else: 806 line = '' 807 if splitLine[-1] == '': 808 self.startStdoutLineLabel = 1 809 del splitLine[-1] 810 else: 811 self.startStdoutLineLabel = 0 812 for s in splitLine[0:-1]: 813 line = line + s + '\n' + lineLabel 814 line = line + splitLine[-1] 815 if self.startStdoutLineLabel: 816 line = line + '\n' 817 self.parentStdoutSock.send_char_msg(line,errprint=0) 818 return line 819 def handle_cli_stderr_input(self,sock): 820 line = mpd_read_nbytes(sock,1024) # sock is self.fd_read_cli_stderr 821 if not line: 822 if self.subproc: # must close subproc's file (not just the fd) 823 self.subproc.stderr.close() 824 else: 825 self.streamHandler.del_handler(self.fd_read_cli_stderr) 826 os.close(self.fd_read_cli_stderr) 827 self.numDone += 1 828 if self.numDone >= self.numWithIO: 829 if self.parentStdoutSock: 830 self.parentStdoutSock.close() 831 self.parentStdoutSock = 0 832 if self.parentStderrSock: 833 self.parentStderrSock.close() 834 self.parentStderrSock = 0 835 else: 836 if self.parentStderrSock: 837 if self.lineLabelFmt: 838 lineLabel = self.create_line_label(self.lineLabelFmt,self.spawned) 839 splitLine = line.split('\n',1024) 840 if self.startStderrLineLabel: 841 line = lineLabel 842 else: 843 line = '' 844 if splitLine[-1] == '': 845 self.startStderrLineLabel = 1 846 del splitLine[-1] 847 else: 848 self.startStderrLineLabel = 0 849 for s in splitLine[0:-1]: 850 line = line + s + '\n' + lineLabel 851 line = line + splitLine[-1] 852 if self.startStderrLineLabel: 853 line = line + '\n' 854 self.parentStderrSock.send_char_msg(line,errprint=0) 855 return line 856 def handle_child_stdout_tree_input(self,sock): 857 if self.lineLabelFmt: 858 line = sock.recv_one_line() 859 else: 860 line = sock.recv(1024) 861 if not line: 862 self.streamHandler.del_handler(sock) 863 sock.close() 864 self.numDone += 1 865 if self.numDone >= self.numWithIO: 866 if self.parentStdoutSock: 867 self.parentStdoutSock.close() 868 self.parentStdoutSock = 0 869 if self.parentStderrSock: 870 self.parentStderrSock.close() 871 self.parentStderrSock = 0 872 else: 873 if self.parentStdoutSock: 874 self.parentStdoutSock.send_char_msg(line,errprint=0) 875 # parentStdoutSock.sendall('FWD by %d: |%s|' % (self.myRank,line) ) 876 def handle_child_stderr_tree_input(self,sock): 877 if self.lineLabelFmt: 878 line = sock.recv_one_line() 879 else: 880 line = sock.recv(1024) 881 if not line: 882 self.streamHandler.del_handler(sock) 883 sock.close() 884 self.numDone += 1 885 if self.numDone >= self.numWithIO: 886 if self.parentStdoutSock: 887 self.parentStdoutSock.close() 888 self.parentStdoutSock = 0 889 if self.parentStderrSock: 890 self.parentStderrSock.close() 891 self.parentStderrSock = 0 892 else: 893 if self.parentStderrSock: 894 self.parentStderrSock.send_char_msg(line,errprint=0) 895 # parentStdoutSock.sendall('FWD by %d: |%s|' % (self.myRank,line) ) 896 def handle_spawned_child_input(self,sock): 897 msg = sock.recv_dict_msg() 898 if not msg: 899 self.streamHandler.del_handler(sock) 900 self.spawnedChildSocks.remove(sock) 901 sock.close() 902 elif msg['cmd'] == 'job_started': 903 pass 904 elif msg['cmd'] == 'client_exit_status': 905 if self.myRank == 0: 906 if self.conSock: 907 self.conSock.send_dict_msg(msg,errprint=0) 908 else: 909 if self.ring.rhsSock: 910 self.ring.rhsSock.send_dict_msg(msg) 911 elif msg['cmd'] == 'job_aborted_early': 912 if self.conSock: 913 msgToSend = { 'cmd' : 'job_aborted_early', 'jobid' : msg['jobid'], 914 'rank' : msg['rank'], 915 'exit_status' : msg['exit_status'] } 916 self.conSock.send_dict_msg(msgToSend,errprint=0) 917 elif msg['cmd'] == 'startup_status': 918 # remember this rc to put in spawn_result 919 self.spawnInProgress['errcodes'][msg['rank']] = msg['rc'] 920 if None not in self.spawnInProgress['errcodes']: # if all errcodes are now filled in 921 # send pmi msg to spawner 922 strerrcodes = '' # put errcodes in str format for pmi msg 923 for ec in self.spawnInProgress['errcodes']: 924 strerrcodes = strerrcodes + str(ec) + ',' 925 strerrcodes = strerrcodes[:-1] 926 if self.pmiSock: # may have disappeared in early shutdown 927 # may want to make rc < 0 if any errcode is < 0 928 pmiMsgToSend = 'cmd=spawn_result rc=0 errcodes=%s\n' % (strerrcodes) 929 self.pmiSock.send_char_msg(pmiMsgToSend) 930 self.spawnInProgress = 0 931 else: 932 mpd_print(1, "unrecognized msg from spawned child :%s:" % msg ) 933 def handle_pmi_connection(self,sock): 934 if self.pmiSock: # already have one 935 pmiMsgToSend = 'cmd=you_already_have_an_open_pmi_conn_to_me\n' 936 self.pmiSock.send_char_msg(pmiMsgToSend) 937 self.streamHandler.del_handler(self.pmiSock) 938 self.pmiSock.close() 939 self.pmiSock = 0 940 errmsg = "mpdman: invalid attempt to open 2 simultaneous pmi connections\n" + \ 941 " client=%s cwd=%s" % (self.clientPgm,os.environ['MPDMAN_CWD']) 942 print errmsg ; sys.stdout.flush() 943 clientExitStatus = 137 # assume kill -9 below 944 msgToSend = { 'cmd' : 'collective_abort', 945 'src' : self.myId, 'rank' : self.myRank, 946 'exit_status' : clientExitStatus } 947 self.ring.rhsSock.send_dict_msg(msgToSend) 948 return 949 (self.pmiSock,tempConnAddr) = self.pmiListenSock.accept() 950 # the following lines are commented out so that we can support a process 951 # that runs 2 MPI pgms in tandem (e.g. mpish at ANL) 952 ##### del socksToSelect[pmiListenSock] 953 ##### pmiListenSock.close() 954 if not self.pmiSock: 955 mpd_print(1,"failed accept for pmi connection from client") 956 sys.exit(-1) 957 self.pmiSock.name = 'pmi' 958 self.streamHandler.set_handler(self.pmiSock,self.handle_pmi_input) 959 if self.tvReady: 960 pmiMsgToSend = 'cmd=tv_ready\n' 961 self.pmiSock.send_char_msg(pmiMsgToSend) 962 def handle_pmi_input(self,sock): 963 global clientPid, clientExited, clientExitStatus, clientExitStatusSent 964 if self.spawnInProgress: 965 return 966 line = self.pmiSock.recv_char_msg() 967 if not line: 968 self.streamHandler.del_handler(self.pmiSock) 969 self.pmiSock.close() 970 self.pmiSock = 0 971 if self.pmiCollectiveJob: 972 if self.ring.rhsSock: # still alive ? 973 if not self.jobEndingEarly: # if I did not already know this 974 if not clientExited: 975 clientExitStatus = 137 # assume kill -9 below 976 msgToSend = { 'cmd' : 'collective_abort', 977 'src' : self.myId, 'rank' : self.myRank, 978 'exit_status' : clientExitStatus } 979 self.ring.rhsSock.send_dict_msg(msgToSend) 980 try: 981 pgrp = clientPid * (-1) # neg Pid -> group 982 os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler 983 except: 984 pass 985 return 986 if line.startswith('mcmd='): 987 parsedMsg = {} 988 line = line.rstrip() 989 splitLine = line.split('=',1) 990 parsedMsg['cmd'] = splitLine[1] 991 line = '' 992 while not line.startswith('endcmd'): 993 line = self.pmiSock.recv_char_msg() 994 if not line.startswith('endcmd'): 995 line = line.rstrip() 996 splitLine = line.split('=',1) 997 parsedMsg[splitLine[0]] = splitLine[1] 998 else: 999 parsedMsg = parse_pmi_msg(line) 1000 if not parsedMsg.has_key('cmd'): 1001 pmiMsgToSend = 'cmd=unparseable_msg rc=-1\n' 1002 self.pmiSock.send_char_msg(pmiMsgToSend) 1003 return 1004 # startup_status may be sent here from new process BEFORE starting client 1005 if parsedMsg['cmd'] == 'startup_status': 1006 msgToSend = { 'cmd' : 'startup_status', 'src' : self.myId, 1007 'rc' : parsedMsg['rc'], 1008 'jobid' : self.jobid, 'rank' : self.myRank, 1009 'exec' : parsedMsg['exec'], 'reason' : parsedMsg['reason'] } 1010 if self.ring.rhsSock: 1011 self.ring.rhsSock.send_dict_msg(msgToSend) 1012 elif parsedMsg['cmd'] == 'init': 1013 self.pmiCollectiveJob = 1 1014 version = int(parsedMsg['pmi_version']) 1015 subversion = int(parsedMsg['pmi_subversion']) 1016 if self.pmiVersion == version and self.pmiSubversion >= subversion: 1017 rc = 0 1018 else: 1019 rc = -1 1020 pmiMsgToSend = 'cmd=response_to_init pmi_version=%d pmi_subversion=%d rc=%d\n' % \ 1021 (self.pmiVersion,self.pmiSubversion,rc) 1022 self.pmiSock.send_char_msg(pmiMsgToSend) 1023 msgToSend = { 'cmd' : 'startup_status', 'src' : self.myId, 'rc' : 0, 1024 'jobid' : self.jobid, 'rank' : self.myRank, 1025 'exec' : '', 'reason' : '' } 1026 self.ring.rhsSock.send_dict_msg(msgToSend) 1027 elif parsedMsg['cmd'] == 'get_my_kvsname': 1028 pmiMsgToSend = 'cmd=my_kvsname kvsname=%s\n' % (self.default_kvsname) 1029 self.pmiSock.send_char_msg(pmiMsgToSend) 1030 elif parsedMsg['cmd'] == 'get_maxes': 1031 pmiMsgToSend = 'cmd=maxes kvsname_max=4096 ' + \ 1032 'keylen_max=4096 vallen_max=4096\n' 1033 self.pmiSock.send_char_msg(pmiMsgToSend) 1034 elif parsedMsg['cmd'] == 'get_universe_size': 1035 pmiMsgToSend = 'cmd=universe_size size=%s\n' % (self.universeSize) 1036 self.pmiSock.send_char_msg(pmiMsgToSend) 1037 elif parsedMsg['cmd'] == 'get_appnum': 1038 pmiMsgToSend = 'cmd=appnum appnum=%s\n' % (self.appnum) 1039 self.pmiSock.send_char_msg(pmiMsgToSend) 1040 elif parsedMsg['cmd'] == 'publish_name': 1041 msgToSend = { 'cmd' : 'publish_name', 1042 'service' : parsedMsg['service'], 1043 'port' : parsedMsg['port'], 1044 'jobid' : self.jobid, 1045 'manpid' : os.getpid() } 1046 self.mpdSock.send_dict_msg(msgToSend) 1047 elif parsedMsg['cmd'] == 'unpublish_name': 1048 msgToSend = { 'cmd' : 'unpublish_name', 1049 'service' : parsedMsg['service'], 1050 'jobid' : self.jobid, 1051 'manpid' : os.getpid() } 1052 self.mpdSock.send_dict_msg(msgToSend) 1053 elif parsedMsg['cmd'] == 'lookup_name': 1054 msgToSend = { 'cmd' : 'lookup_name', 1055 'service' : parsedMsg['service'], 1056 'jobid' : self.jobid, 1057 'manpid' : os.getpid() } 1058 self.mpdSock.send_dict_msg(msgToSend) 1059 elif parsedMsg['cmd'] == 'create_kvs': 1060 new_kvsname = self.kvsname_template + str(self.kvs_next_id) 1061 self.KVSs[new_kvsname] = {} 1062 self.kvs_next_id += 1 1063 pmiMsgToSend = 'cmd=newkvs kvsname=%s\n' % (new_kvsname) 1064 self.pmiSock.send_char_msg(pmiMsgToSend) 1065 elif parsedMsg['cmd'] == 'destroy_kvs': 1066 kvsname = parsedMsg['kvsname'] 1067 try: 1068 del self.KVSs[kvsname] 1069 pmiMsgToSend = 'cmd=kvs_destroyed rc=0\n' 1070 except: 1071 pmiMsgToSend = 'cmd=kvs_destroyed rc=-1\n' 1072 self.pmiSock.send_char_msg(pmiMsgToSend) 1073 elif parsedMsg['cmd'] == 'put': 1074 kvsname = parsedMsg['kvsname'] 1075 key = parsedMsg['key'] 1076 value = parsedMsg['value'] 1077 try: 1078 self.KVSs[kvsname][key] = value 1079 pmiMsgToSend = 'cmd=put_result rc=0\n' 1080 self.pmiSock.send_char_msg(pmiMsgToSend) 1081 except Exception, errmsg: 1082 pmiMsgToSend = 'cmd=put_result rc=-1 msg="%s"\n' % errmsg 1083 self.pmiSock.send_char_msg(pmiMsgToSend) 1084 elif parsedMsg['cmd'] == 'barrier_in': 1085 self.pmiBarrierInRecvd = 1 1086 if self.myRank == 0 or self.holdingPMIBarrierLoop1: 1087 msgToSend = { 'cmd' : 'pmi_barrier_loop_1' } 1088 self.ring.rhsSock.send_dict_msg(msgToSend) 1089 elif parsedMsg['cmd'] == 'get': 1090 key = parsedMsg['key'] 1091 kvsname = parsedMsg['kvsname'] 1092 if self.KVSs.has_key(kvsname) and self.KVSs[kvsname].has_key(key): 1093 value = self.KVSs[kvsname][key] 1094 pmiMsgToSend = 'cmd=get_result rc=0 value=%s\n' % (value) 1095 self.pmiSock.send_char_msg(pmiMsgToSend) 1096 else: 1097 msgToSend = { 'cmd' : 'pmi_get', 'key' : key, 1098 'kvsname' : kvsname, 'from_rank' : self.myRank } 1099 self.ring.rhsSock.send_dict_msg(msgToSend) 1100 elif parsedMsg['cmd'] == 'getbyidx': 1101 kvsname = parsedMsg['kvsname'] 1102 idx = int(parsedMsg['idx']) 1103 if idx == 0: 1104 msgToSend = { 'cmd' : 'pmi_getbyidx', 'kvsname' : kvsname, 1105 'from_rank' : self.myRank, 1106 'kvs' : self.KVSs[self.default_kvsname] } 1107 self.ring.rhsSock.send_dict_msg(msgToSend) 1108 else: 1109 if len(self.KVSs[self.default_kvsname].keys()) > idx: 1110 key = self.KVSs[self.default_kvsname].keys()[idx] 1111 val = self.KVSs[self.default_kvsname][key] 1112 nextidx = idx + 1 1113 pmiMsgToSend = 'cmd=getbyidx_results rc=0 nextidx=%d key=%s val=%s\n' % \ 1114 (nextidx,key,val) 1115 else: 1116 pmiMsgToSend = 'cmd=getbyidx_results rc=-2 reason=no_more_keyvals\n' 1117 self.pmiSock.send_char_msg(pmiMsgToSend) 1118 elif parsedMsg['cmd'] == 'spawn': 1119 ## This code really is handling PMI_Spawn_multiple. It translates a 1120 ## sequence of separate spawn messages into a single message to send 1121 ## to the mpd. It keeps track by the "totspawns" and "spawnssofar" 1122 ## parameters in the incoming message. The first message has 1123 ## "spawnssofar" set to 1. 1124 ## 1125 ## This proc may produce stdout and stderr; do this early so I 1126 ## won't exit before child sets up its conns with me. 1127 ## NOTE: if you spawn a non-MPI job, it may not send these msgs 1128 ## in which case adding 2 to numWithIO will cause the pgm to hang. 1129 totspawns = int(parsedMsg['totspawns']) 1130 spawnssofar = int(parsedMsg['spawnssofar']) 1131 if spawnssofar == 1: # this is the first of possibly several spawn msgs 1132 self.numWithIO += 2 1133 self.tpsf = 0 # total processes spawned so far 1134 self.spawnExecs = {} # part of MPI_Spawn_multiple args 1135 self.spawnHosts = {} # comes from info 1136 self.spawnUsers = {} # always the current user 1137 self.spawnCwds = {} # could come from info, but doesn't yet 1138 self.spawnUmasks = {} # could come from info, but doesn't yet 1139 self.spawnPaths = {} # could come from info, but doesn't yet 1140 self.spawnEnvvars = {} # whole environment from mpiexec, plus appnum 1141 self.spawnLimits = {} 1142 self.spawnArgs = {} 1143 self.spawnNprocs = int(parsedMsg['nprocs']) # num procs in this spawn 1144 pmiInfo = {} 1145 for i in range(0,int(parsedMsg['info_num'])): 1146 info_key = parsedMsg['info_key_%d' % i] 1147 info_val = parsedMsg['info_val_%d' % i] 1148 pmiInfo[info_key] = info_val 1149 1150 if pmiInfo.has_key('host'): 1151 try: 1152 toIfhn = socket.gethostbyname_ex(pmiInfo['host'])[2][0] 1153 self.spawnHosts[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = toIfhn 1154 except: 1155 mpd_print(1, "unable to obtain host info for :%s:" % (pmiInfo['host'])) 1156 pmiMsgToSend = 'cmd=spawn_result rc=-2 status=unknown_host\n' 1157 self.pmiSock.send_char_msg(pmiMsgToSend) 1158 return 1159 else: 1160 self.spawnHosts[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = '_any_' 1161 if pmiInfo.has_key('path'): 1162 self.spawnPaths[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = pmiInfo['path'] 1163 else: 1164 self.spawnPaths[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = os.environ['MPDMAN_CLI_PATH'] 1165 if pmiInfo.has_key('wdir'): 1166 self.spawnCwds[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = pmiInfo['wdir'] 1167 else: 1168 self.spawnCwds[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = os.environ['MPDMAN_CWD'] 1169 if pmiInfo.has_key('umask'): 1170 self.spawnUmasks[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = pmiInfo['umask'] 1171 else: 1172 self.spawnUmasks[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = os.environ['MPDMAN_UMASK'] 1173 self.spawnExecs[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = parsedMsg['execname'] 1174 self.spawnUsers[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = mpd_get_my_username() 1175 self.spawnEnv = {} 1176 self.spawnEnv.update(os.environ) 1177 self.spawnEnv['MPI_APPNUM'] = str(spawnssofar-1) 1178 self.spawnEnvvars[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = self.spawnEnv 1179 self.spawnLimits[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = {} # not implemented yet 1180 ##### args[(tpsf,tpsf+spawnNprocs-1) = [ parsedMsg['args'] ] 1181 ##### args[(tpsf,tpsf+spawnNprocs-1) = [ 'AA', 'BB', 'CC' ] 1182 cliArgs = [] 1183 cliArgcnt = int(parsedMsg['argcnt']) 1184 for i in range(1,cliArgcnt+1): # start at 1 1185 cliArgs.append(parsedMsg['arg%d' % i]) 1186 self.spawnArgs[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = cliArgs 1187 self.tpsf += self.spawnNprocs 1188 1189 if totspawns == spawnssofar: # This is the last in the spawn sequence 1190 self.spawnedCnt += 1 # non-zero to use for creating kvsname in msg below 1191 msgToSend = { 'cmd' : 'spawn', 1192 'conhost' : self.myHost, 1193 'conifhn' : self.myIfhn, 1194 'conport' : self.listenNonRingPort, 1195 'spawned' : self.spawnedCnt, 1196 'jobid' : self.jobid, 1197 'nstarted' : 0, 1198 'nprocs' : self.tpsf, 1199 'hosts' : self.spawnHosts, 1200 'execs' : self.spawnExecs, 1201 'users' : self.spawnUsers, 1202 'cwds' : self.spawnCwds, 1203 'umasks' : self.spawnUmasks, 1204 'paths' : self.spawnPaths, 1205 'args' : self.spawnArgs, 1206 'envvars' : self.spawnEnvvars, 1207 'limits' : self.spawnLimits, 1208 'singinitpid' : 0, 1209 'singinitport' : 0, 1210 } 1211 msgToSend['line_labels'] = self.lineLabelFmt 1212 msgToSend['spawner_manpid'] = os.getpid() 1213 self.mpdSock.send_dict_msg(msgToSend) 1214 self.spawnInProgress = parsedMsg 1215 self.spawnInProgress['errcodes'] = [None] * self.tpsf # one for each spawn 1216 # I could send the preput_info along but will keep it here 1217 # and let the spawnee call me up and ask for it; he will 1218 # call me anyway since I am his parent in the tree. So, I 1219 # will create a KVS to hold the info until he calls 1220 self.spawnedKVSname = 'mpdman_kvs_for_spawned_' + str(self.spawnedCnt) 1221 self.KVSs[self.spawnedKVSname] = {} 1222 preput_num = int(parsedMsg['preput_num']) 1223 for i in range(0,preput_num): 1224 preput_key = parsedMsg['preput_key_%d' % i] 1225 preput_val = parsedMsg['preput_val_%d' % i] 1226 self.KVSs[self.spawnedKVSname][preput_key] = preput_val 1227 elif parsedMsg['cmd'] == 'finalize': 1228 # the following lines are present to support a process that runs 1229 # 2 MPI pgms in tandem (e.g. mpish at ANL) 1230 self.KVSs = {} 1231 self.KVSs[self.default_kvsname] = {} 1232 self.kvs_next_id = 1 1233 self.jobEndingEarly = 0 1234 self.pmiCollectiveJob = 0 1235 self.spawnedCnt = 0 1236 pmiMsgToSend = 'cmd=finalize_ack\n' 1237 self.pmiSock.send_char_msg(pmiMsgToSend) 1238 elif parsedMsg['cmd'] == 'client_bnr_fence_in': ## BNR 1239 self.pmiBarrierInRecvd = 1 1240 if self.myRank == 0 or self.holdingPMIBarrierLoop1: 1241 msgToSend = { 'cmd' : 'pmi_barrier_loop_1' } 1242 self.ring.rhsSock.send_dict_msg(msgToSend) 1243 elif parsedMsg['cmd'] == 'client_bnr_put': ## BNR 1244 key = parsedMsg['attr'] 1245 value = parsedMsg['val'] 1246 try: 1247 self.KVSs[self.default_kvsname][key] = value 1248 pmiMsgToSend = 'cmd=put_result rc=0\n' 1249 self.pmiSock.send_char_msg(pmiMsgToSend) 1250 except Exception, errmsg: 1251 pmiMsgToSend = 'cmd=put_result rc=-1 msg="%s"\n' % errmsg 1252 self.pmiSock.send_char_msg(pmiMsgToSend) 1253 elif parsedMsg['cmd'] == 'client_bnr_get': ## BNR 1254 key = parsedMsg['attr'] 1255 if self.KVSs[self.default_kvsname].has_key(key): 1256 value = self.KVSs[self.default_kvsname][key] 1257 pmiMsgToSend = 'cmd=client_bnr_get_output rc=0 val=%s\n' % (value) 1258 self.pmiSock.send_char_msg(pmiMsgToSend) 1259 else: 1260 msgToSend = { 'cmd' : 'bnr_get', 'key' : key, 1261 'kvsname' : kvsname, 'from_rank' : self.myRank } 1262 self.ring.rhsSock.send_dict_msg(msgToSend) 1263 elif parsedMsg['cmd'] == 'client_ready': ## BNR 1264 ## continue to wait for accepting_signals 1265 pass 1266 elif parsedMsg['cmd'] == 'accepting_signals': ## BNR 1267 ## handle it like a barrier_in ?? 1268 self.pmiBarrierInRecvd = 1 1269 self.doingBNR = 1 ## BNR # set again is OK 1270 elif parsedMsg['cmd'] == 'interrupt_peer_with_msg': ## BNR 1271 self.ring.rhsSock.send_dict_msg(parsedMsg) 1272 else: 1273 mpd_print(1, "unrecognized pmi msg :%s:" % line ) 1274 def handle_console_input(self,sock): 1275 msg = self.conSock.recv_dict_msg() 1276 if not msg: 1277 if self.conSock: 1278 self.streamHandler.del_handler(self.conSock) 1279 self.conSock.close() 1280 self.conSock = 0 1281 if self.parentStdoutSock: 1282 self.streamHandler.del_handler(self.parentStdoutSock) 1283 self.parentStdoutSock.close() 1284 self.parentStdoutSock = 0 1285 if self.parentStderrSock: 1286 self.streamHandler.del_handler(self.parentStderrSock) 1287 self.parentStderrSock.close() 1288 self.parentStderrSock = 0 1289 if self.ring.rhsSock: 1290 msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGKILL' } 1291 self.ring.rhsSock.send_dict_msg(msgToSend) 1292 try: 1293 pgrp = clientPid * (-1) # neg Pid -> group 1294 os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler 1295 except: 1296 pass 1297 elif msg['cmd'] == 'signal': 1298 if msg['signo'] == 'SIGINT': 1299 self.ring.rhsSock.send_dict_msg(msg) 1300 for s in self.spawnedChildSocks: 1301 s.send_dict_msg(msg) 1302 if self.gdb: 1303 os.kill(clientPid,signal.SIGINT) 1304 else: 1305 try: 1306 pgrp = clientPid * (-1) # neg Pid -> group 1307 os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler 1308 except: 1309 pass 1310 elif msg['signo'] == 'SIGKILL': 1311 try: 1312 self.ring.rhsSock.send_dict_msg(msg) 1313 except: 1314 pass 1315 for s in self.spawnedChildSocks: 1316 try: 1317 s.send_dict_msg(msg) 1318 except: 1319 pass 1320 if self.gdb: 1321 os.kill(clientPid,signal.SIGUSR1) # tell gdb driver to kill all 1322 else: 1323 try: 1324 pgrp = clientPid * (-1) # neg Pid -> group 1325 os.kill(pgrp,signal.SIGKILL) # may be reaped by sighandler 1326 except: 1327 pass 1328 elif msg['signo'] == 'SIGTSTP': 1329 msg['dest'] = self.myId 1330 self.ring.rhsSock.send_dict_msg(msg) 1331 try: 1332 pgrp = clientPid * (-1) # neg Pid -> group 1333 os.kill(pgrp,signal.SIGTSTP) # may be reaped by sighandler 1334 except: 1335 pass 1336 elif msg['signo'] == 'SIGCONT': 1337 msg['dest'] = self.myId 1338 self.ring.rhsSock.send_dict_msg(msg) 1339 try: 1340 pgrp = clientPid * (-1) # neg Pid -> group 1341 os.kill(pgrp,signal.SIGCONT) # may be reaped by sighandler 1342 except: 1343 pass 1344 elif msg['cmd'] == 'stdin_from_user': 1345 msg['src'] = self.myId 1346 if self.ring.rhsSock: 1347 # Only send to rhs if that sock is open 1348 self.ring.rhsSock.send_dict_msg(msg) 1349 if in_stdinRcvrs(self.myRank,self.stdinDest): 1350 try: 1351 if msg.has_key('eof'): 1352 if self.subproc: # must close subproc's file (not just the fd) 1353 self.subproc.stdin.close() 1354 else: 1355 os.close(self.fd_write_cli_stdin) 1356 else: 1357 os.write(self.fd_write_cli_stdin,msg['line']) 1358 except: 1359 mpd_print(1, 'cannot send stdin to client') 1360 elif msg['cmd'] == 'stdin_dest': 1361 self.stdinDest = msg['stdin_procs'] 1362 msg['src'] = self.myId 1363 if self.ring.rhsSock: 1364 # Only send to rhs if that sock is open 1365 self.ring.rhsSock.send_dict_msg(msg) 1366 elif msg['cmd'] == 'tv_ready': 1367 self.tvReady = 1 1368 msg['src'] = self.myId 1369 self.ring.rhsSock.send_dict_msg(msg) 1370 if self.pmiSock: # should be valid sock if running tv 1371 pmiMsgToSend = 'cmd=tv_ready\n' 1372 self.pmiSock.send_char_msg(pmiMsgToSend) 1373 else: 1374 mpd_print(1, 'unexpected msg recvd on conSock :%s:' % msg ) 1375 def handle_mpd_input(self,sock): 1376 msg = self.mpdSock.recv_dict_msg() 1377 mpd_print(0000, 'msg recvd on mpdSock :%s:' % msg ) 1378 if not msg: 1379 if self.conSock: 1380 msgToSend = { 'cmd' : 'job_aborted', 'reason' : 'mpd disappeared', 1381 'jobid' : self.jobid } 1382 self.conSock.send_dict_msg(msgToSend,errprint=0) 1383 self.streamHandler.del_handler(self.conSock) 1384 self.conSock.close() 1385 self.conSock = 0 1386 try: 1387 os.kill(0,signal.SIGKILL) # pid 0 -> all in my process group 1388 except: 1389 pass 1390 sys.exit(0) 1391 if msg['cmd'] == 'abortjob': 1392 mpd_print(1, "job aborted by mpd; reason=%s" % (msg['reason'])) 1393 elif msg['cmd'] == 'startup_status': # probably some hosts not found 1394 if self.pmiSock: # may have disappeared in early shutdown 1395 pmiMsgToSend = 'cmd=spawn_result rc=-1 errcodes='' reason=%s\n' % (msg['reason']) 1396 self.pmiSock.send_char_msg(pmiMsgToSend) 1397 elif msg['cmd'] == 'signal_to_handle' and msg.has_key('sigtype'): 1398 if msg['sigtype'].isdigit(): 1399 signum = int(msg['sigtype']) 1400 else: 1401 exec('signum = %s' % 'signal.SIG' + msg['sigtype']) 1402 try: 1403 if msg['s_or_g'] == 's': # single (not entire group) 1404 pgrp = clientPid # just client process 1405 else: 1406 pgrp = clientPid * (-1) # neg Pid -> process group 1407 os.kill(pgrp,signum) 1408 except Exception, errmsg: 1409 mpd_print(1, 'invalid signal (%d) from mpd' % (signum) ) 1410 elif msg['cmd'] == 'publish_result': 1411 if self.pmiSock: 1412 pmiMsgToSend = 'cmd=publish_result info=%s\n' % (msg['info']) 1413 self.pmiSock.send_char_msg(pmiMsgToSend) 1414 elif msg['cmd'] == 'unpublish_result': 1415 if self.pmiSock: 1416 pmiMsgToSend = 'cmd=unpublish_result info=%s\n' % (msg['info']) 1417 self.pmiSock.send_char_msg(pmiMsgToSend) 1418 elif msg['cmd'] == 'lookup_result': 1419 if self.pmiSock: 1420 pmiMsgToSend = 'cmd=lookup_result info=%s port=%s\n' % \ 1421 (msg['info'],msg['port']) 1422 self.pmiSock.send_char_msg(pmiMsgToSend) 1423 elif msg['cmd'] == 'spawn_done_by_mpd': 1424 pass 1425 else: 1426 mpd_print(1, 'invalid msg recvd on mpdSock :%s:' % msg ) 1427 def launch_client_via_fork_exec(self,cli_env): 1428 maxTries = 6 1429 numTries = 0 1430 while numTries < maxTries: 1431 try: 1432 cliPid = os.fork() 1433 errinfo = 0 1434 except OSError, errinfo: 1435 pass ## could check for errinfo.errno == 35 (resource unavailable) 1436 if errinfo: 1437 sleep(1) 1438 numTries += 1 1439 else: 1440 break 1441 if numTries >= maxTries: 1442 ## print '**** mpdman: fork failed for launching client' 1443 return -1 1444 if cliPid == 0: 1445 mpd_set_my_id(socket.gethostname() + '_man_before_exec_client_' + `os.getpid()`) 1446 self.ring.lhsSock.close() 1447 self.ring.rhsSock.close() 1448 self.listenRingSock.close() 1449 if self.conSock: 1450 self.streamHandler.del_handler(self.conSock) 1451 self.conSock.close() 1452 self.conSock = 0 1453 self.pmiListenSock.close() 1454 os.setpgrp() 1455 1456 os.close(self.fd_write_cli_stdin) 1457 os.dup2(self.fd_read_cli_stdin,0) # closes fd 0 (stdin) if open 1458 1459 # to simply print on the mpd's tty: 1460 # comment out the next lines 1461 os.close(self.fd_read_cli_stdout) 1462 os.dup2(self.fd_write_cli_stdout,1) # closes fd 1 (stdout) if open 1463 os.close(self.fd_write_cli_stdout) 1464 os.close(self.fd_read_cli_stderr) 1465 os.dup2(self.fd_write_cli_stderr,2) # closes fd 2 (stderr) if open 1466 os.close(self.fd_write_cli_stderr) 1467 1468 msg = self.handshake_sock_cli_end.recv_char_msg() 1469 if not msg.startswith('go'): 1470 mpd_print(1,'%s: invalid go msg from man :%s:' % (self.myId,msg) ) 1471 sys.exit(-1) 1472 self.handshake_sock_cli_end.close() 1473 1474 self.clientPgmArgs = [self.clientPgm] + self.clientPgmArgs 1475 errmsg = set_limits(self.clientPgmLimits) 1476 if errmsg: 1477 self.pmiSock = MPDSock(name='pmi') 1478 self.pmiSock.connect((self.myIfhn,self.pmiListenPort)) 1479 reason = quote(str(errmsg)) 1480 pmiMsgToSend = 'cmd=startup_status rc=-1 reason=%s exec=%s\n' % \ 1481 (reason,self.clientPgm) 1482 self.pmiSock.send_char_msg(pmiMsgToSend) 1483 sys.exit(0) 1484 try: 1485 mpd_print(0000, 'execing clientPgm=:%s:' % (self.clientPgm) ) 1486 if self.gdb: 1487 fullDirName = os.environ['MPDMAN_FULLPATHDIR'] 1488 gdbdrv = os.path.join(fullDirName,'mpdgdbdrv.py') 1489 if not os.access(gdbdrv,os.X_OK): 1490 print 'mpdman: cannot execute mpdgdbdrv %s' % gdbdrv 1491 sys.exit(0); 1492 if self.gdba: 1493 self.clientPgmArgs.insert(0,'-attach') 1494 self.clientPgmArgs.insert(0,self.clientPgm) 1495 os.execvpe(gdbdrv,self.clientPgmArgs,cli_env) # client 1496 else: 1497 os.environ['PATH'] = cli_env['PATH'] 1498 os.execvpe(self.clientPgm,self.clientPgmArgs,cli_env) # client 1499 except Exception, errmsg: 1500 # print '%s: could not run %s; probably executable file not found' % \ 1501 # (self.myId,clientPgm) 1502 self.pmiSock = MPDSock(name='pmi') 1503 self.pmiSock.connect((self.myIfhn,self.pmiListenPort)) 1504 reason = quote(str(errmsg)) 1505 pmiMsgToSend = 'cmd=startup_status rc=-1 reason=%s exec=%s\n' % \ 1506 (reason,self.clientPgm) 1507 self.pmiSock.send_char_msg(pmiMsgToSend) 1508 sys.exit(0) 1509 sys.exit(0) 1510 if not self.singinitPORT: 1511 os.close(self.fd_read_cli_stdin) 1512 os.close(self.fd_write_cli_stdout) 1513 os.close(self.fd_write_cli_stderr) 1514 self.cliListenSock.close() 1515 return cliPid 1516 def launch_client_via_subprocess(self,cli_env): 1517 import threading 1518 def read_fd_with_func(fd,func): 1519 line = 'x' 1520 while line: 1521 line = func(fd) 1522 tempListenSock = MPDListenSock() 1523 tempListenPort = tempListenSock.getsockname()[1] 1524 # python_executable = '\Python24\python.exe' 1525 python_executable = 'python2.4' 1526 fullDirName = os.environ['MPDMAN_FULLPATHDIR'] 1527 mpdwrapcli = os.path.join(fullDirName,'mpdwrapcli.py') 1528 wrapCmdAndArgs = [ mpdwrapcli, str(tempListenPort), 1529 self.clientPgm, self.clientPgm ] + self.clientPgmArgs 1530 cli_env.update(os.environ) ###### RMB: MAY NEED VARS OTHER THAN PATH ????? 1531 self.subproc = subprocess.Popen([python_executable,'-u'] + wrapCmdAndArgs, 1532 bufsize=0,env=cli_env,close_fds=False, 1533 stdin=subprocess.PIPE, 1534 stdout=subprocess.PIPE, 1535 stderr=subprocess.PIPE) 1536 self.fd_write_cli_stdin = self.subproc.stdin.fileno() 1537 stdout_thread = threading.Thread(target=read_fd_with_func, 1538 args=(self.subproc.stdout.fileno(), 1539 self.handle_cli_stdout_input)) 1540 stdout_thread.start() 1541 stderr_thread = threading.Thread(target=read_fd_with_func, 1542 args=(self.subproc.stderr.fileno(), 1543 self.handle_cli_stderr_input)) 1544 stderr_thread.start() 1545 (self.handshake_sock_man_end,tempAddr) = tempListenSock.accept() 1546 cliPid = self.subproc.pid 1547 ## an mpd_print wreaks havoc here; simple prints are OK (probably a stack issue) 1548 # mpd_print(0000,"CLIPID=%d" % cliPid) 1549 # print "CLIPID=%d" % cliPid ; sys.stdout.flush() 1550 return cliPid 1551 def create_line_label(self,line_label_fmt,spawned): 1552 lineLabel = '' # default is no label 1553 if line_label_fmt: 1554 i = 0 1555 while i < len(line_label_fmt): 1556 if line_label_fmt[i] == '%': 1557 fmtchar = line_label_fmt[i+1] 1558 i += 2 1559 if fmtchar == 'r': 1560 lineLabel += str(self.myRank) 1561 elif fmtchar == 'h': 1562 lineLabel += self.myHost 1563 else: 1564 lineLabel += line_label_fmt[i] 1565 i += 1 1566 if spawned: 1567 lineLabel += ',' + str(spawned) + ': ' # spawned is actually a count 1568 else: 1569 lineLabel += ': ' 1570 return lineLabel 1571 1572def in_stdinRcvrs(myRank,stdinDest): 1573 s1 = stdinDest.split(',') 1574 for s in s1: 1575 s2 = s.split('-') 1576 if len(s2) == 1: 1577 if myRank == int(s2[0]): 1578 return 1 1579 else: 1580 if myRank >= int(s2[0]) and myRank <= int(s2[1]): 1581 return 1 1582 return 0 1583 1584 1585def parse_pmi_msg(msg): 1586 parsed_msg = {} 1587 try: 1588 sm = findall(r'\S+',msg) 1589 for e in sm: 1590 se = e.split('=') 1591 parsed_msg[se[0]] = se[1] 1592 except: 1593 print 'unable to parse pmi msg :%s:' % msg 1594 parsed_msg = {} 1595 return parsed_msg 1596 1597def set_limits(limits): 1598 try: 1599 import resource 1600 except: 1601 return 'unable to import resource module to set limits' 1602 for limtype in limits.keys(): 1603 limit = int(limits[limtype]) 1604 try: 1605 if limtype == 'core': 1606 resource.setrlimit(resource.RLIMIT_CORE,(limit,limit)) 1607 elif limtype == 'cpu': 1608 resource.setrlimit(resource.RLIMIT_CPU,(limit,limit)) 1609 elif limtype == 'fsize': 1610 resource.setrlimit(resource.RLIMIT_FSIZE,(limit,limit)) 1611 elif limtype == 'data': 1612 resource.setrlimit(resource.RLIMIT_DATA,(limit,limit)) 1613 elif limtype == 'stack': 1614 resource.setrlimit(resource.RLIMIT_STACK,(limit,limit)) 1615 elif limtype == 'rss': 1616 resource.setrlimit(resource.RLIMIT_RSS,(limit,limit)) 1617 elif limtype == 'nproc': 1618 resource.setrlimit(resource.RLIMIT_NPROC,(limit,limit)) 1619 elif limtype == 'nofile': 1620 resource.setrlimit(resource.RLIMIT_NOFILE,(limit,limit)) 1621 elif limtype == 'ofile': 1622 resource.setrlimit(resource.RLIMIT_OFILE,(limit,limit)) 1623 elif limtype == 'memloc': 1624 resource.setrlimit(resource.RLIMIT_MEMLOCK,(limit,limit)) 1625 elif limtype == 'as': 1626 resource.setrlimit(resource.RLIMIT_AS,(limit,limit)) 1627 elif limtype == 'vmem': 1628 resource.setrlimit(resource.RLIMIT_VMEM,(limit,limit)) 1629 else: 1630 raise NameError, 'invalid resource name: %s' % limtype # validated at mpdrun 1631 except (NameError,ImportError), errmsg: 1632 return errmsg 1633 return 0 1634 1635def sigchld_handler(signum,frame): 1636 global clientPid, clientExited, clientExitStatus, clientExitStatusSent 1637 done = 0 1638 while not done: 1639 try: 1640 (pid,status) = os.waitpid(-1,os.WNOHANG) 1641 if pid == 0: # no existing child process is finished 1642 done = 1 1643 if pid == clientPid: 1644 clientExited = 1 1645 clientExitStatus = status 1646 mpd_handle_signal(signum,0) 1647 except: 1648 done = 1 1649 1650 1651if __name__ == '__main__': 1652 if not os.environ.has_key('MPDMAN_CLI_PGM'): # assume invoked from keyboard 1653 print __doc__ 1654 sys.exit(-1) 1655 mpdman = MPDMan() 1656 mpdman.run() 1657