1#!/usr/bin/env python 2# 3# (C) 2001 by Argonne National Laboratory. 4# See COPYRIGHT in top-level directory. 5# 6 7""" 8usage: mpd [--host=<host> --port=<portnum>] [--noconsole] 9 [--trace] [--echo] [--daemon] [--bulletproof] --ncpus=<ncpus> 10 [--ifhn=<interface-hostname>] [--listenport=<listenport>] 11 [--pid=<pidfilename>] --tmpdir=<tmpdir>] [-zc] [--debug] 12 13Some long parameter names may be abbreviated to their first letters by using 14 only one hyphen and no equal sign: 15 mpd -h donner -p 4268 -n 16 is equivalent to 17 mpd --host=magpie --port=4268 --noconsole 18 19--host and --port must be specified together; they tell the new mpd where 20 to enter an existing ring; if they are omitted, the new mpd forms a 21 stand-alone ring that other mpds may enter later 22--noconsole is useful for running 2 mpds on the same machine; only one of 23 them will accept mpd commands 24--trace yields lots of traces thru mpd routines; currently too verbose 25--debug turns on some debugging prints; currently not verbose enough 26--echo causes the mpd echo its listener port by which other mpds may connect 27--daemon causes mpd to run backgrounded, with no controlling tty 28--bulletproof says to turn bulletproofing on (experimental) 29--ncpus indicates how many cpus are on the local host; used for starting processes 30--ifhn specifies an alternate interface hostname for the host this mpd is running on; 31 e.g. may be used to specify the alias for an interface other than default 32--listenport specifies a port for this mpd to listen on; by default it will 33 acquire one from the system 34--conlistenport specifies a port for this mpd to listen on for console 35 connections (only used when employing inet socket for console); by default it 36 will acquire one from the system 37--pid=filename writes the mpd pid into the specified file, or --pid alone 38 writes it into /var/run/mpd.pid 39--tmpdir=tmpdirname where mpd places temporary sockets, etc. 40-zc is a purely EXPERIMENTAL option right now used to investigate zeroconf 41 networking; it can be used to allow mpds to discover each other locally 42 using multicast DNS; its usage may change over time 43 Currently, -zc is specified like this: -zc N 44 where N specifies a 'level' in a startup set of mpds. The first mpd in a ring 45 must have 1 and it will establish a ring of one mpd. Subsequent mpds can specify 46 -zc 2 and will hook into the ring via the one at level 1. Except for level 1, new 47 mpds enter the ring via an mpd at level-1. 48 49A file named .mpd.conf file must be present in the user's home directory 50 with read and write access only for the user, and must contain at least 51 a line with MPD_SECRETWORD=<secretword> 52 53To run mpd as root, install it while root and instead of a .mpd.conf file 54use mpd.conf (no leading dot) in the /etc directory.' 55""" 56from time import ctime 57from mpdlib import mpd_version 58__author__ = "Ralph Butler and Rusty Lusk" 59__date__ = ctime() 60__version__ = "$Revision: 1.160 $" 61__version__ += " " + str(mpd_version()) 62__credits__ = "" 63 64 65import sys, os, signal, socket, stat 66 67from re import sub 68from atexit import register 69from cPickle import dumps 70from types import ClassType 71from random import seed, randrange, random 72from time import sleep 73from md5 import new as md5new 74from mpdlib import mpd_set_my_id, mpd_check_python_version, mpd_sockpair, \ 75 mpd_print, mpd_get_my_username, mpd_close_zc, \ 76 mpd_get_groups_for_username, mpd_uncaught_except_tb, \ 77 mpd_set_procedures_to_trace, mpd_trace_calls, \ 78 mpd_dbg_level, mpd_set_dbg_level, mpd_set_tmpdir, \ 79 MPDSock, MPDListenSock, MPDConListenSock, \ 80 MPDStreamHandler, MPDRing, MPDParmDB 81from mpdman import MPDMan 82 83# fix for ticket #753 where the set() builtin isn't available in python2.3 84try: 85 set 86except NameError: 87 from sets import Set as set 88 89 90try: 91 import pwd 92 pwd_module_available = 1 93except: 94 pwd_module_available = 0 95try: 96 import syslog 97 syslog_module_available = 1 98except: 99 syslog_module_available = 0 100try: 101 import subprocess 102 subprocess_module_available = 1 103except: 104 subprocess_module_available = 0 105 106 107def sigchld_handler(signum,frame): 108 done = 0 109 while not done: 110 try: 111 (pid,status) = os.waitpid(-1,os.WNOHANG) 112 if pid == 0: # no existing child process is finished 113 done = 1 114 except: # no more child processes to be waited for 115 done = 1 116 117class MPD(object): 118 def __init__(self): 119 self.myHost = socket.gethostname() 120 try: 121 hostinfo = socket.gethostbyname_ex(self.myHost) 122 self.myIfhn = hostinfo[2][0] # chgd below when I get the real value 123 except: 124 print 'mpd failed: gethostbyname_ex failed for %s' % (self.myHost) 125 sys.exit(-1) 126 def run(self): 127 if syslog_module_available: 128 syslog.openlog("mpd",0,syslog.LOG_DAEMON) 129 syslog.syslog(syslog.LOG_INFO,"mpd starting; no mpdid yet") 130 sys.excepthook = mpd_uncaught_except_tb 131 self.spawnQ = [] 132 self.spawnInProgress = 0 133 self.parmdb = MPDParmDB(orderedSources=['cmdline','xml','env','rcfile','thispgm']) 134 self.parmsToOverride = { 135 'MPD_SECRETWORD' : '', 136 'MPD_MY_IFHN' : self.myIfhn, 137 'MPD_ENTRY_IFHN' : '', 138 'MPD_ENTRY_PORT' : 0, 139 'MPD_NCPUS' : 1, 140 'MPD_LISTEN_PORT' : 0, 141 'MPD_TRACE_FLAG' : 0, 142 'MPD_CONSOLE_FLAG' : 1, 143 'MPD_ECHO_PORT_FLAG' : 0, 144 'MPD_DAEMON_FLAG' : 0, 145 'MPD_BULLETPROOF_FLAG' : 0, 146 'MPD_PID_FILENAME' : '', 147 'MPD_ZC' : 0, 148 'MPD_LOGFILE_TRUNC_SZ' : 4000000, # -1 -> don't trunc 149 'MPD_PORT_RANGE' : 0, 150 'MPD_TMPDIR' : '/tmp', 151 } 152 for (k,v) in self.parmsToOverride.items(): 153 self.parmdb[('thispgm',k)] = v 154 self.get_parms_from_cmdline() 155 self.parmdb.get_parms_from_rcfile(self.parmsToOverride,errIfMissingFile=1) 156 self.parmdb.get_parms_from_env(self.parmsToOverride) 157 self.myIfhn = self.parmdb['MPD_MY_IFHN'] # variable for convenience 158 self.myPid = os.getpid() 159 if self.parmdb['MPD_PORT_RANGE']: 160 os.environ['MPICH_PORT_RANGE'] = self.parmdb['MPD_PORT_RANGE'] 161 self.tmpdir = self.parmdb['MPD_TMPDIR'] 162 mpd_set_tmpdir(self.tmpdir) 163 self.listenSock = MPDListenSock(name='ring_listen_sock', 164 port=self.parmdb['MPD_LISTEN_PORT']) 165 self.parmdb[('thispgm','MPD_LISTEN_PORT')] = self.listenSock.sock.getsockname()[1] 166 self.myId = '%s_%d' % (self.myHost,self.parmdb['MPD_LISTEN_PORT']) 167 mpd_set_my_id(myid=self.myId) 168 self.streamHandler = MPDStreamHandler() 169 self.ring = MPDRing(streamHandler=self.streamHandler, 170 secretword=self.parmdb['MPD_SECRETWORD'], 171 listenSock=self.listenSock, 172 myIfhn=self.myIfhn, 173 entryIfhn=self.parmdb['MPD_ENTRY_IFHN'], 174 entryPort=self.parmdb['MPD_ENTRY_PORT'], 175 zcMyLevel=self.parmdb['MPD_ZC']) 176 # setup tracing if requested via args 177 if self.parmdb['MPD_TRACE_FLAG']: 178 proceduresToTrace = [] 179 import inspect 180 symbolsAndTypes = globals().items() + \ 181 inspect.getmembers(self) + \ 182 inspect.getmembers(self.ring) + \ 183 inspect.getmembers(self.streamHandler) 184 for (symbol,symtype) in symbolsAndTypes: 185 if symbol == '__init__': # a problem to trace 186 continue 187 if inspect.isfunction(symtype) or inspect.ismethod(symtype): 188 # print symbol 189 proceduresToTrace.append(symbol) 190 mpd_set_procedures_to_trace(proceduresToTrace) 191 sys.settrace(mpd_trace_calls) 192 if syslog_module_available: 193 syslog.syslog(syslog.LOG_INFO,"mpd has mpdid=%s (port=%d)" % \ 194 (self.myId,self.parmdb['MPD_LISTEN_PORT']) ) 195 vinfo = mpd_check_python_version() 196 if vinfo: 197 print "mpd: your python version must be >= 2.2 ; current version is:", vinfo 198 sys.exit(-1) 199 200 # need to close both object and underlying fd (ticket #963) 201 sys.stdin.close() 202 os.close(0) 203 204 if self.parmdb['MPD_ECHO_PORT_FLAG']: # do this before becoming a daemon 205 # print self.parmdb['MPD_LISTEN_PORT'] 206 print "mpd_port=%d" % self.parmdb['MPD_LISTEN_PORT'] 207 sys.stdout.flush() 208 ##### NEXT 2 for debugging 209 ## print >>sys.stderr, self.parmdb['MPD_LISTEN_PORT'] 210 ## sys.stderr.flush() 211 self.myRealUsername = mpd_get_my_username() 212 self.currRingSize = 1 # default 213 self.currRingNCPUs = 1 # default 214 if os.environ.has_key('MPD_CON_EXT'): 215 self.conExt = '_' + os.environ['MPD_CON_EXT'] 216 else: 217 self.conExt = '' 218 self.logFilename = self.tmpdir + '/mpd2.logfile_' + mpd_get_my_username() + self.conExt 219 if self.parmdb['MPD_PID_FILENAME']: # may overwrite it below 220 pidFile = open(self.parmdb['MPD_PID_FILENAME'],'w') 221 print >>pidFile, "%d" % (os.getpid()) 222 pidFile.close() 223 224 self.conListenSock = 0 # don't want one when I do cleanup for forked daemon procs 225 if self.parmdb['MPD_DAEMON_FLAG']: # see if I should become a daemon with no controlling tty 226 rc = os.fork() 227 if rc != 0: # parent exits; child in background 228 sys.exit(0) 229 os.setsid() # become session leader; no controlling tty 230 signal.signal(signal.SIGHUP,signal.SIG_IGN) # make sure no sighup when leader ends 231 ## leader exits; svr4: make sure do not get another controlling tty 232 rc = os.fork() 233 if rc != 0: 234 sys.exit(0) 235 if self.parmdb['MPD_PID_FILENAME']: # overwrite one above before chg usmask 236 pidFile = open(self.parmdb['MPD_PID_FILENAME'],'w') 237 print >>pidFile, "%d" % (os.getpid()) 238 pidFile.close() 239 os.chdir("/") # free up filesys for umount 240 os.umask(0) 241 try: os.unlink(self.logFilename) 242 except: pass 243 logFileFD = os.open(self.logFilename,os.O_CREAT|os.O_WRONLY|os.O_EXCL,0600) 244 self.logFile = os.fdopen(logFileFD,'w',0) 245 sys.stdout = self.logFile 246 sys.stderr = self.logFile 247 print >>sys.stdout, 'logfile for mpd with pid %d' % os.getpid() 248 sys.stdout.flush() 249 os.dup2(self.logFile.fileno(),sys.__stdout__.fileno()) 250 os.dup2(self.logFile.fileno(),sys.__stderr__.fileno()) 251 if self.parmdb['MPD_CONSOLE_FLAG']: 252 self.conListenSock = MPDConListenSock(secretword=self.parmdb['MPD_SECRETWORD']) 253 self.streamHandler.set_handler(self.conListenSock, 254 self.handle_console_connection) 255 register(self.cleanup) 256 seed() 257 self.nextJobInt = 1 258 self.activeJobs = {} 259 self.conSock = 0 260 self.allExiting = 0 # for mpdallexit (for first loop for graceful exit) 261 self.exiting = 0 # for mpdexit or mpdallexit 262 self.kvs_cntr = 0 # for mpdman 263 self.pulse_cntr = 0 264 rc = self.ring.enter_ring(lhsHandler=self.handle_lhs_input, 265 rhsHandler=self.handle_rhs_input) 266 if rc < 0: 267 mpd_print(1,"failed to enter ring") 268 sys.exit(-1) 269 self.pmi_published_names = {} 270 if hasattr(signal,'SIGCHLD'): 271 signal.signal(signal.SIGCHLD,sigchld_handler) 272 if not self.parmdb['MPD_BULLETPROOF_FLAG']: 273 # import profile ; profile.run('self.runmainloop()') 274 self.runmainloop() 275 else: 276 try: 277 from threading import Thread 278 except: 279 print '*** mpd terminating' 280 print ' bulletproof option must be able to import threading-Thread' 281 sys.exit(-1) 282 # may use SIG_IGN on all but SIGCHLD and SIGHUP (handled above) 283 while 1: 284 mpdtid = Thread(target=self.runmainloop) 285 mpdtid.start() 286 # signals must be handled in main thread; thus we permit timeout of join 287 while mpdtid.isAlive(): 288 mpdtid.join(2) # come out sometimes and handle signals 289 if self.exiting: 290 break 291 if self.conSock: 292 msgToSend = { 'cmd' : 'restarting_mpd' } 293 self.conSock.msgToSend.send_dict_msg(msgToSend) 294 self.streamHandler.del_handler(self.conSock) 295 self.conSock.close() 296 self.conSock = 0 297 def runmainloop(self): 298 # Main Loop 299 while 1: 300 if self.spawnQ and not self.spawnInProgress: 301 self.ring.rhsSock.send_dict_msg(self.spawnQ[0]) 302 self.spawnQ = self.spawnQ[1:] 303 self.spawnInProgress = 1 304 continue 305 rv = self.streamHandler.handle_active_streams(timeout=8.0) 306 if rv[0] < 0: 307 if type(rv[1]) == ClassType and rv[1] == KeyboardInterrupt: # ^C 308 sys.exit(-1) 309 if self.exiting: 310 break 311 if rv[0] == 0: 312 if self.pulse_cntr == 0 and self.ring.rhsSock: 313 self.ring.rhsSock.send_dict_msg({'cmd':'pulse'}) 314 self.pulse_cntr += 1 315 if self.pulse_cntr >= 3: 316 if self.ring.rhsSock: # rhs must have disappeared 317 self.streamHandler.del_handler(self.ring.rhsSock) 318 self.ring.rhsSock.close() 319 self.ring.rhsSock = 0 320 if self.ring.lhsSock: 321 self.streamHandler.del_handler(self.ring.lhsSock) 322 self.ring.lhsSock.close() 323 self.ring.lhsSock = 0 324 mpd_print(1,'no pulse_ack from rhs; re-entering ring') 325 rc = self.ring.reenter_ring(lhsHandler=self.handle_lhs_input, 326 rhsHandler=self.handle_rhs_input, 327 ntries=16) 328 if rc == 0: 329 mpd_print(1,'back in ring') 330 else: 331 mpd_print(1,'failed to reenter ring') 332 sys.exit(-1) 333 self.pulse_cntr = 0 334 mpd_close_zc() # only does something if we have zc 335 def usage(self): 336 print __doc__ 337 print "This version of mpd is", mpd_version() 338 sys.exit(-1) 339 def cleanup(self): 340 try: 341 mpd_print(0, "CLEANING UP" ) 342 if syslog_module_available: 343 syslog.syslog(syslog.LOG_INFO,"mpd ending mpdid=%s (inside cleanup)" % \ 344 (self.myId) ) 345 syslog.closelog() 346 if self.conListenSock: # only del if I created 347 os.unlink(self.conListenSock.conFilename) 348 except: 349 pass 350 def get_parms_from_cmdline(self): 351 global mpd_dbg_level 352 argidx = 1 353 while argidx < len(sys.argv): 354 if sys.argv[argidx] == '--help': 355 self.usage() 356 argidx += 1 357 elif sys.argv[argidx] == '-h': 358 if len(sys.argv) < 3: 359 self.usage() 360 self.parmdb[('cmdline','MPD_ENTRY_IFHN')] = sys.argv[argidx+1] 361 argidx += 2 362 elif sys.argv[argidx].startswith('--host'): 363 try: 364 entryHost = sys.argv[argidx].split('=',1)[1] 365 except: 366 print 'failed to parse --host option' 367 self.usage() 368 self.parmdb[('cmdline','MPD_ENTRY_IFHN')] = entryHost 369 argidx += 1 370 elif sys.argv[argidx] == '-p': 371 if argidx >= (len(sys.argv)-1): 372 print 'missing arg for -p' 373 sys.exit(-1) 374 if not sys.argv[argidx+1].isdigit(): 375 print 'invalid port %s ; must be numeric' % (sys.argv[argidx+1]) 376 sys.exit(-1) 377 self.parmdb[('cmdline','MPD_ENTRY_PORT')] = int(sys.argv[argidx+1]) 378 argidx += 2 379 elif sys.argv[argidx].startswith('--port'): 380 try: 381 entryPort = sys.argv[argidx].split('=',1)[1] 382 except: 383 print 'failed to parse --port option' 384 self.usage() 385 if not entryPort.isdigit(): 386 print 'invalid port %s ; must be numeric' % (entryPort) 387 sys.exit(-1) 388 self.parmdb[('cmdline','MPD_ENTRY_PORT')] = int(entryPort) 389 argidx += 1 390 elif sys.argv[argidx].startswith('--ncpus'): 391 try: 392 NCPUs = sys.argv[argidx].split('=',1)[1] 393 except: 394 print 'failed to parse --ncpus option' 395 self.usage() 396 if not NCPUs.isdigit(): 397 print 'invalid ncpus %s ; must be numeric' % (NCPUs) 398 sys.exit(-1) 399 self.parmdb[('cmdline','MPD_NCPUS')] = int(NCPUs) 400 argidx += 1 401 elif sys.argv[argidx].startswith('--pid'): 402 try: 403 splitPid = sys.argv[argidx].split('=') 404 except: 405 print 'failed to parse --pid option' 406 self.usage() 407 if len(splitPid) == 1 or not splitPid[1]: 408 pidFilename = '/var/run/mpd.pid' 409 else: 410 pidFilename = splitPid[1] 411 self.parmdb[('cmdline','MPD_PID_FILENAME')] = pidFilename 412 argidx += 1 413 elif sys.argv[argidx].startswith('--tmpdir'): 414 try: 415 splitTmpdir = sys.argv[argidx].split('=') 416 except: 417 print 'failed to parse --tmpdir option' 418 self.usage() 419 if len(splitTmpdir) == 1 or not splitTmpdir[1]: 420 tmpdirName = '/tmp' 421 else: 422 tmpdirName = splitTmpdir[1] 423 self.parmdb[('cmdline','MPD_TMPDIR')] = tmpdirName 424 argidx += 1 425 elif sys.argv[argidx].startswith('--ifhn'): 426 try: 427 ifhn = sys.argv[argidx].split('=',1)[1] 428 except: 429 print 'failed to parse --ifhn option' 430 self.usage() 431 try: 432 hostinfo = socket.gethostbyname_ex(ifhn) 433 ifhn = hostinfo[2][0] 434 except: 435 print 'mpd failed: gethostbyname_ex failed for %s' % (ifhn) 436 sys.exit(-1) 437 self.parmdb[('cmdline','MPD_MY_IFHN')] = ifhn 438 argidx += 1 439 elif sys.argv[argidx] == '-l': 440 if argidx >= (len(sys.argv)-1): 441 print 'missing arg for -l' 442 sys.exit(-1) 443 if not sys.argv[argidx+1].isdigit(): 444 print 'invalid listenport %s ; must be numeric' % (sys.argv[argidx+1]) 445 sys.exit(-1) 446 self.parmdb[('cmdline','MPD_LISTEN_PORT')] = int(sys.argv[argidx+1]) 447 argidx += 2 448 elif sys.argv[argidx].startswith('--listenport'): 449 try: 450 myListenPort = sys.argv[argidx].split('=',1)[1] 451 except: 452 print 'failed to parse --listenport option' 453 self.usage() 454 if not myListenPort.isdigit(): 455 print 'invalid listenport %s ; must be numeric' % (myListenPort) 456 sys.exit(-1) 457 self.parmdb[('cmdline','MPD_LISTEN_PORT')] = int(myListenPort) 458 argidx += 1 459 elif sys.argv[argidx] == '-hp': 460 if argidx >= (len(sys.argv)-1): 461 print 'missing arg for -hp' 462 sys.exit(-1) 463 try: 464 (entryIfhn,entryPort) = sys.argv[argidx+1].split('_') 465 except: 466 print 'invalid entry host: %s' % (sys.argv[argidx+1]) 467 sys.exit(-1) 468 if not entryPort.isdigit(): 469 print 'invalid port %s ; must be numeric' % (sys.argv[argidx+1]) 470 sys.exit(-1) 471 self.parmdb[('cmdline','MPD_ENTRY_IFHN')] = entryIfhn 472 self.parmdb[('cmdline','MPD_ENTRY_PORT')] = int(entryPort) 473 argidx += 2 474 elif sys.argv[argidx] == '-t' or sys.argv[argidx] == '--trace': 475 self.parmdb[('cmdline','MPD_TRACE_FLAG')] = 1 476 argidx += 1 477 elif sys.argv[argidx] == '--debug': 478 mpd_set_dbg_level(1) 479 argidx += 1 480 elif sys.argv[argidx] == '-n' or sys.argv[argidx] == '--noconsole': 481 self.parmdb[('cmdline','MPD_CONSOLE_FLAG')] = 0 482 argidx += 1 483 elif sys.argv[argidx] == '-e' or sys.argv[argidx] == '--echo': 484 self.parmdb[('cmdline','MPD_ECHO_PORT_FLAG')] = 1 485 argidx += 1 486 elif sys.argv[argidx] == '-d' or sys.argv[argidx] == '--daemon': 487 self.parmdb[('cmdline','MPD_DAEMON_FLAG')] = 1 488 argidx += 1 489 elif sys.argv[argidx] == '-b' or sys.argv[argidx] == '--bulletproof': 490 self.parmdb[('cmdline','MPD_BULLETPROOF_FLAG')] = 1 491 argidx += 1 492 elif sys.argv[argidx] == '-zc': 493 if argidx >= (len(sys.argv)-1): 494 print 'missing arg for -zc' 495 sys.exit(-1) 496 if not sys.argv[argidx+1].isdigit(): 497 print 'invalid arg for -zc %s ; must be numeric' % (sys.argv[argidx+1]) 498 sys.exit(-1) 499 intarg = int(sys.argv[argidx+1]) 500 if intarg < 1: 501 print 'invalid arg for -zc %s ; must be >= 1' % (sys.argv[argidx+1]) 502 sys.exit(-1) 503 self.parmdb[('cmdline','MPD_ZC')] = intarg 504 argidx += 2 505 else: 506 print 'unrecognized arg: %s' % (sys.argv[argidx]) 507 sys.exit(-1) 508 if (self.parmdb['MPD_ENTRY_IFHN'] and not self.parmdb['MPD_ENTRY_PORT']) \ 509 or (self.parmdb['MPD_ENTRY_PORT'] and not self.parmdb['MPD_ENTRY_IFHN']): 510 print 'host and port must be specified together' 511 sys.exit(-1) 512 def handle_console_connection(self,sock): 513 if not self.conSock: 514 (self.conSock,newConnAddr) = sock.accept() 515 if hasattr(socket,'AF_UNIX') and sock.family == socket.AF_UNIX: 516 line = self.conSock.recv_char_msg().rstrip() 517 if not line: # caller went away (perhaps another mpd seeing if I am here) 518 self.streamHandler.del_handler(self.conSock) 519 self.conSock.close() 520 self.conSock = 0 521 return 522 errorMsg = '' 523 try: 524 (kv1,kv2) = line.split(' ',1) # 'realusername=xxx secretword=yyy' 525 except: 526 errorMsg = 'failed to split this msg on " ": %s' % line 527 if not errorMsg: 528 try: 529 (k1,self.conSock.realUsername) = kv1.split('=',1) 530 except: 531 errorMsg = 'failed to split first kv pair on "=": %s' % line 532 if not errorMsg: 533 try: 534 (k2,secretword) = kv2.split('=',1) 535 except: 536 errorMsg = 'failed to split second kv pair on "=": %s' % line 537 if not errorMsg and k1 != 'realusername': 538 errorMsg = 'first key is not realusername' 539 if not errorMsg and k2 != 'secretword': 540 errorMsg = 'second key is not secretword' 541 if not errorMsg and os.getuid() == 0 and secretword != self.parmdb['MPD_SECRETWORD']: 542 errorMsg = 'invalid secretword to root mpd' 543 if errorMsg: 544 try: 545 self.conSock.send_dict_msg({'error_msg': errorMsg}) 546 except: 547 pass 548 self.streamHandler.del_handler(self.conSock) 549 self.conSock.close() 550 self.conSock = 0 551 return 552 self.conSock.beingChallenged = 0 553 else: 554 msg = self.conSock.recv_dict_msg() 555 if not msg: # caller went away (perhaps another mpd seeing if I am here) 556 self.streamHandler.del_handler(self.conSock) 557 self.conSock.close() 558 self.conSock = 0 559 return 560 if not msg.has_key('cmd') or msg['cmd'] != 'con_init': 561 mpd_print(1, 'console sent bad msg :%s:' % (msg) ) 562 try: # try to let console know 563 self.conSock.send_dict_msg({'cmd':'invalid_msg_received_from_you'}) 564 except: 565 pass 566 self.streamHandler.del_handler(self.conSock) 567 self.conSock.close() 568 self.conSock = 0 569 return 570 self.streamHandler.set_handler(self.conSock,self.handle_console_input) 571 self.conSock.beingChallenged = 1 572 self.conSock.name = 'console' 573 randNum = randrange(1,10000) 574 randVal = sock.secretword + str(randNum) 575 self.conSock.expectedResponse = md5new(randVal).digest() 576 self.conSock.send_dict_msg({'cmd' : 'con_challenge', 'randnum' : randNum }) 577 self.conSock.realUsername = mpd_get_my_username() 578 self.streamHandler.set_handler(self.conSock,self.handle_console_input) 579 self.conSock.name = 'console' 580 else: 581 return ## postpone it; hope the other one frees up soon 582 def handle_console_input(self,sock): 583 msg = self.conSock.recv_dict_msg() 584 if not msg: 585 mpd_print(0000, 'console has disappeared; closing it') 586 self.streamHandler.del_handler(self.conSock) 587 self.conSock.close() 588 self.conSock = 0 589 return 590 if not msg.has_key('cmd'): 591 mpd_print(1, 'console sent bad msg :%s:' % msg) 592 try: # try to let console know 593 self.conSock.send_dict_msg({ 'cmd':'invalid_msg_received_from_you' }) 594 except: 595 pass 596 self.streamHandler.del_handler(self.conSock) 597 self.conSock.close() 598 self.conSock = 0 599 return 600 if self.conSock.beingChallenged and msg['cmd'] != 'con_challenge_response': 601 mpd_print(1, 'console did not respond to con_challenge; msg=:%s:' % msg) 602 try: # try to let console know 603 self.conSock.send_dict_msg({ 'cmd':'expected_con_challenge_response' }) 604 except: 605 pass 606 self.streamHandler.del_handler(self.conSock) 607 self.conSock.close() 608 self.conSock = 0 609 return 610 if msg['cmd'] == 'con_challenge_response': 611 self.conSock.beingChallenged = 0 612 self.conSock.realUsername = msg['realusername'] 613 if not msg.has_key('response'): 614 try: # try to let console know 615 self.conSock.send_dict_msg({ 'cmd':'missing_response_in_msg' }) 616 except: 617 pass 618 self.streamHandler.del_handler(self.conSock) 619 self.conSock.close() 620 self.conSock = 0 621 return 622 elif msg['response'] != self.conSock.expectedResponse: 623 try: # try to let console know 624 self.conSock.send_dict_msg({ 'cmd':'invalid_response' }) 625 except: 626 pass 627 self.streamHandler.del_handler(self.conSock) 628 self.conSock.close() 629 self.conSock = 0 630 return 631 self.conSock.send_dict_msg({ 'cmd':'valid_response' }) 632 elif msg['cmd'] == 'mpdrun': 633 # permit anyone to run but use THEIR own username 634 # thus, override any username specified by the user 635 if self.conSock.realUsername != 'root': 636 msg['username'] = self.conSock.realUsername 637 msg['users'] = { (0,msg['nprocs']-1) : self.conSock.realUsername } 638 # 639 msg['mpdid_mpdrun_start'] = self.myId 640 msg['nstarted_on_this_loop'] = 0 641 msg['first_loop'] = 1 642 msg['ringsize'] = 0 643 msg['ring_ncpus'] = 0 644 # maps rank => hostname 645 msg['process_mapping'] = {} 646 if msg.has_key('try_1st_locally'): 647 self.do_mpdrun(msg) 648 else: 649 self.ring.rhsSock.send_dict_msg(msg) 650 # send ack after job is going 651 elif msg['cmd'] == 'get_mpdrun_values': 652 msgToSend = { 'cmd' : 'response_get_mpdrun_values', 653 'mpd_version' : mpd_version(), 654 'mpd_ifhn' : self.myIfhn } 655 self.conSock.send_dict_msg(msgToSend) 656 elif msg['cmd'] == 'mpdtrace': 657 msgToSend = { 'cmd' : 'mpdtrace_info', 658 'dest' : self.myId, 659 'id' : self.myId, 660 'ifhn' : self.myIfhn, 661 'lhsport' : '%s' % (self.ring.lhsPort), 662 'lhsifhn' : '%s' % (self.ring.lhsIfhn), 663 'rhsport' : '%s' % (self.ring.rhsPort), 664 'rhsifhn' : '%s' % (self.ring.rhsIfhn) } 665 self.ring.rhsSock.send_dict_msg(msgToSend) 666 msgToSend = { 'cmd' : 'mpdtrace_trailer', 'dest' : self.myId } 667 self.ring.rhsSock.send_dict_msg(msgToSend) 668 # do not send an ack to console now; will send trace info later 669 elif msg['cmd'] == 'mpdallexit': 670 if self.conSock.realUsername != self.myRealUsername: 671 msgToSend = { 'cmd':'invalid_username_to_make_this_request' } 672 self.conSock.send_dict_msg(msgToSend) 673 self.streamHandler.del_handler(self.conSock) 674 self.conSock.close() 675 self.conSock = 0 676 return 677 # self.allExiting = 1 # doesn't really help here 678 self.ring.rhsSock.send_dict_msg( {'cmd' : 'mpdallexit', 'src' : self.myId} ) 679 self.conSock.send_dict_msg( {'cmd' : 'mpdallexit_ack'} ) 680 elif msg['cmd'] == 'mpdexit': 681 if self.conSock.realUsername != self.myRealUsername: 682 msgToSend = { 'cmd':'invalid_username_to_make_this_request' } 683 self.conSock.send_dict_msg(msgToSend) 684 self.streamHandler.del_handler(self.conSock) 685 self.conSock.close() 686 self.conSock = 0 687 return 688 if msg['mpdid'] == 'localmpd': 689 msg['mpdid'] = self.myId 690 self.ring.rhsSock.send_dict_msg( {'cmd' : 'mpdexit', 'src' : self.myId, 691 'done' : 0, 'dest' : msg['mpdid']} ) 692 elif msg['cmd'] == 'mpdringtest': 693 msg['src'] = self.myId 694 self.ring.rhsSock.send_dict_msg(msg) 695 # do not send an ack to console now; will send ringtest info later 696 elif msg['cmd'] == 'mpdlistjobs': 697 msgToSend = { 'cmd' : 'local_mpdid', 'id' : self.myId } 698 self.conSock.send_dict_msg(msgToSend) 699 for jobid in self.activeJobs.keys(): 700 for manPid in self.activeJobs[jobid]: 701 msgToSend = { 'cmd' : 'mpdlistjobs_info', 702 'dest' : self.myId, 703 'jobid' : jobid, 704 'username' : self.activeJobs[jobid][manPid]['username'], 705 'host' : self.myHost, 706 'ifhn' : self.myIfhn, 707 'clipid' : str(self.activeJobs[jobid][manPid]['clipid']), 708 'sid' : str(manPid), # may chg to actual sid later 709 'pgm' : self.activeJobs[jobid][manPid]['pgm'], 710 'rank' : self.activeJobs[jobid][manPid]['rank'] } 711 self.conSock.send_dict_msg(msgToSend) 712 msgToSend = { 'cmd' : 'mpdlistjobs_trailer', 'dest' : self.myId } 713 self.ring.rhsSock.send_dict_msg(msgToSend) 714 # do not send an ack to console now; will send listjobs info later 715 elif msg['cmd'] == 'mpdkilljob': 716 # permit anyone to kill but use THEIR own username 717 # thus, override any username specified by the user 718 if self.conSock.realUsername != 'root': 719 msg['username'] = self.conSock.realUsername 720 msg['src'] = self.myId 721 msg['handled'] = 0 722 if msg['mpdid'] == '': 723 msg['mpdid'] = self.myId 724 self.ring.rhsSock.send_dict_msg(msg) 725 # send ack to console after I get this msg back and do the kill myself 726 elif msg['cmd'] == 'mpdsigjob': 727 # permit anyone to sig but use THEIR own username 728 # thus, override any username specified by the user 729 if self.conSock.realUsername != 'root': 730 msg['username'] = self.conSock.realUsername 731 msg['src'] = self.myId 732 msg['handled'] = 0 733 if msg['mpdid'] == '': 734 msg['mpdid'] = self.myId 735 self.ring.rhsSock.send_dict_msg(msg) 736 # send ack to console after I get this msg back 737 elif msg['cmd'] == 'verify_hosts_in_ring': 738 msgToSend = { 'cmd' : 'verify_hosts_in_ring', 'dest' : self.myId, 739 'host_list' : msg['host_list'] } 740 self.ring.rhsSock.send_dict_msg(msgToSend) 741 # do not send an ack to console now; will send trace info later 742 else: 743 msgToSend = { 'cmd' : 'invalid_msg_received_from_you' } 744 self.conSock.send_dict_msg(msgToSend) 745 badMsg = 'invalid msg received from console: %s' % (str(msg)) 746 mpd_print(1, badMsg) 747 if syslog_module_available: 748 syslog.syslog(syslog.LOG_ERR,badMsg) 749 def handle_man_input(self,sock): 750 msg = sock.recv_dict_msg() 751 if not msg: 752 for jobid in self.activeJobs.keys(): 753 deleted = 0 754 for manPid in self.activeJobs[jobid]: 755 if sock == self.activeJobs[jobid][manPid]['socktoman']: 756 mpd_print(mpd_dbg_level,\ 757 "Deleting %s %d" % (str(jobid),manPid)) 758 del self.activeJobs[jobid][manPid] 759 if len(self.activeJobs[jobid]) == 0: 760 del self.activeJobs[jobid] 761 deleted = 1 762 break 763 if deleted: 764 break 765 self.streamHandler.del_handler(sock) 766 sock.close() 767 return 768 if not msg.has_key('cmd'): 769 mpd_print(1, 'INVALID msg for man request msg=:%s:' % (msg) ) 770 msgToSend = { 'cmd' : 'invalid_msg' } 771 sock.send_dict_msg(msgToSend) 772 self.streamHandler.del_handler(sock) 773 sock.close() 774 return 775 # Who asks, and why? 776 # We have a failure that deletes the spawnerManPid from the 777 # activeJobs[jobid] variable. The temporary work-around is 778 # to ignore this request if the target process is no longer 779 # in the activeJobs table. 780 if msg['cmd'] == 'client_info': 781 jobid = msg['jobid'] 782 manPid = msg['manpid'] 783 self.activeJobs[jobid][manPid]['clipid'] = msg['clipid'] 784 if msg['spawner_manpid'] and msg['rank'] == 0: 785 if msg['spawner_mpd'] == self.myId: 786 spawnerManPid = msg['spawner_manpid'] 787 mpd_print(mpd_dbg_level,\ 788 "About to check %s:%s" % (str(jobid),str(spawnerManPid))) 789 790 if not self.activeJobs[jobid].has_key(spawnerManPid): 791 mpd_print(0,"Missing %d in %s" % (spawnerManPid,str(jobid))) 792 elif not self.activeJobs[jobid][spawnerManPid].has_key('socktoman'): 793 mpd_print(0,"Missing socktoman!") 794 else: 795 spawnerManSock = self.activeJobs[jobid][spawnerManPid]['socktoman'] 796 msgToSend = { 'cmd' : 'spawn_done_by_mpd', 'rc' : 0, 'reason' : '' } 797 spawnerManSock.send_dict_msg(msgToSend) 798 else: 799 self.ring.rhsSock.send_dict_msg(msg) 800 elif msg['cmd'] == 'spawn': 801 msg['mpdid_mpdrun_start'] = self.myId 802 msg['spawner_mpd'] = self.myId 803 msg['nstarted_on_this_loop'] = 0 804 msg['first_loop'] = 1 805 msg['jobalias'] = '' 806 msg['stdin_dest'] = '0' 807 msg['ringsize'] = 0 808 msg['ring_ncpus'] = 0 809 msg['gdb'] = 0 810 msg['gdba'] = '' 811 msg['totalview'] = 0 812 msg['ifhns'] = {} 813 # maps rank => hostname 814 msg['process_mapping'] = {} 815 self.spawnQ.append(msg) 816 elif msg['cmd'] == 'publish_name': 817 self.pmi_published_names[msg['service']] = msg['port'] 818 msgToSend = { 'cmd' : 'publish_result', 'info' : 'ok' } 819 sock.send_dict_msg(msgToSend) 820 elif msg['cmd'] == 'lookup_name': 821 if self.pmi_published_names.has_key(msg['service']): 822 msgToSend = { 'cmd' : 'lookup_result', 'info' : 'ok', 823 'port' : self.pmi_published_names[msg['service']] } 824 sock.send_dict_msg(msgToSend) 825 else: 826 msg['cmd'] = 'pmi_lookup_name' # add pmi_ 827 msg['src'] = self.myId 828 msg['port'] = 0 # invalid 829 self.ring.rhsSock.send_dict_msg(msg) 830 elif msg['cmd'] == 'unpublish_name': 831 if self.pmi_published_names.has_key(msg['service']): 832 del self.pmi_published_names[msg['service']] 833 msgToSend = { 'cmd' : 'unpublish_result', 'info' : 'ok' } 834 sock.send_dict_msg(msgToSend) 835 else: 836 msg['cmd'] = 'pmi_unpublish_name' # add pmi_ 837 msg['src'] = self.myId 838 self.ring.rhsSock.send_dict_msg(msg) 839 else: 840 mpd_print(1, 'INVALID request from man msg=:%s:' % (msg) ) 841 msgToSend = { 'cmd' : 'invalid_request' } 842 sock.send_dict_msg(msgToSend) 843 844 def calculate_process_mapping(self,mapping_dict): 845 # mapping_dict maps ranks => hostnames 846 ranks = list(mapping_dict.keys()) 847 ranks.sort() 848 849 # assign node ids based in first-come-first-serve order when iterating 850 # over the ranks in increasing order 851 next_id = 0 852 node_ids = {} 853 for rank in ranks: 854 host = mapping_dict[rank] 855 if not node_ids.has_key(host): 856 node_ids[host] = next_id 857 next_id += 1 858 859 860 # maps {node_id_A: set([rankX,rankY,...]), node_id_B:...} 861 node_to_ranks = {} 862 for rank in ranks: 863 node_id = node_ids[mapping_dict[rank]] 864 if not node_to_ranks.has_key(node_id): 865 node_to_ranks[node_id] = set([]) 866 node_to_ranks[node_id].add(rank) 867 868 # we only handle two cases for now: 869 # 1. block regular 870 # 2. round-robin regular 871 # we do handle "remainder nodes" that might not be full 872 delta = -1 873 max_ranks_per_node = 0 874 for node_id in node_to_ranks.keys(): 875 last_rank = -1 876 if len(node_to_ranks[node_id]) > max_ranks_per_node: 877 max_ranks_per_node = len(node_to_ranks[node_id]) 878 ranks = list(node_to_ranks[node_id]) 879 ranks.sort() 880 for rank in ranks: 881 if last_rank != -1: 882 if delta == -1: 883 if node_id == 0: 884 delta = rank - last_rank 885 else: 886 # irregular case detected such as {0:A,1:B,2:B} 887 mpd_print(1, "irregular case A detected") 888 return '' 889 elif (rank - last_rank) != delta: 890 # irregular such as {0:A,1:B,2:A,3:A,4:B} 891 mpd_print(1, "irregular case B detected") 892 return '' 893 last_rank = rank 894 895 # another check (case caught in ticket #905) for layouts like {0:A,1:A,2:B,3:B,4:B} 896 if len(node_to_ranks.keys()) > 1: 897 first_size = len(node_to_ranks[0]) 898 last_size = len(node_to_ranks[len(node_to_ranks.keys())-1]) 899 if (last_size > first_size): 900 mpd_print(1, "irregular case C1 detected") 901 return '' 902 in_remainder = False 903 node_ids = node_to_ranks.keys() 904 node_ids.sort() 905 for node_id in node_ids: 906 node_size = len(node_to_ranks[node_id]) 907 if not in_remainder: 908 if node_size == first_size: 909 pass # OK 910 elif node_size == last_size: 911 in_remainder = True 912 else: 913 mpd_print(1, "irregular case C2 detected") 914 return '' 915 else: # in_remainder 916 if node_size != last_size: 917 mpd_print(1, "irregular case C3 detected") 918 return '' 919 920 num_nodes = len(node_to_ranks.keys()) 921 if delta == 1: 922 return '(vector,(%d,%d,%d))' % (0,num_nodes,max_ranks_per_node) 923 else: 924 # either we are round-robin-regular (delta > 1) or there is only one 925 # process per node (delta == -1), either way results in the same 926 # mapping spec 927 return '(vector,(%d,%d,%d))' % (0,num_nodes,1) 928 929 def handle_lhs_input(self,sock): 930 msg = self.ring.lhsSock.recv_dict_msg() 931 if not msg: # lost lhs; don't worry 932 mpd_print(0, "CLOSING self.ring.lhsSock ", self.ring.lhsSock ) 933 self.streamHandler.del_handler(self.ring.lhsSock) 934 self.ring.lhsSock.close() 935 self.ring.lhsSock = 0 936 return 937 if msg['cmd'] == 'mpdrun' or msg['cmd'] == 'spawn': 938 if msg.has_key('mpdid_mpdrun_start') \ 939 and msg['mpdid_mpdrun_start'] == self.myId: 940 if msg['first_loop']: 941 self.currRingSize = msg['ringsize'] 942 self.currRingNCPUs = msg['ring_ncpus'] 943 if msg['nstarted'] == msg['nprocs']: 944 # we have started all processes in the job, tell the 945 # requester this and stop forwarding the mpdrun/spawn 946 # message around the loop 947 if msg['cmd'] == 'spawn': 948 self.spawnInProgress = 0 949 if self.conSock: 950 msgToSend = { 'cmd' : 'mpdrun_ack', 951 'ringsize' : self.currRingSize, 952 'ring_ncpus' : self.currRingNCPUs} 953 self.conSock.send_dict_msg(msgToSend) 954 # Tell all MPDs in the ring the final process mapping. In 955 # turn, they will inform all of their child mpdmans. 956 # Only do this in the case of a regular mpdrun. The spawn 957 # case it too complicated to handle this way right now. 958 if msg['cmd'] == 'mpdrun': 959 process_mapping_str = self.calculate_process_mapping(msg['process_mapping']) 960 msgToSend = { 'cmd' : 'process_mapping', 961 'jobid' : msg['jobid'], 962 'mpdid_mpdrun_start' : self.myId, 963 'process_mapping' : process_mapping_str } 964 self.ring.rhsSock.send_dict_msg(msgToSend) 965 return 966 if not msg['first_loop'] and msg['nstarted_on_this_loop'] == 0: 967 if msg.has_key('jobid'): 968 if msg['cmd'] == 'mpdrun': 969 msgToSend = { 'cmd' : 'abortjob', 'src' : self.myId, 970 'jobid' : msg['jobid'], 971 'reason' : 'some_procs_not_started' } 972 self.ring.rhsSock.send_dict_msg(msgToSend) 973 else: # spawn 974 msgToSend = { 'cmd' : 'startup_status', 'rc' : -1, 975 'reason' : 'some_procs_not_started' } 976 jobid = msg['jobid'] 977 manPid = msg['spawner_manpid'] 978 manSock = self.activeJobs[jobid][manPid]['socktoman'] 979 manSock.send_dict_msg(msgToSend) 980 if self.conSock: 981 msgToSend = { 'cmd' : 'job_failed', 982 'reason' : 'some_procs_not_started', 983 'remaining_hosts' : msg['hosts'] } 984 self.conSock.send_dict_msg(msgToSend) 985 return 986 msg['first_loop'] = 0 987 msg['nstarted_on_this_loop'] = 0 988 self.do_mpdrun(msg) 989 elif msg['cmd'] == 'process_mapping': 990 # message transmission terminates once the message has made it all 991 # the way around the loop once 992 if msg['mpdid_mpdrun_start'] != self.myId: 993 self.ring.rhsSock.send_dict_msg(msg) # forward it on around 994 995 # send to all mpdman's for the jobid embedded in the msg 996 jobid = msg['jobid'] 997 998 # there may be no entry for jobid in the activeJobs table if there 999 # weren't any processes from that job actually launched on our host 1000 if self.activeJobs.has_key(jobid): 1001 for manPid in self.activeJobs[jobid].keys(): 1002 manSock = self.activeJobs[jobid][manPid]['socktoman'] 1003 manSock.send_dict_msg(msg) 1004 elif msg['cmd'] == 'mpdtrace_info': 1005 if msg['dest'] == self.myId: 1006 if self.conSock: 1007 self.conSock.send_dict_msg(msg) 1008 else: 1009 self.ring.rhsSock.send_dict_msg(msg) 1010 elif msg['cmd'] == 'mpdtrace_trailer': 1011 if msg['dest'] == self.myId: 1012 if self.conSock: 1013 self.conSock.send_dict_msg(msg) 1014 else: 1015 msgToSend = { 'cmd' : 'mpdtrace_info', 1016 'dest' : msg['dest'], 1017 'id' : self.myId, 1018 'ifhn' : self.myIfhn, 1019 'lhsport' : '%s' % (self.ring.lhsPort), 1020 'lhsifhn' : '%s' % (self.ring.lhsIfhn), 1021 'rhsport' : '%s' % (self.ring.rhsPort), 1022 'rhsifhn' : '%s' % (self.ring.rhsIfhn) } 1023 self.ring.rhsSock.send_dict_msg(msgToSend) 1024 self.ring.rhsSock.send_dict_msg(msg) 1025 elif msg['cmd'] == 'mpdlistjobs_info': 1026 if msg['dest'] == self.myId: 1027 if self.conSock: 1028 self.conSock.send_dict_msg(msg) 1029 else: 1030 self.ring.rhsSock.send_dict_msg(msg) 1031 elif msg['cmd'] == 'mpdlistjobs_trailer': 1032 if msg['dest'] == self.myId: 1033 if self.conSock: 1034 self.conSock.send_dict_msg(msg) 1035 else: 1036 for jobid in self.activeJobs.keys(): 1037 for manPid in self.activeJobs[jobid]: 1038 msgToSend = { 'cmd' : 'mpdlistjobs_info', 1039 'dest' : msg['dest'], 1040 'jobid' : jobid, 1041 'username' : self.activeJobs[jobid][manPid]['username'], 1042 'host' : self.myHost, 1043 'ifhn' : self.myIfhn, 1044 'clipid' : str(self.activeJobs[jobid][manPid]['clipid']), 1045 'sid' : str(manPid), # may chg to actual sid later 1046 'pgm' : self.activeJobs[jobid][manPid]['pgm'], 1047 'rank' : self.activeJobs[jobid][manPid]['rank'] } 1048 self.ring.rhsSock.send_dict_msg(msgToSend) 1049 self.ring.rhsSock.send_dict_msg(msg) 1050 elif msg['cmd'] == 'mpdallexit': 1051 if self.allExiting: # already seen this once 1052 self.exiting = 1 # set flag to exit main loop 1053 self.allExiting = 1 1054 self.ring.rhsSock.send_dict_msg(msg) 1055 elif msg['cmd'] == 'mpdexit': 1056 if msg['dest'] == self.myId: 1057 msg['done'] = 1 # do this first 1058 if msg['src'] == self.myId: # may be src and dest 1059 if self.conSock: 1060 if msg['done']: 1061 self.conSock.send_dict_msg({'cmd' : 'mpdexit_ack'}) 1062 else: 1063 self.conSock.send_dict_msg({'cmd' : 'mpdexit_failed'}) 1064 else: 1065 self.ring.rhsSock.send_dict_msg(msg) 1066 if msg['dest'] == self.myId: 1067 self.exiting = 1 1068 self.ring.lhsSock.send_dict_msg( { 'cmd' : 'mpdexiting', 1069 'rhsifhn' : self.ring.rhsIfhn, 1070 'rhsport' : self.ring.rhsPort }) 1071 elif msg['cmd'] == 'mpdringtest': 1072 if msg['src'] != self.myId: 1073 self.ring.rhsSock.send_dict_msg(msg) 1074 else: 1075 numLoops = msg['numloops'] - 1 1076 if numLoops > 0: 1077 msg['numloops'] = numLoops 1078 self.ring.rhsSock.send_dict_msg(msg) 1079 else: 1080 if self.conSock: # may have closed it if user did ^C at console 1081 self.conSock.send_dict_msg({'cmd' : 'mpdringtest_done' }) 1082 elif msg['cmd'] == 'mpdsigjob': 1083 forwarded = 0 1084 if msg['handled'] and msg['src'] != self.myId: 1085 self.ring.rhsSock.send_dict_msg(msg) 1086 forwarded = 1 1087 handledHere = 0 1088 for jobid in self.activeJobs.keys(): 1089 sjobid = jobid.split(' ') # jobnum and mpdid 1090 if (sjobid[0] == msg['jobnum'] and sjobid[1] == msg['mpdid']) \ 1091 or (msg['jobalias'] and sjobid[2] == msg['jobalias']): 1092 for manPid in self.activeJobs[jobid].keys(): 1093 if self.activeJobs[jobid][manPid]['username'] == msg['username'] \ 1094 or msg['username'] == 'root': 1095 manSock = self.activeJobs[jobid][manPid]['socktoman'] 1096 manSock.send_dict_msg( { 'cmd' : 'signal_to_handle', 1097 's_or_g' : msg['s_or_g'], 1098 'sigtype' : msg['sigtype'] } ) 1099 handledHere = 1 1100 if handledHere: 1101 msg['handled'] = 1 1102 if not forwarded and msg['src'] != self.myId: 1103 self.ring.rhsSock.send_dict_msg(msg) 1104 if msg['src'] == self.myId: 1105 if self.conSock: 1106 self.conSock.send_dict_msg( {'cmd' : 'mpdsigjob_ack', 1107 'handled' : msg['handled'] } ) 1108 elif msg['cmd'] == 'mpdkilljob': 1109 forwarded = 0 1110 if msg['handled'] and msg['src'] != self.myId: 1111 self.ring.rhsSock.send_dict_msg(msg) 1112 forwarded = 1 1113 handledHere = 0 1114 for jobid in self.activeJobs.keys(): 1115 sjobid = jobid.split(' ') # jobnum and mpdid 1116 if (sjobid[0] == msg['jobnum'] and sjobid[1] == msg['mpdid']) \ 1117 or (msg['jobalias'] and sjobid[2] == msg['jobalias']): 1118 for manPid in self.activeJobs[jobid].keys(): 1119 if self.activeJobs[jobid][manPid]['username'] == msg['username'] \ 1120 or msg['username'] == 'root': 1121 try: 1122 pgrp = manPid * (-1) # neg manPid -> group 1123 os.kill(pgrp,signal.SIGKILL) 1124 cliPid = self.activeJobs[jobid][manPid]['clipid'] 1125 pgrp = cliPid * (-1) # neg Pid -> group 1126 os.kill(pgrp,signal.SIGKILL) # neg Pid -> group 1127 handledHere = 1 1128 except: 1129 pass 1130 # del self.activeJobs[jobid] ## handled when child goes away 1131 if handledHere: 1132 msg['handled'] = 1 1133 if not forwarded and msg['src'] != self.myId: 1134 self.ring.rhsSock.send_dict_msg(msg) 1135 if msg['src'] == self.myId: 1136 if self.conSock: 1137 self.conSock.send_dict_msg( {'cmd' : 'mpdkilljob_ack', 1138 'handled' : msg['handled'] } ) 1139 elif msg['cmd'] == 'abortjob': 1140 if msg['src'] != self.myId: 1141 self.ring.rhsSock.send_dict_msg(msg) 1142 for jobid in self.activeJobs.keys(): 1143 if jobid == msg['jobid']: 1144 for manPid in self.activeJobs[jobid].keys(): 1145 manSocket = self.activeJobs[jobid][manPid]['socktoman'] 1146 if manSocket: 1147 manSocket.send_dict_msg(msg) 1148 sleep(0.5) # give man a brief chance to deal with this 1149 try: 1150 pgrp = manPid * (-1) # neg manPid -> group 1151 os.kill(pgrp,signal.SIGKILL) 1152 cliPid = self.activeJobs[jobid][manPid]['clipid'] 1153 pgrp = cliPid * (-1) # neg Pid -> group 1154 os.kill(pgrp,signal.SIGKILL) # neg Pid -> group 1155 except: 1156 pass 1157 # del self.activeJobs[jobid] ## handled when child goes away 1158 elif msg['cmd'] == 'pulse': 1159 self.ring.lhsSock.send_dict_msg({'cmd':'pulse_ack'}) 1160 elif msg['cmd'] == 'verify_hosts_in_ring': 1161 while self.myIfhn in msg['host_list'] or self.myHost in msg['host_list']: 1162 if self.myIfhn in msg['host_list']: 1163 msg['host_list'].remove(self.myIfhn) 1164 elif self.myHost in msg['host_list']: 1165 msg['host_list'].remove(self.myHost) 1166 if msg['dest'] == self.myId: 1167 msgToSend = { 'cmd' : 'verify_hosts_in_ring_response', 1168 'host_list' : msg['host_list'] } 1169 self.conSock.send_dict_msg(msgToSend) 1170 else: 1171 self.ring.rhsSock.send_dict_msg(msg) 1172 elif msg['cmd'] == 'pmi_lookup_name': 1173 if msg['src'] == self.myId: 1174 if msg.has_key('port') and msg['port'] != 0: 1175 msgToSend = msg 1176 msgToSend['cmd'] = 'lookup_result' 1177 msgToSend['info'] = 'ok' 1178 else: 1179 msgToSend = { 'cmd' : 'lookup_result', 'info' : 'unknown_service', 1180 'port' : 0} 1181 jobid = msg['jobid'] 1182 manPid = msg['manpid'] 1183 manSock = self.activeJobs[jobid][manPid]['socktoman'] 1184 manSock.send_dict_msg(msgToSend) 1185 else: 1186 if self.pmi_published_names.has_key(msg['service']): 1187 msg['port'] = self.pmi_published_names[msg['service']] 1188 self.ring.rhsSock.send_dict_msg(msg) 1189 elif msg['cmd'] == 'pmi_unpublish_name': 1190 if msg['src'] == self.myId: 1191 if msg.has_key('done'): 1192 msgToSend = msg 1193 msgToSend['cmd'] = 'unpublish_result' 1194 msgToSend['info'] = 'ok' 1195 else: 1196 msgToSend = { 'cmd' : 'unpublish_result', 'info' : 'unknown_service' } 1197 jobid = msg['jobid'] 1198 manPid = msg['manpid'] 1199 manSock = self.activeJobs[jobid][manPid]['socktoman'] 1200 manSock.send_dict_msg(msgToSend) 1201 else: 1202 if self.pmi_published_names.has_key(msg['service']): 1203 del self.pmi_published_names[msg['service']] 1204 msg['done'] = 1 1205 self.ring.rhsSock.send_dict_msg(msg) 1206 elif msg['cmd'] == 'client_info': 1207 if msg['spawner_manpid'] and msg['rank'] == 0: 1208 if msg['spawner_mpd'] == self.myId: 1209 jobid = msg['jobid'] 1210 spawnerManPid = msg['spawner_manpid'] 1211 if self.activeJobs[jobid].has_key(spawnerManPid): 1212 spawnerManSock = self.activeJobs[jobid][spawnerManPid]['socktoman'] 1213 msgToSend = { 'cmd' : 'spawn_done_by_mpd', 'rc' : 0, 'reason' : '' } 1214 spawnerManSock.send_dict_msg(msgToSend) 1215 else: 1216 self.ring.rhsSock.send_dict_msg(msg) 1217 else: 1218 mpd_print(1, 'unrecognized cmd from lhs: %s' % (msg) ) 1219 1220 def handle_rhs_input(self,sock): 1221 if self.allExiting: 1222 return 1223 msg = sock.recv_dict_msg() 1224 if not msg: # lost rhs; re-knit the ring 1225 if sock == self.ring.rhsSock: 1226 needToReenter = 1 1227 else: 1228 needToReenter = 0 1229 if sock == self.ring.rhsSock and self.ring.lhsSock: 1230 self.streamHandler.del_handler(self.ring.lhsSock) 1231 self.ring.lhsSock.close() 1232 self.ring.lhsSock = 0 1233 if sock == self.ring.rhsSock and self.ring.rhsSock: 1234 self.streamHandler.del_handler(self.ring.rhsSock) 1235 self.ring.rhsSock.close() 1236 self.ring.rhsSock = 0 1237 if needToReenter: 1238 mpd_print(1,'lost rhs; re-entering ring') 1239 rc = self.ring.reenter_ring(lhsHandler=self.handle_lhs_input, 1240 rhsHandler=self.handle_rhs_input, 1241 ntries=16) 1242 if rc == 0: 1243 mpd_print(1,'back in ring') 1244 else: 1245 mpd_print(1,'failed to reenter ring') 1246 sys.exit(-1) 1247 return 1248 if msg['cmd'] == 'pulse_ack': 1249 self.pulse_cntr = 0 1250 elif msg['cmd'] == 'mpdexiting': # for mpdexit 1251 if self.ring.rhsSock: 1252 self.streamHandler.del_handler(self.ring.rhsSock) 1253 self.ring.rhsSock.close() 1254 self.ring.rhsSock = 0 1255 # connect to new rhs 1256 self.ring.rhsIfhn = msg['rhsifhn'] 1257 self.ring.rhsPort = int(msg['rhsport']) 1258 if self.ring.rhsIfhn == self.myIfhn and self.ring.rhsPort == self.parmdb['MPD_LISTEN_PORT']: 1259 rv = self.ring.connect_rhs(rhsHost=self.ring.rhsIfhn, 1260 rhsPort=self.ring.rhsPort, 1261 rhsHandler=self.handle_rhs_input, 1262 numTries=3) 1263 if rv[0] <= 0: # connect did not succeed; may try again 1264 mpd_print(1,"rhs connect failed") 1265 sys.exit(-1) 1266 return 1267 self.ring.rhsSock = MPDSock(name='rhs') 1268 self.ring.rhsSock.connect((self.ring.rhsIfhn,self.ring.rhsPort)) 1269 self.pulse_cntr = 0 1270 if not self.ring.rhsSock: 1271 mpd_print(1,'handle_rhs_input failed to obtain rhs socket') 1272 return 1273 msgToSend = { 'cmd' : 'request_to_enter_as_lhs', 'host' : self.myHost, 1274 'ifhn' : self.myIfhn, 'port' : self.parmdb['MPD_LISTEN_PORT'] } 1275 self.ring.rhsSock.send_dict_msg(msgToSend) 1276 msg = self.ring.rhsSock.recv_dict_msg() 1277 if (not msg) or \ 1278 (not msg.has_key('cmd')) or \ 1279 (msg['cmd'] != 'challenge') or (not msg.has_key('randnum')): 1280 mpd_print(1, 'failed to recv challenge from rhs; msg=:%s:' % (msg) ) 1281 response = md5new(''.join([self.parmdb['MPD_SECRETWORD'], 1282 msg['randnum']])).digest() 1283 msgToSend = { 'cmd' : 'challenge_response', 1284 'response' : response, 1285 'host' : self.myHost, 'ifhn' : self.myIfhn, 1286 'port' : self.parmdb['MPD_LISTEN_PORT'] } 1287 self.ring.rhsSock.send_dict_msg(msgToSend) 1288 msg = self.ring.rhsSock.recv_dict_msg() 1289 if (not msg) or \ 1290 (not msg.has_key('cmd')) or \ 1291 (msg['cmd'] != 'OK_to_enter_as_lhs'): 1292 mpd_print(1, 'NOT OK to enter ring; msg=:%s:' % (msg) ) 1293 self.streamHandler.set_handler(self.ring.rhsSock,self.handle_rhs_input) 1294 else: 1295 mpd_print(1, 'unexpected from rhs; msg=:%s:' % (msg) ) 1296 return 1297 1298 def do_mpdrun(self,msg): 1299 if self.parmdb['MPD_LOGFILE_TRUNC_SZ'] >= 0: 1300 try: 1301 logSize = os.stat(self.logFilename)[stat.ST_SIZE] 1302 if logSize > self.parmdb['MPD_LOGFILE_TRUNC_SZ']: 1303 self.logFile.truncate(self.parmdb['MPD_LOGFILE_TRUNC_SZ']) 1304 except: 1305 pass 1306 1307 if msg.has_key('jobid'): 1308 jobid = msg['jobid'] 1309 else: 1310 jobid = str(self.nextJobInt) + ' ' + self.myId + ' ' + msg['jobalias'] 1311 self.nextJobInt += 1 1312 msg['jobid'] = jobid 1313 if msg['nstarted'] >= msg['nprocs']: 1314 self.ring.rhsSock.send_dict_msg(msg) # forward it on around 1315 return 1316 hosts = msg['hosts'] 1317 if self.myIfhn in hosts.values(): 1318 hostsKeys = hosts.keys() 1319 hostsKeys.sort() 1320 for ranks in hostsKeys: 1321 if hosts[ranks] == self.myIfhn: 1322 (lorank,hirank) = ranks 1323 for rank in range(lorank,hirank+1): 1324 self.run_one_cli(rank,msg) 1325 # we use myHost under the assumption that there is only 1326 # one mpd per user on a given host. The ifhn only 1327 # affects how the MPDs communicate with each other, not 1328 # which host they are on 1329 msg['process_mapping'][rank] = self.myHost 1330 msg['nstarted'] += 1 1331 msg['nstarted_on_this_loop'] += 1 1332 del msg['hosts'][ranks] 1333 elif '_any_from_pool_' in hosts.values(): 1334 hostsKeys = hosts.keys() 1335 hostsKeys.sort() 1336 for ranks in hostsKeys: 1337 if hosts[ranks] == '_any_from_pool_': 1338 (lorank,hirank) = ranks 1339 hostSpecPool = msg['host_spec_pool'] 1340 if self.myIfhn in hostSpecPool or self.myHost in hostSpecPool: 1341 self.run_one_cli(lorank,msg) 1342 msg['process_mapping'][lorank] = self.myHost 1343 msg['nstarted'] += 1 1344 msg['nstarted_on_this_loop'] += 1 1345 del msg['hosts'][ranks] 1346 if lorank < hirank: 1347 msg['hosts'][(lorank+1,hirank)] = '_any_from_pool_' 1348 break 1349 elif '_any_' in hosts.values(): 1350 done = 0 1351 while not done: 1352 hostsKeys = hosts.keys() 1353 hostsKeys.sort() 1354 for ranks in hostsKeys: 1355 if hosts[ranks] == '_any_': 1356 (lorank,hirank) = ranks 1357 self.run_one_cli(lorank,msg) 1358 msg['process_mapping'][lorank] = self.myHost 1359 msg['nstarted'] += 1 1360 msg['nstarted_on_this_loop'] += 1 1361 del msg['hosts'][ranks] 1362 if lorank < hirank: 1363 msg['hosts'][(lorank+1,hirank)] = '_any_' 1364 # self.activeJobs maps: 1365 # { jobid => { mpdman_pid => {...} } } 1366 procsHereForJob = len(self.activeJobs[jobid].keys()) 1367 if procsHereForJob >= self.parmdb['MPD_NCPUS']: 1368 break # out of for loop 1369 # if no more to start via any or enough started here 1370 if '_any_' not in hosts.values() \ 1371 or procsHereForJob >= self.parmdb['MPD_NCPUS']: 1372 done = 1 1373 if msg['first_loop']: 1374 msg['ringsize'] += 1 1375 msg['ring_ncpus'] += self.parmdb['MPD_NCPUS'] 1376 self.ring.rhsSock.send_dict_msg(msg) # forward it on around 1377 def run_one_cli(self,currRank,msg): 1378 users = msg['users'] 1379 for ranks in users.keys(): 1380 (lo,hi) = ranks 1381 if currRank >= lo and currRank <= hi: 1382 username = users[ranks] 1383 break 1384 execs = msg['execs'] 1385 for ranks in execs.keys(): 1386 (lo,hi) = ranks 1387 if currRank >= lo and currRank <= hi: 1388 pgm = execs[ranks] 1389 break 1390 paths = msg['paths'] 1391 for ranks in paths.keys(): 1392 (lo,hi) = ranks 1393 if currRank >= lo and currRank <= hi: 1394 pathForExec = paths[ranks] 1395 break 1396 args = msg['args'] 1397 for ranks in args.keys(): 1398 (lo,hi) = ranks 1399 if currRank >= lo and currRank <= hi: 1400 pgmArgs = dumps(args[ranks]) 1401 break 1402 envvars = msg['envvars'] 1403 for ranks in envvars.keys(): 1404 (lo,hi) = ranks 1405 if currRank >= lo and currRank <= hi: 1406 pgmEnvVars = dumps(envvars[ranks]) 1407 break 1408 limits = msg['limits'] 1409 for ranks in limits.keys(): 1410 (lo,hi) = ranks 1411 if currRank >= lo and currRank <= hi: 1412 pgmLimits = dumps(limits[ranks]) 1413 break 1414 cwds = msg['cwds'] 1415 for ranks in cwds.keys(): 1416 (lo,hi) = ranks 1417 if currRank >= lo and currRank <= hi: 1418 cwd = cwds[ranks] 1419 break 1420 umasks = msg['umasks'] 1421 for ranks in umasks.keys(): 1422 (lo,hi) = ranks 1423 if currRank >= lo and currRank <= hi: 1424 pgmUmask = umasks[ranks] 1425 break 1426 man_env = {} 1427 if msg['ifhns'].has_key(currRank): 1428 man_env['MPICH_INTERFACE_HOSTNAME'] = msg['ifhns'][currRank] 1429 else: 1430 man_env['MPICH_INTERFACE_HOSTNAME'] = self.myIfhn 1431 man_env.update(os.environ) # may only want to mov non-MPD_ stuff 1432 man_env['MPDMAN_MYHOST'] = self.myHost 1433 man_env['MPDMAN_MYIFHN'] = self.myIfhn 1434 man_env['MPDMAN_JOBID'] = msg['jobid'] 1435 man_env['MPDMAN_CLI_PGM'] = pgm 1436 man_env['MPDMAN_CLI_PATH'] = pathForExec 1437 man_env['MPDMAN_PGM_ARGS'] = pgmArgs 1438 man_env['MPDMAN_PGM_ENVVARS'] = pgmEnvVars 1439 man_env['MPDMAN_PGM_LIMITS'] = pgmLimits 1440 man_env['MPDMAN_CWD'] = cwd 1441 man_env['MPDMAN_UMASK'] = pgmUmask 1442 man_env['MPDMAN_SPAWNED'] = str(msg['spawned']) 1443 if msg.has_key('spawner_manpid'): 1444 man_env['MPDMAN_SPAWNER_MANPID'] = str(msg['spawner_manpid']) 1445 else: 1446 man_env['MPDMAN_SPAWNER_MANPID'] = '0' 1447 if msg.has_key('spawner_mpd'): 1448 man_env['MPDMAN_SPAWNER_MPD'] = msg['spawner_mpd'] 1449 else: 1450 man_env['MPDMAN_SPAWNER_MPD'] = '' 1451 man_env['MPDMAN_NPROCS'] = str(msg['nprocs']) 1452 man_env['MPDMAN_MPD_LISTEN_PORT'] = str(self.parmdb['MPD_LISTEN_PORT']) 1453 man_env['MPDMAN_MPD_CONF_SECRETWORD'] = self.parmdb['MPD_SECRETWORD'] 1454 man_env['MPDMAN_CONHOST'] = msg['conhost'] 1455 man_env['MPDMAN_CONIFHN'] = msg['conifhn'] 1456 man_env['MPDMAN_CONPORT'] = str(msg['conport']) 1457 man_env['MPDMAN_RANK'] = str(currRank) 1458 man_env['MPDMAN_POS_IN_RING'] = str(msg['nstarted']) 1459 man_env['MPDMAN_STDIN_DEST'] = msg['stdin_dest'] 1460 man_env['MPDMAN_TOTALVIEW'] = str(msg['totalview']) 1461 man_env['MPDMAN_GDB'] = str(msg['gdb']) 1462 man_env['MPDMAN_GDBA'] = str(msg['gdba']) # for attach to running pgm 1463 fullDirName = os.path.abspath(os.path.split(sys.argv[0])[0]) # normalize 1464 man_env['MPDMAN_FULLPATHDIR'] = fullDirName # used to find gdbdrv 1465 man_env['MPDMAN_SINGINIT_PID'] = str(msg['singinitpid']) 1466 man_env['MPDMAN_SINGINIT_PORT'] = str(msg['singinitport']) 1467 man_env['MPDMAN_LINE_LABELS_FMT'] = msg['line_labels'] 1468 if msg.has_key('rship'): 1469 man_env['MPDMAN_RSHIP'] = msg['rship'] 1470 man_env['MPDMAN_MSHIP_HOST'] = msg['mship_host'] 1471 man_env['MPDMAN_MSHIP_PORT'] = str(msg['mship_port']) 1472 if msg.has_key('doing_bnr'): 1473 man_env['MPDMAN_DOING_BNR'] = '1' 1474 else: 1475 man_env['MPDMAN_DOING_BNR'] = '0' 1476 if msg['nstarted'] == 0: 1477 manKVSTemplate = '%s_%s_%d' % \ 1478 (self.myHost,self.parmdb['MPD_LISTEN_PORT'],self.kvs_cntr) 1479 manKVSTemplate = sub('\.','_',manKVSTemplate) # chg magpie.cs to magpie_cs 1480 manKVSTemplate = sub('\-','_',manKVSTemplate) # chg node-0 to node_0 1481 self.kvs_cntr += 1 1482 msg['kvs_template'] = manKVSTemplate 1483 man_env['MPDMAN_KVS_TEMPLATE'] = msg['kvs_template'] 1484 msg['username'] = username 1485 if hasattr(os,'fork'): 1486 (manPid,toManSock) = self.launch_mpdman_via_fork(msg,man_env) 1487 if not manPid: 1488 print '**** mpd: launch_client_via_fork_exec failed; exiting' 1489 elif subprocess_module_available: 1490 (manPid,toManSock) = self.launch_mpdman_via_subprocess(msg,man_env) 1491 else: 1492 mpd_print(1,'neither fork nor subprocess is available') 1493 sys.exit(-1) 1494 jobid = msg['jobid'] 1495 if not self.activeJobs.has_key(jobid): 1496 self.activeJobs[jobid] = {} 1497 self.activeJobs[jobid][manPid] = { 'pgm' : pgm, 'rank' : currRank, 1498 'username' : username, 1499 'clipid' : -1, # until report by man 1500 'socktoman' : toManSock } 1501 mpd_print(mpd_dbg_level,"Created entry for %s %d" % (str(jobid),manPid) ) 1502 def launch_mpdman_via_fork(self,msg,man_env): 1503 man_env['MPDMAN_HOW_LAUNCHED'] = 'FORK' 1504 currRank = int(man_env['MPDMAN_RANK']) 1505 manListenSock = MPDListenSock('',0,name='tempsock') 1506 manListenPort = manListenSock.getsockname()[1] 1507 if msg['nstarted'] == 0: 1508 manEntryIfhn = '' 1509 manEntryPort = 0 1510 msg['pos0_host'] = self.myHost 1511 msg['pos0_ifhn'] = self.myIfhn 1512 msg['pos0_port'] = str(manListenPort) 1513 man_env['MPDMAN_POS0_IFHN'] = self.myIfhn 1514 man_env['MPDMAN_POS0_PORT'] = str(manListenPort) 1515 else: 1516 manEntryIfhn = msg['entry_ifhn'] 1517 manEntryPort = msg['entry_port'] 1518 man_env['MPDMAN_POS0_IFHN'] = msg['pos0_ifhn'] 1519 man_env['MPDMAN_POS0_PORT'] = msg['pos0_port'] 1520 man_env['MPDMAN_LHS_IFHN'] = manEntryIfhn 1521 man_env['MPDMAN_LHS_PORT'] = str(manEntryPort) 1522 man_env['MPDMAN_MY_LISTEN_FD'] = str(manListenSock.fileno()) 1523 man_env['MPDMAN_MY_LISTEN_PORT'] = str(manListenPort) 1524 mpd_print(mpd_dbg_level,"About to get sockpair for mpdman") 1525 (toManSock,toMpdSock) = mpd_sockpair() 1526 mpd_print(mpd_dbg_level,"Found sockpair (%d,%d) for mpdman" % \ 1527 (toManSock.fileno(), toMpdSock.fileno()) ) 1528 toManSock.name = 'to_man' 1529 toMpdSock.name = 'to_mpd' ## to be used by mpdman below 1530 man_env['MPDMAN_TO_MPD_FD'] = str(toMpdSock.fileno()) 1531 self.streamHandler.set_handler(toManSock,self.handle_man_input) 1532 msg['entry_host'] = self.myHost 1533 msg['entry_ifhn'] = self.myIfhn 1534 msg['entry_port'] = manListenPort 1535 maxTries = 6 1536 numTries = 0 1537 while numTries < maxTries: 1538 try: 1539 manPid = os.fork() 1540 errinfo = 0 1541 except OSError, errinfo: 1542 pass ## could check for errinfo.errno == 35 (resource unavailable) 1543 if errinfo: 1544 sleep(1) 1545 numTries += 1 1546 else: 1547 break 1548 if numTries >= maxTries: 1549 return (0,0) 1550 if manPid == 0: 1551 self.conListenSock = 0 # don't want to clean up console if I am manager 1552 self.myId = '%s_man_%d' % (self.myHost,self.myPid) 1553 mpd_set_my_id(self.myId) 1554 self.streamHandler.close_all_active_streams() 1555 os.setpgrp() 1556 os.environ = man_env 1557 if hasattr(os,'getuid') and os.getuid() == 0 and pwd_module_available: 1558 username = msg['username'] 1559 try: 1560 pwent = pwd.getpwnam(username) 1561 except: 1562 mpd_print(1,'invalid username :%s: on %s' % (username,self.myHost)) 1563 msgToSend = {'cmd' : 'job_failed', 'reason' : 'invalid_username', 1564 'username' : username, 'host' : self.myHost } 1565 self.conSock.send_dict_msg(msgToSend) 1566 return 1567 uid = pwent[2] 1568 gid = pwent[3] 1569 os.setgroups(mpd_get_groups_for_username(username)) 1570 os.setregid(gid,gid) 1571 try: 1572 os.setreuid(uid,uid) 1573 except OSError, errmsg1: 1574 try: 1575 os.setuid(uid) 1576 except OSError, errmsg2: 1577 mpd_print(1,"unable to perform setreuid or setuid") 1578 sys.exit(-1) 1579 import atexit # need to use full name of _exithandlers 1580 atexit._exithandlers = [] # un-register handlers in atexit module 1581 # import profile 1582 # print 'profiling the manager' 1583 # profile.run('mpdman()') 1584 mpdman = MPDMan() 1585 mpdman.run() 1586 sys.exit(0) # do NOT do cleanup (eliminated atexit handlers above) 1587 # After the fork, if we're the parent, close the other side of the 1588 # mpdpair sockets, as well as the listener socket 1589 manListenSock.close() 1590 toMpdSock.close() 1591 return (manPid,toManSock) 1592 def launch_mpdman_via_subprocess(self,msg,man_env): 1593 man_env['MPDMAN_HOW_LAUNCHED'] = 'SUBPROCESS' 1594 currRank = int(man_env['MPDMAN_RANK']) 1595 if msg['nstarted'] == 0: 1596 manEntryIfhn = '' 1597 manEntryPort = 0 1598 else: 1599 manEntryIfhn = msg['entry_ifhn'] 1600 manEntryPort = msg['entry_port'] 1601 man_env['MPDMAN_POS0_IFHN'] = msg['pos0_ifhn'] 1602 man_env['MPDMAN_POS0_PORT'] = msg['pos0_port'] 1603 man_env['MPDMAN_LHS_IFHN'] = manEntryIfhn 1604 man_env['MPDMAN_LHS_PORT'] = str(manEntryPort) 1605 tempListenSock = MPDListenSock() 1606 man_env['MPDMAN_MPD_PORT'] = str(tempListenSock.getsockname()[1]) 1607 # python_executable = '\Python24\python.exe' 1608 python_executable = 'python2.4' 1609 fullDirName = man_env['MPDMAN_FULLPATHDIR'] 1610 manCmd = os.path.join(fullDirName,'mpdman.py') 1611 runner = subprocess.Popen([python_executable,'-u',manCmd], # only one 'python' arg 1612 bufsize=0, 1613 env=man_env, 1614 close_fds=False) 1615 ### stdin=subprocess.PIPE,stdout=subprocess.PIPE, 1616 ### stderr=subprocess.PIPE) 1617 manPid = runner.pid 1618 oldTimeout = socket.getdefaulttimeout() 1619 socket.setdefaulttimeout(8) 1620 try: 1621 (toManSock,toManAddr) = tempListenSock.accept() 1622 except Exception, errmsg: 1623 toManSock = 0 1624 socket.setdefaulttimeout(oldTimeout) 1625 tempListenSock.close() 1626 if not toManSock: 1627 mpd_print(1,'failed to recv msg from launched man') 1628 return (0,0) 1629 msgFromMan = toManSock.recv_dict_msg() 1630 if not msgFromMan or not msgFromMan.has_key('man_listen_port'): 1631 toManSock.close() 1632 mpd_print(1,'invalid msg from launched man') 1633 return (0,0) 1634 manListenPort = msgFromMan['man_listen_port'] 1635 if currRank == 0: 1636 msg['pos0_host'] = self.myHost 1637 msg['pos0_ifhn'] = self.myIfhn 1638 msg['pos0_port'] = str(manListenPort) 1639 msg['entry_host'] = self.myHost 1640 msg['entry_ifhn'] = self.myIfhn 1641 msg['entry_port'] = manListenPort 1642 return (manPid,toManSock) 1643 1644# code for testing 1645if __name__ == '__main__': 1646 mpd = MPD() 1647 mpd.run() 1648