1#!/usr/bin/env python
2#
3#   (C) 2001 by Argonne National Laboratory.
4#       See COPYRIGHT in top-level directory.
5#
6
7# workaround to suppress deprecated module warnings in python2.6
8# see https://trac.mcs.anl.gov/projects/mpich2/ticket/362 for tracking
9import warnings
10warnings.filterwarnings('ignore', '.*the md5 module is deprecated.*', DeprecationWarning)
11warnings.filterwarnings('ignore', '.*the popen2 module is deprecated.*', DeprecationWarning)
12
13import sys, os, signal, popen2, socket, select, inspect
14
15from  cPickle   import  dumps, loads
16from  types     import  TupleType
17from  traceback import  extract_tb, extract_stack, format_list
18from  re        import  sub, split
19from  errno     import  EINTR, ECONNRESET, EISCONN, ECONNREFUSED, EPIPE
20from  md5       import  new as md5new
21from  time      import  sleep
22from  random    import  randrange, random
23
24try:
25    import pwd
26    pwd_module_available = 1
27except:
28    pwd_module_available = 0
29try:
30    import grp
31    grp_module_available = 1
32except:
33    grp_module_available = 0
34try:
35    import  syslog
36    syslog_module_available = 1
37except:
38    syslog_module_available = 0
39try:
40    import subprocess
41    subprocess_module_available = 1
42except:
43    subprocess_module_available = 0
44
45
46# some global vars for some utilities
47global mpd_my_id, mpd_signum, mpd_my_hostname, mpd_procedures_to_trace
48global mpd_cli_app  # for debug during mpich nightly tests
49global mpd_tmpdir
50mpd_cli_app = ''
51mpd_my_id = ''
52mpd_procedures_to_trace = []
53mpd_my_hostname = ''
54# mpd_signum can be set by mpd_handle_signal to indicate which signal was recently caught;
55# this can be useful below to pop out of loops that ordinarily continue after sigs
56# NOTE: mpd_handle_signal must be called by the user, e.g. in his own signal handler
57mpd_signum = 0
58mpd_zc = 0
59if os.environ.has_key('MPD_TMPDIR'):
60    mpd_tmpdir = os.environ['MPD_TMPDIR']
61else:
62    mpd_tmpdir = '/tmp'
63
64# For easier debugging, we provide this variable that is used in the
65# mpd_print calls.  This makes it a little easier to debug problems involving
66# communication with other processes, such as handling EINTR from signals.
67global mpd_dbg_level
68mpd_dbg_level = 0
69
70def mpd_set_dbg_level(flag):
71    global mpd_dbg_level
72    mpd_dbg_level = flag
73
74def mpd_set_my_id(myid=''):
75    global mpd_my_id
76    mpd_my_id = myid
77
78def mpd_set_tmpdir(tmpdir):
79    global mpd_tmpdir
80    mpd_tmpdir = tmpdir
81
82def mpd_get_my_id():
83    global mpd_my_id
84    return(mpd_my_id)
85
86def mpd_set_cli_app(app):    # for debug during mpich nightly tests
87    global mpd_cli_app
88    mpd_cli_app = app
89
90def mpd_handle_signal(signum,frame):
91    global mpd_signum
92    mpd_signum = signum
93
94def mpd_print(*args):
95    global mpd_my_id
96    if not args[0]:
97        return
98    stack = extract_stack()
99    callingProc = stack[-2][2]
100    callingLine = stack[-2][1]
101    printLine = '%s (%s %d): ' % (mpd_my_id,callingProc,callingLine)
102    for arg in args[1:]:
103        printLine = printLine + str(arg)
104    # We've seen an EINTR on the flush here
105    while 1:
106        try:
107            print printLine
108            break
109        except os.error, errinfo:
110            if errinfo[0] != EINTR:
111                raise os.error, errinfo
112    # end of while
113    while 1:
114        try:
115            sys.stdout.flush()
116            break
117        except os.error, errinfo:
118	    if errinfo[0] != EINTR:
119                raise os.error, errinfo
120    # end of while
121    if syslog_module_available:
122        syslog.syslog(syslog.LOG_INFO,printLine)
123
124def mpd_print_tb(*args):
125    global mpd_my_id
126    if not args[0]:
127        return
128    stack = extract_stack()
129    callingProc = stack[-2][2]
130    callingLine = stack[-2][1]
131    stack = extract_stack()
132    stack.reverse()
133    stack = stack[1:]
134    printLine = '%s (%s %d):' % (mpd_my_id,callingProc,callingLine)
135    for arg in args[1:]:
136        printLine = printLine + str(arg)
137    printLine += '\n  mpdtb:\n'
138    for line in format_list(stack):
139        line = sub(r'\n.*','',line)
140        splitLine = split(',',line)
141        splitLine[0] = sub('  File "(.*)"',lambda mo: mo.group(1),splitLine[0])
142        splitLine[1] = sub(' line ','',splitLine[1])
143        splitLine[2] = sub(' in ','',splitLine[2])
144        printLine = printLine + '    %s,  %s,  %s\n' % tuple(splitLine)
145    if mpd_cli_app:    # debug mpich apps in nightly tests
146        printLine += '    mpd_cli_app=%s\n' % (mpd_cli_app)
147        printLine += '    cwd=%s' % (os.getcwd())
148    print printLine
149    sys.stdout.flush()
150    if syslog_module_available:
151        syslog.syslog(syslog.LOG_INFO,printLine)
152
153def mpd_uncaught_except_tb(arg1,arg2,arg3):
154    global mpd_my_id
155    global mpd_cli_id
156    if mpd_my_id:
157        errstr = '%s: ' % (mpd_my_id)
158    else:
159        errstr = ''
160    errstr += 'mpd_uncaught_except_tb handling:\n'
161    errstr += '  %s: %s\n' % (arg1,arg2)
162    tb = extract_tb(arg3)
163    tb.reverse()
164    for tup in tb:
165        # errstr += '    file %s  line# %i  procedure %s\n        %s\n' % (tup)
166        errstr += '    %s  %i  %s\n        %s\n' % (tup)
167    if mpd_cli_app:    # debug mpich apps in nightly tests
168        errstr += '    mpd_cli_app=%s\n' % (mpd_cli_app)
169        errstr += '    cwd=%s' % (os.getcwd())
170    print errstr,
171    if syslog_module_available:
172        syslog.syslog(syslog.LOG_ERR, errstr)
173
174def mpd_set_procedures_to_trace(procs):
175    global mpd_procedures_to_trace
176    mpd_procedures_to_trace = procs
177
178def mpd_trace_calls(frame,event,args):
179    global mpd_my_id, mpd_procedures_to_trace
180    if frame.f_code.co_name not in mpd_procedures_to_trace:
181        return None
182    args_info = apply(inspect.formatargvalues,inspect.getargvalues(frame))
183    # Be VERY careful here; under AIX, it looked like EINTR is
184    # possible within print (!).
185    while (1):
186        try:
187            print '%s: ENTER %s in %s at line %d; ARGS=%s' % \
188          (mpd_my_id,frame.f_code.co_name,frame.f_code.co_filename,frame.f_lineno,args_info)
189            break
190        except os.error, errinfo:
191            if errinfo[0] != EINTR:
192                raise os.error, errinfo
193    # end of while
194    return mpd_trace_returns
195
196def mpd_trace_returns(frame,event,args):
197    global mpd_my_id
198    if event == 'return':
199        # Be VERY careful here; under AIX, it looked like EINTR is
200        # possible within print (!).
201        while (1):
202            try:
203                print '%s: EXIT %s at line %d ' % (mpd_my_id,frame.f_code.co_name,frame.f_lineno)
204                break
205            except os.error, errinfo:
206                if errinfo[0] != EINTR:
207                    raise os.error, errinfo
208        # end of while
209        return None
210    else:
211        return mpd_trace_returns
212
213def mpd_sockpair():
214    sock1 = MPDSock()
215    rc = sock1.sock.bind(('',0))
216    rc = sock1.sock.listen(5)
217    port1 = sock1.sock.getsockname()[1]
218    sock2 = MPDSock()
219    #
220    # We have encountered situations where the connection fails; as this is
221    # a connection to this process, we retry a few times in that case
222    # (seen on AIX)
223    #
224    try:
225        connAttempts = 0
226        while (1):
227            try:
228                rc = sock2.sock.connect(('localhost',port1))
229                break
230            except socket.error, errinfo:
231                # In some cases, connect will return EINTR and then on the
232                # next iteration, returns EISCONN.
233                if errinfo[0] == EISCONN:
234                    break
235                if errinfo[0] == ECONNREFUSED and connAttempts < 10:
236                    mpd_print(mpd_dbg_level,"Retrying on connection refused")
237                    connAttempts += 1
238                    sleep(random())
239                elif errinfo[0] != EINTR:
240                    mpd_print(1,"connect %d %s" % (errinfo[0],errinfo[1]))
241                    raise socket.error, errinfo
242	# End of the while
243    except socket.error, errinfo:
244        # we have seen at least one machine that needs it this way
245        # We've seen a failure here; it could be EINPROGRESS, EALREADY,
246        # or EADDRINUSE.  In that case, we may need to do something else
247	mpd_print(1,"connect error with %d %s" % (errinfo[0],errinfo[1]))
248        # Should this only attempt on ECONNREFUSED, ENETUNREACH, EADDRNOTAVAIL
249        # FIXME: Does this need a try/except?
250        while 1:
251            try:
252                rc = sock2.sock.connect(('',port1))
253                break
254            except socket.error, errinfo:
255                if errinfo[0] == EISCONN:
256                    break
257                elif errinfo[0] != EINTR:
258                    mpd_print(1,"connect %d %s" % (errinfo[0],errinfo[1]))
259                    raise socket.error, errinfo
260        # end of while
261    # Accept can fail on EINTR, so we handle that here
262    while (1):
263        try:
264            (sock3,addr) = sock1.sock.accept()
265            break
266        except socket.error, errinfo:
267            if errinfo[0] != EINTR:
268                mpd_print(1,"connect %d %s" % (errinfo[0],errinfo[1]))
269                raise socket.error, errinfo
270    # end of while
271    sock3 = MPDSock(sock=sock3)
272    sock1.close()
273    return (sock2,sock3)
274
275def mpd_which(execName,user_path=None):
276    if not user_path:
277        if os.environ.has_key('PATH'):
278            user_path = os.environ['PATH']
279        else:
280            return ''
281    for d in user_path.split(os.pathsep):
282        fpn = os.path.join(d,execName)
283        if os.path.isdir(fpn):  # follows symlinks; dirs can have execute permission
284            continue
285        if os.access(fpn,os.X_OK):    # NOTE access works based on real uid (not euid)
286            return fpn
287    return ''
288
289def mpd_check_python_version():
290    # version_info: (major,minor,micro,releaselevel,serial)
291    if (sys.version_info[0] < 2)  or  \
292       (sys.version_info[0] == 2 and sys.version_info[1] < 2):
293        return sys.version_info
294    return 0
295
296def mpd_version():
297    return (1,0,1,'July, 2006 release')  # major, minor, micro, special
298
299def mpd_get_my_username():
300    if pwd_module_available:
301        username = pwd.getpwuid(os.getuid())[0]    # favor this over env
302    elif os.environ.has_key('USER'):
303        username = os.environ['USER']
304    elif os.environ.has_key('USERNAME'):
305        username = os.environ['USERNAME']
306    else:
307        username = 'unknown_username'
308    return username
309
310def mpd_get_ranks_in_binary_tree(myRank,nprocs):
311    if myRank == 0:
312        parent = -1;
313    else:
314        parent = (myRank - 1) / 2;
315    lchild = (myRank * 2) + 1
316    if lchild > (nprocs - 1):
317        lchild = -1;
318    rchild = (myRank * 2) + 2
319    if rchild > (nprocs - 1):
320        rchild = -1;
321    return (parent,lchild,rchild)
322
323def mpd_same_ips(host1,host2):    # hosts may be names or IPs
324    try:
325        ips1 = socket.gethostbyname_ex(host1)[2]    # may fail if invalid host
326        ips2 = socket.gethostbyname_ex(host2)[2]    # may fail if invalid host
327    except:
328        return 0
329    for ip1 in ips1:
330        for ip2 in ips2:
331            if ip1 == ip2:
332                return 1
333    return 0
334
335def mpd_read_nbytes(fd,nbytes):
336    global mpd_signum
337    rv = 0
338    while 1:
339        try:
340            mpd_signum = 0
341            rv = os.read(fd,nbytes)
342            break
343        except os.error, errinfo:
344            if errinfo[0] == EINTR:
345                if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
346                    break
347                else:
348                    continue
349            elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
350                break
351            else:
352                mpd_print(1, 'read error: %s' % os.strerror(errinfo[0]))
353                break
354        except KeyboardInterrupt, errinfo:
355            break
356        except Exception, errinfo:
357            mpd_print(1, 'other error after read %s :%s:' % ( errinfo.__class__, errinfo) )
358            break
359    return rv
360
361def mpd_get_groups_for_username(username):
362    if pwd_module_available  and  grp_module_available:
363        userGroups = [pwd.getpwnam(username)[3]]  # default group for the user
364        allGroups = grp.getgrall();
365        for group in allGroups:
366            if username in group[3]  and  group[2] not in userGroups:
367                userGroups.append(group[2])
368    else:
369        userGroups = []
370    return userGroups
371
372
373class MPDSock(object):
374    def __init__(self,family=socket.AF_INET,socktype=socket.SOCK_STREAM,proto=0,
375                 sock=None,name=''):
376        if sock:
377            self.sock = sock
378        else:
379            self.sock = socket.socket(family=family,type=socktype,proto=proto)
380        self.name = name
381        self.type = socktype
382        self.family = family
383        ## used this when inherited from socket.socket (only works with py 2.3+)
384        ## socket.socket.__init__(self,family=family,type=socktype,proto=proto,_sock=sock)
385    def close(self):
386        self.sock.close()
387    def sendall(self,data):
388        self.sock.sendall(data)
389    def getsockname(self):
390        return self.sock.getsockname()
391    def fileno(self):
392        return self.sock.fileno()
393
394    def connect(self,*args):
395        # We handle EINTR in this method, unless it appears that a
396        # SIGINT or SIGALRM are delivered.  In that case, we do not
397        # complete the connection (FIXME: make sure that all uses of this
398        # do the right thing in that case).
399        while 1:
400            try:
401                mpd_signum = 0
402                self.sock.connect(*args)
403                break
404            except socket.error, errinfo:
405                if errinfo[0] == EINTR:   # sigchld, sigint, etc.
406                    if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
407                        break
408                    else:
409                        continue
410                else:
411                    raise socket.error, errinfo
412        # end of while
413
414    def accept(self,name='accepter'):
415        global mpd_signum
416        newsock = 0
417        newaddr = 0
418        while 1:
419            try:
420                mpd_signum = 0
421                (newsock,newaddr) = self.sock.accept()
422                break
423            except socket.error, errinfo:
424                if errinfo[0] == EINTR:   # sigchld, sigint, etc.
425                    if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
426                        break
427                    else:
428                        continue
429                elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
430                    break
431                else:
432                    print '%s: accept error: %s' % (mpd_my_id,os.strerror(errinfo[0]))
433                    break
434            except Exception, errinfo:
435                print '%s: failure doing accept : %s : %s' % \
436                      (mpd_my_id,errinfo.__class__,errinfo)
437                break
438        if newsock:
439            newsock = MPDSock(sock=newsock,name=name)    # turn new socket into an MPDSock
440        return (newsock,newaddr)
441    def recv(self,nbytes):
442        global mpd_signum
443        data = 0
444        while 1:
445            try:
446                mpd_signum = 0
447                data = self.sock.recv(nbytes)
448                break
449            except socket.error, errinfo:
450                if errinfo[0] == EINTR:   # sigchld, sigint, etc.
451                    if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
452                        break
453                    else:
454                        continue
455                elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
456                    break
457                else:
458                    print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0]))
459                    break
460            except Exception, errinfo:
461                print '%s: failure doing recv %s :%s:' % \
462                      (mpd_my_id,errinfo.__class__,errinfo)
463                break
464        return data
465    def recv_dict_msg(self,timeout=None):
466        global mpd_signum
467        global mpd_dbg_level
468
469        mpd_print(mpd_dbg_level, \
470                  "Entering recv_dict_msg with timeout=%s" % (str(timeout)))
471        msg = {}
472        readyToRecv = 0
473        if timeout:
474            try:
475		# Loop while we get EINTR.
476                # FIXME: In some cases, we may want to exit if
477	        # the signal was SIGINT.  We need to restart if
478                # we see SIGCLD
479                while 1:
480                    try:
481		        mpd_signum = 0
482                        if timeout == -1:
483                            # use -1 to indicate indefinite timeout
484                            (readyToRecv,unused1,unused2) = select.select([self.sock],[],[])
485                        else:
486                            (readyToRecv,unused1,unused2) = select.select([self.sock],[],[],timeout)
487                        break;
488                    except os.error, errinfo:
489                        if errinfo[0] == EINTR:
490                            # Retry interrupted system calls
491                            pass
492                        else:
493                            raise os.error, errinfo
494                # End of the while(1)
495            except select.error, errinfo:
496                if errinfo[0] == EINTR:
497                    if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
498                        mpd_print(0,"sigint/alrm check");
499                        pass   # assume timedout; returns {} below
500                    elif mpd_signum == signal.SIGCLD:
501                        mpd_print_tb(1,"mishandling sigchild in recv_dict_msg, errinfo=:%s" % (errinfo) )
502                    else:
503                        mpd_print_tb(1,"Unhandled EINTR: errinfo=%s" % (errinfo) )
504                else:
505                    mpd_print(1, '%s: select error: %s' % (mpd_my_id,os.strerror(errinfo[0])))
506            except KeyboardInterrupt, errinfo:
507                # print 'recv_dict_msg: keyboard interrupt during select'
508                mpd_print(0,"KeyboardInterrupt");
509                return msg
510            except Exception, errinfo:
511                mpd_print(1, 'recv_dict_msg: exception during select %s :%s:' % \
512                      ( errinfo.__class__, errinfo))
513                return msg
514        else:
515            readyToRecv = 1
516        if readyToRecv:
517            mpd_print(mpd_dbg_level,"readyToRecv");
518            try:
519                pickledLen = ''
520                tempRecvd = ''
521                lenLeft = 8
522                while lenLeft:
523                    while (1):
524                        try:
525                            tempRecvd = self.sock.recv(lenLeft)
526                            # FIXME: Shouldn't this block until there is a
527                            # message unless it raises an exception.
528                            # Is no message an EOF, and in that case,
529                            # do we really want to immediately delete
530                            # the corresponding entry?
531                            #if not pickledLen:
532                            #    mpd_print(1,"continuing because recv failed")
533                            #    continue
534                            break
535                        except socket.error,errinfo:
536                            if errinfo[0] == EINTR:
537                                mpd_print(mpd_dbg_level,"Saw EINTR")
538                                pass
539                            elif errinfo[0] == ECONNRESET:
540                                mpd_print(mpd_dbg_level,"Saw ECONNRESET, ignore (return null msg)")
541                                return msg;
542                            else:
543                                mpd_print_tb(1,"recv_dict_msg: sock.recv(8): errinfo=:%s:" % (errinfo))
544                                raise socket.error,errinfo
545                    # end of while(1)
546                    if not tempRecvd:
547                         break
548                    pickledLen += tempRecvd
549                    lenLeft -= len(tempRecvd)
550                if not pickledLen:
551                    mpd_print(mpd_dbg_level,"no pickeled len")
552                if pickledLen:
553                    pickledLen = int(pickledLen)
554                    pickledMsg = ''
555                    lenLeft = pickledLen
556                    while lenLeft:
557                        while (1):
558                            try:
559                                recvdMsg = self.sock.recv(lenLeft)
560                                break
561                            except socket.error,errinfo:
562                                if errinfo[0] == EINTR:
563                                    pass
564                                else:
565                                    mpd_print_tb(1,"recv_dict_msg: sock.recv(8): errinfo=:%s:" % (errinfo))
566                                    raise socket.error,errinfo
567                        # end of while(1)
568
569                        pickledMsg += recvdMsg
570                        lenLeft -= len(recvdMsg)
571                    msg = loads(pickledMsg)
572            except socket.error, errinfo:
573                if errinfo[0] == EINTR:
574                    mpd_print(1, "Unhandled EINTR on sock.recv")
575                    return msg
576                elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
577                    mpd_print(mpd_dbg_level,"Connection reset")
578                    pass   # socket.error: (104, 'Connection reset by peer')
579                else:
580                    mpd_print_tb(1,'recv_dict_msg: socket error: errinfo=:%s:' % (errinfo))
581            except StandardError, errmsg:    # any built-in exceptions
582                mpd_print_tb(1, 'recv_dict_msg: errmsg=:%s:' % (errmsg) )
583            except Exception, errmsg:
584                mpd_print_tb(1, 'recv_dict_msg failed on sock %s errmsg=:%s:' % \
585                             (self.name,errmsg) )
586        if mpd_dbg_level:
587            if msg:
588                mpd_print(1,"Returning with non-null msg, length = %d, head = %s" % (pickledLen,pickledMsg[0:32].replace('\n','<NL>') ) )
589	    else:
590                mpd_print(1,"Returning with null msg" )
591        return msg
592    def recv_char_msg(self):
593        return self.recv_one_line()  # use leading len later
594    def recv_one_line(self):
595        msg = ''
596	# A failure with EINTR was observed here, so a loop to retry on
597        # EINTR has been added
598        try:
599            while 1:
600                try:
601                    c = self.sock.recv(1)
602                    break
603                except socket.error, errinfo:
604                    if errinfo[0] != EINTR:
605                        raise socket.error, errinfo
606            # end of while
607        except socket.error, errinfo:
608            if errinfo[0] == EINTR:   # sigchld, sigint, etc.
609                # This should no longer happen (handled above)
610                mpd_print_tb( 1,  "Unhandled EINTR in sock.recv" );
611                return msg
612            elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
613                return msg
614            else:
615                print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0]))
616                sys.exit(-1)
617        except Exception, errmsg:
618            c = ''
619            msg = ''
620            mpd_print_tb(1, 'recv_char_msg: errmsg=:%s:' % (errmsg) )
621        if c:
622            while c != '\n':
623                msg += c
624                try:
625                    c = self.sock.recv(1)
626                except socket.error, errinfo:
627                    if errinfo[0] == EINTR:   # sigchld, sigint, etc.
628                        return msg
629                    elif errinfo[0] == ECONNRESET:   # connection reset (treat as eof)
630                        return msg
631                    else:
632                        print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0]))
633                        sys.exit(-1)
634                except Exception, errmsg:
635                    c = ''
636                    msg = ''
637                    mpd_print_tb(1, 'recv_char_msg: errmsg=:%s:' % (errmsg) )
638                    break
639            msg += c
640        return msg
641
642    # The default behavior on an error needs to be to handle and/or report
643    # it.  Otherwise, we all waste time trying to figure out why
644    # the code is silently failing.  I've set the default for errprint
645    # to YES rather than NO.
646    def send_dict_msg(self,msg,errprint=1):
647        pickledMsg = dumps(msg)
648        # FIXME: Does this automatically handle EINTR, or does it need an
649        # except os.error, errinfo: and check on errinfo[0] == EINTR
650        try:
651            while 1:
652                try:
653                    self.sendall( "%08d%s" % (len(pickledMsg),pickledMsg) )
654                    break
655                except socket.error, errmsg:
656		    if errmsg[0] == EPIPE  \
657                    or errmsg[0] == ECONNRESET \
658                    or errmsg[0] == EINTR:
659			# silent failure on pipe failure, as we usually
660                        # just want to discard messages in this case
661                        # (We need to plan error handling more thoroughly)
662                        break  ## RMB: chgd from pass
663                    else:
664                        raise socket.error, errmsg
665            # end of While
666        except Exception, errmsg:
667            mpd_print_tb(errprint,'send_dict_msg: sock=%s errmsg=:%s:' % (self.name,errmsg))
668    def send_char_msg(self,msg,errprint=1):
669        try:
670            while 1:
671                try:
672                    self.sock.sendall(msg)
673                    break
674                except socket.error, errmsg:
675		    if errmsg[0] == EPIPE:
676			# silent failure on pipe failure, as we usually
677                        # just want to discard messages in this case
678                        # (We need to plan error handling more thoroughly)
679                        pass
680                    if errmsg[0] != EINTR:
681                        raise socket.error, errmsg
682            # end of While
683        except Exception, errmsg:
684            mpd_print_tb(errprint,'send_char_msg: sock=%s errmsg=:%s:' % (self.name,errmsg))
685
686class MPDListenSock(MPDSock):
687    def __init__(self,host='',port=0,filename='',listen=5,name='listener',**kargs):
688        MPDSock.__init__(self,name=name,**kargs)
689        self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
690        if filename:
691            self.sock.bind(filename)
692            self.sock.listen(listen)
693            return
694        # see if we have a PORT_RANGE environment variable
695        try:
696            port_range = os.environ['MPIEXEC_PORT_RANGE']
697            (low_port, high_port) = map(int, port_range.split(':'))
698        except:
699            try:
700                port_range = os.environ['MPICH_PORT_RANGE']
701                (low_port, high_port) = map(int, port_range.split(':'))
702            except:
703                (low_port,high_port) = (0,0)
704        if low_port < 0  or  high_port < low_port:
705            (low_port,high_port) = (0,0)
706        if low_port != 0  and  high_port != 0:
707            if port == 0:
708                port = low_port
709                while 1:
710                    try:
711                        self.sock.bind((host,port))
712                        self.sock.listen(listen)
713                        break
714                    except socket.error, e:
715                        port += 1
716                        if port <= high_port:
717                            self.sock.close()
718                            MPDSock.__init__(self,name=name,**kargs)
719                            self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
720                            continue
721                        else:
722                            mpd_print_tb(1,'** no free ports in MPICH_PORT_RANGE')
723                            sys.exit(-1)
724            else:  # else use the explicitly specified port
725                if port < low_port  or  port > high_port:
726                    mpd_print_tb(1,'** port %d is outside MPICH_PORT_RANGE' % port)
727                    sys.exit(-1)
728                self.sock.bind((host,port))  # go ahead and bind
729                self.sock.listen(listen)
730        else:
731            self.sock.bind((host,port))  # no port range set, so just bind as usual
732            self.sock.listen(listen)
733
734class MPDStreamHandler(object):
735    def __init__(self):
736        self.activeStreams = {}
737    def set_handler(self,stream,handler,args=()):
738        self.activeStreams[stream] = (handler,args)
739    def del_handler(self,stream):
740        if self.activeStreams.has_key(stream):
741            del self.activeStreams[stream]
742    def close_all_active_streams(self):
743        for stream in self.activeStreams.keys():
744            del self.activeStreams[stream]
745            stream.close()
746    def handle_active_streams(self,streams=None,timeout=0.1):
747        global mpd_signum
748        while 1:
749            if streams:
750                streamsToSelect = streams
751            else:
752                streamsToSelect = self.activeStreams.keys()
753            readyStreams = []
754            try:
755                mpd_signum = 0
756                (readyStreams,u1,u2) = select.select(streamsToSelect,[],[],timeout)
757                break
758            except select.error, errinfo:
759                if errinfo[0] == EINTR:
760                    if mpd_signum == signal.SIGCHLD:
761                        break
762                    if mpd_signum == signal.SIGINT  or  mpd_signum == signal.SIGALRM:
763                        break
764                    else:
765                        continue
766                else:
767                    print '%s: handle_active_streams: select error: %s' % \
768                          (mpd_my_id,os.strerror(errinfo[0]))
769                    return (-1,os.strerror(errinfo[0]))
770            except KeyboardInterrupt, errinfo:
771                # print 'handle_active_streams: keyboard interrupt during select'
772                return (-1,errinfo.__class__,errinfo)
773            except Exception, errinfo:
774                print 'handle_active_streams: exception during select %s :%s:' % \
775                      ( errinfo.__class__, errinfo)
776                return (-1,errinfo.__class__,errinfo)
777        for stream in readyStreams:
778            if self.activeStreams.has_key(stream):
779                (handler,args) = self.activeStreams[stream]
780                handler(stream,*args)
781            else:
782                # this is not nec bad; an active stream (handler) may
783                # have been deleted by earlier handler in this loop
784                print '*** OOPS, unknown stream in handle_active_streams'
785        return (len(readyStreams),0)  #  len >= 0
786
787class MPDRing(object):
788    def __init__(self,listenSock=None,streamHandler=None,secretword='',
789                 myIfhn='',entryIfhn='',entryPort=0,zcMyLevel=0):
790        if not streamHandler:
791            mpd_print(1, "must supply handler for new conns in ring")
792            sys.exit(-1)
793        if not listenSock:
794            mpd_print(1, "must supply listenSock for new ring")
795            sys.exit(-1)
796        if not myIfhn:
797            mpd_print(1, "must supply myIfhn for new ring")
798            sys.exit(-1)
799        self.secretword = secretword
800        self.myIfhn     = myIfhn
801        self.generation = 0
802        self.listenSock = listenSock
803        self.listenPort = self.listenSock.sock.getsockname()[1]
804        self.streamHandler = streamHandler
805        self.streamHandler.set_handler(self.listenSock,self.handle_ring_listener_connection)
806        self.entryIfhn = entryIfhn
807        self.entryPort = entryPort
808        self.lhsIfhn = ''
809        self.lhsPort = 0
810        self.rhsIfhn = ''
811        self.rhsPort = 0
812        self.lhsSock = 0
813        self.rhsSock = 0
814        self.lhsHandler = None
815        self.rhsHandler = None
816        self.zcMyLevel = zcMyLevel
817        if self.zcMyLevel:
818            mpd_init_zc(self.myIfhn,self.zcMyLevel)
819    def create_single_mem_ring(self,ifhn='',port=0,lhsHandler=None,rhsHandler=None):
820        self.lhsSock,self.rhsSock = mpd_sockpair()
821        self.lhsIfhn = ifhn
822        self.lhsPort = port
823        self.rhsIfhn = ifhn
824        self.rhsPort = port
825        self.lhsHandler = lhsHandler
826        self.streamHandler.set_handler(self.lhsSock,lhsHandler)
827        self.rhsHandler = rhsHandler
828        self.streamHandler.set_handler(self.rhsSock,rhsHandler)
829    def reenter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=5):
830        if mpd_zc:
831            mpd_close_zc()
832            mpd_init_zc(self.myIfhn,self.zcMyLevel)
833        rc = -1
834        numTries = 0
835	self.generation += 1
836        while rc < 0  and  numTries < ntries:
837            numTries += 1
838            rc = self.enter_ring(entryIfhn=entryIfhn,entryPort=entryPort,
839                                 lhsHandler=lhsHandler,rhsHandler=rhsHandler,
840				 ntries=1)
841	    sleepTime = random() * 1.5  # a single random is between 0 and 1
842            sleep(sleepTime)
843        mpd_print(1,'reenter_ring rc=%d after numTries=%d' % (rc,numTries) )
844        return rc
845    def enter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=1):
846        if not lhsHandler  or  not rhsHandler:
847            print 'missing handler for enter_ring'
848            sys.exit(-1)
849        if not entryIfhn:
850            entryIfhn = self.entryIfhn
851        if not entryPort:
852            entryPort = self.entryPort
853        if not entryIfhn  and  mpd_zc:
854            if self.zcMyLevel == 1:
855                (entryHost,entryPort) = ('',0)
856            else:
857                (entryIfhn,entryPort) = mpd_find_zc_peer(self.zcMyLevel-1)
858                if not entryPort:
859                    print "FAILED TO FIND A PEER AT LEVEL", self.zcMyLevel-1
860                    sys.exit(-1)
861            print "ENTRY INFO", (entryIfhn,entryPort)
862        if not entryIfhn:
863            self.create_single_mem_ring(ifhn=self.myIfhn,
864                                        port=self.listenPort,
865                                        lhsHandler=lhsHandler,
866                                        rhsHandler=rhsHandler)
867        else:
868            rv = self.connect_lhs(lhsIfhn=entryIfhn,
869                                  lhsPort=entryPort,
870                                  lhsHandler=lhsHandler,
871                                  numTries=ntries)
872            if rv[0] <= 0:  # connect failed with problem
873                mpd_print(1,"lhs connect failed")
874                return -1
875            if rv[1]:  # rhsifhn and rhsport
876                rhsIfhn = rv[1][0]
877                rhsPort = rv[1][1]
878            else:
879                mpd_print(1,"did not recv rhs host&port from lhs")
880                return -1
881            rv = self.connect_rhs(rhsIfhn=rhsIfhn,
882                                  rhsPort=rhsPort,
883                                  rhsHandler=rhsHandler,
884                                  numTries=ntries)
885            if rv[0] <=  0:  # connect did not succeed; may try again
886                mpd_print(1,"rhs connect failed")
887                return -1
888        if mpd_zc:
889            mpd_register_zc(self.myIfhn,self.zcMyLevel)
890        return 0
891    def connect_lhs(self,lhsIfhn='',lhsPort=0,lhsHandler=None,numTries=1):
892        if not lhsHandler:
893            mpd_print(1, "must supply handler for lhs in ring")
894            return (-1,None)
895        if not lhsIfhn:
896            mpd_print(1, "must supply host for lhs in ring")
897            return (-1,None)
898        self.lhsIfhn = lhsIfhn
899        if not lhsPort:
900            mpd_print(1, "must supply port for lhs in ring")
901            return (-1,None)
902        self.lhsPort = lhsPort
903        numConnTries = 0
904        while numConnTries < numTries:
905            numConnTries += 1
906            self.lhsSock = MPDSock(name='lhs')
907            try:
908                self.lhsSock.connect((self.lhsIfhn,self.lhsPort))
909            except socket.error, errinfo:
910                print '%s: conn error in connect_lhs: %s' % \
911                      (mpd_my_id,os.strerror(errinfo[0]))
912                self.lhsSock.close()
913                self.lhsSock = 0
914                sleep(random())
915                continue
916            break
917        if not self.lhsSock  or  numConnTries > numTries:
918            mpd_print(1,'failed to connect to lhs at %s %d' % (self.lhsIfhn,self.lhsPort))
919            return (0,None)
920        msgToSend = { 'cmd' : 'request_to_enter_as_rhs', 'ifhn' : self.myIfhn,
921                      'port' : self.listenPort,
922                      'mpd_version' : mpd_version() }
923        self.lhsSock.send_dict_msg(msgToSend)
924        msg = self.lhsSock.recv_dict_msg()
925        if (not msg) \
926        or (not msg.has_key('cmd')) \
927        or (not msg['cmd'] == 'challenge') \
928        or (not msg.has_key('randnum')) \
929        or (not msg.has_key('generation')):
930            mpd_print(1,'invalid challenge from %s %d: %s' % \
931                      (self.lhsIfhn,self.lhsPort,msg) )
932            return (-1,None)
933        if msg['generation'] < self.generation:
934            mpd_print(1,'bad generation from lhs; lhsgen=%d mygen=%d' % (msg['generation'],self.generation))
935            return(-1,'bad_generation')  # RMB: try again here later
936        response = md5new(''.join([self.secretword,msg['randnum']])).digest()
937        msgToSend = { 'cmd' : 'challenge_response', 'response' : response,
938                      'ifhn' : self.myIfhn, 'port' : self.listenPort }
939        self.lhsSock.send_dict_msg(msgToSend)
940        msg = self.lhsSock.recv_dict_msg()
941        if (not msg) \
942        or (not msg.has_key('cmd')) \
943        or (not msg['cmd'] == 'OK_to_enter_as_rhs'):
944            mpd_print(1,'NOT OK to enter ring; one likely cause: mismatched secretwords')
945            return (-1,None)
946        self.lhsHandler = lhsHandler
947        self.streamHandler.set_handler(self.lhsSock,lhsHandler)
948        if msg.has_key('rhsifhn') and msg.has_key('rhsport'):
949            return (1,(msg['rhsifhn'],msg['rhsport']))
950        else:
951            return (1,None)
952    def connect_rhs(self,rhsIfhn='',rhsPort=0,rhsHandler=None,numTries=1):
953        if not rhsHandler:
954            mpd_print(1, "must supply handler for rhs in ring")
955            return (-1,None)
956        if not rhsIfhn:
957            mpd_print(1, "must supply host for rhs in ring")
958            return (-1,None)
959        self.rhsIfhn = rhsIfhn
960        if not rhsPort:
961            mpd_print(1, "must supply port for rhs in ring")
962            return (-1,None)
963        self.rhsPort = rhsPort
964        numConnTries = 0
965        while numConnTries < numTries:
966            numConnTries += 1
967            self.rhsSock = MPDSock(name='rhs')
968            try:
969                self.rhsSock.connect((self.rhsIfhn,self.rhsPort))
970            except socket.error, errinfo:
971                print '%s: conn error in connect_rhs: %s' % \
972                      (mpd_my_id,os.strerror(errinfo[0]))
973                self.rhsSock.close()
974                self.rhsSock = 0
975                sleep(random())
976                continue
977            break
978        if not self.rhsSock or numConnTries > numTries:
979            mpd_print(1,'failed to connect to rhs at %s %d' % (self.rhsIfhn,self.rhsPort))
980            return (0,None)
981        msgToSend = { 'cmd' : 'request_to_enter_as_lhs', 'ifhn' : self.myIfhn,
982                      'port' : self.listenPort,
983                      'mpd_version' : mpd_version() }
984        self.rhsSock.send_dict_msg(msgToSend)
985        msg = self.rhsSock.recv_dict_msg()
986        if (not msg) \
987        or (not msg.has_key('cmd')) \
988        or (not msg['cmd'] == 'challenge') \
989        or (not msg.has_key('randnum')) \
990        or (not msg.has_key('generation')):
991            mpd_print(1,'invalid challenge from %s %d: %s' % (self.rhsIfhn,rhsPort,msg) )
992            return (-1,None)
993        if msg['generation'] < self.generation:
994            mpd_print(1,'bad generation from rhs; lhsgen=%d mygen=%d' % (msg['generation'],self.generation))
995            return(-1,'bad_generation')  # RMB: try again here later
996        response = md5new(''.join([self.secretword,msg['randnum']])).digest()
997        msgToSend = { 'cmd' : 'challenge_response', 'response' : response,
998                      'ifhn' : self.myIfhn, 'port' : self.listenPort }
999        self.rhsSock.send_dict_msg(msgToSend)
1000        msg = self.rhsSock.recv_dict_msg()
1001        if (not msg) \
1002        or (not msg.has_key('cmd')) \
1003        or (not msg['cmd'] == 'OK_to_enter_as_lhs'):
1004            mpd_print(1,'NOT OK to enter ring; one likely cause: mismatched secretwords')
1005            return (-1,None)
1006        self.rhsHandler = rhsHandler
1007        self.streamHandler.set_handler(self.rhsSock,rhsHandler)
1008        if msg.has_key('lhsifhn') and msg.has_key('lhsport'):
1009            return (1,(msg['lhsifhn'],msg['lhsport']))
1010        else:
1011            return (1,None)
1012    def accept_lhs(self,lhsHandler=None):
1013        self.lhsHandler = lhsHandler
1014        newsock = self.handle_ring_listener_connection(self.listenSock)
1015        self.handle_lhs_challenge_response(newsock)
1016        self.streamHandler.set_handler(self.lhsSock,lhsHandler)
1017    def accept_rhs(self,rhsHandler=None):
1018        self.rhsHandler = rhsHandler
1019        newsock = self.handle_ring_listener_connection(self.listenSock)
1020        self.handle_rhs_challenge_response(newsock)
1021        self.streamHandler.set_handler(self.rhsSock,rhsHandler)
1022    def handle_ring_listener_connection(self,sock):
1023        randHiRange = 10000
1024        (newsock,newaddr) = sock.accept()
1025        newsock.name = 'candidate_to_enter_ring'
1026        msg = newsock.recv_dict_msg()
1027        if (not msg) or \
1028           (not msg.has_key('cmd')) or (not msg.has_key('ifhn')) or  \
1029           (not msg.has_key('port')):
1030            mpd_print(1, 'INVALID msg from new connection :%s: msg=:%s:' % (newaddr,msg) )
1031            newsock.close()
1032            return None
1033        if msg.has_key('mpd_version'):  # ping, etc may not have one
1034            if msg['mpd_version'] != mpd_version():
1035                msgToSend = { 'cmd' : 'entry_rejected_bad_mpd_version',
1036                              'your_version' : msg['mpd_version'],
1037                              'my_version' : mpd_version() }
1038                newsock.send_dict_msg(msgToSend)
1039                newsock.close()
1040                return None
1041        randNumStr = '%04d' % (randrange(1,randHiRange))  # 0001-(hi-1), inclusive
1042        newsock.correctChallengeResponse = \
1043                         md5new(''.join([self.secretword,randNumStr])).digest()
1044        msgToSend = { 'cmd' : 'challenge', 'randnum' : randNumStr,
1045                      'generation' : self.generation }
1046        newsock.send_dict_msg(msgToSend)
1047        if msg['cmd'] == 'request_to_enter_as_lhs':
1048            self.streamHandler.set_handler(newsock,self.handle_lhs_challenge_response)
1049            newsock.name = 'candidate_for_lhs_challenged'
1050            return newsock
1051        elif msg['cmd'] == 'request_to_enter_as_rhs':
1052            self.streamHandler.set_handler(newsock,self.handle_rhs_challenge_response)
1053            newsock.name = 'candidate_for_rhs_challenged'
1054            return newsock
1055        elif msg['cmd'] == 'ping':
1056            # already sent challenge instead of ack
1057            newsock.close()
1058            return None
1059        else:
1060            mpd_print(1, 'INVALID msg from new connection :%s:  msg=:%s:' % (newaddr,msg) )
1061            newsock.close()
1062            return None
1063        return None
1064    def handle_lhs_challenge_response(self,sock):
1065        msg = sock.recv_dict_msg()
1066        if (not msg)   or  \
1067           (not msg.has_key('cmd'))   or  (not msg.has_key('response'))  or  \
1068           (not msg.has_key('ifhn'))  or  (not msg.has_key('port'))  or  \
1069           (not msg['response'] == sock.correctChallengeResponse):
1070            mpd_print(1, 'INVALID msg for lhs response msg=:%s:' % (msg) )
1071            msgToSend = { 'cmd' : 'invalid_response' }
1072            sock.send_dict_msg(msgToSend)
1073            self.streamHandler.del_handler(sock)
1074            sock.close()
1075        else:
1076            msgToSend = { 'cmd' : 'OK_to_enter_as_lhs' }
1077            sock.send_dict_msg(msgToSend)
1078            if self.lhsSock:
1079                self.streamHandler.del_handler(self.lhsSock)
1080                self.lhsSock.close()
1081            self.lhsSock = sock
1082            self.lhsIfhn = msg['ifhn']
1083            self.lhsPort = int(msg['port'])
1084            self.streamHandler.set_handler(self.lhsSock,self.lhsHandler)
1085            self.lhsSock.name = 'lhs'
1086    def handle_rhs_challenge_response(self,sock):
1087        msg = sock.recv_dict_msg()
1088        if (not msg)   or  \
1089           (not msg.has_key('cmd'))   or  (not msg.has_key('response'))  or  \
1090           (not msg.has_key('ifhn'))  or  (not msg.has_key('port')):
1091            mpd_print(1, 'INVALID msg for rhs response msg=:%s:' % (msg) )
1092            msgToSend = { 'cmd' : 'invalid_response' }
1093            sock.send_dict_msg(msgToSend)
1094            self.streamHandler.del_handler(sock)
1095            sock.close()
1096        elif msg['response'] != sock.correctChallengeResponse:
1097            mpd_print(1, 'INVALID response in rhs response msg=:%s:' % (msg) )
1098            msgToSend = { 'cmd' : 'invalid_response' }
1099            sock.send_dict_msg(msgToSend)
1100            self.streamHandler.del_handler(sock)
1101            sock.close()
1102        elif msg['response'] == 'bad_generation':
1103            mpd_print(1, 'someone failed entering my ring gen=%d msg=%s' % \
1104                      (self.generation,msg) )
1105            self.streamHandler.del_handler(sock)
1106            sock.close()
1107        else:
1108            msgToSend = { 'cmd' : 'OK_to_enter_as_rhs', 'rhsifhn' : self.rhsIfhn,
1109                          'rhsip' : self.rhsIfhn, 'rhsport' : self.rhsPort }
1110            sock.send_dict_msg(msgToSend)
1111            if self.rhsSock:
1112                self.streamHandler.del_handler(self.rhsSock)
1113                self.rhsSock.close()
1114            self.rhsSock = sock
1115            self.rhsIfhn   = msg['ifhn']
1116            self.rhsPort = int(msg['port'])
1117            self.streamHandler.set_handler(self.rhsSock,self.rhsHandler)
1118            self.rhsSock.name = 'rhs'
1119
1120class MPDConListenSock(MPDListenSock):
1121    def __init__(self,name='console_listen',secretword='',**kargs):
1122        if os.environ.has_key('MPD_CON_EXT'):
1123            self.conExt = '_'  + os.environ['MPD_CON_EXT']
1124        else:
1125            self.conExt = ''
1126        self.conFilename = mpd_tmpdir + '/mpd2.console_' + mpd_get_my_username() + self.conExt
1127        self.secretword = secretword
1128        consoleAlreadyExists = 0
1129        if hasattr(socket,'AF_UNIX'):
1130            sockFamily = socket.AF_UNIX
1131        else:
1132            sockFamily = socket.AF_INET
1133        if os.environ.has_key('MPD_CON_INET_HOST_PORT'):
1134            sockFamily = socket.AF_INET    # override above-assigned value
1135            (conHost,conPort) = os.environ['MPD_CON_INET_HOST_PORT'].split(':')
1136            conPort = int(conPort)
1137        else:
1138            (conHost,conPort) = ('',0)
1139        if os.access(self.conFilename,os.R_OK):    # if console there, see if mpd listening
1140            if hasattr(socket,'AF_UNIX')  and  sockFamily == socket.AF_UNIX:
1141                tempSock = MPDSock(family=socket.AF_UNIX)
1142                try:
1143                    tempSock.connect(self.conFilename)
1144                    consoleAlreadyExists = 1
1145                except Exception, errmsg:
1146                    os.unlink(self.conFilename)
1147                tempSock.close()
1148            else:
1149                if not conPort:
1150                    conFile = open(self.conFilename)
1151                    for line in conFile:
1152                        line = line.strip()
1153                        (k,v) = line.split('=')
1154                        if k == 'port':
1155                            conPort = int(v)
1156                    conFile.close()
1157                tempSock = MPDSock()
1158                try:
1159                    tempSock.sock.connect(('localhost',conPort))
1160                    consoleAlreadyExists = 1
1161                except Exception, errmsg:
1162                    os.unlink(self.conFilename)
1163                tempSock.close()
1164        if consoleAlreadyExists:
1165            print 'An mpd is already running with console at %s on %s. ' % \
1166                  (self.conFilename, socket.gethostname())
1167            print 'Start mpd with the -n option for a second mpd on same host.'
1168            if syslog_module_available:
1169                syslog.syslog(syslog.LOG_ERR,
1170                              "%s: exiting; an mpd is already using the console" % \
1171                              (mpd_my_id))
1172            sys.exit(-1)
1173        if hasattr(socket,'AF_UNIX')  and  sockFamily == socket.AF_UNIX:
1174            MPDListenSock.__init__(self,family=sockFamily,socktype=socket.SOCK_STREAM,
1175                                   filename=self.conFilename,listen=1,name=name)
1176        else:
1177            MPDListenSock.__init__(self,family=sockFamily,socktype=socket.SOCK_STREAM,
1178                                   port=conPort,listen=1,name=name)
1179            conFD = os.open(self.conFilename,os.O_CREAT|os.O_WRONLY|os.O_EXCL,0600)
1180            self.port = self.sock.getsockname()[1]
1181            os.write(conFD,'port=%d\n' % (self.port) )
1182            os.close(conFD)
1183
1184class MPDConClientSock(MPDSock):
1185    def __init__(self,name='console_to_mpd',mpdroot='',secretword='',**kargs):
1186        MPDSock.__init__(self)
1187        self.sock = 0
1188        if os.environ.has_key('MPD_CON_EXT'):
1189            self.conExt = '_'  + os.environ['MPD_CON_EXT']
1190        else:
1191            self.conExt = ''
1192        self.secretword = secretword
1193        if mpdroot:
1194            self.conFilename = mpd_tmpdir + '/mpd2.console_root' + self.conExt
1195            self.sock = MPDSock(family=socket.AF_UNIX,name=name)
1196            rootpid = os.fork()
1197            if rootpid == 0:
1198                os.execvpe(mpdroot,[mpdroot,self.conFilename,str(self.sock.fileno())],{})
1199                mpd_print(1,'failed to exec mpdroot (%s)' % mpdroot )
1200                sys.exit(-1)
1201            else:
1202                (pid,status) = os.waitpid(rootpid,0)
1203                if os.WIFSIGNALED(status):
1204                    status = status & 0x007f  # AND off core flag
1205                else:
1206                    status = os.WEXITSTATUS(status)
1207                if status != 0:
1208                    mpd_print(1,'forked process failed; status=%s' % status)
1209                    sys.exit(-1)
1210        else:
1211            self.conFilename = mpd_tmpdir + '/mpd2.console_' + mpd_get_my_username() + self.conExt
1212            if hasattr(socket,'AF_UNIX'):
1213                sockFamily = socket.AF_UNIX
1214            else:
1215                sockFamily = socket.AF_INET
1216            if os.environ.has_key('MPD_CON_INET_HOST_PORT'):
1217                sockFamily = socket.AF_INET    # override above-assigned value
1218                (conHost,conPort) = os.environ['MPD_CON_INET_HOST_PORT'].split(':')
1219                conPort = int(conPort)
1220            else:
1221                (conHost,conPort) = ('',0)
1222            self.sock = MPDSock(family=sockFamily,socktype=socket.SOCK_STREAM,name=name)
1223            if hasattr(socket,'AF_UNIX')  and  sockFamily == socket.AF_UNIX:
1224                if hasattr(signal,'alarm'):
1225                    oldAlarmTime = signal.alarm(8)
1226                else:    # assume python2.3 or later
1227                    oldTimeout = socket.getdefaulttimeout()
1228                    socket.setdefaulttimeout(8)
1229                try:
1230                    self.sock.connect(self.conFilename)
1231                except Exception, errmsg:
1232                    self.sock.close()
1233                    self.sock = 0
1234                if hasattr(signal,'alarm'):
1235                    signal.alarm(oldAlarmTime)
1236                else:    # assume python2.3 or later
1237                    socket.setdefaulttimeout(oldTimeout)
1238                if self.sock:
1239                    # this is done by mpdroot otherwise
1240                    msgToSend = 'realusername=%s secretword=UNUSED\n' % \
1241                                mpd_get_my_username()
1242                    self.sock.send_char_msg(msgToSend)
1243            else:
1244                if not conPort:
1245                    conFile = open(self.conFilename)
1246                    for line in conFile:
1247                        line = line.strip()
1248                        (k,v) = line.split('=')
1249                        if k == 'port':
1250                            conPort = int(v)
1251                    conFile.close()
1252                if conHost:
1253                    conIfhn = socket.gethostbyname_ex(conHost)[2][0]
1254                else:
1255                    conIfhn = 'localhost'
1256                self.sock = MPDSock(name=name)
1257                if hasattr(signal,'alarm'):
1258                    oldAlarmTime = signal.alarm(8)
1259                else:    # assume python2.3 or later
1260                    oldTimeout = socket.getdefaulttimeout()
1261                    socket.setdefaulttimeout(8)
1262                try:
1263                    self.sock.connect((conIfhn,conPort))
1264                except Exception, errmsg:
1265                    mpd_print(1,"failed to connect to host %s port %d" % \
1266                              (conIfhn,conPort) )
1267                    self.sock.close()
1268                    self.sock = 0
1269                if hasattr(signal,'alarm'):
1270                    signal.alarm(oldAlarmTime)
1271                else:    # assume python2.3 or later
1272                    socket.setdefaulttimeout(oldTimeout)
1273                if not self.sock:
1274                    print '%s: cannot connect to local mpd (%s); possible causes:' % \
1275                          (mpd_my_id,self.conFilename)
1276                    print '  1. no mpd is running on this host'
1277                    print '  2. an mpd is running but was started without a "console" (-n option)'
1278                    print 'In case 1, you can start an mpd on this host with:'
1279                    print '    mpd &'
1280                    print 'and you will be able to run jobs just on this host.'
1281                    print 'For more details on starting mpds on a set of hosts, see'
1282                    print 'the MPICH2 Installation Guide.'
1283                    sys.exit(-1)
1284                msgToSend = { 'cmd' : 'con_init' }
1285                self.sock.send_dict_msg(msgToSend)
1286                msg = self.sock.recv_dict_msg()
1287                if not msg:
1288                    mpd_print(1,'expected con_challenge from mpd; got eof')
1289                    sys.exit(-1)
1290                if msg['cmd'] != 'con_challenge':
1291                    mpd_print(1,'expected con_challenge from mpd; got msg=:%s:' % (msg) )
1292                    sys.exit(-1)
1293                randVal = self.secretword + str(msg['randnum'])
1294                response = md5new(randVal).digest()
1295                msgToSend = { 'cmd' : 'con_challenge_response', 'response' : response,
1296                              'realusername' : mpd_get_my_username() }
1297                self.sock.send_dict_msg(msgToSend)
1298                msg = self.sock.recv_dict_msg()
1299                if not msg  or  msg['cmd'] != 'valid_response':
1300                    mpd_print(1,'expected valid_response from mpd; got msg=:%s:' % (msg) )
1301                    sys.exit(-1)
1302        if not self.sock:
1303            print '%s: cannot connect to local mpd (%s); possible causes:' % \
1304                  (mpd_my_id,self.conFilename)
1305            print '  1. no mpd is running on this host'
1306            print '  2. an mpd is running but was started without a "console" (-n option)'
1307            print 'In case 1, you can start an mpd on this host with:'
1308            print '    mpd &'
1309            print 'and you will be able to run jobs just on this host.'
1310            print 'For more details on starting mpds on a set of hosts, see'
1311            print 'the MPICH2 Installation Guide.'
1312            sys.exit(-1)
1313
1314class MPDParmDB(dict):
1315    def __init__(self,orderedSources=[]):
1316        dict.__init__(self)
1317        self.orderedSources = orderedSources
1318        self.db = {}
1319        for src in orderedSources:  # highest to lowest
1320            self.db[src] = {}
1321    def __setitem__(self,sk_tup,val):
1322        if type(sk_tup) != TupleType  or  len(sk_tup) != 2:
1323            mpd_print_tb(1,"must use a 2-tuple as key in a parm db; invalid: %s" % (sk_tup) )
1324            sys.exit(-1)
1325        s,k = sk_tup
1326        for src in self.orderedSources:
1327            if src == s:
1328                self.db[src][k] = val
1329                break
1330        else:
1331            mpd_print_tb(1,"invalid src specified for insert into parm db; src=%s" % (src) )
1332            sys.exit(-1)
1333    def __getitem__(self,key):
1334        for src in self.orderedSources:
1335            if self.db[src].has_key(key):
1336                return self.db[src][key]
1337        raise KeyError, "key %s not found in parm db" % (key)
1338    def has_key(self,key):
1339        for src in self.orderedSources:
1340            if self.db[src].has_key(key):
1341                return 1
1342        return 0
1343    def printall(self):
1344        print "MPDRUN's PARMDB; values from all sources:"
1345        for src in self.orderedSources:
1346            print '  %s (source)' % (src)
1347            for key in self.db[src].keys():
1348                print '    %s = %s' % (key,self.db[src][key])
1349    def printdef(self):
1350        print "MPDRUN's PARMDB; default values only:"
1351        printed = {}
1352        for src in self.orderedSources:
1353            for key in self.db[src]:
1354                if not printed.has_key(key):
1355                    printed[key] = 1
1356                    print '  %s  %s = %s' % (src,key,self.db[src][key])
1357    def get_parms_from_env(self,parmsToOverride):
1358        for k in parmsToOverride.keys():
1359            if os.environ.has_key(k):
1360                self[('env',k)] = os.environ[k]
1361    def get_parms_from_rcfile(self,parmsToOverride,errIfMissingFile=0):
1362        if os.environ.has_key('MPD_CONF_FILE') and os.access(os.environ['MPD_CONF_FILE'], os.R_OK):
1363            parmsRCFilename = os.environ['MPD_CONF_FILE']
1364        elif hasattr(os,'getuid')  and  os.getuid() == 0:    # if ROOT
1365            parmsRCFilename = os.path.abspath('/etc/mpd.conf')
1366        elif os.environ.has_key('HOME') and os.access(os.path.join(os.environ['HOME'], '.mpd.conf'), os.R_OK):
1367            parmsRCFilename = os.path.join(os.environ['HOME'],'.mpd.conf')
1368        elif os.environ.has_key('HOMEPATH'):    # e.g. win32
1369            parmsRCFilename = os.path.join(os.environ['HOMEPATH'],'.mpd.conf')
1370        else:
1371            print 'unable to find mpd.conf file'
1372            sys.exit(-1)
1373        if sys.platform == 'win32':
1374            mode = 0x80   # fake it
1375        else:
1376            try:
1377                mode = os.stat(parmsRCFilename)[0]
1378            except:
1379                mode = ''
1380	# sometimes a missing file is OK, e.g. when user running with root's mpd
1381        if not mode  and  not errIfMissingFile:
1382            return
1383        if not mode:
1384            print 'configuration file %s not found' % (parmsRCFilename)
1385            print 'A file named .mpd.conf file must be present in the user\'s home'
1386            print 'directory (/etc/mpd.conf if root) with read and write access'
1387            print 'only for the user, and must contain at least a line with:'
1388            print 'MPD_SECRETWORD=<secretword>'
1389            print 'One way to safely create this file is to do the following:'
1390            print '  cd $HOME'
1391            print '  touch .mpd.conf'
1392            print '  chmod 600 .mpd.conf'
1393            print 'and then use an editor to insert a line like'
1394            print '  MPD_SECRETWORD=mr45-j9z'
1395            print 'into the file.  (Of course use some other secret word than mr45-j9z.)'
1396            sys.exit(-1)
1397        if  (mode & 0x3f):
1398            print 'configuration file %s is accessible by others' % (parmsRCFilename)
1399            print 'change permissions to allow read and write access only by you'
1400            sys.exit(-1)
1401        parmsRCFile = open(parmsRCFilename)
1402        for line in parmsRCFile:
1403            lineWithoutComments = line.split('#')[0]    # will at least be ''
1404            lineWithoutComments = lineWithoutComments.strip()
1405            if not lineWithoutComments:
1406                continue
1407            splitLine = lineWithoutComments.split('=')
1408            if not splitLine[0]:    # ['']
1409                print 'warning: unrecognized (null) key in %s' % (parmsRCFilename)
1410                continue
1411            if len(splitLine) == 2:
1412                (k,v) = splitLine
1413                origKey = k
1414                if k == 'secretword':    # for bkwd-compat
1415                    k = 'MPD_SECRETWORD'
1416                if k in parmsToOverride.keys():
1417                    if k != 'MPD_SECRETWORD'  and  v.isdigit():
1418                        v = int(v)
1419                    self[('rcfile',k)] = v
1420            else:
1421                mpd_print(1, 'line in mpd conf is not key=val pair; line=:%s:' % (line) )
1422
1423class MPDTest(object):
1424    def __init__(self):
1425        pass
1426    def run(self,cmd='',expIn = '',chkEC=0,expEC=0,chkOut=0,expOut='',ordOut=0,
1427            grepOut=0, exitOnFail=1):
1428        rv = {}
1429        if chkOut and grepOut:
1430            print "grepOut and chkOut are mutually exclusive"
1431            sys.exit(-1)
1432        outLines = []
1433        if subprocess_module_available:
1434            import re
1435            cmd = re.split(r'\s+',cmd)
1436            runner = subprocess.Popen(cmd,bufsize=0,env=os.environ,close_fds=True,
1437                                      stdin=subprocess.PIPE,stdout=subprocess.PIPE,
1438                                      stderr=subprocess.PIPE)
1439            if expIn:
1440                runner.stdin.write(expIn)
1441            runner.stdin.close()
1442            for line in runner.stdout:
1443                outLines.append(line[:-1])    # strip newlines
1444            for line in runner.stderr:
1445                outLines.append(line[:-1])    # strip newlines
1446            rv['pid'] = runner.pid
1447            rv['EC'] = runner.wait()
1448        elif hasattr(popen2,'Popen4'):    # delete when python2.4+ is common
1449            runner = popen2.Popen4(cmd)
1450            if expIn:
1451                runner.tochild.write(expIn)
1452            runner.tochild.close()
1453            for line in runner.fromchild:
1454                outLines.append(line[:-1])    # strip newlines
1455            rv['pid'] = runner.pid
1456            rv['EC'] = runner.wait()
1457        else:
1458            mpd_print(1,'can not run with either subprocess or popen2-Popen4')
1459            sys.exit(-1)
1460        rv['OUT'] = outLines[:]
1461        if chkEC  and  expEC != rv['EC']:
1462            print "bad exit code from test: %s" % (cmd)
1463            print "   expected exitcode=%d ; got %d" % (expEC,rv['EC'])
1464            print "output from cmd:"
1465            for line in outLines:
1466                print line
1467            if exitOnFail:
1468                sys.exit(-1)
1469        if chkOut:
1470            orderOK = 1
1471            expOut = expOut.split('\n')[:-1]  # leave off trailing ''
1472            for line in outLines[:]:    # copy of outLines
1473                if line in expOut:
1474                    if ordOut and line != expOut[0]:
1475                        orderOK = 0
1476                        break  # count rest of outLines as bad
1477                    expOut.remove(line)
1478                    outLines.remove(line)
1479            if not orderOK:
1480                print "lines out of order in output for test: %s" % (cmd)
1481                for line in outLines:
1482                    print line
1483                if exitOnFail:
1484                    sys.exit(-1)
1485            if expOut:
1486                print "some required lines not found in output for test: %s" % (cmd)
1487                for line in outLines:
1488                    print line
1489                if exitOnFail:
1490                    sys.exit(-1)
1491            if outLines:
1492                print "extra lines in output for test: %s" % (cmd)
1493                for line in outLines:
1494                    print line
1495                if exitOnFail:
1496                    sys.exit(-1)
1497        elif grepOut:
1498            foundCnt = 0
1499            for expLine in expOut:
1500                for outLine in outLines:
1501                    if outLine.find(expLine) >= 0:
1502                        foundCnt += 1
1503            if foundCnt < len(expOut):
1504                print "some lines not matched for test: %s" % (cmd)
1505                for line in outLines:
1506                     print line
1507                if exitOnFail:
1508                    sys.exit(-1)
1509        return rv
1510
1511#### experimental code for zeroconf
1512def mpd_init_zc(ifhn,my_level):
1513    import threading, Zeroconf
1514    global mpd_zc
1515    mpd_zc = Zeroconf.Zeroconf()
1516    class ListenerForPeers(object):
1517        def __init__(self):
1518            mpd_zc.peers = {}
1519            mpd_zc.peersLock = threading.Lock()
1520            mpd_zc.peers_available_event = threading.Event()
1521        def removeService(self, zc, service_type, name):
1522            mpd_zc.peersLock.acquire()
1523            del mpd_zc.peers[name]
1524            print "removed", name ; sys.stdout.flush()
1525            mpd_zc.peersLock.release()
1526        def addService(self, zc, service_type, name):
1527            info = zc.getServiceInfo(service_type, name)
1528            if info:
1529                if info.properties['username'] != mpd_get_my_username():
1530                    return
1531                mpd_zc.peersLock.acquire()
1532                mpd_zc.peers[name] = info
1533                print "added peer:", name, info.properties ; sys.stdout.flush()
1534                mpd_zc.peersLock.release()
1535                mpd_zc.peers_available_event.set()
1536            else:
1537                print "OOPS NO INFO FOR", name ; sys.stdout.flush()
1538    service_type = "_mpdzc._tcp.local."
1539    listenerForPeers = ListenerForPeers()
1540    browser = Zeroconf.ServiceBrowser(mpd_zc,service_type,listenerForPeers)
1541    ##  sleep(1.5)  # give browser a chance to find some peers
1542def mpd_find_zc_peer(peer_level):
1543    print "finding a peer at level %d..." % (peer_level) ; sys.stdout.flush()
1544    mpd_zc.peers_available_event.wait(5)
1545    for (peername,info) in mpd_zc.peers.items():
1546        if info.properties['mpdid'] == mpd_my_id:
1547            continue
1548        if info.properties['level'] != peer_level:
1549            continue
1550        peerAddr = str(socket.inet_ntoa(info.getAddress()))
1551        peerPort = info.getPort()
1552        return(peerAddr,peerPort)
1553    return ('',0)
1554def mpd_register_zc(ifhn,level):
1555    import Zeroconf
1556    service_type = "_mpdzc._tcp.local."
1557    service_ifhn = socket.inet_aton(ifhn)
1558    service_host = socket.gethostname()
1559    service_port = int(mpd_my_id.split('_')[1])
1560    svc = Zeroconf.ServiceInfo(service_type,
1561                               mpd_my_id + service_type,
1562                               address = service_ifhn,
1563                               port = service_port,
1564                               weight = 0, priority = 0,
1565                               properties = { 'description': 'mpd',
1566                                              'mpdid' : mpd_my_id,
1567                                              'level' : level,
1568                                              'username' : mpd_get_my_username() }
1569                               )
1570    mpd_zc.registerService(svc)
1571def mpd_close_zc():
1572    if mpd_zc:
1573        mpd_zc.close()
1574
1575
1576# code for testing
1577
1578def _handle_msg(sock):
1579    msg = sock.recv_dict_msg()
1580    print 'recvd msg=:%s:' % (msg)
1581
1582if __name__ == '__main__':
1583    sh = MPDStreamHandler()
1584    (tsock1,tsock2) = mpd_sockpair()
1585    tsock1.name = 'tsock1_connected_to_tsock2'
1586    sh.set_handler(tsock1,_handle_msg)
1587    tsock2.send_dict_msg( {'msgtype' : 'hello'} )
1588    sh.handle_active_streams()
1589    # just to demo a listen sock
1590    lsock = MPDListenSock('',9999,name='listen_sock')
1591    print lsock.name, lsock.getsockname()[1]
1592
1593    ### import sys
1594    ### sys.excepthook = mpd_uncaught_except_tb
1595    ### i = 1/0
1596