1#!/usr/bin/env python
3#   (C) 2001 by Argonne National Laboratory.
4#       See COPYRIGHT in top-level directory.
7# workaround to suppress deprecated module warnings in python2.6
8# see https://trac.mcs.anl.gov/projects/mpich2/ticket/362 for tracking
9import warnings
10warnings.filterwarnings('ignore', '.*the md5 module is deprecated.*', DeprecationWarning)
11warnings.filterwarnings('ignore', '.*the popen2 module is deprecated.*', DeprecationWarning)
13import sys, os, signal, popen2, socket, select, inspect
15from  cPickle   import  dumps, loads
16from  types     import  TupleType
17from  traceback import  extract_tb, extract_stack, format_list
18from  re        import  sub, split
20from  md5       import  new as md5new
21from  time      import  sleep
22from  random    import  randrange, random
25    import pwd
26    pwd_module_available = 1
28    pwd_module_available = 0
30    import grp
31    grp_module_available = 1
33    grp_module_available = 0
35    import  syslog
36    syslog_module_available = 1
38    syslog_module_available = 0
40    import subprocess
41    subprocess_module_available = 1
43    subprocess_module_available = 0
46# some global vars for some utilities
47global mpd_my_id, mpd_signum, mpd_my_hostname, mpd_procedures_to_trace
48global mpd_cli_app  # for debug during mpich nightly tests
49global mpd_tmpdir
50mpd_cli_app = ''
51mpd_my_id = ''
52mpd_procedures_to_trace = []
53mpd_my_hostname = ''
54# mpd_signum can be set by mpd_handle_signal to indicate which signal was recently caught;
55# this can be useful below to pop out of loops that ordinarily continue after sigs
56# NOTE: mpd_handle_signal must be called by the user, e.g. in his own signal handler
57mpd_signum = 0
58mpd_zc = 0
59if os.environ.has_key('MPD_TMPDIR'):
60    mpd_tmpdir = os.environ['MPD_TMPDIR']
62    mpd_tmpdir = '/tmp'
64# For easier debugging, we provide this variable that is used in the
65# mpd_print calls.  This makes it a little easier to debug problems involving
66# communication with other processes, such as handling EINTR from signals.
67global mpd_dbg_level
68mpd_dbg_level = 0
70def mpd_set_dbg_level(flag):
71    global mpd_dbg_level
72    mpd_dbg_level = flag
74def mpd_set_my_id(myid=''):
75    global mpd_my_id
76    mpd_my_id = myid
78def mpd_set_tmpdir(tmpdir):
79    global mpd_tmpdir
80    mpd_tmpdir = tmpdir
82def mpd_get_my_id():
83    global mpd_my_id
84    return(mpd_my_id)
86def mpd_set_cli_app(app):    # for debug during mpich nightly tests
87    global mpd_cli_app
88    mpd_cli_app = app
90def mpd_handle_signal(signum,frame):
91    global mpd_signum
92    mpd_signum = signum
94def mpd_print(*args):
95    global mpd_my_id
96    if not args[0]:
97        return
98    stack = extract_stack()
99    callingProc = stack[-2][2]
100    callingLine = stack[-2][1]
101    printLine = '%s (%s %d): ' % (mpd_my_id,callingProc,callingLine)
102    for arg in args[1:]:
103        printLine = printLine + str(arg)
104    # We've seen an EINTR on the flush here
105    while 1:
106        try:
107            print printLine
108            break
109        except os.error, errinfo:
110            if errinfo[0] != EINTR:
111                raise os.error, errinfo
112    # end of while
113    while 1:
114        try:
115            sys.stdout.flush()
116            break
117        except os.error, errinfo:
118	    if errinfo[0] != EINTR:
119                raise os.error, errinfo
120    # end of while
121    if syslog_module_available:
122        syslog.syslog(syslog.LOG_INFO,printLine)
124def mpd_print_tb(*args):
125    global mpd_my_id
126    if not args[0]:
127        return
128    stack = extract_stack()
129    callingProc = stack[-2][2]
130    callingLine = stack[-2][1]
131    stack = extract_stack()
132    stack.reverse()
133    stack = stack[1:]
134    printLine = '%s (%s %d):' % (mpd_my_id,callingProc,callingLine)
135    for arg in args[1:]:
136        printLine = printLine + str(arg)
137    printLine += '\n  mpdtb:\n'
138    for line in format_list(stack):
139        line = sub(r'\n.*','',line)
140        splitLine = split(',',line)
141        splitLine[0] = sub('  File "(.*)"',lambda mo: mo.group(1),splitLine[0])
142        splitLine[1] = sub(' line ','',splitLine[1])
143        splitLine[2] = sub(' in ','',splitLine[2])
144        printLine = printLine + '    %s,  %s,  %s\n' % tuple(splitLine)
145    if mpd_cli_app:    # debug mpich apps in nightly tests
146        printLine += '    mpd_cli_app=%s\n' % (mpd_cli_app)
147        printLine += '    cwd=%s' % (os.getcwd())
148    print printLine
149    sys.stdout.flush()
150    if syslog_module_available:
151        syslog.syslog(syslog.LOG_INFO,printLine)
153def mpd_uncaught_except_tb(arg1,arg2,arg3):
154    global mpd_my_id
155    global mpd_cli_id
156    if mpd_my_id:
157        errstr = '%s: ' % (mpd_my_id)
158    else:
159        errstr = ''
160    errstr += 'mpd_uncaught_except_tb handling:\n'
161    errstr += '  %s: %s\n' % (arg1,arg2)
162    tb = extract_tb(arg3)
163    tb.reverse()
164    for tup in tb:
165        # errstr += '    file %s  line# %i  procedure %s\n        %s\n' % (tup)
166        errstr += '    %s  %i  %s\n        %s\n' % (tup)
167    if mpd_cli_app:    # debug mpich apps in nightly tests
168        errstr += '    mpd_cli_app=%s\n' % (mpd_cli_app)
169        errstr += '    cwd=%s' % (os.getcwd())
170    print errstr,
171    if syslog_module_available:
172        syslog.syslog(syslog.LOG_ERR, errstr)
174def mpd_set_procedures_to_trace(procs):
175    global mpd_procedures_to_trace
176    mpd_procedures_to_trace = procs
178def mpd_trace_calls(frame,event,args):
179    global mpd_my_id, mpd_procedures_to_trace
180    if frame.f_code.co_name not in mpd_procedures_to_trace:
181        return None
182    args_info = apply(inspect.formatargvalues,inspect.getargvalues(frame))
183    # Be VERY careful here; under AIX, it looked like EINTR is
184    # possible within print (!).
185    while (1):
186        try:
187            print '%s: ENTER %s in %s at line %d; ARGS=%s' % \
188          (mpd_my_id,frame.f_code.co_name,frame.f_code.co_filename,frame.f_lineno,args_info)
189            break
190        except os.error, errinfo:
191            if errinfo[0] != EINTR:
192                raise os.error, errinfo
193    # end of while
194    return mpd_trace_returns
196def mpd_trace_returns(frame,event,args):
197    global mpd_my_id
198    if event == 'return':
199        # Be VERY careful here; under AIX, it looked like EINTR is
200        # possible within print (!).
201        while (1):
202            try:
203                print '%s: EXIT %s at line %d ' % (mpd_my_id,frame.f_code.co_name,frame.f_lineno)
204                break
205            except os.error, errinfo:
206                if errinfo[0] != EINTR:
207                    raise os.error, errinfo
208        # end of while
209        return None
210    else:
211        return mpd_trace_returns
213def mpd_sockpair():
214    sock1 = MPDSock()
215    rc = sock1.sock.bind(('',0))
216    rc = sock1.sock.listen(5)
217    port1 = sock1.sock.getsockname()[1]
218    sock2 = MPDSock()
219    #
220    # We have encountered situations where the connection fails; as this is
221    # a connection to this process, we retry a few times in that case
222    # (seen on AIX)
223    #
224    try:
225        connAttempts = 0
226        while (1):
227            try:
228                rc = sock2.sock.connect(('localhost',port1))
229                break
230            except socket.error, errinfo:
231                # In some cases, connect will return EINTR and then on the
232                # next iteration, returns EISCONN.
233                if errinfo[0] == EISCONN:
234                    break
235                if errinfo[0] == ECONNREFUSED and connAttempts < 10:
236                    mpd_print(mpd_dbg_level,"Retrying on connection refused")
237                    connAttempts += 1
238                    sleep(random())
239                elif errinfo[0] != EINTR:
240                    mpd_print(1,"connect %d %s" % (errinfo[0],errinfo[1]))
241                    raise socket.error, errinfo
242	# End of the while
243    except socket.error, errinfo:
244        # we have seen at least one machine that needs it this way
245        # We've seen a failure here; it could be EINPROGRESS, EALREADY,
246        # or EADDRINUSE.  In that case, we may need to do something else
247	mpd_print(1,"connect error with %d %s" % (errinfo[0],errinfo[1]))
248        # Should this only attempt on ECONNREFUSED, ENETUNREACH, EADDRNOTAVAIL
249        # FIXME: Does this need a try/except?
250        while 1:
251            try:
252                rc = sock2.sock.connect(('',port1))
253                break
254            except socket.error, errinfo:
255                if errinfo[0] == EISCONN:
256                    break
257                elif errinfo[0] != EINTR:
258                    mpd_print(1,"connect %d %s" % (errinfo[0],errinfo[1]))
259                    raise socket.error, errinfo
260        # end of while
261    # Accept can fail on EINTR, so we handle that here
262    while (1):
263        try:
264            (sock3,addr) = sock1.sock.accept()
265            break
266        except socket.error, errinfo:
267            if errinfo[0] != EINTR:
268                mpd_print(1,"connect %d %s" % (errinfo[0],errinfo[1]))
269                raise socket.error, errinfo
270    # end of while
271    sock3 = MPDSock(sock=sock3)
272    sock1.close()
273    return (sock2,sock3)
275def mpd_which(execName,user_path=None):
276    if not user_path:
277        if os.environ.has_key('PATH'):
278            user_path = os.environ['PATH']
279        else:
280            return ''
281    for d in user_path.split(os.pathsep):
282        fpn = os.path.join(d,execName)
283        if os.path.isdir(fpn):  # follows symlinks; dirs can have execute permission
284            continue
285        if os.access(fpn,os.X_OK):    # NOTE access works based on real uid (not euid)
286            return fpn
287    return ''
289def mpd_check_python_version():
290    # version_info: (major,minor,micro,releaselevel,serial)
291    if (sys.version_info[0] < 2)  or  \
292       (sys.version_info[0] == 2 and sys.version_info[1] < 2):
293        return sys.version_info
294    return 0
296def mpd_version():
297    return (1,0,1,'July, 2006 release')  # major, minor, micro, special
299def mpd_get_my_username():
300    if pwd_module_available:
301        username = pwd.getpwuid(os.getuid())[0]    # favor this over env
302    elif os.environ.has_key('USER'):
303        username = os.environ['USER']
304    elif os.environ.has_key('USERNAME'):
305        username = os.environ['USERNAME']
306    else:
307        username = 'unknown_username'
308    return username
310def mpd_get_ranks_in_binary_tree(myRank,nprocs):
311    if myRank == 0:
312        parent = -1;
313    else:
314        parent = (myRank - 1) / 2;
315    lchild = (myRank * 2) + 1
316    if lchild > (nprocs - 1):
317        lchild = -1;
318    rchild = (myRank * 2) + 2
319    if rchild > (nprocs - 1):
320        rchild = -1;
321    return (parent,lchild,rchild)
323def mpd_same_ips(host1,host2):    # hosts may be names or IPs
324    try:
325        ips1 = socket.gethostbyname_ex(host1)[2]    # may fail if invalid host
326        ips2 = socket.gethostbyname_ex(host2)[2]    # may fail if invalid host
327    except:
328        return 0
329    for ip1 in ips1:
330        for ip2 in ips2:
331            if ip1 == ip2:
332                return 1
333    return 0
335def mpd_read_nbytes(fd,nbytes):
336    global mpd_signum
337    rv = 0
338    while 1:
339        try:
340            mpd_signum = 0
341            rv = os.read(fd,nbytes)
342            break
343        except os.error, errinfo:
344            if errinfo[0] == EINTR:
345                if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
346                    break
347                else:
348                    continue
349            elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
350                break
351            else:
352                mpd_print(1, 'read error: %s' % os.strerror(errinfo[0]))
353                break
354        except KeyboardInterrupt, errinfo:
355            break
356        except Exception, errinfo:
357            mpd_print(1, 'other error after read %s :%s:' % ( errinfo.__class__, errinfo) )
358            break
359    return rv
361def mpd_get_groups_for_username(username):
362    if pwd_module_available  and  grp_module_available:
363        userGroups = [pwd.getpwnam(username)[3]]  # default group for the user
364        allGroups = grp.getgrall();
365        for group in allGroups:
366            if username in group[3]  and  group[2] not in userGroups:
367                userGroups.append(group[2])
368    else:
369        userGroups = []
370    return userGroups
373class MPDSock(object):
374    def __init__(self,family=socket.AF_INET,socktype=socket.SOCK_STREAM,proto=0,
375                 sock=None,name=''):
376        if sock:
377            self.sock = sock
378        else:
379            self.sock = socket.socket(family=family,type=socktype,proto=proto)
380        self.name = name
381        self.type = socktype
382        self.family = family
383        ## used this when inherited from socket.socket (only works with py 2.3+)
384        ## socket.socket.__init__(self,family=family,type=socktype,proto=proto,_sock=sock)
385    def close(self):
386        self.sock.close()
387    def sendall(self,data):
388        self.sock.sendall(data)
389    def getsockname(self):
390        return self.sock.getsockname()
391    def fileno(self):
392        return self.sock.fileno()
394    def connect(self,*args):
395        # We handle EINTR in this method, unless it appears that a
396        # SIGINT or SIGALRM are delivered.  In that case, we do not
397        # complete the connection (FIXME: make sure that all uses of this
398        # do the right thing in that case).
399        while 1:
400            try:
401                mpd_signum = 0
402                self.sock.connect(*args)
403                break
404            except socket.error, errinfo:
405                if errinfo[0] == EINTR:   # sigchld, sigint, etc.
406                    if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
407                        break
408                    else:
409                        continue
410                else:
411                    raise socket.error, errinfo
412        # end of while
414    def accept(self,name='accepter'):
415        global mpd_signum
416        newsock = 0
417        newaddr = 0
418        while 1:
419            try:
420                mpd_signum = 0
421                (newsock,newaddr) = self.sock.accept()
422                break
423            except socket.error, errinfo:
424                if errinfo[0] == EINTR:   # sigchld, sigint, etc.
425                    if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
426                        break
427                    else:
428                        continue
429                elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
430                    break
431                else:
432                    print '%s: accept error: %s' % (mpd_my_id,os.strerror(errinfo[0]))
433                    break
434            except Exception, errinfo:
435                print '%s: failure doing accept : %s : %s' % \
436                      (mpd_my_id,errinfo.__class__,errinfo)
437                break
438        if newsock:
439            newsock = MPDSock(sock=newsock,name=name)    # turn new socket into an MPDSock
440        return (newsock,newaddr)
441    def recv(self,nbytes):
442        global mpd_signum
443        data = 0
444        while 1:
445            try:
446                mpd_signum = 0
447                data = self.sock.recv(nbytes)
448                break
449            except socket.error, errinfo:
450                if errinfo[0] == EINTR:   # sigchld, sigint, etc.
451                    if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
452                        break
453                    else:
454                        continue
455                elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
456                    break
457                else:
458                    print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0]))
459                    break
460            except Exception, errinfo:
461                print '%s: failure doing recv %s :%s:' % \
462                      (mpd_my_id,errinfo.__class__,errinfo)
463                break
464        return data
465    def recv_dict_msg(self,timeout=None):
466        global mpd_signum
467        global mpd_dbg_level
469        mpd_print(mpd_dbg_level, \
470                  "Entering recv_dict_msg with timeout=%s" % (str(timeout)))
471        msg = {}
472        readyToRecv = 0
473        if timeout:
474            try:
475		# Loop while we get EINTR.
476                # FIXME: In some cases, we may want to exit if
477	        # the signal was SIGINT.  We need to restart if
478                # we see SIGCLD
479                while 1:
480                    try:
481		        mpd_signum = 0
482                        if timeout == -1:
483                            # use -1 to indicate indefinite timeout
484                            (readyToRecv,unused1,unused2) = select.select([self.sock],[],[])
485                        else:
486                            (readyToRecv,unused1,unused2) = select.select([self.sock],[],[],timeout)
487                        break;
488                    except os.error, errinfo:
489                        if errinfo[0] == EINTR:
490                            # Retry interrupted system calls
491                            pass
492                        else:
493                            raise os.error, errinfo
494                # End of the while(1)
495            except select.error, errinfo:
496                if errinfo[0] == EINTR:
497                    if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
498                        mpd_print(0,"sigint/alrm check");
499                        pass   # assume timedout; returns {} below
500                    elif mpd_signum == signal.SIGCLD:
501                        mpd_print_tb(1,"mishandling sigchild in recv_dict_msg, errinfo=:%s" % (errinfo) )
502                    else:
503                        mpd_print_tb(1,"Unhandled EINTR: errinfo=%s" % (errinfo) )
504                else:
505                    mpd_print(1, '%s: select error: %s' % (mpd_my_id,os.strerror(errinfo[0])))
506            except KeyboardInterrupt, errinfo:
507                # print 'recv_dict_msg: keyboard interrupt during select'
508                mpd_print(0,"KeyboardInterrupt");
509                return msg
510            except Exception, errinfo:
511                mpd_print(1, 'recv_dict_msg: exception during select %s :%s:' % \
512                      ( errinfo.__class__, errinfo))
513                return msg
514        else:
515            readyToRecv = 1
516        if readyToRecv:
517            mpd_print(mpd_dbg_level,"readyToRecv");
518            try:
519                pickledLen = ''
520                tempRecvd = ''
521                lenLeft = 8
522                while lenLeft:
523                    while (1):
524                        try:
525                            tempRecvd = self.sock.recv(lenLeft)
526                            # FIXME: Shouldn't this block until there is a
527                            # message unless it raises an exception.
528                            # Is no message an EOF, and in that case,
529                            # do we really want to immediately delete
530                            # the corresponding entry?
531                            #if not pickledLen:
532                            #    mpd_print(1,"continuing because recv failed")
533                            #    continue
534                            break
535                        except socket.error,errinfo:
536                            if errinfo[0] == EINTR:
537                                mpd_print(mpd_dbg_level,"Saw EINTR")
538                                pass
539                            elif errinfo[0] == ECONNRESET:
540                                mpd_print(mpd_dbg_level,"Saw ECONNRESET, ignore (return null msg)")
541                                return msg;
542                            else:
543                                mpd_print_tb(1,"recv_dict_msg: sock.recv(8): errinfo=:%s:" % (errinfo))
544                                raise socket.error,errinfo
545                    # end of while(1)
546                    if not tempRecvd:
547                         break
548                    pickledLen += tempRecvd
549                    lenLeft -= len(tempRecvd)
550                if not pickledLen:
551                    mpd_print(mpd_dbg_level,"no pickeled len")
552                if pickledLen:
553                    pickledLen = int(pickledLen)
554                    pickledMsg = ''
555                    lenLeft = pickledLen
556                    while lenLeft:
557                        while (1):
558                            try:
559                                recvdMsg = self.sock.recv(lenLeft)
560                                break
561                            except socket.error,errinfo:
562                                if errinfo[0] == EINTR:
563                                    pass
564                                else:
565                                    mpd_print_tb(1,"recv_dict_msg: sock.recv(8): errinfo=:%s:" % (errinfo))
566                                    raise socket.error,errinfo
567                        # end of while(1)
569                        pickledMsg += recvdMsg
570                        lenLeft -= len(recvdMsg)
571                    msg = loads(pickledMsg)
572            except socket.error, errinfo:
573                if errinfo[0] == EINTR:
574                    mpd_print(1, "Unhandled EINTR on sock.recv")
575                    return msg
576                elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
577                    mpd_print(mpd_dbg_level,"Connection reset")
578                    pass   # socket.error: (104, 'Connection reset by peer')
579                else:
580                    mpd_print_tb(1,'recv_dict_msg: socket error: errinfo=:%s:' % (errinfo))
581            except StandardError, errmsg:    # any built-in exceptions
582                mpd_print_tb(1, 'recv_dict_msg: errmsg=:%s:' % (errmsg) )
583            except Exception, errmsg:
584                mpd_print_tb(1, 'recv_dict_msg failed on sock %s errmsg=:%s:' % \
585                             (self.name,errmsg) )
586        if mpd_dbg_level:
587            if msg:
588                mpd_print(1,"Returning with non-null msg, length = %d, head = %s" % (pickledLen,pickledMsg[0:32].replace('\n','<NL>') ) )
589	    else:
590                mpd_print(1,"Returning with null msg" )
591        return msg
592    def recv_char_msg(self):
593        return self.recv_one_line()  # use leading len later
594    def recv_one_line(self):
595        msg = ''
596	# A failure with EINTR was observed here, so a loop to retry on
597        # EINTR has been added
598        try:
599            while 1:
600                try:
601                    c = self.sock.recv(1)
602                    break
603                except socket.error, errinfo:
604                    if errinfo[0] != EINTR:
605                        raise socket.error, errinfo
606            # end of while
607        except socket.error, errinfo:
608            if errinfo[0] == EINTR:   # sigchld, sigint, etc.
609                # This should no longer happen (handled above)
610                mpd_print_tb( 1,  "Unhandled EINTR in sock.recv" );
611                return msg
612            elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
613                return msg
614            else:
615                print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0]))
616                sys.exit(-1)
617        except Exception, errmsg:
618            c = ''
619            msg = ''
620            mpd_print_tb(1, 'recv_char_msg: errmsg=:%s:' % (errmsg) )
621        if c:
622            while c != '\n':
623                msg += c
624                try:
625                    c = self.sock.recv(1)
626                except socket.error, errinfo:
627                    if errinfo[0] == EINTR:   # sigchld, sigint, etc.
628                        return msg
629                    elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
630                        return msg
631                    else:
632                        print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0]))
633                        sys.exit(-1)
634                except Exception, errmsg:
635                    c = ''
636                    msg = ''
637                    mpd_print_tb(1, 'recv_char_msg: errmsg=:%s:' % (errmsg) )
638                    break
639            msg += c
640        return msg
642    # The default behavior on an error needs to be to handle and/or report
643    # it.  Otherwise, we all waste time trying to figure out why
644    # the code is silently failing.  I've set the default for errprint
645    # to YES rather than NO.
646    def send_dict_msg(self,msg,errprint=1):
647        pickledMsg = dumps(msg)
648        # FIXME: Does this automatically handle EINTR, or does it need an
649        # except os.error, errinfo: and check on errinfo[0] == EINTR
650        try:
651            while 1:
652                try:
653                    self.sendall( "%08d%s" % (len(pickledMsg),pickledMsg) )
654                    break
655                except socket.error, errmsg:
656		    if errmsg[0] == EPIPE  \
657                    or errmsg[0] == ECONNRESET \
658                    or errmsg[0] == EINTR:
659			# silent failure on pipe failure, as we usually
660                        # just want to discard messages in this case
661                        # (We need to plan error handling more thoroughly)
662                        break  ## RMB: chgd from pass
663                    else:
664                        raise socket.error, errmsg
665            # end of While
666        except Exception, errmsg:
667            mpd_print_tb(errprint,'send_dict_msg: sock=%s errmsg=:%s:' % (self.name,errmsg))
668    def send_char_msg(self,msg,errprint=1):
669        try:
670            while 1:
671                try:
672                    self.sock.sendall(msg)
673                    break
674                except socket.error, errmsg:
675		    if errmsg[0] == EPIPE:
676			# silent failure on pipe failure, as we usually
677                        # just want to discard messages in this case
678                        # (We need to plan error handling more thoroughly)
679                        pass
680                    if errmsg[0] != EINTR:
681                        raise socket.error, errmsg
682            # end of While
683        except Exception, errmsg:
684            mpd_print_tb(errprint,'send_char_msg: sock=%s errmsg=:%s:' % (self.name,errmsg))
686class MPDListenSock(MPDSock):
687    def __init__(self,host='',port=0,filename='',listen=5,name='listener',**kargs):
688        MPDSock.__init__(self,name=name,**kargs)
689        self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
690        if filename:
691            self.sock.bind(filename)
692            self.sock.listen(listen)
693            return
694        # see if we have a PORT_RANGE environment variable
695        try:
696            port_range = os.environ['MPIEXEC_PORT_RANGE']
697            (low_port, high_port) = map(int, port_range.split(':'))
698        except:
699            try:
700                port_range = os.environ['MPICH_PORT_RANGE']
701                (low_port, high_port) = map(int, port_range.split(':'))
702            except:
703                (low_port,high_port) = (0,0)
704        if low_port < 0  or  high_port < low_port:
705            (low_port,high_port) = (0,0)
706        if low_port != 0  and  high_port != 0:
707            if port == 0:
708                port = low_port
709                while 1:
710                    try:
711                        self.sock.bind((host,port))
712                        self.sock.listen(listen)
713                        break
714                    except socket.error, e:
715                        port += 1
716                        if port <= high_port:
717                            self.sock.close()
718                            MPDSock.__init__(self,name=name,**kargs)
719                            self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
720                            continue
721                        else:
722                            mpd_print_tb(1,'** no free ports in MPICH_PORT_RANGE')
723                            sys.exit(-1)
724            else:  # else use the explicitly specified port
725                if port < low_port  or  port > high_port:
726                    mpd_print_tb(1,'** port %d is outside MPICH_PORT_RANGE' % port)
727                    sys.exit(-1)
728                self.sock.bind((host,port))  # go ahead and bind
729                self.sock.listen(listen)
730        else:
731            self.sock.bind((host,port))  # no port range set, so just bind as usual
732            self.sock.listen(listen)
734class MPDStreamHandler(object):
735    def __init__(self):
736        self.activeStreams = {}
737    def set_handler(self,stream,handler,args=()):
738        self.activeStreams[stream] = (handler,args)
739    def del_handler(self,stream):
740        if self.activeStreams.has_key(stream):
741            del self.activeStreams[stream]
742    def close_all_active_streams(self):
743        for stream in self.activeStreams.keys():
744            del self.activeStreams[stream]
745            stream.close()
746    def handle_active_streams(self,streams=None,timeout=0.1):
747        global mpd_signum
748        while 1:
749            if streams:
750                streamsToSelect = streams
751            else:
752                streamsToSelect = self.activeStreams.keys()
753            readyStreams = []
754            try:
755                mpd_signum = 0
756                (readyStreams,u1,u2) = select.select(streamsToSelect,[],[],timeout)
757                break
758            except select.error, errinfo:
759                if errinfo[0] == EINTR:
760                    if mpd_signum == signal.SIGCHLD:
761                        break
762                    if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
763                        break
764                    else:
765                        continue
766                else:
767                    print '%s: handle_active_streams: select error: %s' % \
768                          (mpd_my_id,os.strerror(errinfo[0]))
769                    return (-1,os.strerror(errinfo[0]))
770            except KeyboardInterrupt, errinfo:
771                # print 'handle_active_streams: keyboard interrupt during select'
772                return (-1,errinfo.__class__,errinfo)
773            except Exception, errinfo:
774                print 'handle_active_streams: exception during select %s :%s:' % \
775                      ( errinfo.__class__, errinfo)
776                return (-1,errinfo.__class__,errinfo)
777        for stream in readyStreams:
778            if self.activeStreams.has_key(stream):
779                (handler,args) = self.activeStreams[stream]
780                handler(stream,*args)
781            else:
782                # this is not nec bad; an active stream (handler) may
783                # have been deleted by earlier handler in this loop
784                print '*** OOPS, unknown stream in handle_active_streams'
785        return (len(readyStreams),0)  #  len >= 0
787class MPDRing(object):
788    def __init__(self,listenSock=None,streamHandler=None,secretword='',
789                 myIfhn='',entryIfhn='',entryPort=0,zcMyLevel=0):
790        if not streamHandler:
791            mpd_print(1, "must supply handler for new conns in ring")
792            sys.exit(-1)
793        if not listenSock:
794            mpd_print(1, "must supply listenSock for new ring")
795            sys.exit(-1)
796        if not myIfhn:
797            mpd_print(1, "must supply myIfhn for new ring")
798            sys.exit(-1)
799        self.secretword = secretword
800        self.myIfhn     = myIfhn
801        self.generation = 0
802        self.listenSock = listenSock
803        self.listenPort = self.listenSock.sock.getsockname()[1]
804        self.streamHandler = streamHandler
805        self.streamHandler.set_handler(self.listenSock,self.handle_ring_listener_connection)
806        self.entryIfhn = entryIfhn
807        self.entryPort = entryPort
808        self.lhsIfhn = ''
809        self.lhsPort = 0
810        self.rhsIfhn = ''
811        self.rhsPort = 0
812        self.lhsSock = 0
813        self.rhsSock = 0
814        self.lhsHandler = None
815        self.rhsHandler = None
816        self.zcMyLevel = zcMyLevel
817        if self.zcMyLevel:
818            mpd_init_zc(self.myIfhn,self.zcMyLevel)
819    def create_single_mem_ring(self,ifhn='',port=0,lhsHandler=None,rhsHandler=None):
820        self.lhsSock,self.rhsSock = mpd_sockpair()
821        self.lhsIfhn = ifhn
822        self.lhsPort = port
823        self.rhsIfhn = ifhn
824        self.rhsPort = port
825        self.lhsHandler = lhsHandler
826        self.streamHandler.set_handler(self.lhsSock,lhsHandler)
827        self.rhsHandler = rhsHandler
828        self.streamHandler.set_handler(self.rhsSock,rhsHandler)
829    def reenter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=5):
830        if mpd_zc:
831            mpd_close_zc()
832            mpd_init_zc(self.myIfhn,self.zcMyLevel)
833        rc = -1
834        numTries = 0
835	self.generation += 1
836        while rc < 0  and  numTries < ntries:
837            numTries += 1
838            rc = self.enter_ring(entryIfhn=entryIfhn,entryPort=entryPort,
839                                 lhsHandler=lhsHandler,rhsHandler=rhsHandler,
840				 ntries=1)
841	    sleepTime = random() * 1.5  # a single random is between 0 and 1
842            sleep(sleepTime)
843        mpd_print(1,'reenter_ring rc=%d after numTries=%d' % (rc,numTries) )
844        return rc
845    def enter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=1):
846        if not lhsHandler  or  not rhsHandler:
847            print 'missing handler for enter_ring'
848            sys.exit(-1)
849        if not entryIfhn:
850            entryIfhn = self.entryIfhn
851        if not entryPort:
852            entryPort = self.entryPort
853        if not entryIfhn  and  mpd_zc:
854            if self.zcMyLevel == 1:
855                (entryHost,entryPort) = ('',0)
856            else:
857                (entryIfhn,entryPort) = mpd_find_zc_peer(self.zcMyLevel-1)
858                if not entryPort:
859                    print "FAILED TO FIND A PEER AT LEVEL", self.zcMyLevel-1
860                    sys.exit(-1)
861            print "ENTRY INFO", (entryIfhn,entryPort)
862        if not entryIfhn:
863            self.create_single_mem_ring(ifhn=self.myIfhn,
864                                        port=self.listenPort,
865                                        lhsHandler=lhsHandler,
866                                        rhsHandler=rhsHandler)
867        else:
868            rv = self.connect_lhs(lhsIfhn=entryIfhn,
869                                  lhsPort=entryPort,
870                                  lhsHandler=lhsHandler,
871                                  numTries=ntries)
872            if rv[0] <= 0:  # connect failed with problem
873                mpd_print(1,"lhs connect failed")
874                return -1
875            if rv[1]:  # rhsifhn and rhsport
876                rhsIfhn = rv[1][0]
877                rhsPort = rv[1][1]
878            else:
879                mpd_print(1,"did not recv rhs host&port from lhs")
880                return -1
881            rv = self.connect_rhs(rhsIfhn=rhsIfhn,
882                                  rhsPort=rhsPort,
883                                  rhsHandler=rhsHandler,
884                                  numTries=ntries)
885            if rv[0] <=  0:  # connect did not succeed; may try again
886                mpd_print(1,"rhs connect failed")
887                return -1
888        if mpd_zc:
889            mpd_register_zc(self.myIfhn,self.zcMyLevel)
890        return 0
891    def connect_lhs(self,lhsIfhn='',lhsPort=0,lhsHandler=None,numTries=1):
892        if not lhsHandler:
893            mpd_print(1, "must supply handler for lhs in ring")
894            return (-1,None)
895        if not lhsIfhn:
896            mpd_print(1, "must supply host for lhs in ring")
897            return (-1,None)
898        self.lhsIfhn = lhsIfhn
899        if not lhsPort:
900            mpd_print(1, "must supply port for lhs in ring")
901            return (-1,None)
902        self.lhsPort = lhsPort
903        numConnTries = 0
904        while numConnTries < numTries:
905            numConnTries += 1
906            self.lhsSock = MPDSock(name='lhs')
907            try:
908                self.lhsSock.connect((self.lhsIfhn,self.lhsPort))
909            except socket.error, errinfo:
910                print '%s: conn error in connect_lhs: %s' % \
911                      (mpd_my_id,os.strerror(errinfo[0]))
912                self.lhsSock.close()
913                self.lhsSock = 0
914                sleep(random())
915                continue
916            break
917        if not self.lhsSock  or  numConnTries > numTries:
918            mpd_print(1,'failed to connect to lhs at %s %d' % (self.lhsIfhn,self.lhsPort))
919            return (0,None)
920        msgToSend = { 'cmd' : 'request_to_enter_as_rhs', 'ifhn' : self.myIfhn,
921                      'port' : self.listenPort,
922                      'mpd_version' : mpd_version() }
923        self.lhsSock.send_dict_msg(msgToSend)
924        msg = self.lhsSock.recv_dict_msg()
925        if (not msg) \
926        or (not msg.has_key('cmd')) \
927        or (not msg['cmd'] == 'challenge') \
928        or (not msg.has_key('randnum')) \
929        or (not msg.has_key('generation')):
930            mpd_print(1,'invalid challenge from %s %d: %s' % \
931                      (self.lhsIfhn,self.lhsPort,msg) )
932            return (-1,None)
933        if msg['generation'] < self.generation:
934            mpd_print(1,'bad generation from lhs; lhsgen=%d mygen=%d' % (msg['generation'],self.generation))
935            return(-1,'bad_generation')  # RMB: try again here later
936        response = md5new(''.join([self.secretword,msg['randnum']])).digest()
937        msgToSend = { 'cmd' : 'challenge_response', 'response' : response,
938                      'ifhn' : self.myIfhn, 'port' : self.listenPort }
939        self.lhsSock.send_dict_msg(msgToSend)
940        msg = self.lhsSock.recv_dict_msg()
941        if (not msg) \
942        or (not msg.has_key('cmd')) \
943        or (not msg['cmd'] == 'OK_to_enter_as_rhs'):
944            mpd_print(1,'NOT OK to enter ring; one likely cause: mismatched secretwords')
945            return (-1,None)
946        self.lhsHandler = lhsHandler
947        self.streamHandler.set_handler(self.lhsSock,lhsHandler)
948        if msg.has_key('rhsifhn') and msg.has_key('rhsport'):
949            return (1,(msg['rhsifhn'],msg['rhsport']))
950        else:
951            return (1,None)
952    def connect_rhs(self,rhsIfhn='',rhsPort=0,rhsHandler=None,numTries=1):
953        if not rhsHandler:
954            mpd_print(1, "must supply handler for rhs in ring")
955            return (-1,None)
956        if not rhsIfhn:
957            mpd_print(1, "must supply host for rhs in ring")
958            return (-1,None)
959        self.rhsIfhn = rhsIfhn
960        if not rhsPort:
961            mpd_print(1, "must supply port for rhs in ring")
962            return (-1,None)
963        self.rhsPort = rhsPort
964        numConnTries = 0
965        while numConnTries < numTries:
966            numConnTries += 1
967            self.rhsSock = MPDSock(name='rhs')
968            try:
969                self.rhsSock.connect((self.rhsIfhn,self.rhsPort))
970            except socket.error, errinfo:
971                print '%s: conn error in connect_rhs: %s' % \
972                      (mpd_my_id,os.strerror(errinfo[0]))
973                self.rhsSock.close()
974                self.rhsSock = 0
975                sleep(random())
976                continue
977            break
978        if not self.rhsSock or numConnTries > numTries:
979            mpd_print(1,'failed to connect to rhs at %s %d' % (self.rhsIfhn,self.rhsPort))
980            return (0,None)
981        msgToSend = { 'cmd' : 'request_to_enter_as_lhs', 'ifhn' : self.myIfhn,
982                      'port' : self.listenPort,
983                      'mpd_version' : mpd_version() }
984        self.rhsSock.send_dict_msg(msgToSend)
985        msg = self.rhsSock.recv_dict_msg()
986        if (not msg) \
987        or (not msg.has_key('cmd')) \
988        or (not msg['cmd'] == 'challenge') \
989        or (not msg.has_key('randnum')) \
990        or (not msg.has_key('generation')):
991            mpd_print(1,'invalid challenge from %s %d: %s' % (self.rhsIfhn,rhsPort,msg) )
992            return (-1,None)
993        if msg['generation'] < self.generation:
994            mpd_print(1,'bad generation from rhs; lhsgen=%d mygen=%d' % (msg['generation'],self.generation))
995            return(-1,'bad_generation')  # RMB: try again here later
996        response = md5new(''.join([self.secretword,msg['randnum']])).digest()
997        msgToSend = { 'cmd' : 'challenge_response', 'response' : response,
998                      'ifhn' : self.myIfhn, 'port' : self.listenPort }
999        self.rhsSock.send_dict_msg(msgToSend)
1000        msg = self.rhsSock.recv_dict_msg()
1001        if (not msg) \
1002        or (not msg.has_key('cmd')) \
1003        or (not msg['cmd'] == 'OK_to_enter_as_lhs'):
1004            mpd_print(1,'NOT OK to enter ring; one likely cause: mismatched secretwords')
1005            return (-1,None)
1006        self.rhsHandler = rhsHandler
1007        self.streamHandler.set_handler(self.rhsSock,rhsHandler)
1008        if msg.has_key('lhsifhn') and msg.has_key('lhsport'):
1009            return (1,(msg['lhsifhn'],msg['lhsport']))
1010        else:
1011            return (1,None)
1012    def accept_lhs(self,lhsHandler=None):
1013        self.lhsHandler = lhsHandler
1014        newsock = self.handle_ring_listener_connection(self.listenSock)
1015        self.handle_lhs_challenge_response(newsock)
1016        self.streamHandler.set_handler(self.lhsSock,lhsHandler)
1017    def accept_rhs(self,rhsHandler=None):
1018        self.rhsHandler = rhsHandler
1019        newsock = self.handle_ring_listener_connection(self.listenSock)
1020        self.handle_rhs_challenge_response(newsock)
1021        self.streamHandler.set_handler(self.rhsSock,rhsHandler)
1022    def handle_ring_listener_connection(self,sock):
1023        randHiRange = 10000
1024        (newsock,newaddr) = sock.accept()
1025        newsock.name = 'candidate_to_enter_ring'
1026        msg = newsock.recv_dict_msg()
1027        if (not msg) or \
1028           (not msg.has_key('cmd')) or (not msg.has_key('ifhn')) or  \
1029           (not msg.has_key('port')):
1030            mpd_print(1, 'INVALID msg from new connection :%s: msg=:%s:' % (newaddr,msg) )
1031            newsock.close()
1032            return None
1033        if msg.has_key('mpd_version'):  # ping, etc may not have one
1034            if msg['mpd_version'] != mpd_version():
1035                msgToSend = { 'cmd' : 'entry_rejected_bad_mpd_version',
1036                              'your_version' : msg['mpd_version'],
1037                              'my_version' : mpd_version() }
1038                newsock.send_dict_msg(msgToSend)
1039                newsock.close()
1040                return None
1041        randNumStr = '%04d' % (randrange(1,randHiRange))  # 0001-(hi-1), inclusive
1042        newsock.correctChallengeResponse = \
1043                         md5new(''.join([self.secretword,randNumStr])).digest()
1044        msgToSend = { 'cmd' : 'challenge', 'randnum' : randNumStr,
1045                      'generation' : self.generation }
1046        newsock.send_dict_msg(msgToSend)
1047        if msg['cmd'] == 'request_to_enter_as_lhs':
1048            self.streamHandler.set_handler(newsock,self.handle_lhs_challenge_response)
1049            newsock.name = 'candidate_for_lhs_challenged'
1050            return newsock
1051        elif msg['cmd'] == 'request_to_enter_as_rhs':
1052            self.streamHandler.set_handler(newsock,self.handle_rhs_challenge_response)
1053            newsock.name = 'candidate_for_rhs_challenged'
1054            return newsock
1055        elif msg['cmd'] == 'ping':
1056            # already sent challenge instead of ack
1057            newsock.close()
1058            return None
1059        else:
1060            mpd_print(1, 'INVALID msg from new connection :%s:  msg=:%s:' % (newaddr,msg) )
1061            newsock.close()
1062            return None
1063        return None
1064    def handle_lhs_challenge_response(self,sock):
1065        msg = sock.recv_dict_msg()
1066        if (not msg)   or  \
1067           (not msg.has_key('cmd'))   or  (not msg.has_key('response'))  or  \
1068           (not msg.has_key('ifhn'))  or  (not msg.has_key('port'))  or  \
1069           (not msg['response'] == sock.correctChallengeResponse):
1070            mpd_print(1, 'INVALID msg for lhs response msg=:%s:' % (msg) )
1071            msgToSend = { 'cmd' : 'invalid_response' }
1072            sock.send_dict_msg(msgToSend)
1073            self.streamHandler.del_handler(sock)
1074            sock.close()
1075        else:
1076            msgToSend = { 'cmd' : 'OK_to_enter_as_lhs' }
1077            sock.send_dict_msg(msgToSend)
1078            if self.lhsSock:
1079                self.streamHandler.del_handler(self.lhsSock)
1080                self.lhsSock.close()
1081            self.lhsSock = sock
1082            self.lhsIfhn = msg['ifhn']
1083            self.lhsPort = int(msg['port'])
1084            self.streamHandler.set_handler(self.lhsSock,self.lhsHandler)
1085            self.lhsSock.name = 'lhs'
1086    def handle_rhs_challenge_response(self,sock):
1087        msg = sock.recv_dict_msg()
1088        if (not msg)   or  \
1089           (not msg.has_key('cmd'))   or  (not msg.has_key('response'))  or  \
1090           (not msg.has_key('ifhn'))  or  (not msg.has_key('port')):
1091            mpd_print(1, 'INVALID msg for rhs response msg=:%s:' % (msg) )
1092            msgToSend = { 'cmd' : 'invalid_response' }
1093            sock.send_dict_msg(msgToSend)
1094            self.streamHandler.del_handler(sock)
1095            sock.close()
1096        elif msg['response'] != sock.correctChallengeResponse:
1097            mpd_print(1, 'INVALID response in rhs response msg=:%s:' % (msg) )
1098            msgToSend = { 'cmd' : 'invalid_response' }
1099            sock.send_dict_msg(msgToSend)
1100            self.streamHandler.del_handler(sock)
1101            sock.close()
1102        elif msg['response'] == 'bad_generation':
1103            mpd_print(1, 'someone failed entering my ring gen=%d msg=%s' % \
1104                      (self.generation,msg) )
1105            self.streamHandler.del_handler(sock)
1106            sock.close()
1107        else:
1108            msgToSend = { 'cmd' : 'OK_to_enter_as_rhs', 'rhsifhn' : self.rhsIfhn,
1109                          'rhsip' : self.rhsIfhn, 'rhsport' : self.rhsPort }
1110            sock.send_dict_msg(msgToSend)
1111            if self.rhsSock:
1112                self.streamHandler.del_handler(self.rhsSock)
1113                self.rhsSock.close()
1114            self.rhsSock = sock
1115            self.rhsIfhn   = msg['ifhn']
1116            self.rhsPort = int(msg['port'])
1117            self.streamHandler.set_handler(self.rhsSock,self.rhsHandler)
1118            self.rhsSock.name = 'rhs'
1120class MPDConListenSock(MPDListenSock):
1121    def __init__(self,name='console_listen',secretword='',**kargs):
1122        if os.environ.has_key('MPD_CON_EXT'):
1123            self.conExt = '_'  + os.environ['MPD_CON_EXT']
1124        else:
1125            self.conExt = ''
1126        self.conFilename = mpd_tmpdir + '/mpd2.console_' + mpd_get_my_username() + self.conExt
1127        self.secretword = secretword
1128        consoleAlreadyExists = 0
1129        if hasattr(socket,'AF_UNIX'):
1130            sockFamily = socket.AF_UNIX
1131        else:
1132            sockFamily = socket.AF_INET
1133        if os.environ.has_key('MPD_CON_INET_HOST_PORT'):
1134            sockFamily = socket.AF_INET    # override above-assigned value
1135            (conHost,conPort) = os.environ['MPD_CON_INET_HOST_PORT'].split(':')
1136            conPort = int(conPort)
1137        else:
1138            (conHost,conPort) = ('',0)
1139        if os.access(self.conFilename,os.R_OK):    # if console there, see if mpd listening
1140            if hasattr(socket,'AF_UNIX')  and  sockFamily == socket.AF_UNIX:
1141                tempSock = MPDSock(family=socket.AF_UNIX)
1142                try:
1143                    tempSock.connect(self.conFilename)
1144                    consoleAlreadyExists = 1
1145                except Exception, errmsg:
1146                    os.unlink(self.conFilename)
1147                tempSock.close()
1148            else:
1149                if not conPort:
1150                    conFile = open(self.conFilename)
1151                    for line in conFile:
1152                        line = line.strip()
1153                        (k,v) = line.split('=')
1154                        if k == 'port':
1155                            conPort = int(v)
1156                    conFile.close()
1157                tempSock = MPDSock()
1158                try:
1159                    tempSock.sock.connect(('localhost',conPort))
1160                    consoleAlreadyExists = 1
1161                except Exception, errmsg:
1162                    os.unlink(self.conFilename)
1163                tempSock.close()
1164        if consoleAlreadyExists:
1165            print 'An mpd is already running with console at %s on %s. ' % \
1166                  (self.conFilename, socket.gethostname())
1167            print 'Start mpd with the -n option for a second mpd on same host.'
1168            if syslog_module_available:
1169                syslog.syslog(syslog.LOG_ERR,
1170                              "%s: exiting; an mpd is already using the console" % \
1171                              (mpd_my_id))
1172            sys.exit(-1)
1173        if hasattr(socket,'AF_UNIX')  and  sockFamily == socket.AF_UNIX:
1174            MPDListenSock.__init__(self,family=sockFamily,socktype=socket.SOCK_STREAM,
1175                                   filename=self.conFilename,listen=1,name=name)
1176        else:
1177            MPDListenSock.__init__(self,family=sockFamily,socktype=socket.SOCK_STREAM,
1178                                   port=conPort,listen=1,name=name)
1179            conFD = os.open(self.conFilename,os.O_CREAT|os.O_WRONLY|os.O_EXCL,0600)
1180            self.port = self.sock.getsockname()[1]
1181            os.write(conFD,'port=%d\n' % (self.port) )
1182            os.close(conFD)
1184class MPDConClientSock(MPDSock):
1185    def __init__(self,name='console_to_mpd',mpdroot='',secretword='',**kargs):
1186        MPDSock.__init__(self)
1187        self.sock = 0
1188        if os.environ.has_key('MPD_CON_EXT'):
1189            self.conExt = '_'  + os.environ['MPD_CON_EXT']
1190        else:
1191            self.conExt = ''
1192        self.secretword = secretword
1193        if mpdroot:
1194            self.conFilename = mpd_tmpdir + '/mpd2.console_root' + self.conExt
1195            self.sock = MPDSock(family=socket.AF_UNIX,name=name)
1196            rootpid = os.fork()
1197            if rootpid == 0:
1198                os.execvpe(mpdroot,[mpdroot,self.conFilename,str(self.sock.fileno())],{})
1199                mpd_print(1,'failed to exec mpdroot (%s)' % mpdroot )
1200                sys.exit(-1)
1201            else:
1202                (pid,status) = os.waitpid(rootpid,0)
1203                if os.WIFSIGNALED(status):
1204                    status = status & 0x007f  # AND off core flag
1205                else:
1206                    status = os.WEXITSTATUS(status)
1207                if status != 0:
1208                    mpd_print(1,'forked process failed; status=%s' % status)
1209                    sys.exit(-1)
1210        else:
1211            self.conFilename = mpd_tmpdir + '/mpd2.console_' + mpd_get_my_username() + self.conExt
1212            if hasattr(socket,'AF_UNIX'):
1213                sockFamily = socket.AF_UNIX
1214            else:
1215                sockFamily = socket.AF_INET
1216            if os.environ.has_key('MPD_CON_INET_HOST_PORT'):
1217                sockFamily = socket.AF_INET    # override above-assigned value
1218                (conHost,conPort) = os.environ['MPD_CON_INET_HOST_PORT'].split(':')
1219                conPort = int(conPort)
1220            else:
1221                (conHost,conPort) = ('',0)
1222            self.sock = MPDSock(family=sockFamily,socktype=socket.SOCK_STREAM,name=name)
1223            if hasattr(socket,'AF_UNIX')  and  sockFamily == socket.AF_UNIX:
1224                if hasattr(signal,'alarm'):
1225                    oldAlarmTime = signal.alarm(8)
1226                else:    # assume python2.3 or later
1227                    oldTimeout = socket.getdefaulttimeout()
1228                    socket.setdefaulttimeout(8)
1229                try:
1230                    self.sock.connect(self.conFilename)
1231                except Exception, errmsg:
1232                    self.sock.close()
1233                    self.sock = 0
1234                if hasattr(signal,'alarm'):
1235                    signal.alarm(oldAlarmTime)
1236                else:    # assume python2.3 or later
1237                    socket.setdefaulttimeout(oldTimeout)
1238                if self.sock:
1239                    # this is done by mpdroot otherwise
1240                    msgToSend = 'realusername=%s secretword=UNUSED\n' % \
1241                                mpd_get_my_username()
1242                    self.sock.send_char_msg(msgToSend)
1243            else:
1244                if not conPort:
1245                    conFile = open(self.conFilename)
1246                    for line in conFile:
1247                        line = line.strip()
1248                        (k,v) = line.split('=')
1249                        if k == 'port':
1250                            conPort = int(v)
1251                    conFile.close()
1252                if conHost:
1253                    conIfhn = socket.gethostbyname_ex(conHost)[2][0]
1254                else:
1255                    conIfhn = 'localhost'
1256                self.sock = MPDSock(name=name)
1257                if hasattr(signal,'alarm'):
1258                    oldAlarmTime = signal.alarm(8)
1259                else:    # assume python2.3 or later
1260                    oldTimeout = socket.getdefaulttimeout()
1261                    socket.setdefaulttimeout(8)
1262                try:
1263                    self.sock.connect((conIfhn,conPort))
1264                except Exception, errmsg:
1265                    mpd_print(1,"failed to connect to host %s port %d" % \
1266                              (conIfhn,conPort) )
1267                    self.sock.close()
1268                    self.sock = 0
1269                if hasattr(signal,'alarm'):
1270                    signal.alarm(oldAlarmTime)
1271                else:    # assume python2.3 or later
1272                    socket.setdefaulttimeout(oldTimeout)
1273                if not self.sock:
1274                    print '%s: cannot connect to local mpd (%s); possible causes:' % \
1275                          (mpd_my_id,self.conFilename)
1276                    print '  1. no mpd is running on this host'
1277                    print '  2. an mpd is running but was started without a "console" (-n option)'
1278                    print 'In case 1, you can start an mpd on this host with:'
1279                    print '    mpd &'
1280                    print 'and you will be able to run jobs just on this host.'
1281                    print 'For more details on starting mpds on a set of hosts, see'
1282                    print 'the MPICH2 Installation Guide.'
1283                    sys.exit(-1)
1284                msgToSend = { 'cmd' : 'con_init' }
1285                self.sock.send_dict_msg(msgToSend)
1286                msg = self.sock.recv_dict_msg()
1287                if not msg:
1288                    mpd_print(1,'expected con_challenge from mpd; got eof')
1289                    sys.exit(-1)
1290                if msg['cmd'] != 'con_challenge':
1291                    mpd_print(1,'expected con_challenge from mpd; got msg=:%s:' % (msg) )
1292                    sys.exit(-1)
1293                randVal = self.secretword + str(msg['randnum'])
1294                response = md5new(randVal).digest()
1295                msgToSend = { 'cmd' : 'con_challenge_response', 'response' : response,
1296                              'realusername' : mpd_get_my_username() }
1297                self.sock.send_dict_msg(msgToSend)
1298                msg = self.sock.recv_dict_msg()
1299                if not msg  or  msg['cmd'] != 'valid_response':
1300                    mpd_print(1,'expected valid_response from mpd; got msg=:%s:' % (msg) )
1301                    sys.exit(-1)
1302        if not self.sock:
1303            print '%s: cannot connect to local mpd (%s); possible causes:' % \
1304                  (mpd_my_id,self.conFilename)
1305            print '  1. no mpd is running on this host'
1306            print '  2. an mpd is running but was started without a "console" (-n option)'
1307            print 'In case 1, you can start an mpd on this host with:'
1308            print '    mpd &'
1309            print 'and you will be able to run jobs just on this host.'
1310            print 'For more details on starting mpds on a set of hosts, see'
1311            print 'the MPICH2 Installation Guide.'
1312            sys.exit(-1)
1314class MPDParmDB(dict):
1315    def __init__(self,orderedSources=[]):
1316        dict.__init__(self)
1317        self.orderedSources = orderedSources
1318        self.db = {}
1319        for src in orderedSources:  # highest to lowest
1320            self.db[src] = {}
1321    def __setitem__(self,sk_tup,val):
1322        if type(sk_tup) != TupleType  or  len(sk_tup) != 2:
1323            mpd_print_tb(1,"must use a 2-tuple as key in a parm db; invalid: %s" % (sk_tup) )
1324            sys.exit(-1)
1325        s,k = sk_tup
1326        for src in self.orderedSources:
1327            if src == s:
1328                self.db[src][k] = val
1329                break
1330        else:
1331            mpd_print_tb(1,"invalid src specified for insert into parm db; src=%s" % (src) )
1332            sys.exit(-1)
1333    def __getitem__(self,key):
1334        for src in self.orderedSources:
1335            if self.db[src].has_key(key):
1336                return self.db[src][key]
1337        raise KeyError, "key %s not found in parm db" % (key)
1338    def has_key(self,key):
1339        for src in self.orderedSources:
1340            if self.db[src].has_key(key):
1341                return 1
1342        return 0
1343    def printall(self):
1344        print "MPDRUN's PARMDB; values from all sources:"
1345        for src in self.orderedSources:
1346            print '  %s (source)' % (src)
1347            for key in self.db[src].keys():
1348                print '    %s = %s' % (key,self.db[src][key])
1349    def printdef(self):
1350        print "MPDRUN's PARMDB; default values only:"
1351        printed = {}
1352        for src in self.orderedSources:
1353            for key in self.db[src]:
1354                if not printed.has_key(key):
1355                    printed[key] = 1
1356                    print '  %s  %s = %s' % (src,key,self.db[src][key])
1357    def get_parms_from_env(self,parmsToOverride):
1358        for k in parmsToOverride.keys():
1359            if os.environ.has_key(k):
1360                self[('env',k)] = os.environ[k]
1361    def get_parms_from_rcfile(self,parmsToOverride,errIfMissingFile=0):
1362        if os.environ.has_key('MPD_CONF_FILE') and os.access(os.environ['MPD_CONF_FILE'], os.R_OK):
1363            parmsRCFilename = os.environ['MPD_CONF_FILE']
1364        elif hasattr(os,'getuid')  and  os.getuid() == 0:    # if ROOT
1365            parmsRCFilename = os.path.abspath('/etc/mpd.conf')
1366        elif os.environ.has_key('HOME') and os.access(os.path.join(os.environ['HOME'], '.mpd.conf'), os.R_OK):
1367            parmsRCFilename = os.path.join(os.environ['HOME'],'.mpd.conf')
1368        elif os.environ.has_key('HOMEPATH'):    # e.g. win32
1369            parmsRCFilename = os.path.join(os.environ['HOMEPATH'],'.mpd.conf')
1370        else:
1371            print 'unable to find mpd.conf file'
1372            sys.exit(-1)
1373        if sys.platform == 'win32':
1374            mode = 0x80   # fake it
1375        else:
1376            try:
1377                mode = os.stat(parmsRCFilename)[0]
1378            except:
1379                mode = ''
1380	# sometimes a missing file is OK, e.g. when user running with root's mpd
1381        if not mode  and  not errIfMissingFile:
1382            return
1383        if not mode:
1384            print 'configuration file %s not found' % (parmsRCFilename)
1385            print 'A file named .mpd.conf file must be present in the user\'s home'
1386            print 'directory (/etc/mpd.conf if root) with read and write access'
1387            print 'only for the user, and must contain at least a line with:'
1388            print 'MPD_SECRETWORD=<secretword>'
1389            print 'One way to safely create this file is to do the following:'
1390            print '  cd $HOME'
1391            print '  touch .mpd.conf'
1392            print '  chmod 600 .mpd.conf'
1393            print 'and then use an editor to insert a line like'
1394            print '  MPD_SECRETWORD=mr45-j9z'
1395            print 'into the file.  (Of course use some other secret word than mr45-j9z.)'
1396            sys.exit(-1)
1397        if  (mode & 0x3f):
1398            print 'configuration file %s is accessible by others' % (parmsRCFilename)
1399            print 'change permissions to allow read and write access only by you'
1400            sys.exit(-1)
1401        parmsRCFile = open(parmsRCFilename)
1402        for line in parmsRCFile:
1403            lineWithoutComments = line.split('#')[0]    # will at least be ''
1404            lineWithoutComments = lineWithoutComments.strip()
1405            if not lineWithoutComments:
1406                continue
1407            splitLine = lineWithoutComments.split('=')
1408            if not splitLine[0]:    # ['']
1409                print 'warning: unrecognized (null) key in %s' % (parmsRCFilename)
1410                continue
1411            if len(splitLine) == 2:
1412                (k,v) = splitLine
1413                origKey = k
1414                if k == 'secretword':    # for bkwd-compat
1415                    k = 'MPD_SECRETWORD'
1416                if k in parmsToOverride.keys():
1417                    if k != 'MPD_SECRETWORD'  and  v.isdigit():
1418                        v = int(v)
1419                    self[('rcfile',k)] = v
1420            else:
1421                mpd_print(1, 'line in mpd conf is not key=val pair; line=:%s:' % (line) )
1423class MPDTest(object):
1424    def __init__(self):
1425        pass
1426    def run(self,cmd='',expIn = '',chkEC=0,expEC=0,chkOut=0,expOut='',ordOut=0,
1427            grepOut=0, exitOnFail=1):
1428        rv = {}
1429        if chkOut and grepOut:
1430            print "grepOut and chkOut are mutually exclusive"
1431            sys.exit(-1)
1432        outLines = []
1433        if subprocess_module_available:
1434            import re
1435            cmd = re.split(r'\s+',cmd)
1436            runner = subprocess.Popen(cmd,bufsize=0,env=os.environ,close_fds=True,
1437                                      stdin=subprocess.PIPE,stdout=subprocess.PIPE,
1438                                      stderr=subprocess.PIPE)
1439            if expIn:
1440                runner.stdin.write(expIn)
1441            runner.stdin.close()
1442            for line in runner.stdout:
1443                outLines.append(line[:-1])    # strip newlines
1444            for line in runner.stderr:
1445                outLines.append(line[:-1])    # strip newlines
1446            rv['pid'] = runner.pid
1447            rv['EC'] = runner.wait()
1448        elif hasattr(popen2,'Popen4'):    # delete when python2.4+ is common
1449            runner = popen2.Popen4(cmd)
1450            if expIn:
1451                runner.tochild.write(expIn)
1452            runner.tochild.close()
1453            for line in runner.fromchild:
1454                outLines.append(line[:-1])    # strip newlines
1455            rv['pid'] = runner.pid
1456            rv['EC'] = runner.wait()
1457        else:
1458            mpd_print(1,'can not run with either subprocess or popen2-Popen4')
1459            sys.exit(-1)
1460        rv['OUT'] = outLines[:]
1461        if chkEC  and  expEC != rv['EC']:
1462            print "bad exit code from test: %s" % (cmd)
1463            print "   expected exitcode=%d ; got %d" % (expEC,rv['EC'])
1464            print "output from cmd:"
1465            for line in outLines:
1466                print line
1467            if exitOnFail:
1468                sys.exit(-1)
1469        if chkOut:
1470            orderOK = 1
1471            expOut = expOut.split('\n')[:-1]  # leave off trailing ''
1472            for line in outLines[:]:    # copy of outLines
1473                if line in expOut:
1474                    if ordOut and line != expOut[0]:
1475                        orderOK = 0
1476                        break  # count rest of outLines as bad
1477                    expOut.remove(line)
1478                    outLines.remove(line)
1479            if not orderOK:
1480                print "lines out of order in output for test: %s" % (cmd)
1481                for line in outLines:
1482                    print line
1483                if exitOnFail:
1484                    sys.exit(-1)
1485            if expOut:
1486                print "some required lines not found in output for test: %s" % (cmd)
1487                for line in outLines:
1488                    print line
1489                if exitOnFail:
1490                    sys.exit(-1)
1491            if outLines:
1492                print "extra lines in output for test: %s" % (cmd)
1493                for line in outLines:
1494                    print line
1495                if exitOnFail:
1496                    sys.exit(-1)
1497        elif grepOut:
1498            foundCnt = 0
1499            for expLine in expOut:
1500                for outLine in outLines:
1501                    if outLine.find(expLine) >= 0:
1502                        foundCnt += 1
1503            if foundCnt < len(expOut):
1504                print "some lines not matched for test: %s" % (cmd)
1505                for line in outLines:
1506                     print line
1507                if exitOnFail:
1508                    sys.exit(-1)
1509        return rv
1511#### experimental code for zeroconf
1512def mpd_init_zc(ifhn,my_level):
1513    import threading, Zeroconf
1514    global mpd_zc
1515    mpd_zc = Zeroconf.Zeroconf()
1516    class ListenerForPeers(object):
1517        def __init__(self):
1518            mpd_zc.peers = {}
1519            mpd_zc.peersLock = threading.Lock()
1520            mpd_zc.peers_available_event = threading.Event()
1521        def removeService(self, zc, service_type, name):
1522            mpd_zc.peersLock.acquire()
1523            del mpd_zc.peers[name]
1524            print "removed", name ; sys.stdout.flush()
1525            mpd_zc.peersLock.release()
1526        def addService(self, zc, service_type, name):
1527            info = zc.getServiceInfo(service_type, name)
1528            if info:
1529                if info.properties['username'] != mpd_get_my_username():
1530                    return
1531                mpd_zc.peersLock.acquire()
1532                mpd_zc.peers[name] = info
1533                print "added peer:", name, info.properties ; sys.stdout.flush()
1534                mpd_zc.peersLock.release()
1535                mpd_zc.peers_available_event.set()
1536            else:
1537                print "OOPS NO INFO FOR", name ; sys.stdout.flush()
1538    service_type = "_mpdzc._tcp.local."
1539    listenerForPeers = ListenerForPeers()
1540    browser = Zeroconf.ServiceBrowser(mpd_zc,service_type,listenerForPeers)
1541    ##  sleep(1.5)  # give browser a chance to find some peers
1542def mpd_find_zc_peer(peer_level):
1543    print "finding a peer at level %d..." % (peer_level) ; sys.stdout.flush()
1544    mpd_zc.peers_available_event.wait(5)
1545    for (peername,info) in mpd_zc.peers.items():
1546        if info.properties['mpdid'] == mpd_my_id:
1547            continue
1548        if info.properties['level'] != peer_level:
1549            continue
1550        peerAddr = str(socket.inet_ntoa(info.getAddress()))
1551        peerPort = info.getPort()
1552        return(peerAddr,peerPort)
1553    return ('',0)
1554def mpd_register_zc(ifhn,level):
1555    import Zeroconf
1556    service_type = "_mpdzc._tcp.local."
1557    service_ifhn = socket.inet_aton(ifhn)
1558    service_host = socket.gethostname()
1559    service_port = int(mpd_my_id.split('_')[1])
1560    svc = Zeroconf.ServiceInfo(service_type,
1561                               mpd_my_id + service_type,
1562                               address = service_ifhn,
1563                               port = service_port,
1564                               weight = 0, priority = 0,
1565                               properties = { 'description': 'mpd',
1566                                              'mpdid' : mpd_my_id,
1567                                              'level' : level,
1568                                              'username' : mpd_get_my_username() }
1569                               )
1570    mpd_zc.registerService(svc)
1571def mpd_close_zc():
1572    if mpd_zc:
1573        mpd_zc.close()
1576# code for testing
1578def _handle_msg(sock):
1579    msg = sock.recv_dict_msg()
1580    print 'recvd msg=:%s:' % (msg)
1582if __name__ == '__main__':
1583    sh = MPDStreamHandler()
1584    (tsock1,tsock2) = mpd_sockpair()
1585    tsock1.name = 'tsock1_connected_to_tsock2'
1586    sh.set_handler(tsock1,_handle_msg)
1587    tsock2.send_dict_msg( {'msgtype' : 'hello'} )
1588    sh.handle_active_streams()
1589    # just to demo a listen sock
1590    lsock = MPDListenSock('',9999,name='listen_sock')
1591    print lsock.name, lsock.getsockname()[1]
1593    ### import sys
1594    ### sys.excepthook = mpd_uncaught_except_tb
1595    ### i = 1/0