1#!/usr/bin/env python
2#
3#   (C) 2001 by Argonne National Laboratory.
4#       See COPYRIGHT in top-level directory.
5#
6
7"""
8usage: mpd [--host=<host> --port=<portnum>] [--noconsole]
9           [--trace] [--echo] [--daemon] [--bulletproof] --ncpus=<ncpus>
10           [--ifhn=<interface-hostname>] [--listenport=<listenport>]
11           [--pid=<pidfilename>] --tmpdir=<tmpdir>] [-zc] [--debug]
12
13Some long parameter names may be abbreviated to their first letters by using
14  only one hyphen and no equal sign:
15     mpd -h donner -p 4268 -n
16  is equivalent to
17     mpd --host=magpie --port=4268 --noconsole
18
19--host and --port must be specified together; they tell the new mpd where
20  to enter an existing ring;  if they are omitted, the new mpd forms a
21  stand-alone ring that other mpds may enter later
22--noconsole is useful for running 2 mpds on the same machine; only one of
23  them will accept mpd commands
24--trace yields lots of traces thru mpd routines; currently too verbose
25--debug turns on some debugging prints; currently not verbose enough
26--echo causes the mpd echo its listener port by which other mpds may connect
27--daemon causes mpd to run backgrounded, with no controlling tty
28--bulletproof says to turn bulletproofing on (experimental)
29--ncpus indicates how many cpus are on the local host; used for starting processes
30--ifhn specifies an alternate interface hostname for the host this mpd is running on;
31  e.g. may be used to specify the alias for an interface other than default
32--listenport specifies a port for this mpd to listen on; by default it will
33  acquire one from the system
34--conlistenport specifies a port for this mpd to listen on for console
35  connections (only used when employing inet socket for console); by default it
36  will acquire one from the system
37--pid=filename writes the mpd pid into the specified file, or --pid alone
38  writes it into /var/run/mpd.pid
39--tmpdir=tmpdirname where mpd places temporary sockets, etc.
40-zc is a purely EXPERIMENTAL option right now used to investigate zeroconf
41  networking; it can be used to allow mpds to discover each other locally
42  using multicast DNS; its usage may change over time
43  Currently, -zc is specified like this:  -zc N
44  where N specifies a 'level' in a startup set of mpds.  The first mpd in a ring
45  must have 1 and it will establish a ring of one mpd.  Subsequent mpds can specify
46  -zc 2 and will hook into the ring via the one at level 1.  Except for level 1, new
47  mpds enter the ring via an mpd at level-1.
48
49A file named .mpd.conf file must be present in the user's home directory
50  with read and write access only for the user, and must contain at least
51  a line with MPD_SECRETWORD=<secretword>
52
53To run mpd as root, install it while root and instead of a .mpd.conf file
54use mpd.conf (no leading dot) in the /etc directory.'
55"""
56from  time    import  ctime
57from  mpdlib  import  mpd_version
58__author__ = "Ralph Butler and Rusty Lusk"
59__date__ = ctime()
60__version__ = "$Revision: 1.160 $"
61__version__ += "  " + str(mpd_version())
62__credits__ = ""
63
64
65import sys, os, signal, socket, stat
66
67from  re          import  sub
68from  atexit      import  register
69from  cPickle     import  dumps
70from  types       import  ClassType
71from  random      import  seed, randrange, random
72from  time        import  sleep
73from  md5         import  new as md5new
74from  mpdlib      import  mpd_set_my_id, mpd_check_python_version, mpd_sockpair, \
75                          mpd_print, mpd_get_my_username, mpd_close_zc, \
76                          mpd_get_groups_for_username, mpd_uncaught_except_tb, \
77                          mpd_set_procedures_to_trace, mpd_trace_calls, \
78                          mpd_dbg_level, mpd_set_dbg_level, mpd_set_tmpdir, \
79                          MPDSock, MPDListenSock, MPDConListenSock, \
80                          MPDStreamHandler, MPDRing, MPDParmDB
81from  mpdman      import  MPDMan
82
83# fix for ticket #753 where the set() builtin isn't available in python2.3
84try:
85    set
86except NameError:
87    from sets import Set as set
88
89
90try:
91    import pwd
92    pwd_module_available = 1
93except:
94    pwd_module_available = 0
95try:
96    import  syslog
97    syslog_module_available = 1
98except:
99    syslog_module_available = 0
100try:
101    import  subprocess
102    subprocess_module_available = 1
103except:
104    subprocess_module_available = 0
105
106
107def sigchld_handler(signum,frame):
108    done = 0
109    while not done:
110        try:
111            (pid,status) = os.waitpid(-1,os.WNOHANG)
112            if pid == 0:    # no existing child process is finished
113                done = 1
114        except:    # no more child processes to be waited for
115            done = 1
116
117class MPD(object):
118    def __init__(self):
119        self.myHost = socket.gethostname()
120        try:
121            hostinfo = socket.gethostbyname_ex(self.myHost)
122            self.myIfhn = hostinfo[2][0]    # chgd below when I get the real value
123        except:
124            print 'mpd failed: gethostbyname_ex failed for %s' % (self.myHost)
125            sys.exit(-1)
126    def run(self):
127        if syslog_module_available:
128            syslog.openlog("mpd",0,syslog.LOG_DAEMON)
129            syslog.syslog(syslog.LOG_INFO,"mpd starting; no mpdid yet")
130        sys.excepthook = mpd_uncaught_except_tb
131        self.spawnQ = []
132        self.spawnInProgress = 0
133        self.parmdb = MPDParmDB(orderedSources=['cmdline','xml','env','rcfile','thispgm'])
134        self.parmsToOverride = {
135                                 'MPD_SECRETWORD'       :  '',
136                                 'MPD_MY_IFHN'          :  self.myIfhn,
137                                 'MPD_ENTRY_IFHN'       :  '',
138                                 'MPD_ENTRY_PORT'       :  0,
139                                 'MPD_NCPUS'            :  1,
140                                 'MPD_LISTEN_PORT'      :  0,
141                                 'MPD_TRACE_FLAG'       :  0,
142                                 'MPD_CONSOLE_FLAG'     :  1,
143                                 'MPD_ECHO_PORT_FLAG'   :  0,
144                                 'MPD_DAEMON_FLAG'      :  0,
145                                 'MPD_BULLETPROOF_FLAG' :  0,
146                                 'MPD_PID_FILENAME'     :  '',
147                                 'MPD_ZC'               :  0,
148                                 'MPD_LOGFILE_TRUNC_SZ' :  4000000,  # -1 -> don't trunc
149                                 'MPD_PORT_RANGE'       :  0,
150                                 'MPD_TMPDIR'           :  '/tmp',
151                               }
152        for (k,v) in self.parmsToOverride.items():
153            self.parmdb[('thispgm',k)] = v
154        self.get_parms_from_cmdline()
155        self.parmdb.get_parms_from_rcfile(self.parmsToOverride,errIfMissingFile=1)
156        self.parmdb.get_parms_from_env(self.parmsToOverride)
157        self.myIfhn = self.parmdb['MPD_MY_IFHN']    # variable for convenience
158        self.myPid = os.getpid()
159        if self.parmdb['MPD_PORT_RANGE']:
160            os.environ['MPICH_PORT_RANGE'] = self.parmdb['MPD_PORT_RANGE']
161        self.tmpdir = self.parmdb['MPD_TMPDIR']
162        mpd_set_tmpdir(self.tmpdir)
163        self.listenSock = MPDListenSock(name='ring_listen_sock',
164                                        port=self.parmdb['MPD_LISTEN_PORT'])
165        self.parmdb[('thispgm','MPD_LISTEN_PORT')] = self.listenSock.sock.getsockname()[1]
166        self.myId = '%s_%d' % (self.myHost,self.parmdb['MPD_LISTEN_PORT'])
167        mpd_set_my_id(myid=self.myId)
168        self.streamHandler = MPDStreamHandler()
169        self.ring = MPDRing(streamHandler=self.streamHandler,
170                            secretword=self.parmdb['MPD_SECRETWORD'],
171                            listenSock=self.listenSock,
172                            myIfhn=self.myIfhn,
173                            entryIfhn=self.parmdb['MPD_ENTRY_IFHN'],
174                            entryPort=self.parmdb['MPD_ENTRY_PORT'],
175                            zcMyLevel=self.parmdb['MPD_ZC'])
176        # setup tracing if requested via args
177        if self.parmdb['MPD_TRACE_FLAG']:
178            proceduresToTrace = []
179            import inspect
180            symbolsAndTypes = globals().items() + \
181                              inspect.getmembers(self) + \
182                              inspect.getmembers(self.ring) + \
183                              inspect.getmembers(self.streamHandler)
184            for (symbol,symtype) in symbolsAndTypes:
185                if symbol == '__init__':  # a problem to trace
186                    continue
187                if inspect.isfunction(symtype)  or  inspect.ismethod(symtype):
188                    # print symbol
189                    proceduresToTrace.append(symbol)
190            mpd_set_procedures_to_trace(proceduresToTrace)
191            sys.settrace(mpd_trace_calls)
192        if syslog_module_available:
193            syslog.syslog(syslog.LOG_INFO,"mpd has mpdid=%s (port=%d)" % \
194                          (self.myId,self.parmdb['MPD_LISTEN_PORT']) )
195        vinfo = mpd_check_python_version()
196        if vinfo:
197            print "mpd: your python version must be >= 2.2 ; current version is:", vinfo
198            sys.exit(-1)
199
200        # need to close both object and underlying fd (ticket #963)
201        sys.stdin.close()
202        os.close(0)
203
204        if self.parmdb['MPD_ECHO_PORT_FLAG']:    # do this before becoming a daemon
205            # print self.parmdb['MPD_LISTEN_PORT']
206            print "mpd_port=%d" % self.parmdb['MPD_LISTEN_PORT']
207            sys.stdout.flush()
208            ##### NEXT 2 for debugging
209            ## print >>sys.stderr, self.parmdb['MPD_LISTEN_PORT']
210            ## sys.stderr.flush()
211        self.myRealUsername = mpd_get_my_username()
212        self.currRingSize = 1    # default
213        self.currRingNCPUs = 1   # default
214        if os.environ.has_key('MPD_CON_EXT'):
215            self.conExt = '_'  + os.environ['MPD_CON_EXT']
216        else:
217            self.conExt = ''
218        self.logFilename = self.tmpdir + '/mpd2.logfile_' + mpd_get_my_username() + self.conExt
219        if self.parmdb['MPD_PID_FILENAME']:  # may overwrite it below
220            pidFile = open(self.parmdb['MPD_PID_FILENAME'],'w')
221            print >>pidFile, "%d" % (os.getpid())
222            pidFile.close()
223
224        self.conListenSock = 0    # don't want one when I do cleanup for forked daemon procs
225        if self.parmdb['MPD_DAEMON_FLAG']:      # see if I should become a daemon with no controlling tty
226            rc = os.fork()
227            if rc != 0:   # parent exits; child in background
228                sys.exit(0)
229            os.setsid()  # become session leader; no controlling tty
230            signal.signal(signal.SIGHUP,signal.SIG_IGN)  # make sure no sighup when leader ends
231            ## leader exits; svr4: make sure do not get another controlling tty
232            rc = os.fork()
233            if rc != 0:
234                sys.exit(0)
235            if self.parmdb['MPD_PID_FILENAME']:  # overwrite one above before chg usmask
236                pidFile = open(self.parmdb['MPD_PID_FILENAME'],'w')
237                print >>pidFile, "%d" % (os.getpid())
238                pidFile.close()
239            os.chdir("/")  # free up filesys for umount
240            os.umask(0)
241            try:    os.unlink(self.logFilename)
242            except: pass
243            logFileFD = os.open(self.logFilename,os.O_CREAT|os.O_WRONLY|os.O_EXCL,0600)
244            self.logFile = os.fdopen(logFileFD,'w',0)
245            sys.stdout = self.logFile
246            sys.stderr = self.logFile
247            print >>sys.stdout, 'logfile for mpd with pid %d' % os.getpid()
248            sys.stdout.flush()
249            os.dup2(self.logFile.fileno(),sys.__stdout__.fileno())
250            os.dup2(self.logFile.fileno(),sys.__stderr__.fileno())
251        if self.parmdb['MPD_CONSOLE_FLAG']:
252            self.conListenSock = MPDConListenSock(secretword=self.parmdb['MPD_SECRETWORD'])
253            self.streamHandler.set_handler(self.conListenSock,
254                                           self.handle_console_connection)
255        register(self.cleanup)
256        seed()
257        self.nextJobInt    = 1
258        self.activeJobs    = {}
259        self.conSock       = 0
260        self.allExiting    = 0    # for mpdallexit (for first loop for graceful exit)
261        self.exiting       = 0    # for mpdexit or mpdallexit
262        self.kvs_cntr      = 0    # for mpdman
263        self.pulse_cntr    = 0
264        rc = self.ring.enter_ring(lhsHandler=self.handle_lhs_input,
265                                  rhsHandler=self.handle_rhs_input)
266        if rc < 0:
267            mpd_print(1,"failed to enter ring")
268            sys.exit(-1)
269        self.pmi_published_names = {}
270        if hasattr(signal,'SIGCHLD'):
271            signal.signal(signal.SIGCHLD,sigchld_handler)
272        if not self.parmdb['MPD_BULLETPROOF_FLAG']:
273            #    import profile ; profile.run('self.runmainloop()')
274            self.runmainloop()
275        else:
276            try:
277                from threading import Thread
278            except:
279                print '*** mpd terminating'
280                print '    bulletproof option must be able to import threading-Thread'
281                sys.exit(-1)
282            # may use SIG_IGN on all but SIGCHLD and SIGHUP (handled above)
283            while 1:
284                mpdtid = Thread(target=self.runmainloop)
285                mpdtid.start()
286                # signals must be handled in main thread; thus we permit timeout of join
287                while mpdtid.isAlive():
288                    mpdtid.join(2)   # come out sometimes and handle signals
289                if self.exiting:
290                    break
291                if self.conSock:
292                    msgToSend = { 'cmd' : 'restarting_mpd' }
293                    self.conSock.msgToSend.send_dict_msg(msgToSend)
294                    self.streamHandler.del_handler(self.conSock)
295                    self.conSock.close()
296                    self.conSock = 0
297    def runmainloop(self):
298        # Main Loop
299        while 1:
300            if self.spawnQ  and  not self.spawnInProgress:
301                self.ring.rhsSock.send_dict_msg(self.spawnQ[0])
302                self.spawnQ = self.spawnQ[1:]
303                self.spawnInProgress = 1
304                continue
305            rv = self.streamHandler.handle_active_streams(timeout=8.0)
306            if rv[0] < 0:
307                if type(rv[1]) == ClassType  and  rv[1] == KeyboardInterrupt: # ^C
308                    sys.exit(-1)
309            if self.exiting:
310                break
311            if rv[0] == 0:
312                if self.pulse_cntr == 0  and  self.ring.rhsSock:
313                    self.ring.rhsSock.send_dict_msg({'cmd':'pulse'})
314                self.pulse_cntr += 1
315            if self.pulse_cntr >= 3:
316                if self.ring.rhsSock:  # rhs must have disappeared
317                    self.streamHandler.del_handler(self.ring.rhsSock)
318                    self.ring.rhsSock.close()
319                    self.ring.rhsSock = 0
320                if self.ring.lhsSock:
321                    self.streamHandler.del_handler(self.ring.lhsSock)
322                    self.ring.lhsSock.close()
323                    self.ring.lhsSock = 0
324                mpd_print(1,'no pulse_ack from rhs; re-entering ring')
325                rc = self.ring.reenter_ring(lhsHandler=self.handle_lhs_input,
326                                            rhsHandler=self.handle_rhs_input,
327                                            ntries=16)
328                if rc == 0:
329                    mpd_print(1,'back in ring')
330		else:
331                    mpd_print(1,'failed to reenter ring')
332                    sys.exit(-1)
333                self.pulse_cntr = 0
334        mpd_close_zc()  # only does something if we have zc
335    def usage(self):
336        print __doc__
337        print "This version of mpd is", mpd_version()
338        sys.exit(-1)
339    def cleanup(self):
340        try:
341            mpd_print(0, "CLEANING UP" )
342            if syslog_module_available:
343                syslog.syslog(syslog.LOG_INFO,"mpd ending mpdid=%s (inside cleanup)" % \
344                              (self.myId) )
345                syslog.closelog()
346            if self.conListenSock:    # only del if I created
347                os.unlink(self.conListenSock.conFilename)
348        except:
349            pass
350    def get_parms_from_cmdline(self):
351        global mpd_dbg_level
352        argidx = 1
353        while argidx < len(sys.argv):
354            if sys.argv[argidx] == '--help':
355                self.usage()
356                argidx += 1
357            elif sys.argv[argidx] == '-h':
358                if len(sys.argv) < 3:
359                    self.usage()
360                self.parmdb[('cmdline','MPD_ENTRY_IFHN')] = sys.argv[argidx+1]
361                argidx += 2
362            elif sys.argv[argidx].startswith('--host'):
363                try:
364                    entryHost = sys.argv[argidx].split('=',1)[1]
365                except:
366                    print 'failed to parse --host option'
367                    self.usage()
368                self.parmdb[('cmdline','MPD_ENTRY_IFHN')] = entryHost
369                argidx += 1
370            elif sys.argv[argidx] == '-p':
371                if argidx >= (len(sys.argv)-1):
372                    print 'missing arg for -p'
373                    sys.exit(-1)
374                if not sys.argv[argidx+1].isdigit():
375                    print 'invalid port %s ; must be numeric' % (sys.argv[argidx+1])
376                    sys.exit(-1)
377                self.parmdb[('cmdline','MPD_ENTRY_PORT')] = int(sys.argv[argidx+1])
378                argidx += 2
379            elif sys.argv[argidx].startswith('--port'):
380                try:
381                    entryPort = sys.argv[argidx].split('=',1)[1]
382                except:
383                    print 'failed to parse --port option'
384                    self.usage()
385                if not entryPort.isdigit():
386                    print 'invalid port %s ; must be numeric' % (entryPort)
387                    sys.exit(-1)
388                self.parmdb[('cmdline','MPD_ENTRY_PORT')] = int(entryPort)
389                argidx += 1
390            elif sys.argv[argidx].startswith('--ncpus'):
391                try:
392                    NCPUs = sys.argv[argidx].split('=',1)[1]
393                except:
394                    print 'failed to parse --ncpus option'
395                    self.usage()
396                if not NCPUs.isdigit():
397                    print 'invalid ncpus %s ; must be numeric' % (NCPUs)
398                    sys.exit(-1)
399                self.parmdb[('cmdline','MPD_NCPUS')] = int(NCPUs)
400                argidx += 1
401            elif sys.argv[argidx].startswith('--pid'):
402                try:
403                    splitPid = sys.argv[argidx].split('=')
404                except:
405                    print 'failed to parse --pid option'
406                    self.usage()
407                if len(splitPid) == 1  or  not splitPid[1]:
408                    pidFilename = '/var/run/mpd.pid'
409                else:
410                    pidFilename = splitPid[1]
411                self.parmdb[('cmdline','MPD_PID_FILENAME')] = pidFilename
412                argidx += 1
413            elif sys.argv[argidx].startswith('--tmpdir'):
414                try:
415                    splitTmpdir = sys.argv[argidx].split('=')
416                except:
417                    print 'failed to parse --tmpdir option'
418                    self.usage()
419                if len(splitTmpdir) == 1  or  not splitTmpdir[1]:
420                    tmpdirName = '/tmp'
421                else:
422                    tmpdirName = splitTmpdir[1]
423                self.parmdb[('cmdline','MPD_TMPDIR')] = tmpdirName
424                argidx += 1
425            elif sys.argv[argidx].startswith('--ifhn'):
426                try:
427                    ifhn = sys.argv[argidx].split('=',1)[1]
428                except:
429                    print 'failed to parse --ifhn option'
430                    self.usage()
431                try:
432                    hostinfo = socket.gethostbyname_ex(ifhn)
433                    ifhn = hostinfo[2][0]
434                except:
435                    print 'mpd failed: gethostbyname_ex failed for %s' % (ifhn)
436                    sys.exit(-1)
437                self.parmdb[('cmdline','MPD_MY_IFHN')] = ifhn
438                argidx += 1
439            elif sys.argv[argidx] == '-l':
440                if argidx >= (len(sys.argv)-1):
441                    print 'missing arg for -l'
442                    sys.exit(-1)
443                if not sys.argv[argidx+1].isdigit():
444                    print 'invalid listenport %s ; must be numeric' % (sys.argv[argidx+1])
445                    sys.exit(-1)
446                self.parmdb[('cmdline','MPD_LISTEN_PORT')] = int(sys.argv[argidx+1])
447                argidx += 2
448            elif sys.argv[argidx].startswith('--listenport'):
449                try:
450                    myListenPort = sys.argv[argidx].split('=',1)[1]
451                except:
452                    print 'failed to parse --listenport option'
453                    self.usage()
454                if not myListenPort.isdigit():
455                    print 'invalid listenport %s ; must be numeric' % (myListenPort)
456                    sys.exit(-1)
457                self.parmdb[('cmdline','MPD_LISTEN_PORT')] = int(myListenPort)
458                argidx += 1
459            elif sys.argv[argidx] == '-hp':
460                if argidx >= (len(sys.argv)-1):
461                    print 'missing arg for -hp'
462                    sys.exit(-1)
463                try:
464                    (entryIfhn,entryPort) = sys.argv[argidx+1].split('_')
465                except:
466                    print 'invalid entry host: %s' % (sys.argv[argidx+1])
467                    sys.exit(-1)
468                if not entryPort.isdigit():
469                    print 'invalid port %s ; must be numeric' % (sys.argv[argidx+1])
470                    sys.exit(-1)
471                self.parmdb[('cmdline','MPD_ENTRY_IFHN')] = entryIfhn
472                self.parmdb[('cmdline','MPD_ENTRY_PORT')] = int(entryPort)
473                argidx += 2
474            elif sys.argv[argidx] == '-t'  or  sys.argv[argidx] == '--trace':
475                self.parmdb[('cmdline','MPD_TRACE_FLAG')] = 1
476                argidx += 1
477            elif sys.argv[argidx] == '--debug':
478                mpd_set_dbg_level(1)
479                argidx += 1
480            elif sys.argv[argidx] == '-n'  or  sys.argv[argidx] == '--noconsole':
481                self.parmdb[('cmdline','MPD_CONSOLE_FLAG')] = 0
482                argidx += 1
483            elif sys.argv[argidx] == '-e'  or  sys.argv[argidx] == '--echo':
484                self.parmdb[('cmdline','MPD_ECHO_PORT_FLAG')] = 1
485                argidx += 1
486            elif sys.argv[argidx] == '-d'  or  sys.argv[argidx] == '--daemon':
487                self.parmdb[('cmdline','MPD_DAEMON_FLAG')] = 1
488                argidx += 1
489            elif sys.argv[argidx] == '-b'  or  sys.argv[argidx] == '--bulletproof':
490                self.parmdb[('cmdline','MPD_BULLETPROOF_FLAG')] = 1
491                argidx += 1
492            elif sys.argv[argidx] == '-zc':
493                if argidx >= (len(sys.argv)-1):
494                    print 'missing arg for -zc'
495                    sys.exit(-1)
496                if not sys.argv[argidx+1].isdigit():
497                    print 'invalid arg for -zc %s ; must be numeric' % (sys.argv[argidx+1])
498                    sys.exit(-1)
499                intarg = int(sys.argv[argidx+1])
500                if intarg < 1:
501                    print 'invalid arg for -zc %s ; must be >= 1' % (sys.argv[argidx+1])
502                    sys.exit(-1)
503                self.parmdb[('cmdline','MPD_ZC')] = intarg
504                argidx += 2
505            else:
506                print 'unrecognized arg: %s' % (sys.argv[argidx])
507                sys.exit(-1)
508        if (self.parmdb['MPD_ENTRY_IFHN']  and  not self.parmdb['MPD_ENTRY_PORT']) \
509        or (self.parmdb['MPD_ENTRY_PORT']  and  not self.parmdb['MPD_ENTRY_IFHN']):
510            print 'host and port must be specified together'
511            sys.exit(-1)
512    def handle_console_connection(self,sock):
513        if not self.conSock:
514            (self.conSock,newConnAddr) = sock.accept()
515            if hasattr(socket,'AF_UNIX')  and  sock.family == socket.AF_UNIX:
516                line = self.conSock.recv_char_msg().rstrip()
517                if not line:  # caller went away (perhaps another mpd seeing if I am here)
518                    self.streamHandler.del_handler(self.conSock)
519                    self.conSock.close()
520                    self.conSock = 0
521                    return
522                errorMsg = ''
523                try:
524                    (kv1,kv2) = line.split(' ',1)  # 'realusername=xxx secretword=yyy'
525                except:
526                    errorMsg = 'failed to split this msg on " ": %s' % line
527                if not errorMsg:
528                    try:
529                        (k1,self.conSock.realUsername) = kv1.split('=',1)
530                    except:
531                        errorMsg = 'failed to split first kv pair on "=": %s' % line
532                if not errorMsg:
533                    try:
534                        (k2,secretword) = kv2.split('=',1)
535                    except:
536                        errorMsg = 'failed to split second kv pair on "=": %s' % line
537                if not errorMsg  and  k1 != 'realusername':
538                    errorMsg = 'first key is not realusername'
539                if not errorMsg  and  k2 != 'secretword':
540                    errorMsg = 'second key is not secretword'
541                if not errorMsg  and  os.getuid() == 0  and  secretword != self.parmdb['MPD_SECRETWORD']:
542                    errorMsg = 'invalid secretword to root mpd'
543                if errorMsg:
544                    try:
545                        self.conSock.send_dict_msg({'error_msg': errorMsg})
546                    except:
547                        pass
548                    self.streamHandler.del_handler(self.conSock)
549                    self.conSock.close()
550                    self.conSock = 0
551                    return
552                self.conSock.beingChallenged = 0
553            else:
554                msg = self.conSock.recv_dict_msg()
555                if not msg:    # caller went away (perhaps another mpd seeing if I am here)
556                    self.streamHandler.del_handler(self.conSock)
557                    self.conSock.close()
558                    self.conSock = 0
559                    return
560                if not msg.has_key('cmd')  or  msg['cmd'] != 'con_init':
561                    mpd_print(1, 'console sent bad msg :%s:' % (msg) )
562                    try:  # try to let console know
563                        self.conSock.send_dict_msg({'cmd':'invalid_msg_received_from_you'})
564                    except:
565                        pass
566                    self.streamHandler.del_handler(self.conSock)
567                    self.conSock.close()
568                    self.conSock = 0
569                    return
570                self.streamHandler.set_handler(self.conSock,self.handle_console_input)
571                self.conSock.beingChallenged = 1
572                self.conSock.name = 'console'
573                randNum = randrange(1,10000)
574                randVal = sock.secretword + str(randNum)
575                self.conSock.expectedResponse = md5new(randVal).digest()
576                self.conSock.send_dict_msg({'cmd' : 'con_challenge', 'randnum' : randNum })
577                self.conSock.realUsername = mpd_get_my_username()
578            self.streamHandler.set_handler(self.conSock,self.handle_console_input)
579            self.conSock.name = 'console'
580        else:
581            return  ## postpone it; hope the other one frees up soon
582    def handle_console_input(self,sock):
583        msg = self.conSock.recv_dict_msg()
584        if not msg:
585            mpd_print(0000, 'console has disappeared; closing it')
586            self.streamHandler.del_handler(self.conSock)
587            self.conSock.close()
588            self.conSock = 0
589            return
590        if not msg.has_key('cmd'):
591            mpd_print(1, 'console sent bad msg :%s:' % msg)
592            try:  # try to let console know
593                self.conSock.send_dict_msg({ 'cmd':'invalid_msg_received_from_you' })
594            except:
595                pass
596            self.streamHandler.del_handler(self.conSock)
597            self.conSock.close()
598            self.conSock = 0
599            return
600        if self.conSock.beingChallenged  and  msg['cmd'] != 'con_challenge_response':
601            mpd_print(1, 'console did not respond to con_challenge; msg=:%s:' % msg)
602            try:  # try to let console know
603                self.conSock.send_dict_msg({ 'cmd':'expected_con_challenge_response' })
604            except:
605                pass
606            self.streamHandler.del_handler(self.conSock)
607            self.conSock.close()
608            self.conSock = 0
609            return
610        if msg['cmd'] == 'con_challenge_response':
611            self.conSock.beingChallenged = 0
612            self.conSock.realUsername = msg['realusername']
613            if not msg.has_key('response'):
614                try:  # try to let console know
615                    self.conSock.send_dict_msg({ 'cmd':'missing_response_in_msg' })
616                except:
617                    pass
618                self.streamHandler.del_handler(self.conSock)
619                self.conSock.close()
620                self.conSock = 0
621                return
622            elif msg['response'] != self.conSock.expectedResponse:
623                try:  # try to let console know
624                    self.conSock.send_dict_msg({ 'cmd':'invalid_response' })
625                except:
626                    pass
627                self.streamHandler.del_handler(self.conSock)
628                self.conSock.close()
629                self.conSock = 0
630                return
631            self.conSock.send_dict_msg({ 'cmd':'valid_response' })
632        elif msg['cmd'] == 'mpdrun':
633            # permit anyone to run but use THEIR own username
634            #   thus, override any username specified by the user
635            if self.conSock.realUsername != 'root':
636                msg['username'] = self.conSock.realUsername
637                msg['users'] = { (0,msg['nprocs']-1) : self.conSock.realUsername }
638            #
639            msg['mpdid_mpdrun_start'] = self.myId
640            msg['nstarted_on_this_loop'] = 0
641            msg['first_loop'] = 1
642            msg['ringsize'] = 0
643            msg['ring_ncpus'] = 0
644            # maps rank => hostname
645            msg['process_mapping'] = {}
646            if msg.has_key('try_1st_locally'):
647                self.do_mpdrun(msg)
648            else:
649                self.ring.rhsSock.send_dict_msg(msg)
650            # send ack after job is going
651        elif msg['cmd'] == 'get_mpdrun_values':
652            msgToSend = { 'cmd' : 'response_get_mpdrun_values',
653	                  'mpd_version' : mpd_version(),
654	                  'mpd_ifhn' : self.myIfhn }
655            self.conSock.send_dict_msg(msgToSend)
656        elif msg['cmd'] == 'mpdtrace':
657            msgToSend = { 'cmd'     : 'mpdtrace_info',
658                          'dest'    : self.myId,
659                          'id'      : self.myId,
660                          'ifhn'    : self.myIfhn,
661                          'lhsport' : '%s' % (self.ring.lhsPort),
662                          'lhsifhn' : '%s' % (self.ring.lhsIfhn),
663                          'rhsport' : '%s' % (self.ring.rhsPort),
664                          'rhsifhn' : '%s' % (self.ring.rhsIfhn) }
665            self.ring.rhsSock.send_dict_msg(msgToSend)
666            msgToSend = { 'cmd'  : 'mpdtrace_trailer', 'dest' : self.myId }
667            self.ring.rhsSock.send_dict_msg(msgToSend)
668            # do not send an ack to console now; will send trace info later
669        elif msg['cmd'] == 'mpdallexit':
670            if self.conSock.realUsername != self.myRealUsername:
671                msgToSend = { 'cmd':'invalid_username_to_make_this_request' }
672                self.conSock.send_dict_msg(msgToSend)
673                self.streamHandler.del_handler(self.conSock)
674                self.conSock.close()
675                self.conSock = 0
676                return
677            # self.allExiting = 1  # doesn't really help here
678            self.ring.rhsSock.send_dict_msg( {'cmd' : 'mpdallexit', 'src' : self.myId} )
679            self.conSock.send_dict_msg( {'cmd' : 'mpdallexit_ack'} )
680        elif msg['cmd'] == 'mpdexit':
681            if self.conSock.realUsername != self.myRealUsername:
682                msgToSend = { 'cmd':'invalid_username_to_make_this_request' }
683                self.conSock.send_dict_msg(msgToSend)
684                self.streamHandler.del_handler(self.conSock)
685                self.conSock.close()
686                self.conSock = 0
687                return
688            if msg['mpdid'] == 'localmpd':
689                msg['mpdid'] = self.myId
690            self.ring.rhsSock.send_dict_msg( {'cmd' : 'mpdexit', 'src' : self.myId,
691                                              'done' : 0, 'dest' : msg['mpdid']} )
692        elif msg['cmd'] == 'mpdringtest':
693            msg['src'] = self.myId
694            self.ring.rhsSock.send_dict_msg(msg)
695            # do not send an ack to console now; will send ringtest info later
696        elif msg['cmd'] == 'mpdlistjobs':
697            msgToSend = { 'cmd'  : 'local_mpdid', 'id' : self.myId }
698            self.conSock.send_dict_msg(msgToSend)
699            for jobid in self.activeJobs.keys():
700                for manPid in self.activeJobs[jobid]:
701                    msgToSend = { 'cmd' : 'mpdlistjobs_info',
702                                  'dest' : self.myId,
703                                  'jobid' : jobid,
704                                  'username' : self.activeJobs[jobid][manPid]['username'],
705                                  'host' : self.myHost,
706                                  'ifhn' : self.myIfhn,
707                                  'clipid' : str(self.activeJobs[jobid][manPid]['clipid']),
708                                  'sid' : str(manPid),  # may chg to actual sid later
709                                  'pgm'  : self.activeJobs[jobid][manPid]['pgm'],
710                                  'rank' : self.activeJobs[jobid][manPid]['rank'] }
711                    self.conSock.send_dict_msg(msgToSend)
712            msgToSend = { 'cmd'  : 'mpdlistjobs_trailer', 'dest' : self.myId }
713            self.ring.rhsSock.send_dict_msg(msgToSend)
714            # do not send an ack to console now; will send listjobs info later
715        elif msg['cmd'] == 'mpdkilljob':
716            # permit anyone to kill but use THEIR own username
717            #   thus, override any username specified by the user
718            if self.conSock.realUsername != 'root':
719                msg['username'] = self.conSock.realUsername
720            msg['src'] = self.myId
721            msg['handled'] = 0
722            if msg['mpdid'] == '':
723                msg['mpdid'] = self.myId
724            self.ring.rhsSock.send_dict_msg(msg)
725            # send ack to console after I get this msg back and do the kill myself
726        elif msg['cmd'] == 'mpdsigjob':
727            # permit anyone to sig but use THEIR own username
728            #   thus, override any username specified by the user
729            if self.conSock.realUsername != 'root':
730                msg['username'] = self.conSock.realUsername
731            msg['src'] = self.myId
732            msg['handled'] = 0
733            if msg['mpdid'] == '':
734                msg['mpdid'] = self.myId
735            self.ring.rhsSock.send_dict_msg(msg)
736            # send ack to console after I get this msg back
737        elif msg['cmd'] == 'verify_hosts_in_ring':
738            msgToSend = { 'cmd'  : 'verify_hosts_in_ring', 'dest' : self.myId,
739                          'host_list' : msg['host_list'] }
740            self.ring.rhsSock.send_dict_msg(msgToSend)
741            # do not send an ack to console now; will send trace info later
742        else:
743            msgToSend = { 'cmd' : 'invalid_msg_received_from_you' }
744            self.conSock.send_dict_msg(msgToSend)
745            badMsg = 'invalid msg received from console: %s' % (str(msg))
746            mpd_print(1, badMsg)
747            if syslog_module_available:
748                syslog.syslog(syslog.LOG_ERR,badMsg)
749    def handle_man_input(self,sock):
750        msg = sock.recv_dict_msg()
751        if not msg:
752            for jobid in self.activeJobs.keys():
753                deleted = 0
754                for manPid in self.activeJobs[jobid]:
755                    if sock == self.activeJobs[jobid][manPid]['socktoman']:
756			mpd_print(mpd_dbg_level,\
757                                  "Deleting %s %d" % (str(jobid),manPid))
758                        del self.activeJobs[jobid][manPid]
759                        if len(self.activeJobs[jobid]) == 0:
760                            del self.activeJobs[jobid]
761                        deleted = 1
762                        break
763                if deleted:
764                    break
765            self.streamHandler.del_handler(sock)
766            sock.close()
767            return
768        if not msg.has_key('cmd'):
769            mpd_print(1, 'INVALID msg for man request msg=:%s:' % (msg) )
770            msgToSend = { 'cmd' : 'invalid_msg' }
771            sock.send_dict_msg(msgToSend)
772            self.streamHandler.del_handler(sock)
773            sock.close()
774            return
775	# Who asks, and why?
776        # We have a failure that deletes the spawnerManPid from the
777	# activeJobs[jobid] variable.   The temporary work-around is
778        # to ignore this request if the target process is no longer
779	# in the activeJobs table.
780        if msg['cmd'] == 'client_info':
781            jobid = msg['jobid']
782            manPid = msg['manpid']
783            self.activeJobs[jobid][manPid]['clipid'] = msg['clipid']
784            if msg['spawner_manpid']  and  msg['rank'] == 0:
785                if msg['spawner_mpd'] == self.myId:
786                    spawnerManPid = msg['spawner_manpid']
787		    mpd_print(mpd_dbg_level,\
788                       "About to check %s:%s" % (str(jobid),str(spawnerManPid)))
789
790                    if not self.activeJobs[jobid].has_key(spawnerManPid):
791                        mpd_print(0,"Missing %d in %s" % (spawnerManPid,str(jobid)))
792                    elif not self.activeJobs[jobid][spawnerManPid].has_key('socktoman'):
793                        mpd_print(0,"Missing socktoman!")
794                    else:
795                        spawnerManSock = self.activeJobs[jobid][spawnerManPid]['socktoman']
796                        msgToSend = { 'cmd' : 'spawn_done_by_mpd', 'rc' : 0, 'reason' : '' }
797                        spawnerManSock.send_dict_msg(msgToSend)
798                else:
799                    self.ring.rhsSock.send_dict_msg(msg)
800        elif msg['cmd'] == 'spawn':
801            msg['mpdid_mpdrun_start'] = self.myId
802            msg['spawner_mpd'] = self.myId
803            msg['nstarted_on_this_loop'] = 0
804            msg['first_loop'] = 1
805            msg['jobalias'] = ''
806            msg['stdin_dest'] = '0'
807            msg['ringsize'] = 0
808            msg['ring_ncpus'] = 0
809            msg['gdb'] = 0
810            msg['gdba'] = ''
811            msg['totalview'] = 0
812            msg['ifhns'] = {}
813            # maps rank => hostname
814            msg['process_mapping'] = {}
815            self.spawnQ.append(msg)
816        elif msg['cmd'] == 'publish_name':
817            self.pmi_published_names[msg['service']] = msg['port']
818            msgToSend = { 'cmd' : 'publish_result', 'info' : 'ok' }
819            sock.send_dict_msg(msgToSend)
820        elif msg['cmd'] == 'lookup_name':
821            if self.pmi_published_names.has_key(msg['service']):
822                msgToSend = { 'cmd' : 'lookup_result', 'info' : 'ok',
823                              'port' : self.pmi_published_names[msg['service']] }
824                sock.send_dict_msg(msgToSend)
825            else:
826                msg['cmd'] = 'pmi_lookup_name'    # add pmi_
827                msg['src'] = self.myId
828                msg['port'] = 0    # invalid
829                self.ring.rhsSock.send_dict_msg(msg)
830        elif msg['cmd'] == 'unpublish_name':
831            if self.pmi_published_names.has_key(msg['service']):
832                del self.pmi_published_names[msg['service']]
833                msgToSend = { 'cmd' : 'unpublish_result', 'info' : 'ok' }
834                sock.send_dict_msg(msgToSend)
835            else:
836                msg['cmd'] = 'pmi_unpublish_name'    # add pmi_
837                msg['src'] = self.myId
838                self.ring.rhsSock.send_dict_msg(msg)
839        else:
840            mpd_print(1, 'INVALID request from man msg=:%s:' % (msg) )
841            msgToSend = { 'cmd' : 'invalid_request' }
842            sock.send_dict_msg(msgToSend)
843
844    def calculate_process_mapping(self,mapping_dict):
845        # mapping_dict maps ranks => hostnames
846        ranks = list(mapping_dict.keys())
847        ranks.sort()
848
849        # assign node ids based in first-come-first-serve order when iterating
850        # over the ranks in increasing order
851        next_id = 0
852        node_ids = {}
853        for rank in ranks:
854            host = mapping_dict[rank]
855            if not node_ids.has_key(host):
856                node_ids[host] = next_id
857                next_id += 1
858
859
860        # maps {node_id_A: set([rankX,rankY,...]), node_id_B:...}
861        node_to_ranks = {}
862        for rank in ranks:
863            node_id = node_ids[mapping_dict[rank]]
864            if not node_to_ranks.has_key(node_id):
865                node_to_ranks[node_id] = set([])
866            node_to_ranks[node_id].add(rank)
867
868        # we only handle two cases for now:
869        # 1. block regular
870        # 2. round-robin regular
871        # we do handle "remainder nodes" that might not be full
872        delta = -1
873        max_ranks_per_node = 0
874        for node_id in node_to_ranks.keys():
875            last_rank = -1
876            if len(node_to_ranks[node_id]) > max_ranks_per_node:
877                max_ranks_per_node = len(node_to_ranks[node_id])
878            ranks = list(node_to_ranks[node_id])
879            ranks.sort()
880            for rank in ranks:
881                if last_rank != -1:
882                    if delta == -1:
883                        if node_id == 0:
884                            delta = rank - last_rank
885                        else:
886                            # irregular case detected such as {0:A,1:B,2:B}
887                            mpd_print(1, "irregular case A detected")
888                            return ''
889                    elif (rank - last_rank) != delta:
890                        # irregular such as {0:A,1:B,2:A,3:A,4:B}
891                        mpd_print(1, "irregular case B detected")
892                        return ''
893                last_rank = rank
894
895        # another check (case caught in ticket #905) for layouts like {0:A,1:A,2:B,3:B,4:B}
896        if len(node_to_ranks.keys()) > 1:
897            first_size = len(node_to_ranks[0])
898            last_size  = len(node_to_ranks[len(node_to_ranks.keys())-1])
899            if (last_size > first_size):
900                mpd_print(1, "irregular case C1 detected")
901                return ''
902            in_remainder = False
903            node_ids = node_to_ranks.keys()
904            node_ids.sort()
905            for node_id in node_ids:
906                node_size = len(node_to_ranks[node_id])
907                if not in_remainder:
908                    if node_size == first_size:
909                        pass # OK
910                    elif node_size == last_size:
911                        in_remainder = True
912                    else:
913                        mpd_print(1, "irregular case C2 detected")
914                        return ''
915                else: # in_remainder
916                    if node_size != last_size:
917                        mpd_print(1, "irregular case C3 detected")
918                        return ''
919
920        num_nodes = len(node_to_ranks.keys())
921        if delta == 1:
922            return '(vector,(%d,%d,%d))' % (0,num_nodes,max_ranks_per_node)
923        else:
924            # either we are round-robin-regular (delta > 1) or there is only one
925            # process per node (delta == -1), either way results in the same
926            # mapping spec
927            return '(vector,(%d,%d,%d))' % (0,num_nodes,1)
928
929    def handle_lhs_input(self,sock):
930        msg = self.ring.lhsSock.recv_dict_msg()
931        if not msg:    # lost lhs; don't worry
932            mpd_print(0, "CLOSING self.ring.lhsSock ", self.ring.lhsSock )
933            self.streamHandler.del_handler(self.ring.lhsSock)
934            self.ring.lhsSock.close()
935            self.ring.lhsSock = 0
936            return
937        if msg['cmd'] == 'mpdrun'  or  msg['cmd'] == 'spawn':
938            if  msg.has_key('mpdid_mpdrun_start')  \
939            and msg['mpdid_mpdrun_start'] == self.myId:
940                if msg['first_loop']:
941                    self.currRingSize = msg['ringsize']
942                    self.currRingNCPUs = msg['ring_ncpus']
943                if msg['nstarted'] == msg['nprocs']:
944                    # we have started all processes in the job, tell the
945                    # requester this and stop forwarding the mpdrun/spawn
946                    # message around the loop
947                    if msg['cmd'] == 'spawn':
948                        self.spawnInProgress = 0
949                    if self.conSock:
950                        msgToSend = { 'cmd' : 'mpdrun_ack',
951                                      'ringsize' : self.currRingSize,
952                                      'ring_ncpus' : self.currRingNCPUs}
953                        self.conSock.send_dict_msg(msgToSend)
954                    # Tell all MPDs in the ring the final process mapping.  In
955                    # turn, they will inform all of their child mpdmans.
956                    # Only do this in the case of a regular mpdrun.  The spawn
957                    # case it too complicated to handle this way right now.
958                    if msg['cmd'] == 'mpdrun':
959                        process_mapping_str = self.calculate_process_mapping(msg['process_mapping'])
960                        msgToSend = { 'cmd' : 'process_mapping',
961                                      'jobid' : msg['jobid'],
962                                      'mpdid_mpdrun_start' : self.myId,
963                                      'process_mapping' : process_mapping_str }
964                        self.ring.rhsSock.send_dict_msg(msgToSend)
965                    return
966                if not msg['first_loop']  and  msg['nstarted_on_this_loop'] == 0:
967                    if msg.has_key('jobid'):
968                        if msg['cmd'] == 'mpdrun':
969                            msgToSend = { 'cmd' : 'abortjob', 'src' : self.myId,
970                                          'jobid' : msg['jobid'],
971                                          'reason' : 'some_procs_not_started' }
972                            self.ring.rhsSock.send_dict_msg(msgToSend)
973                        else:  # spawn
974                            msgToSend = { 'cmd' : 'startup_status', 'rc' : -1,
975                                          'reason' : 'some_procs_not_started' }
976                            jobid = msg['jobid']
977                            manPid = msg['spawner_manpid']
978                            manSock = self.activeJobs[jobid][manPid]['socktoman']
979                            manSock.send_dict_msg(msgToSend)
980                    if self.conSock:
981                        msgToSend = { 'cmd' : 'job_failed',
982                                      'reason' : 'some_procs_not_started',
983                                      'remaining_hosts' : msg['hosts'] }
984                        self.conSock.send_dict_msg(msgToSend)
985                    return
986                msg['first_loop'] = 0
987                msg['nstarted_on_this_loop'] = 0
988            self.do_mpdrun(msg)
989        elif msg['cmd'] == 'process_mapping':
990            # message transmission terminates once the message has made it all
991            # the way around the loop once
992            if msg['mpdid_mpdrun_start'] != self.myId:
993                self.ring.rhsSock.send_dict_msg(msg) # forward it on around
994
995            # send to all mpdman's for the jobid embedded in the msg
996            jobid = msg['jobid']
997
998            # there may be no entry for jobid in the activeJobs table if there
999            # weren't any processes from that job actually launched on our host
1000            if self.activeJobs.has_key(jobid):
1001                for manPid in self.activeJobs[jobid].keys():
1002                    manSock = self.activeJobs[jobid][manPid]['socktoman']
1003                    manSock.send_dict_msg(msg)
1004        elif msg['cmd'] == 'mpdtrace_info':
1005            if msg['dest'] == self.myId:
1006                if self.conSock:
1007                    self.conSock.send_dict_msg(msg)
1008            else:
1009                self.ring.rhsSock.send_dict_msg(msg)
1010        elif msg['cmd'] == 'mpdtrace_trailer':
1011            if msg['dest'] == self.myId:
1012                if self.conSock:
1013                    self.conSock.send_dict_msg(msg)
1014            else:
1015                msgToSend = { 'cmd'     : 'mpdtrace_info',
1016                              'dest'    : msg['dest'],
1017                              'id'      : self.myId,
1018                              'ifhn'    : self.myIfhn,
1019                              'lhsport' : '%s' % (self.ring.lhsPort),
1020                              'lhsifhn' : '%s' % (self.ring.lhsIfhn),
1021                              'rhsport' : '%s' % (self.ring.rhsPort),
1022                              'rhsifhn' : '%s' % (self.ring.rhsIfhn) }
1023                self.ring.rhsSock.send_dict_msg(msgToSend)
1024                self.ring.rhsSock.send_dict_msg(msg)
1025        elif msg['cmd'] == 'mpdlistjobs_info':
1026            if msg['dest'] == self.myId:
1027                if self.conSock:
1028                    self.conSock.send_dict_msg(msg)
1029            else:
1030                self.ring.rhsSock.send_dict_msg(msg)
1031        elif msg['cmd'] == 'mpdlistjobs_trailer':
1032            if msg['dest'] == self.myId:
1033                if self.conSock:
1034                    self.conSock.send_dict_msg(msg)
1035            else:
1036                for jobid in self.activeJobs.keys():
1037                    for manPid in self.activeJobs[jobid]:
1038                        msgToSend = { 'cmd' : 'mpdlistjobs_info',
1039                                      'dest' : msg['dest'],
1040                                      'jobid' : jobid,
1041                                      'username' : self.activeJobs[jobid][manPid]['username'],
1042                                      'host' : self.myHost,
1043                                      'ifhn' : self.myIfhn,
1044                                      'clipid' : str(self.activeJobs[jobid][manPid]['clipid']),
1045                                      'sid' : str(manPid),  # may chg to actual sid later
1046                                      'pgm' : self.activeJobs[jobid][manPid]['pgm'],
1047                                      'rank' : self.activeJobs[jobid][manPid]['rank'] }
1048                        self.ring.rhsSock.send_dict_msg(msgToSend)
1049                self.ring.rhsSock.send_dict_msg(msg)
1050        elif msg['cmd'] == 'mpdallexit':
1051            if self.allExiting:   # already seen this once
1052                self.exiting = 1  # set flag to exit main loop
1053            self.allExiting = 1
1054            self.ring.rhsSock.send_dict_msg(msg)
1055        elif msg['cmd'] == 'mpdexit':
1056            if msg['dest'] == self.myId:
1057                msg['done'] = 1    # do this first
1058            if msg['src'] == self.myId:    # may be src and dest
1059                if self.conSock:
1060                    if msg['done']:
1061                        self.conSock.send_dict_msg({'cmd' : 'mpdexit_ack'})
1062                    else:
1063                        self.conSock.send_dict_msg({'cmd' : 'mpdexit_failed'})
1064            else:
1065                self.ring.rhsSock.send_dict_msg(msg)
1066            if msg['dest'] == self.myId:
1067                self.exiting = 1
1068                self.ring.lhsSock.send_dict_msg( { 'cmd'     : 'mpdexiting',
1069                                                   'rhsifhn' : self.ring.rhsIfhn,
1070                                                   'rhsport' : self.ring.rhsPort })
1071        elif msg['cmd'] == 'mpdringtest':
1072            if msg['src'] != self.myId:
1073                self.ring.rhsSock.send_dict_msg(msg)
1074            else:
1075                numLoops = msg['numloops'] - 1
1076                if numLoops > 0:
1077                    msg['numloops'] = numLoops
1078                    self.ring.rhsSock.send_dict_msg(msg)
1079                else:
1080                    if self.conSock:    # may have closed it if user did ^C at console
1081                        self.conSock.send_dict_msg({'cmd' : 'mpdringtest_done' })
1082        elif msg['cmd'] == 'mpdsigjob':
1083            forwarded = 0
1084            if msg['handled']  and  msg['src'] != self.myId:
1085                self.ring.rhsSock.send_dict_msg(msg)
1086                forwarded = 1
1087            handledHere = 0
1088            for jobid in self.activeJobs.keys():
1089                sjobid = jobid.split('  ')  # jobnum and mpdid
1090                if (sjobid[0] == msg['jobnum']  and  sjobid[1] == msg['mpdid'])  \
1091                or (msg['jobalias']  and  sjobid[2] == msg['jobalias']):
1092                    for manPid in self.activeJobs[jobid].keys():
1093                        if self.activeJobs[jobid][manPid]['username'] == msg['username']  \
1094                        or msg['username'] == 'root':
1095                            manSock = self.activeJobs[jobid][manPid]['socktoman']
1096                            manSock.send_dict_msg( { 'cmd' : 'signal_to_handle',
1097                                                     's_or_g' : msg['s_or_g'],
1098                                                     'sigtype' : msg['sigtype'] } )
1099                            handledHere = 1
1100            if handledHere:
1101                msg['handled'] = 1
1102            if not forwarded  and  msg['src'] != self.myId:
1103                self.ring.rhsSock.send_dict_msg(msg)
1104            if msg['src'] == self.myId:
1105                if self.conSock:
1106                    self.conSock.send_dict_msg( {'cmd' : 'mpdsigjob_ack',
1107                                                 'handled' : msg['handled'] } )
1108        elif msg['cmd'] == 'mpdkilljob':
1109            forwarded = 0
1110            if msg['handled'] and msg['src'] != self.myId:
1111                self.ring.rhsSock.send_dict_msg(msg)
1112                forwarded = 1
1113            handledHere = 0
1114            for jobid in self.activeJobs.keys():
1115                sjobid = jobid.split('  ')  # jobnum and mpdid
1116                if (sjobid[0] == msg['jobnum']  and  sjobid[1] == msg['mpdid'])  \
1117                or (msg['jobalias']  and  sjobid[2] == msg['jobalias']):
1118                    for manPid in self.activeJobs[jobid].keys():
1119                        if self.activeJobs[jobid][manPid]['username'] == msg['username']  \
1120                        or msg['username'] == 'root':
1121                            try:
1122                                pgrp = manPid * (-1)  # neg manPid -> group
1123                                os.kill(pgrp,signal.SIGKILL)
1124                                cliPid = self.activeJobs[jobid][manPid]['clipid']
1125                                pgrp = cliPid * (-1)  # neg Pid -> group
1126                                os.kill(pgrp,signal.SIGKILL)  # neg Pid -> group
1127                                handledHere = 1
1128                            except:
1129                                pass
1130                    # del self.activeJobs[jobid]  ## handled when child goes away
1131            if handledHere:
1132                msg['handled'] = 1
1133            if not forwarded  and  msg['src'] != self.myId:
1134                self.ring.rhsSock.send_dict_msg(msg)
1135            if msg['src'] == self.myId:
1136                if self.conSock:
1137                    self.conSock.send_dict_msg( {'cmd' : 'mpdkilljob_ack',
1138                                                 'handled' : msg['handled'] } )
1139        elif msg['cmd'] == 'abortjob':
1140            if msg['src'] != self.myId:
1141                self.ring.rhsSock.send_dict_msg(msg)
1142            for jobid in self.activeJobs.keys():
1143                if jobid == msg['jobid']:
1144                    for manPid in self.activeJobs[jobid].keys():
1145                        manSocket = self.activeJobs[jobid][manPid]['socktoman']
1146                        if manSocket:
1147                            manSocket.send_dict_msg(msg)
1148                            sleep(0.5)  # give man a brief chance to deal with this
1149                        try:
1150                            pgrp = manPid * (-1)  # neg manPid -> group
1151                            os.kill(pgrp,signal.SIGKILL)
1152                            cliPid = self.activeJobs[jobid][manPid]['clipid']
1153                            pgrp = cliPid * (-1)  # neg Pid -> group
1154                            os.kill(pgrp,signal.SIGKILL)  # neg Pid -> group
1155                        except:
1156                            pass
1157                    # del self.activeJobs[jobid]  ## handled when child goes away
1158        elif msg['cmd'] == 'pulse':
1159            self.ring.lhsSock.send_dict_msg({'cmd':'pulse_ack'})
1160        elif msg['cmd'] == 'verify_hosts_in_ring':
1161            while self.myIfhn in msg['host_list']  or  self.myHost in msg['host_list']:
1162                if self.myIfhn in msg['host_list']:
1163                    msg['host_list'].remove(self.myIfhn)
1164                elif self.myHost in msg['host_list']:
1165                    msg['host_list'].remove(self.myHost)
1166            if msg['dest'] == self.myId:
1167                msgToSend = { 'cmd' : 'verify_hosts_in_ring_response',
1168                              'host_list' : msg['host_list'] }
1169                self.conSock.send_dict_msg(msgToSend)
1170            else:
1171                self.ring.rhsSock.send_dict_msg(msg)
1172        elif msg['cmd'] == 'pmi_lookup_name':
1173            if msg['src'] == self.myId:
1174                if msg.has_key('port') and msg['port'] != 0:
1175                    msgToSend = msg
1176                    msgToSend['cmd'] = 'lookup_result'
1177                    msgToSend['info'] = 'ok'
1178                else:
1179                    msgToSend = { 'cmd' : 'lookup_result', 'info' : 'unknown_service',
1180                                  'port' : 0}
1181                jobid = msg['jobid']
1182                manPid = msg['manpid']
1183                manSock = self.activeJobs[jobid][manPid]['socktoman']
1184                manSock.send_dict_msg(msgToSend)
1185            else:
1186                if self.pmi_published_names.has_key(msg['service']):
1187                    msg['port'] = self.pmi_published_names[msg['service']]
1188                self.ring.rhsSock.send_dict_msg(msg)
1189        elif msg['cmd'] == 'pmi_unpublish_name':
1190            if msg['src'] == self.myId:
1191                if msg.has_key('done'):
1192                    msgToSend = msg
1193                    msgToSend['cmd'] = 'unpublish_result'
1194                    msgToSend['info'] = 'ok'
1195                else:
1196                    msgToSend = { 'cmd' : 'unpublish_result', 'info' : 'unknown_service' }
1197                jobid = msg['jobid']
1198                manPid = msg['manpid']
1199                manSock = self.activeJobs[jobid][manPid]['socktoman']
1200                manSock.send_dict_msg(msgToSend)
1201            else:
1202                if self.pmi_published_names.has_key(msg['service']):
1203                    del self.pmi_published_names[msg['service']]
1204                    msg['done'] = 1
1205                self.ring.rhsSock.send_dict_msg(msg)
1206        elif msg['cmd'] == 'client_info':
1207            if msg['spawner_manpid']  and  msg['rank'] == 0:
1208                if msg['spawner_mpd'] == self.myId:
1209                    jobid = msg['jobid']
1210                    spawnerManPid = msg['spawner_manpid']
1211                    if self.activeJobs[jobid].has_key(spawnerManPid):
1212                        spawnerManSock = self.activeJobs[jobid][spawnerManPid]['socktoman']
1213                        msgToSend = { 'cmd' : 'spawn_done_by_mpd', 'rc' : 0, 'reason' : '' }
1214                        spawnerManSock.send_dict_msg(msgToSend)
1215                else:
1216                    self.ring.rhsSock.send_dict_msg(msg)
1217        else:
1218            mpd_print(1, 'unrecognized cmd from lhs: %s' % (msg) )
1219
1220    def handle_rhs_input(self,sock):
1221        if self.allExiting:
1222            return
1223        msg = sock.recv_dict_msg()
1224        if not msg:    # lost rhs; re-knit the ring
1225            if sock == self.ring.rhsSock:
1226                needToReenter = 1
1227            else:
1228                needToReenter = 0
1229            if sock == self.ring.rhsSock  and self.ring.lhsSock:
1230                self.streamHandler.del_handler(self.ring.lhsSock)
1231                self.ring.lhsSock.close()
1232                self.ring.lhsSock = 0
1233            if sock == self.ring.rhsSock  and self.ring.rhsSock:
1234                self.streamHandler.del_handler(self.ring.rhsSock)
1235                self.ring.rhsSock.close()
1236                self.ring.rhsSock = 0
1237            if needToReenter:
1238                mpd_print(1,'lost rhs; re-entering ring')
1239                rc = self.ring.reenter_ring(lhsHandler=self.handle_lhs_input,
1240                                            rhsHandler=self.handle_rhs_input,
1241                                            ntries=16)
1242                if rc == 0:
1243                    mpd_print(1,'back in ring')
1244		else:
1245                    mpd_print(1,'failed to reenter ring')
1246                    sys.exit(-1)
1247            return
1248        if msg['cmd'] == 'pulse_ack':
1249            self.pulse_cntr = 0
1250        elif msg['cmd'] == 'mpdexiting':    # for mpdexit
1251            if self.ring.rhsSock:
1252                self.streamHandler.del_handler(self.ring.rhsSock)
1253                self.ring.rhsSock.close()
1254                self.ring.rhsSock = 0
1255            # connect to new rhs
1256            self.ring.rhsIfhn = msg['rhsifhn']
1257            self.ring.rhsPort = int(msg['rhsport'])
1258            if self.ring.rhsIfhn == self.myIfhn  and  self.ring.rhsPort == self.parmdb['MPD_LISTEN_PORT']:
1259                rv = self.ring.connect_rhs(rhsHost=self.ring.rhsIfhn,
1260                                           rhsPort=self.ring.rhsPort,
1261                                           rhsHandler=self.handle_rhs_input,
1262                                           numTries=3)
1263                if rv[0] <=  0:  # connect did not succeed; may try again
1264                    mpd_print(1,"rhs connect failed")
1265                    sys.exit(-1)
1266                return
1267            self.ring.rhsSock = MPDSock(name='rhs')
1268            self.ring.rhsSock.connect((self.ring.rhsIfhn,self.ring.rhsPort))
1269            self.pulse_cntr = 0
1270            if not self.ring.rhsSock:
1271                mpd_print(1,'handle_rhs_input failed to obtain rhs socket')
1272                return
1273            msgToSend = { 'cmd' : 'request_to_enter_as_lhs', 'host' : self.myHost,
1274                          'ifhn' : self.myIfhn, 'port' : self.parmdb['MPD_LISTEN_PORT'] }
1275            self.ring.rhsSock.send_dict_msg(msgToSend)
1276            msg = self.ring.rhsSock.recv_dict_msg()
1277            if (not msg) or  \
1278               (not msg.has_key('cmd')) or  \
1279               (msg['cmd'] != 'challenge') or (not msg.has_key('randnum')):
1280                mpd_print(1, 'failed to recv challenge from rhs; msg=:%s:' % (msg) )
1281            response = md5new(''.join([self.parmdb['MPD_SECRETWORD'],
1282                                       msg['randnum']])).digest()
1283            msgToSend = { 'cmd' : 'challenge_response',
1284                          'response' : response,
1285                          'host' : self.myHost, 'ifhn' : self.myIfhn,
1286                          'port' : self.parmdb['MPD_LISTEN_PORT'] }
1287            self.ring.rhsSock.send_dict_msg(msgToSend)
1288            msg = self.ring.rhsSock.recv_dict_msg()
1289            if (not msg) or  \
1290               (not msg.has_key('cmd')) or  \
1291               (msg['cmd'] != 'OK_to_enter_as_lhs'):
1292                mpd_print(1, 'NOT OK to enter ring; msg=:%s:' % (msg) )
1293            self.streamHandler.set_handler(self.ring.rhsSock,self.handle_rhs_input)
1294        else:
1295            mpd_print(1, 'unexpected from rhs; msg=:%s:' % (msg) )
1296        return
1297
1298    def do_mpdrun(self,msg):
1299        if self.parmdb['MPD_LOGFILE_TRUNC_SZ'] >= 0:
1300            try:
1301                logSize = os.stat(self.logFilename)[stat.ST_SIZE]
1302                if logSize > self.parmdb['MPD_LOGFILE_TRUNC_SZ']:
1303                    self.logFile.truncate(self.parmdb['MPD_LOGFILE_TRUNC_SZ'])
1304            except:
1305                pass
1306
1307        if msg.has_key('jobid'):
1308            jobid = msg['jobid']
1309        else:
1310            jobid = str(self.nextJobInt) + '  ' + self.myId + '  ' + msg['jobalias']
1311            self.nextJobInt += 1
1312            msg['jobid'] = jobid
1313        if msg['nstarted'] >= msg['nprocs']:
1314            self.ring.rhsSock.send_dict_msg(msg)  # forward it on around
1315            return
1316        hosts = msg['hosts']
1317        if self.myIfhn in hosts.values():
1318            hostsKeys = hosts.keys()
1319            hostsKeys.sort()
1320            for ranks in hostsKeys:
1321                if hosts[ranks] == self.myIfhn:
1322                    (lorank,hirank) = ranks
1323                    for rank in range(lorank,hirank+1):
1324                        self.run_one_cli(rank,msg)
1325                        # we use myHost under the assumption that there is only
1326                        # one mpd per user on a given host.  The ifhn only
1327                        # affects how the MPDs communicate with each other, not
1328                        # which host they are on
1329                        msg['process_mapping'][rank] = self.myHost
1330                        msg['nstarted'] += 1
1331                        msg['nstarted_on_this_loop'] += 1
1332                    del msg['hosts'][ranks]
1333        elif '_any_from_pool_' in hosts.values():
1334            hostsKeys = hosts.keys()
1335            hostsKeys.sort()
1336            for ranks in hostsKeys:
1337                if hosts[ranks] == '_any_from_pool_':
1338                    (lorank,hirank) = ranks
1339                    hostSpecPool = msg['host_spec_pool']
1340                    if self.myIfhn in hostSpecPool  or  self.myHost in hostSpecPool:
1341                        self.run_one_cli(lorank,msg)
1342                        msg['process_mapping'][lorank] = self.myHost
1343                        msg['nstarted'] += 1
1344                        msg['nstarted_on_this_loop'] += 1
1345                        del msg['hosts'][ranks]
1346                        if lorank < hirank:
1347                            msg['hosts'][(lorank+1,hirank)] = '_any_from_pool_'
1348                    break
1349        elif '_any_' in hosts.values():
1350            done = 0
1351            while not done:
1352                hostsKeys = hosts.keys()
1353                hostsKeys.sort()
1354                for ranks in hostsKeys:
1355                    if hosts[ranks] == '_any_':
1356                        (lorank,hirank) = ranks
1357                        self.run_one_cli(lorank,msg)
1358                        msg['process_mapping'][lorank] = self.myHost
1359                        msg['nstarted'] += 1
1360                        msg['nstarted_on_this_loop'] += 1
1361                        del msg['hosts'][ranks]
1362                        if lorank < hirank:
1363                            msg['hosts'][(lorank+1,hirank)] = '_any_'
1364                        # self.activeJobs maps:
1365                        # { jobid => { mpdman_pid => {...} } }
1366                        procsHereForJob = len(self.activeJobs[jobid].keys())
1367                        if procsHereForJob >= self.parmdb['MPD_NCPUS']:
1368                            break  # out of for loop
1369                # if no more to start via any or enough started here
1370                if '_any_' not in hosts.values() \
1371                or procsHereForJob >= self.parmdb['MPD_NCPUS']:
1372                    done = 1
1373        if msg['first_loop']:
1374            msg['ringsize'] += 1
1375            msg['ring_ncpus'] += self.parmdb['MPD_NCPUS']
1376        self.ring.rhsSock.send_dict_msg(msg)  # forward it on around
1377    def run_one_cli(self,currRank,msg):
1378        users = msg['users']
1379        for ranks in users.keys():
1380            (lo,hi) = ranks
1381            if currRank >= lo  and  currRank <= hi:
1382                username = users[ranks]
1383                break
1384        execs = msg['execs']
1385        for ranks in execs.keys():
1386            (lo,hi) = ranks
1387            if currRank >= lo  and  currRank <= hi:
1388                pgm = execs[ranks]
1389                break
1390        paths = msg['paths']
1391        for ranks in paths.keys():
1392            (lo,hi) = ranks
1393            if currRank >= lo  and  currRank <= hi:
1394                pathForExec = paths[ranks]
1395                break
1396        args = msg['args']
1397        for ranks in args.keys():
1398            (lo,hi) = ranks
1399            if currRank >= lo  and  currRank <= hi:
1400                pgmArgs = dumps(args[ranks])
1401                break
1402        envvars = msg['envvars']
1403        for ranks in envvars.keys():
1404            (lo,hi) = ranks
1405            if currRank >= lo  and  currRank <= hi:
1406                pgmEnvVars = dumps(envvars[ranks])
1407                break
1408        limits = msg['limits']
1409        for ranks in limits.keys():
1410            (lo,hi) = ranks
1411            if currRank >= lo  and  currRank <= hi:
1412                pgmLimits = dumps(limits[ranks])
1413                break
1414        cwds = msg['cwds']
1415        for ranks in cwds.keys():
1416            (lo,hi) = ranks
1417            if currRank >= lo  and  currRank <= hi:
1418                cwd = cwds[ranks]
1419                break
1420        umasks = msg['umasks']
1421        for ranks in umasks.keys():
1422            (lo,hi) = ranks
1423            if currRank >= lo  and  currRank <= hi:
1424                pgmUmask = umasks[ranks]
1425                break
1426        man_env = {}
1427        if msg['ifhns'].has_key(currRank):
1428            man_env['MPICH_INTERFACE_HOSTNAME'] = msg['ifhns'][currRank]
1429        else:
1430            man_env['MPICH_INTERFACE_HOSTNAME'] = self.myIfhn
1431        man_env.update(os.environ)    # may only want to mov non-MPD_ stuff
1432        man_env['MPDMAN_MYHOST'] = self.myHost
1433        man_env['MPDMAN_MYIFHN'] = self.myIfhn
1434        man_env['MPDMAN_JOBID'] = msg['jobid']
1435        man_env['MPDMAN_CLI_PGM'] = pgm
1436        man_env['MPDMAN_CLI_PATH'] = pathForExec
1437        man_env['MPDMAN_PGM_ARGS'] = pgmArgs
1438        man_env['MPDMAN_PGM_ENVVARS'] = pgmEnvVars
1439        man_env['MPDMAN_PGM_LIMITS'] = pgmLimits
1440        man_env['MPDMAN_CWD'] = cwd
1441        man_env['MPDMAN_UMASK'] = pgmUmask
1442        man_env['MPDMAN_SPAWNED'] = str(msg['spawned'])
1443        if msg.has_key('spawner_manpid'):
1444            man_env['MPDMAN_SPAWNER_MANPID'] = str(msg['spawner_manpid'])
1445        else:
1446            man_env['MPDMAN_SPAWNER_MANPID'] = '0'
1447        if msg.has_key('spawner_mpd'):
1448            man_env['MPDMAN_SPAWNER_MPD'] = msg['spawner_mpd']
1449        else:
1450            man_env['MPDMAN_SPAWNER_MPD'] = ''
1451        man_env['MPDMAN_NPROCS'] = str(msg['nprocs'])
1452        man_env['MPDMAN_MPD_LISTEN_PORT'] = str(self.parmdb['MPD_LISTEN_PORT'])
1453        man_env['MPDMAN_MPD_CONF_SECRETWORD'] = self.parmdb['MPD_SECRETWORD']
1454        man_env['MPDMAN_CONHOST'] = msg['conhost']
1455        man_env['MPDMAN_CONIFHN'] = msg['conifhn']
1456        man_env['MPDMAN_CONPORT'] = str(msg['conport'])
1457        man_env['MPDMAN_RANK'] = str(currRank)
1458        man_env['MPDMAN_POS_IN_RING'] = str(msg['nstarted'])
1459        man_env['MPDMAN_STDIN_DEST'] = msg['stdin_dest']
1460        man_env['MPDMAN_TOTALVIEW'] = str(msg['totalview'])
1461        man_env['MPDMAN_GDB'] = str(msg['gdb'])
1462        man_env['MPDMAN_GDBA'] = str(msg['gdba'])  # for attach to running pgm
1463        fullDirName = os.path.abspath(os.path.split(sys.argv[0])[0])  # normalize
1464        man_env['MPDMAN_FULLPATHDIR'] = fullDirName    # used to find gdbdrv
1465        man_env['MPDMAN_SINGINIT_PID']  = str(msg['singinitpid'])
1466        man_env['MPDMAN_SINGINIT_PORT'] = str(msg['singinitport'])
1467        man_env['MPDMAN_LINE_LABELS_FMT'] = msg['line_labels']
1468        if msg.has_key('rship'):
1469            man_env['MPDMAN_RSHIP'] = msg['rship']
1470            man_env['MPDMAN_MSHIP_HOST'] = msg['mship_host']
1471            man_env['MPDMAN_MSHIP_PORT'] = str(msg['mship_port'])
1472        if msg.has_key('doing_bnr'):
1473            man_env['MPDMAN_DOING_BNR'] = '1'
1474        else:
1475            man_env['MPDMAN_DOING_BNR'] = '0'
1476        if msg['nstarted'] == 0:
1477            manKVSTemplate = '%s_%s_%d' % \
1478                             (self.myHost,self.parmdb['MPD_LISTEN_PORT'],self.kvs_cntr)
1479            manKVSTemplate = sub('\.','_',manKVSTemplate)  # chg magpie.cs to magpie_cs
1480            manKVSTemplate = sub('\-','_',manKVSTemplate)  # chg node-0 to node_0
1481            self.kvs_cntr += 1
1482            msg['kvs_template'] = manKVSTemplate
1483        man_env['MPDMAN_KVS_TEMPLATE'] = msg['kvs_template']
1484	msg['username'] = username
1485        if hasattr(os,'fork'):
1486            (manPid,toManSock) = self.launch_mpdman_via_fork(msg,man_env)
1487            if not manPid:
1488                print '**** mpd: launch_client_via_fork_exec failed; exiting'
1489        elif subprocess_module_available:
1490            (manPid,toManSock) = self.launch_mpdman_via_subprocess(msg,man_env)
1491        else:
1492            mpd_print(1,'neither fork nor subprocess is available')
1493            sys.exit(-1)
1494        jobid = msg['jobid']
1495        if not self.activeJobs.has_key(jobid):
1496            self.activeJobs[jobid] = {}
1497        self.activeJobs[jobid][manPid] = { 'pgm' : pgm, 'rank' : currRank,
1498                                           'username' : username,
1499                                           'clipid' : -1,    # until report by man
1500                                           'socktoman' : toManSock }
1501        mpd_print(mpd_dbg_level,"Created entry for %s %d" % (str(jobid),manPid) )
1502    def launch_mpdman_via_fork(self,msg,man_env):
1503        man_env['MPDMAN_HOW_LAUNCHED'] = 'FORK'
1504        currRank = int(man_env['MPDMAN_RANK'])
1505        manListenSock = MPDListenSock('',0,name='tempsock')
1506        manListenPort = manListenSock.getsockname()[1]
1507        if msg['nstarted'] == 0:
1508            manEntryIfhn = ''
1509            manEntryPort = 0
1510            msg['pos0_host'] = self.myHost
1511            msg['pos0_ifhn'] = self.myIfhn
1512            msg['pos0_port'] = str(manListenPort)
1513            man_env['MPDMAN_POS0_IFHN'] = self.myIfhn
1514            man_env['MPDMAN_POS0_PORT'] = str(manListenPort)
1515        else:
1516            manEntryIfhn = msg['entry_ifhn']
1517            manEntryPort = msg['entry_port']
1518            man_env['MPDMAN_POS0_IFHN'] = msg['pos0_ifhn']
1519            man_env['MPDMAN_POS0_PORT'] = msg['pos0_port']
1520        man_env['MPDMAN_LHS_IFHN']  = manEntryIfhn
1521        man_env['MPDMAN_LHS_PORT'] = str(manEntryPort)
1522        man_env['MPDMAN_MY_LISTEN_FD'] = str(manListenSock.fileno())
1523        man_env['MPDMAN_MY_LISTEN_PORT'] = str(manListenPort)
1524        mpd_print(mpd_dbg_level,"About to get sockpair for mpdman")
1525        (toManSock,toMpdSock) = mpd_sockpair()
1526        mpd_print(mpd_dbg_level,"Found sockpair (%d,%d) for mpdman" % \
1527                                (toManSock.fileno(), toMpdSock.fileno()) )
1528        toManSock.name = 'to_man'
1529        toMpdSock.name = 'to_mpd'  ## to be used by mpdman below
1530        man_env['MPDMAN_TO_MPD_FD'] = str(toMpdSock.fileno())
1531        self.streamHandler.set_handler(toManSock,self.handle_man_input)
1532        msg['entry_host'] = self.myHost
1533        msg['entry_ifhn'] = self.myIfhn
1534        msg['entry_port'] = manListenPort
1535        maxTries = 6
1536        numTries = 0
1537        while numTries < maxTries:
1538            try:
1539                manPid = os.fork()
1540                errinfo = 0
1541            except OSError, errinfo:
1542                pass  ## could check for errinfo.errno == 35 (resource unavailable)
1543            if errinfo:
1544                sleep(1)
1545                numTries += 1
1546            else:
1547                break
1548        if numTries >= maxTries:
1549            return (0,0)
1550        if manPid == 0:
1551            self.conListenSock = 0    # don't want to clean up console if I am manager
1552            self.myId = '%s_man_%d' % (self.myHost,self.myPid)
1553            mpd_set_my_id(self.myId)
1554            self.streamHandler.close_all_active_streams()
1555            os.setpgrp()
1556            os.environ = man_env
1557            if hasattr(os,'getuid')  and  os.getuid() == 0  and  pwd_module_available:
1558		username = msg['username']
1559                try:
1560                    pwent = pwd.getpwnam(username)
1561                except:
1562                    mpd_print(1,'invalid username :%s: on %s' % (username,self.myHost))
1563                    msgToSend = {'cmd' : 'job_failed', 'reason' : 'invalid_username',
1564                                 'username' : username, 'host' : self.myHost }
1565                    self.conSock.send_dict_msg(msgToSend)
1566                    return
1567                uid = pwent[2]
1568                gid = pwent[3]
1569                os.setgroups(mpd_get_groups_for_username(username))
1570                os.setregid(gid,gid)
1571                try:
1572                    os.setreuid(uid,uid)
1573                except OSError, errmsg1:
1574                    try:
1575                        os.setuid(uid)
1576                    except OSError, errmsg2:
1577                        mpd_print(1,"unable to perform setreuid or setuid")
1578                        sys.exit(-1)
1579            import atexit    # need to use full name of _exithandlers
1580            atexit._exithandlers = []    # un-register handlers in atexit module
1581            # import profile
1582            # print 'profiling the manager'
1583            # profile.run('mpdman()')
1584            mpdman = MPDMan()
1585            mpdman.run()
1586            sys.exit(0)  # do NOT do cleanup (eliminated atexit handlers above)
1587        # After the fork, if we're the parent, close the other side of the
1588        # mpdpair sockets, as well as the listener socket
1589        manListenSock.close()
1590        toMpdSock.close()
1591        return (manPid,toManSock)
1592    def launch_mpdman_via_subprocess(self,msg,man_env):
1593        man_env['MPDMAN_HOW_LAUNCHED'] = 'SUBPROCESS'
1594        currRank = int(man_env['MPDMAN_RANK'])
1595        if msg['nstarted'] == 0:
1596            manEntryIfhn = ''
1597            manEntryPort = 0
1598        else:
1599            manEntryIfhn = msg['entry_ifhn']
1600            manEntryPort = msg['entry_port']
1601            man_env['MPDMAN_POS0_IFHN'] = msg['pos0_ifhn']
1602            man_env['MPDMAN_POS0_PORT'] = msg['pos0_port']
1603        man_env['MPDMAN_LHS_IFHN']  = manEntryIfhn
1604        man_env['MPDMAN_LHS_PORT'] = str(manEntryPort)
1605        tempListenSock = MPDListenSock()
1606        man_env['MPDMAN_MPD_PORT'] = str(tempListenSock.getsockname()[1])
1607        # python_executable = '\Python24\python.exe'
1608        python_executable = 'python2.4'
1609        fullDirName = man_env['MPDMAN_FULLPATHDIR']
1610        manCmd = os.path.join(fullDirName,'mpdman.py')
1611        runner = subprocess.Popen([python_executable,'-u',manCmd],  # only one 'python' arg
1612                                  bufsize=0,
1613                                  env=man_env,
1614                                  close_fds=False)
1615                                  ### stdin=subprocess.PIPE,stdout=subprocess.PIPE,
1616                                  ### stderr=subprocess.PIPE)
1617        manPid = runner.pid
1618        oldTimeout = socket.getdefaulttimeout()
1619        socket.setdefaulttimeout(8)
1620        try:
1621            (toManSock,toManAddr) = tempListenSock.accept()
1622        except Exception, errmsg:
1623            toManSock = 0
1624        socket.setdefaulttimeout(oldTimeout)
1625        tempListenSock.close()
1626        if not toManSock:
1627            mpd_print(1,'failed to recv msg from launched man')
1628            return (0,0)
1629        msgFromMan = toManSock.recv_dict_msg()
1630        if not msgFromMan  or  not msgFromMan.has_key('man_listen_port'):
1631            toManSock.close()
1632            mpd_print(1,'invalid msg from launched man')
1633            return (0,0)
1634        manListenPort = msgFromMan['man_listen_port']
1635        if currRank == 0:
1636            msg['pos0_host'] = self.myHost
1637            msg['pos0_ifhn'] = self.myIfhn
1638            msg['pos0_port'] = str(manListenPort)
1639        msg['entry_host'] = self.myHost
1640        msg['entry_ifhn'] = self.myIfhn
1641        msg['entry_port'] = manListenPort
1642        return (manPid,toManSock)
1643
1644# code for testing
1645if __name__ == '__main__':
1646    mpd = MPD()
1647    mpd.run()
1648