1#!/usr/bin/env python 2# 3# (C) 2001 by Argonne National Laboratory. 4# See COPYRIGHT in top-level directory. 5# 6 7# workaround to suppress deprecated module warnings in python2.6 8# see https://trac.mcs.anl.gov/projects/mpich2/ticket/362 for tracking 9import warnings 10warnings.filterwarnings('ignore', '.*the md5 module is deprecated.*', DeprecationWarning) 11warnings.filterwarnings('ignore', '.*the popen2 module is deprecated.*', DeprecationWarning) 12 13import sys, os, signal, popen2, socket, select, inspect 14 15from cPickle import dumps, loads 16from types import TupleType 17from traceback import extract_tb, extract_stack, format_list 18from re import sub, split 19from errno import EINTR, ECONNRESET, EISCONN, ECONNREFUSED, EPIPE 20from md5 import new as md5new 21from time import sleep 22from random import randrange, random 23 24try: 25 import pwd 26 pwd_module_available = 1 27except: 28 pwd_module_available = 0 29try: 30 import grp 31 grp_module_available = 1 32except: 33 grp_module_available = 0 34try: 35 import syslog 36 syslog_module_available = 1 37except: 38 syslog_module_available = 0 39try: 40 import subprocess 41 subprocess_module_available = 1 42except: 43 subprocess_module_available = 0 44 45 46# some global vars for some utilities 47global mpd_my_id, mpd_signum, mpd_my_hostname, mpd_procedures_to_trace 48global mpd_cli_app # for debug during mpich nightly tests 49global mpd_tmpdir 50mpd_cli_app = '' 51mpd_my_id = '' 52mpd_procedures_to_trace = [] 53mpd_my_hostname = '' 54# mpd_signum can be set by mpd_handle_signal to indicate which signal was recently caught; 55# this can be useful below to pop out of loops that ordinarily continue after sigs 56# NOTE: mpd_handle_signal must be called by the user, e.g. in his own signal handler 57mpd_signum = 0 58mpd_zc = 0 59if os.environ.has_key('MPD_TMPDIR'): 60 mpd_tmpdir = os.environ['MPD_TMPDIR'] 61else: 62 mpd_tmpdir = '/tmp' 63 64# For easier debugging, we provide this variable that is used in the 65# mpd_print calls. This makes it a little easier to debug problems involving 66# communication with other processes, such as handling EINTR from signals. 67global mpd_dbg_level 68mpd_dbg_level = 0 69 70def mpd_set_dbg_level(flag): 71 global mpd_dbg_level 72 mpd_dbg_level = flag 73 74def mpd_set_my_id(myid=''): 75 global mpd_my_id 76 mpd_my_id = myid 77 78def mpd_set_tmpdir(tmpdir): 79 global mpd_tmpdir 80 mpd_tmpdir = tmpdir 81 82def mpd_get_my_id(): 83 global mpd_my_id 84 return(mpd_my_id) 85 86def mpd_set_cli_app(app): # for debug during mpich nightly tests 87 global mpd_cli_app 88 mpd_cli_app = app 89 90def mpd_handle_signal(signum,frame): 91 global mpd_signum 92 mpd_signum = signum 93 94def mpd_print(*args): 95 global mpd_my_id 96 if not args[0]: 97 return 98 stack = extract_stack() 99 callingProc = stack[-2][2] 100 callingLine = stack[-2][1] 101 printLine = '%s (%s %d): ' % (mpd_my_id,callingProc,callingLine) 102 for arg in args[1:]: 103 printLine = printLine + str(arg) 104 # We've seen an EINTR on the flush here 105 while 1: 106 try: 107 print printLine 108 break 109 except os.error, errinfo: 110 if errinfo[0] != EINTR: 111 raise os.error, errinfo 112 # end of while 113 while 1: 114 try: 115 sys.stdout.flush() 116 break 117 except os.error, errinfo: 118 if errinfo[0] != EINTR: 119 raise os.error, errinfo 120 # end of while 121 if syslog_module_available: 122 syslog.syslog(syslog.LOG_INFO,printLine) 123 124def mpd_print_tb(*args): 125 global mpd_my_id 126 if not args[0]: 127 return 128 stack = extract_stack() 129 callingProc = stack[-2][2] 130 callingLine = stack[-2][1] 131 stack = extract_stack() 132 stack.reverse() 133 stack = stack[1:] 134 printLine = '%s (%s %d):' % (mpd_my_id,callingProc,callingLine) 135 for arg in args[1:]: 136 printLine = printLine + str(arg) 137 printLine += '\n mpdtb:\n' 138 for line in format_list(stack): 139 line = sub(r'\n.*','',line) 140 splitLine = split(',',line) 141 splitLine[0] = sub(' File "(.*)"',lambda mo: mo.group(1),splitLine[0]) 142 splitLine[1] = sub(' line ','',splitLine[1]) 143 splitLine[2] = sub(' in ','',splitLine[2]) 144 printLine = printLine + ' %s, %s, %s\n' % tuple(splitLine) 145 if mpd_cli_app: # debug mpich apps in nightly tests 146 printLine += ' mpd_cli_app=%s\n' % (mpd_cli_app) 147 printLine += ' cwd=%s' % (os.getcwd()) 148 print printLine 149 sys.stdout.flush() 150 if syslog_module_available: 151 syslog.syslog(syslog.LOG_INFO,printLine) 152 153def mpd_uncaught_except_tb(arg1,arg2,arg3): 154 global mpd_my_id 155 global mpd_cli_id 156 if mpd_my_id: 157 errstr = '%s: ' % (mpd_my_id) 158 else: 159 errstr = '' 160 errstr += 'mpd_uncaught_except_tb handling:\n' 161 errstr += ' %s: %s\n' % (arg1,arg2) 162 tb = extract_tb(arg3) 163 tb.reverse() 164 for tup in tb: 165 # errstr += ' file %s line# %i procedure %s\n %s\n' % (tup) 166 errstr += ' %s %i %s\n %s\n' % (tup) 167 if mpd_cli_app: # debug mpich apps in nightly tests 168 errstr += ' mpd_cli_app=%s\n' % (mpd_cli_app) 169 errstr += ' cwd=%s' % (os.getcwd()) 170 print errstr, 171 if syslog_module_available: 172 syslog.syslog(syslog.LOG_ERR, errstr) 173 174def mpd_set_procedures_to_trace(procs): 175 global mpd_procedures_to_trace 176 mpd_procedures_to_trace = procs 177 178def mpd_trace_calls(frame,event,args): 179 global mpd_my_id, mpd_procedures_to_trace 180 if frame.f_code.co_name not in mpd_procedures_to_trace: 181 return None 182 args_info = apply(inspect.formatargvalues,inspect.getargvalues(frame)) 183 # Be VERY careful here; under AIX, it looked like EINTR is 184 # possible within print (!). 185 while (1): 186 try: 187 print '%s: ENTER %s in %s at line %d; ARGS=%s' % \ 188 (mpd_my_id,frame.f_code.co_name,frame.f_code.co_filename,frame.f_lineno,args_info) 189 break 190 except os.error, errinfo: 191 if errinfo[0] != EINTR: 192 raise os.error, errinfo 193 # end of while 194 return mpd_trace_returns 195 196def mpd_trace_returns(frame,event,args): 197 global mpd_my_id 198 if event == 'return': 199 # Be VERY careful here; under AIX, it looked like EINTR is 200 # possible within print (!). 201 while (1): 202 try: 203 print '%s: EXIT %s at line %d ' % (mpd_my_id,frame.f_code.co_name,frame.f_lineno) 204 break 205 except os.error, errinfo: 206 if errinfo[0] != EINTR: 207 raise os.error, errinfo 208 # end of while 209 return None 210 else: 211 return mpd_trace_returns 212 213def mpd_sockpair(): 214 sock1 = MPDSock() 215 rc = sock1.sock.bind(('',0)) 216 rc = sock1.sock.listen(5) 217 port1 = sock1.sock.getsockname()[1] 218 sock2 = MPDSock() 219 # 220 # We have encountered situations where the connection fails; as this is 221 # a connection to this process, we retry a few times in that case 222 # (seen on AIX) 223 # 224 try: 225 connAttempts = 0 226 while (1): 227 try: 228 rc = sock2.sock.connect(('localhost',port1)) 229 break 230 except socket.error, errinfo: 231 # In some cases, connect will return EINTR and then on the 232 # next iteration, returns EISCONN. 233 if errinfo[0] == EISCONN: 234 break 235 if errinfo[0] == ECONNREFUSED and connAttempts < 10: 236 mpd_print(mpd_dbg_level,"Retrying on connection refused") 237 connAttempts += 1 238 sleep(random()) 239 elif errinfo[0] != EINTR: 240 mpd_print(1,"connect %d %s" % (errinfo[0],errinfo[1])) 241 raise socket.error, errinfo 242 # End of the while 243 except socket.error, errinfo: 244 # we have seen at least one machine that needs it this way 245 # We've seen a failure here; it could be EINPROGRESS, EALREADY, 246 # or EADDRINUSE. In that case, we may need to do something else 247 mpd_print(1,"connect error with %d %s" % (errinfo[0],errinfo[1])) 248 # Should this only attempt on ECONNREFUSED, ENETUNREACH, EADDRNOTAVAIL 249 # FIXME: Does this need a try/except? 250 while 1: 251 try: 252 rc = sock2.sock.connect(('',port1)) 253 break 254 except socket.error, errinfo: 255 if errinfo[0] == EISCONN: 256 break 257 elif errinfo[0] != EINTR: 258 mpd_print(1,"connect %d %s" % (errinfo[0],errinfo[1])) 259 raise socket.error, errinfo 260 # end of while 261 # Accept can fail on EINTR, so we handle that here 262 while (1): 263 try: 264 (sock3,addr) = sock1.sock.accept() 265 break 266 except socket.error, errinfo: 267 if errinfo[0] != EINTR: 268 mpd_print(1,"connect %d %s" % (errinfo[0],errinfo[1])) 269 raise socket.error, errinfo 270 # end of while 271 sock3 = MPDSock(sock=sock3) 272 sock1.close() 273 return (sock2,sock3) 274 275def mpd_which(execName,user_path=None): 276 if not user_path: 277 if os.environ.has_key('PATH'): 278 user_path = os.environ['PATH'] 279 else: 280 return '' 281 for d in user_path.split(os.pathsep): 282 fpn = os.path.join(d,execName) 283 if os.path.isdir(fpn): # follows symlinks; dirs can have execute permission 284 continue 285 if os.access(fpn,os.X_OK): # NOTE access works based on real uid (not euid) 286 return fpn 287 return '' 288 289def mpd_check_python_version(): 290 # version_info: (major,minor,micro,releaselevel,serial) 291 if (sys.version_info[0] < 2) or \ 292 (sys.version_info[0] == 2 and sys.version_info[1] < 2): 293 return sys.version_info 294 return 0 295 296def mpd_version(): 297 return (1,0,1,'July, 2006 release') # major, minor, micro, special 298 299def mpd_get_my_username(): 300 if pwd_module_available: 301 username = pwd.getpwuid(os.getuid())[0] # favor this over env 302 elif os.environ.has_key('USER'): 303 username = os.environ['USER'] 304 elif os.environ.has_key('USERNAME'): 305 username = os.environ['USERNAME'] 306 else: 307 username = 'unknown_username' 308 return username 309 310def mpd_get_ranks_in_binary_tree(myRank,nprocs): 311 if myRank == 0: 312 parent = -1; 313 else: 314 parent = (myRank - 1) / 2; 315 lchild = (myRank * 2) + 1 316 if lchild > (nprocs - 1): 317 lchild = -1; 318 rchild = (myRank * 2) + 2 319 if rchild > (nprocs - 1): 320 rchild = -1; 321 return (parent,lchild,rchild) 322 323def mpd_same_ips(host1,host2): # hosts may be names or IPs 324 try: 325 ips1 = socket.gethostbyname_ex(host1)[2] # may fail if invalid host 326 ips2 = socket.gethostbyname_ex(host2)[2] # may fail if invalid host 327 except: 328 return 0 329 for ip1 in ips1: 330 for ip2 in ips2: 331 if ip1 == ip2: 332 return 1 333 return 0 334 335def mpd_read_nbytes(fd,nbytes): 336 global mpd_signum 337 rv = 0 338 while 1: 339 try: 340 mpd_signum = 0 341 rv = os.read(fd,nbytes) 342 break 343 except os.error, errinfo: 344 if errinfo[0] == EINTR: 345 if mpd_signum == signal.SIGINT or mpd_signum == signal.SIGALRM: 346 break 347 else: 348 continue 349 elif errinfo[0] == ECONNRESET: # connection reset (treat as eof) 350 break 351 else: 352 mpd_print(1, 'read error: %s' % os.strerror(errinfo[0])) 353 break 354 except KeyboardInterrupt, errinfo: 355 break 356 except Exception, errinfo: 357 mpd_print(1, 'other error after read %s :%s:' % ( errinfo.__class__, errinfo) ) 358 break 359 return rv 360 361def mpd_get_groups_for_username(username): 362 if pwd_module_available and grp_module_available: 363 userGroups = [pwd.getpwnam(username)[3]] # default group for the user 364 allGroups = grp.getgrall(); 365 for group in allGroups: 366 if username in group[3] and group[2] not in userGroups: 367 userGroups.append(group[2]) 368 else: 369 userGroups = [] 370 return userGroups 371 372 373class MPDSock(object): 374 def __init__(self,family=socket.AF_INET,socktype=socket.SOCK_STREAM,proto=0, 375 sock=None,name=''): 376 if sock: 377 self.sock = sock 378 else: 379 self.sock = socket.socket(family=family,type=socktype,proto=proto) 380 self.name = name 381 self.type = socktype 382 self.family = family 383 ## used this when inherited from socket.socket (only works with py 2.3+) 384 ## socket.socket.__init__(self,family=family,type=socktype,proto=proto,_sock=sock) 385 def close(self): 386 self.sock.close() 387 def sendall(self,data): 388 self.sock.sendall(data) 389 def getsockname(self): 390 return self.sock.getsockname() 391 def fileno(self): 392 return self.sock.fileno() 393 394 def connect(self,*args): 395 # We handle EINTR in this method, unless it appears that a 396 # SIGINT or SIGALRM are delivered. In that case, we do not 397 # complete the connection (FIXME: make sure that all uses of this 398 # do the right thing in that case). 399 while 1: 400 try: 401 mpd_signum = 0 402 self.sock.connect(*args) 403 break 404 except socket.error, errinfo: 405 if errinfo[0] == EINTR: # sigchld, sigint, etc. 406 if mpd_signum == signal.SIGINT or mpd_signum == signal.SIGALRM: 407 break 408 else: 409 continue 410 else: 411 raise socket.error, errinfo 412 # end of while 413 414 def accept(self,name='accepter'): 415 global mpd_signum 416 newsock = 0 417 newaddr = 0 418 while 1: 419 try: 420 mpd_signum = 0 421 (newsock,newaddr) = self.sock.accept() 422 break 423 except socket.error, errinfo: 424 if errinfo[0] == EINTR: # sigchld, sigint, etc. 425 if mpd_signum == signal.SIGINT or mpd_signum == signal.SIGALRM: 426 break 427 else: 428 continue 429 elif errinfo[0] == ECONNRESET: # connection reset (treat as eof) 430 break 431 else: 432 print '%s: accept error: %s' % (mpd_my_id,os.strerror(errinfo[0])) 433 break 434 except Exception, errinfo: 435 print '%s: failure doing accept : %s : %s' % \ 436 (mpd_my_id,errinfo.__class__,errinfo) 437 break 438 if newsock: 439 newsock = MPDSock(sock=newsock,name=name) # turn new socket into an MPDSock 440 return (newsock,newaddr) 441 def recv(self,nbytes): 442 global mpd_signum 443 data = 0 444 while 1: 445 try: 446 mpd_signum = 0 447 data = self.sock.recv(nbytes) 448 break 449 except socket.error, errinfo: 450 if errinfo[0] == EINTR: # sigchld, sigint, etc. 451 if mpd_signum == signal.SIGINT or mpd_signum == signal.SIGALRM: 452 break 453 else: 454 continue 455 elif errinfo[0] == ECONNRESET: # connection reset (treat as eof) 456 break 457 else: 458 print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0])) 459 break 460 except Exception, errinfo: 461 print '%s: failure doing recv %s :%s:' % \ 462 (mpd_my_id,errinfo.__class__,errinfo) 463 break 464 return data 465 def recv_dict_msg(self,timeout=None): 466 global mpd_signum 467 global mpd_dbg_level 468 469 mpd_print(mpd_dbg_level, \ 470 "Entering recv_dict_msg with timeout=%s" % (str(timeout))) 471 msg = {} 472 readyToRecv = 0 473 if timeout: 474 try: 475 # Loop while we get EINTR. 476 # FIXME: In some cases, we may want to exit if 477 # the signal was SIGINT. We need to restart if 478 # we see SIGCLD 479 while 1: 480 try: 481 mpd_signum = 0 482 if timeout == -1: 483 # use -1 to indicate indefinite timeout 484 (readyToRecv,unused1,unused2) = select.select([self.sock],[],[]) 485 else: 486 (readyToRecv,unused1,unused2) = select.select([self.sock],[],[],timeout) 487 break; 488 except os.error, errinfo: 489 if errinfo[0] == EINTR: 490 # Retry interrupted system calls 491 pass 492 else: 493 raise os.error, errinfo 494 # End of the while(1) 495 except select.error, errinfo: 496 if errinfo[0] == EINTR: 497 if mpd_signum == signal.SIGINT or mpd_signum == signal.SIGALRM: 498 mpd_print(0,"sigint/alrm check"); 499 pass # assume timedout; returns {} below 500 elif mpd_signum == signal.SIGCLD: 501 mpd_print_tb(1,"mishandling sigchild in recv_dict_msg, errinfo=:%s" % (errinfo) ) 502 else: 503 mpd_print_tb(1,"Unhandled EINTR: errinfo=%s" % (errinfo) ) 504 else: 505 mpd_print(1, '%s: select error: %s' % (mpd_my_id,os.strerror(errinfo[0]))) 506 except KeyboardInterrupt, errinfo: 507 # print 'recv_dict_msg: keyboard interrupt during select' 508 mpd_print(0,"KeyboardInterrupt"); 509 return msg 510 except Exception, errinfo: 511 mpd_print(1, 'recv_dict_msg: exception during select %s :%s:' % \ 512 ( errinfo.__class__, errinfo)) 513 return msg 514 else: 515 readyToRecv = 1 516 if readyToRecv: 517 mpd_print(mpd_dbg_level,"readyToRecv"); 518 try: 519 pickledLen = '' 520 tempRecvd = '' 521 lenLeft = 8 522 while lenLeft: 523 while (1): 524 try: 525 tempRecvd = self.sock.recv(lenLeft) 526 # FIXME: Shouldn't this block until there is a 527 # message unless it raises an exception. 528 # Is no message an EOF, and in that case, 529 # do we really want to immediately delete 530 # the corresponding entry? 531 #if not pickledLen: 532 # mpd_print(1,"continuing because recv failed") 533 # continue 534 break 535 except socket.error,errinfo: 536 if errinfo[0] == EINTR: 537 mpd_print(mpd_dbg_level,"Saw EINTR") 538 pass 539 elif errinfo[0] == ECONNRESET: 540 mpd_print(mpd_dbg_level,"Saw ECONNRESET, ignore (return null msg)") 541 return msg; 542 else: 543 mpd_print_tb(1,"recv_dict_msg: sock.recv(8): errinfo=:%s:" % (errinfo)) 544 raise socket.error,errinfo 545 # end of while(1) 546 if not tempRecvd: 547 break 548 pickledLen += tempRecvd 549 lenLeft -= len(tempRecvd) 550 if not pickledLen: 551 mpd_print(mpd_dbg_level,"no pickeled len") 552 if pickledLen: 553 pickledLen = int(pickledLen) 554 pickledMsg = '' 555 lenLeft = pickledLen 556 while lenLeft: 557 while (1): 558 try: 559 recvdMsg = self.sock.recv(lenLeft) 560 break 561 except socket.error,errinfo: 562 if errinfo[0] == EINTR: 563 pass 564 else: 565 mpd_print_tb(1,"recv_dict_msg: sock.recv(8): errinfo=:%s:" % (errinfo)) 566 raise socket.error,errinfo 567 # end of while(1) 568 569 pickledMsg += recvdMsg 570 lenLeft -= len(recvdMsg) 571 msg = loads(pickledMsg) 572 except socket.error, errinfo: 573 if errinfo[0] == EINTR: 574 mpd_print(1, "Unhandled EINTR on sock.recv") 575 return msg 576 elif errinfo[0] == ECONNRESET: # connection reset (treat as eof) 577 mpd_print(mpd_dbg_level,"Connection reset") 578 pass # socket.error: (104, 'Connection reset by peer') 579 else: 580 mpd_print_tb(1,'recv_dict_msg: socket error: errinfo=:%s:' % (errinfo)) 581 except StandardError, errmsg: # any built-in exceptions 582 mpd_print_tb(1, 'recv_dict_msg: errmsg=:%s:' % (errmsg) ) 583 except Exception, errmsg: 584 mpd_print_tb(1, 'recv_dict_msg failed on sock %s errmsg=:%s:' % \ 585 (self.name,errmsg) ) 586 if mpd_dbg_level: 587 if msg: 588 mpd_print(1,"Returning with non-null msg, length = %d, head = %s" % (pickledLen,pickledMsg[0:32].replace('\n','<NL>') ) ) 589 else: 590 mpd_print(1,"Returning with null msg" ) 591 return msg 592 def recv_char_msg(self): 593 return self.recv_one_line() # use leading len later 594 def recv_one_line(self): 595 msg = '' 596 # A failure with EINTR was observed here, so a loop to retry on 597 # EINTR has been added 598 try: 599 while 1: 600 try: 601 c = self.sock.recv(1) 602 break 603 except socket.error, errinfo: 604 if errinfo[0] != EINTR: 605 raise socket.error, errinfo 606 # end of while 607 except socket.error, errinfo: 608 if errinfo[0] == EINTR: # sigchld, sigint, etc. 609 # This should no longer happen (handled above) 610 mpd_print_tb( 1, "Unhandled EINTR in sock.recv" ); 611 return msg 612 elif errinfo[0] == ECONNRESET: # connection reset (treat as eof) 613 return msg 614 else: 615 print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0])) 616 sys.exit(-1) 617 except Exception, errmsg: 618 c = '' 619 msg = '' 620 mpd_print_tb(1, 'recv_char_msg: errmsg=:%s:' % (errmsg) ) 621 if c: 622 while c != '\n': 623 msg += c 624 try: 625 c = self.sock.recv(1) 626 except socket.error, errinfo: 627 if errinfo[0] == EINTR: # sigchld, sigint, etc. 628 return msg 629 elif errinfo[0] == ECONNRESET: # connection reset (treat as eof) 630 return msg 631 else: 632 print '%s: recv error: %s' % (mpd_my_id,os.strerror(errinfo[0])) 633 sys.exit(-1) 634 except Exception, errmsg: 635 c = '' 636 msg = '' 637 mpd_print_tb(1, 'recv_char_msg: errmsg=:%s:' % (errmsg) ) 638 break 639 msg += c 640 return msg 641 642 # The default behavior on an error needs to be to handle and/or report 643 # it. Otherwise, we all waste time trying to figure out why 644 # the code is silently failing. I've set the default for errprint 645 # to YES rather than NO. 646 def send_dict_msg(self,msg,errprint=1): 647 pickledMsg = dumps(msg) 648 # FIXME: Does this automatically handle EINTR, or does it need an 649 # except os.error, errinfo: and check on errinfo[0] == EINTR 650 try: 651 while 1: 652 try: 653 self.sendall( "%08d%s" % (len(pickledMsg),pickledMsg) ) 654 break 655 except socket.error, errmsg: 656 if errmsg[0] == EPIPE \ 657 or errmsg[0] == ECONNRESET \ 658 or errmsg[0] == EINTR: 659 # silent failure on pipe failure, as we usually 660 # just want to discard messages in this case 661 # (We need to plan error handling more thoroughly) 662 break ## RMB: chgd from pass 663 else: 664 raise socket.error, errmsg 665 # end of While 666 except Exception, errmsg: 667 mpd_print_tb(errprint,'send_dict_msg: sock=%s errmsg=:%s:' % (self.name,errmsg)) 668 def send_char_msg(self,msg,errprint=1): 669 try: 670 while 1: 671 try: 672 self.sock.sendall(msg) 673 break 674 except socket.error, errmsg: 675 if errmsg[0] == EPIPE: 676 # silent failure on pipe failure, as we usually 677 # just want to discard messages in this case 678 # (We need to plan error handling more thoroughly) 679 pass 680 if errmsg[0] != EINTR: 681 raise socket.error, errmsg 682 # end of While 683 except Exception, errmsg: 684 mpd_print_tb(errprint,'send_char_msg: sock=%s errmsg=:%s:' % (self.name,errmsg)) 685 686class MPDListenSock(MPDSock): 687 def __init__(self,host='',port=0,filename='',listen=5,name='listener',**kargs): 688 MPDSock.__init__(self,name=name,**kargs) 689 self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) 690 if filename: 691 self.sock.bind(filename) 692 self.sock.listen(listen) 693 return 694 # see if we have a PORT_RANGE environment variable 695 try: 696 port_range = os.environ['MPIEXEC_PORT_RANGE'] 697 (low_port, high_port) = map(int, port_range.split(':')) 698 except: 699 try: 700 port_range = os.environ['MPICH_PORT_RANGE'] 701 (low_port, high_port) = map(int, port_range.split(':')) 702 except: 703 (low_port,high_port) = (0,0) 704 if low_port < 0 or high_port < low_port: 705 (low_port,high_port) = (0,0) 706 if low_port != 0 and high_port != 0: 707 if port == 0: 708 port = low_port 709 while 1: 710 try: 711 self.sock.bind((host,port)) 712 self.sock.listen(listen) 713 break 714 except socket.error, e: 715 port += 1 716 if port <= high_port: 717 self.sock.close() 718 MPDSock.__init__(self,name=name,**kargs) 719 self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) 720 continue 721 else: 722 mpd_print_tb(1,'** no free ports in MPICH_PORT_RANGE') 723 sys.exit(-1) 724 else: # else use the explicitly specified port 725 if port < low_port or port > high_port: 726 mpd_print_tb(1,'** port %d is outside MPICH_PORT_RANGE' % port) 727 sys.exit(-1) 728 self.sock.bind((host,port)) # go ahead and bind 729 self.sock.listen(listen) 730 else: 731 self.sock.bind((host,port)) # no port range set, so just bind as usual 732 self.sock.listen(listen) 733 734class MPDStreamHandler(object): 735 def __init__(self): 736 self.activeStreams = {} 737 def set_handler(self,stream,handler,args=()): 738 self.activeStreams[stream] = (handler,args) 739 def del_handler(self,stream): 740 if self.activeStreams.has_key(stream): 741 del self.activeStreams[stream] 742 def close_all_active_streams(self): 743 for stream in self.activeStreams.keys(): 744 del self.activeStreams[stream] 745 stream.close() 746 def handle_active_streams(self,streams=None,timeout=0.1): 747 global mpd_signum 748 while 1: 749 if streams: 750 streamsToSelect = streams 751 else: 752 streamsToSelect = self.activeStreams.keys() 753 readyStreams = [] 754 try: 755 mpd_signum = 0 756 (readyStreams,u1,u2) = select.select(streamsToSelect,[],[],timeout) 757 break 758 except select.error, errinfo: 759 if errinfo[0] == EINTR: 760 if mpd_signum == signal.SIGCHLD: 761 break 762 if mpd_signum == signal.SIGINT or mpd_signum == signal.SIGALRM: 763 break 764 else: 765 continue 766 else: 767 print '%s: handle_active_streams: select error: %s' % \ 768 (mpd_my_id,os.strerror(errinfo[0])) 769 return (-1,os.strerror(errinfo[0])) 770 except KeyboardInterrupt, errinfo: 771 # print 'handle_active_streams: keyboard interrupt during select' 772 return (-1,errinfo.__class__,errinfo) 773 except Exception, errinfo: 774 print 'handle_active_streams: exception during select %s :%s:' % \ 775 ( errinfo.__class__, errinfo) 776 return (-1,errinfo.__class__,errinfo) 777 for stream in readyStreams: 778 if self.activeStreams.has_key(stream): 779 (handler,args) = self.activeStreams[stream] 780 handler(stream,*args) 781 else: 782 # this is not nec bad; an active stream (handler) may 783 # have been deleted by earlier handler in this loop 784 print '*** OOPS, unknown stream in handle_active_streams' 785 return (len(readyStreams),0) # len >= 0 786 787class MPDRing(object): 788 def __init__(self,listenSock=None,streamHandler=None,secretword='', 789 myIfhn='',entryIfhn='',entryPort=0,zcMyLevel=0): 790 if not streamHandler: 791 mpd_print(1, "must supply handler for new conns in ring") 792 sys.exit(-1) 793 if not listenSock: 794 mpd_print(1, "must supply listenSock for new ring") 795 sys.exit(-1) 796 if not myIfhn: 797 mpd_print(1, "must supply myIfhn for new ring") 798 sys.exit(-1) 799 self.secretword = secretword 800 self.myIfhn = myIfhn 801 self.generation = 0 802 self.listenSock = listenSock 803 self.listenPort = self.listenSock.sock.getsockname()[1] 804 self.streamHandler = streamHandler 805 self.streamHandler.set_handler(self.listenSock,self.handle_ring_listener_connection) 806 self.entryIfhn = entryIfhn 807 self.entryPort = entryPort 808 self.lhsIfhn = '' 809 self.lhsPort = 0 810 self.rhsIfhn = '' 811 self.rhsPort = 0 812 self.lhsSock = 0 813 self.rhsSock = 0 814 self.lhsHandler = None 815 self.rhsHandler = None 816 self.zcMyLevel = zcMyLevel 817 if self.zcMyLevel: 818 mpd_init_zc(self.myIfhn,self.zcMyLevel) 819 def create_single_mem_ring(self,ifhn='',port=0,lhsHandler=None,rhsHandler=None): 820 self.lhsSock,self.rhsSock = mpd_sockpair() 821 self.lhsIfhn = ifhn 822 self.lhsPort = port 823 self.rhsIfhn = ifhn 824 self.rhsPort = port 825 self.lhsHandler = lhsHandler 826 self.streamHandler.set_handler(self.lhsSock,lhsHandler) 827 self.rhsHandler = rhsHandler 828 self.streamHandler.set_handler(self.rhsSock,rhsHandler) 829 def reenter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=5): 830 if mpd_zc: 831 mpd_close_zc() 832 mpd_init_zc(self.myIfhn,self.zcMyLevel) 833 rc = -1 834 numTries = 0 835 self.generation += 1 836 while rc < 0 and numTries < ntries: 837 numTries += 1 838 rc = self.enter_ring(entryIfhn=entryIfhn,entryPort=entryPort, 839 lhsHandler=lhsHandler,rhsHandler=rhsHandler, 840 ntries=1) 841 sleepTime = random() * 1.5 # a single random is between 0 and 1 842 sleep(sleepTime) 843 mpd_print(1,'reenter_ring rc=%d after numTries=%d' % (rc,numTries) ) 844 return rc 845 def enter_ring(self,entryIfhn='',entryPort=0,lhsHandler='',rhsHandler='',ntries=1): 846 if not lhsHandler or not rhsHandler: 847 print 'missing handler for enter_ring' 848 sys.exit(-1) 849 if not entryIfhn: 850 entryIfhn = self.entryIfhn 851 if not entryPort: 852 entryPort = self.entryPort 853 if not entryIfhn and mpd_zc: 854 if self.zcMyLevel == 1: 855 (entryHost,entryPort) = ('',0) 856 else: 857 (entryIfhn,entryPort) = mpd_find_zc_peer(self.zcMyLevel-1) 858 if not entryPort: 859 print "FAILED TO FIND A PEER AT LEVEL", self.zcMyLevel-1 860 sys.exit(-1) 861 print "ENTRY INFO", (entryIfhn,entryPort) 862 if not entryIfhn: 863 self.create_single_mem_ring(ifhn=self.myIfhn, 864 port=self.listenPort, 865 lhsHandler=lhsHandler, 866 rhsHandler=rhsHandler) 867 else: 868 rv = self.connect_lhs(lhsIfhn=entryIfhn, 869 lhsPort=entryPort, 870 lhsHandler=lhsHandler, 871 numTries=ntries) 872 if rv[0] <= 0: # connect failed with problem 873 mpd_print(1,"lhs connect failed") 874 return -1 875 if rv[1]: # rhsifhn and rhsport 876 rhsIfhn = rv[1][0] 877 rhsPort = rv[1][1] 878 else: 879 mpd_print(1,"did not recv rhs host&port from lhs") 880 return -1 881 rv = self.connect_rhs(rhsIfhn=rhsIfhn, 882 rhsPort=rhsPort, 883 rhsHandler=rhsHandler, 884 numTries=ntries) 885 if rv[0] <= 0: # connect did not succeed; may try again 886 mpd_print(1,"rhs connect failed") 887 return -1 888 if mpd_zc: 889 mpd_register_zc(self.myIfhn,self.zcMyLevel) 890 return 0 891 def connect_lhs(self,lhsIfhn='',lhsPort=0,lhsHandler=None,numTries=1): 892 if not lhsHandler: 893 mpd_print(1, "must supply handler for lhs in ring") 894 return (-1,None) 895 if not lhsIfhn: 896 mpd_print(1, "must supply host for lhs in ring") 897 return (-1,None) 898 self.lhsIfhn = lhsIfhn 899 if not lhsPort: 900 mpd_print(1, "must supply port for lhs in ring") 901 return (-1,None) 902 self.lhsPort = lhsPort 903 numConnTries = 0 904 while numConnTries < numTries: 905 numConnTries += 1 906 self.lhsSock = MPDSock(name='lhs') 907 try: 908 self.lhsSock.connect((self.lhsIfhn,self.lhsPort)) 909 except socket.error, errinfo: 910 print '%s: conn error in connect_lhs: %s' % \ 911 (mpd_my_id,os.strerror(errinfo[0])) 912 self.lhsSock.close() 913 self.lhsSock = 0 914 sleep(random()) 915 continue 916 break 917 if not self.lhsSock or numConnTries > numTries: 918 mpd_print(1,'failed to connect to lhs at %s %d' % (self.lhsIfhn,self.lhsPort)) 919 return (0,None) 920 msgToSend = { 'cmd' : 'request_to_enter_as_rhs', 'ifhn' : self.myIfhn, 921 'port' : self.listenPort, 922 'mpd_version' : mpd_version() } 923 self.lhsSock.send_dict_msg(msgToSend) 924 msg = self.lhsSock.recv_dict_msg() 925 if (not msg) \ 926 or (not msg.has_key('cmd')) \ 927 or (not msg['cmd'] == 'challenge') \ 928 or (not msg.has_key('randnum')) \ 929 or (not msg.has_key('generation')): 930 mpd_print(1,'invalid challenge from %s %d: %s' % \ 931 (self.lhsIfhn,self.lhsPort,msg) ) 932 return (-1,None) 933 if msg['generation'] < self.generation: 934 mpd_print(1,'bad generation from lhs; lhsgen=%d mygen=%d' % (msg['generation'],self.generation)) 935 return(-1,'bad_generation') # RMB: try again here later 936 response = md5new(''.join([self.secretword,msg['randnum']])).digest() 937 msgToSend = { 'cmd' : 'challenge_response', 'response' : response, 938 'ifhn' : self.myIfhn, 'port' : self.listenPort } 939 self.lhsSock.send_dict_msg(msgToSend) 940 msg = self.lhsSock.recv_dict_msg() 941 if (not msg) \ 942 or (not msg.has_key('cmd')) \ 943 or (not msg['cmd'] == 'OK_to_enter_as_rhs'): 944 mpd_print(1,'NOT OK to enter ring; one likely cause: mismatched secretwords') 945 return (-1,None) 946 self.lhsHandler = lhsHandler 947 self.streamHandler.set_handler(self.lhsSock,lhsHandler) 948 if msg.has_key('rhsifhn') and msg.has_key('rhsport'): 949 return (1,(msg['rhsifhn'],msg['rhsport'])) 950 else: 951 return (1,None) 952 def connect_rhs(self,rhsIfhn='',rhsPort=0,rhsHandler=None,numTries=1): 953 if not rhsHandler: 954 mpd_print(1, "must supply handler for rhs in ring") 955 return (-1,None) 956 if not rhsIfhn: 957 mpd_print(1, "must supply host for rhs in ring") 958 return (-1,None) 959 self.rhsIfhn = rhsIfhn 960 if not rhsPort: 961 mpd_print(1, "must supply port for rhs in ring") 962 return (-1,None) 963 self.rhsPort = rhsPort 964 numConnTries = 0 965 while numConnTries < numTries: 966 numConnTries += 1 967 self.rhsSock = MPDSock(name='rhs') 968 try: 969 self.rhsSock.connect((self.rhsIfhn,self.rhsPort)) 970 except socket.error, errinfo: 971 print '%s: conn error in connect_rhs: %s' % \ 972 (mpd_my_id,os.strerror(errinfo[0])) 973 self.rhsSock.close() 974 self.rhsSock = 0 975 sleep(random()) 976 continue 977 break 978 if not self.rhsSock or numConnTries > numTries: 979 mpd_print(1,'failed to connect to rhs at %s %d' % (self.rhsIfhn,self.rhsPort)) 980 return (0,None) 981 msgToSend = { 'cmd' : 'request_to_enter_as_lhs', 'ifhn' : self.myIfhn, 982 'port' : self.listenPort, 983 'mpd_version' : mpd_version() } 984 self.rhsSock.send_dict_msg(msgToSend) 985 msg = self.rhsSock.recv_dict_msg() 986 if (not msg) \ 987 or (not msg.has_key('cmd')) \ 988 or (not msg['cmd'] == 'challenge') \ 989 or (not msg.has_key('randnum')) \ 990 or (not msg.has_key('generation')): 991 mpd_print(1,'invalid challenge from %s %d: %s' % (self.rhsIfhn,rhsPort,msg) ) 992 return (-1,None) 993 if msg['generation'] < self.generation: 994 mpd_print(1,'bad generation from rhs; lhsgen=%d mygen=%d' % (msg['generation'],self.generation)) 995 return(-1,'bad_generation') # RMB: try again here later 996 response = md5new(''.join([self.secretword,msg['randnum']])).digest() 997 msgToSend = { 'cmd' : 'challenge_response', 'response' : response, 998 'ifhn' : self.myIfhn, 'port' : self.listenPort } 999 self.rhsSock.send_dict_msg(msgToSend) 1000 msg = self.rhsSock.recv_dict_msg() 1001 if (not msg) \ 1002 or (not msg.has_key('cmd')) \ 1003 or (not msg['cmd'] == 'OK_to_enter_as_lhs'): 1004 mpd_print(1,'NOT OK to enter ring; one likely cause: mismatched secretwords') 1005 return (-1,None) 1006 self.rhsHandler = rhsHandler 1007 self.streamHandler.set_handler(self.rhsSock,rhsHandler) 1008 if msg.has_key('lhsifhn') and msg.has_key('lhsport'): 1009 return (1,(msg['lhsifhn'],msg['lhsport'])) 1010 else: 1011 return (1,None) 1012 def accept_lhs(self,lhsHandler=None): 1013 self.lhsHandler = lhsHandler 1014 newsock = self.handle_ring_listener_connection(self.listenSock) 1015 self.handle_lhs_challenge_response(newsock) 1016 self.streamHandler.set_handler(self.lhsSock,lhsHandler) 1017 def accept_rhs(self,rhsHandler=None): 1018 self.rhsHandler = rhsHandler 1019 newsock = self.handle_ring_listener_connection(self.listenSock) 1020 self.handle_rhs_challenge_response(newsock) 1021 self.streamHandler.set_handler(self.rhsSock,rhsHandler) 1022 def handle_ring_listener_connection(self,sock): 1023 randHiRange = 10000 1024 (newsock,newaddr) = sock.accept() 1025 newsock.name = 'candidate_to_enter_ring' 1026 msg = newsock.recv_dict_msg() 1027 if (not msg) or \ 1028 (not msg.has_key('cmd')) or (not msg.has_key('ifhn')) or \ 1029 (not msg.has_key('port')): 1030 mpd_print(1, 'INVALID msg from new connection :%s: msg=:%s:' % (newaddr,msg) ) 1031 newsock.close() 1032 return None 1033 if msg.has_key('mpd_version'): # ping, etc may not have one 1034 if msg['mpd_version'] != mpd_version(): 1035 msgToSend = { 'cmd' : 'entry_rejected_bad_mpd_version', 1036 'your_version' : msg['mpd_version'], 1037 'my_version' : mpd_version() } 1038 newsock.send_dict_msg(msgToSend) 1039 newsock.close() 1040 return None 1041 randNumStr = '%04d' % (randrange(1,randHiRange)) # 0001-(hi-1), inclusive 1042 newsock.correctChallengeResponse = \ 1043 md5new(''.join([self.secretword,randNumStr])).digest() 1044 msgToSend = { 'cmd' : 'challenge', 'randnum' : randNumStr, 1045 'generation' : self.generation } 1046 newsock.send_dict_msg(msgToSend) 1047 if msg['cmd'] == 'request_to_enter_as_lhs': 1048 self.streamHandler.set_handler(newsock,self.handle_lhs_challenge_response) 1049 newsock.name = 'candidate_for_lhs_challenged' 1050 return newsock 1051 elif msg['cmd'] == 'request_to_enter_as_rhs': 1052 self.streamHandler.set_handler(newsock,self.handle_rhs_challenge_response) 1053 newsock.name = 'candidate_for_rhs_challenged' 1054 return newsock 1055 elif msg['cmd'] == 'ping': 1056 # already sent challenge instead of ack 1057 newsock.close() 1058 return None 1059 else: 1060 mpd_print(1, 'INVALID msg from new connection :%s: msg=:%s:' % (newaddr,msg) ) 1061 newsock.close() 1062 return None 1063 return None 1064 def handle_lhs_challenge_response(self,sock): 1065 msg = sock.recv_dict_msg() 1066 if (not msg) or \ 1067 (not msg.has_key('cmd')) or (not msg.has_key('response')) or \ 1068 (not msg.has_key('ifhn')) or (not msg.has_key('port')) or \ 1069 (not msg['response'] == sock.correctChallengeResponse): 1070 mpd_print(1, 'INVALID msg for lhs response msg=:%s:' % (msg) ) 1071 msgToSend = { 'cmd' : 'invalid_response' } 1072 sock.send_dict_msg(msgToSend) 1073 self.streamHandler.del_handler(sock) 1074 sock.close() 1075 else: 1076 msgToSend = { 'cmd' : 'OK_to_enter_as_lhs' } 1077 sock.send_dict_msg(msgToSend) 1078 if self.lhsSock: 1079 self.streamHandler.del_handler(self.lhsSock) 1080 self.lhsSock.close() 1081 self.lhsSock = sock 1082 self.lhsIfhn = msg['ifhn'] 1083 self.lhsPort = int(msg['port']) 1084 self.streamHandler.set_handler(self.lhsSock,self.lhsHandler) 1085 self.lhsSock.name = 'lhs' 1086 def handle_rhs_challenge_response(self,sock): 1087 msg = sock.recv_dict_msg() 1088 if (not msg) or \ 1089 (not msg.has_key('cmd')) or (not msg.has_key('response')) or \ 1090 (not msg.has_key('ifhn')) or (not msg.has_key('port')): 1091 mpd_print(1, 'INVALID msg for rhs response msg=:%s:' % (msg) ) 1092 msgToSend = { 'cmd' : 'invalid_response' } 1093 sock.send_dict_msg(msgToSend) 1094 self.streamHandler.del_handler(sock) 1095 sock.close() 1096 elif msg['response'] != sock.correctChallengeResponse: 1097 mpd_print(1, 'INVALID response in rhs response msg=:%s:' % (msg) ) 1098 msgToSend = { 'cmd' : 'invalid_response' } 1099 sock.send_dict_msg(msgToSend) 1100 self.streamHandler.del_handler(sock) 1101 sock.close() 1102 elif msg['response'] == 'bad_generation': 1103 mpd_print(1, 'someone failed entering my ring gen=%d msg=%s' % \ 1104 (self.generation,msg) ) 1105 self.streamHandler.del_handler(sock) 1106 sock.close() 1107 else: 1108 msgToSend = { 'cmd' : 'OK_to_enter_as_rhs', 'rhsifhn' : self.rhsIfhn, 1109 'rhsip' : self.rhsIfhn, 'rhsport' : self.rhsPort } 1110 sock.send_dict_msg(msgToSend) 1111 if self.rhsSock: 1112 self.streamHandler.del_handler(self.rhsSock) 1113 self.rhsSock.close() 1114 self.rhsSock = sock 1115 self.rhsIfhn = msg['ifhn'] 1116 self.rhsPort = int(msg['port']) 1117 self.streamHandler.set_handler(self.rhsSock,self.rhsHandler) 1118 self.rhsSock.name = 'rhs' 1119 1120class MPDConListenSock(MPDListenSock): 1121 def __init__(self,name='console_listen',secretword='',**kargs): 1122 if os.environ.has_key('MPD_CON_EXT'): 1123 self.conExt = '_' + os.environ['MPD_CON_EXT'] 1124 else: 1125 self.conExt = '' 1126 self.conFilename = mpd_tmpdir + '/mpd2.console_' + mpd_get_my_username() + self.conExt 1127 self.secretword = secretword 1128 consoleAlreadyExists = 0 1129 if hasattr(socket,'AF_UNIX'): 1130 sockFamily = socket.AF_UNIX 1131 else: 1132 sockFamily = socket.AF_INET 1133 if os.environ.has_key('MPD_CON_INET_HOST_PORT'): 1134 sockFamily = socket.AF_INET # override above-assigned value 1135 (conHost,conPort) = os.environ['MPD_CON_INET_HOST_PORT'].split(':') 1136 conPort = int(conPort) 1137 else: 1138 (conHost,conPort) = ('',0) 1139 if os.access(self.conFilename,os.R_OK): # if console there, see if mpd listening 1140 if hasattr(socket,'AF_UNIX') and sockFamily == socket.AF_UNIX: 1141 tempSock = MPDSock(family=socket.AF_UNIX) 1142 try: 1143 tempSock.connect(self.conFilename) 1144 consoleAlreadyExists = 1 1145 except Exception, errmsg: 1146 os.unlink(self.conFilename) 1147 tempSock.close() 1148 else: 1149 if not conPort: 1150 conFile = open(self.conFilename) 1151 for line in conFile: 1152 line = line.strip() 1153 (k,v) = line.split('=') 1154 if k == 'port': 1155 conPort = int(v) 1156 conFile.close() 1157 tempSock = MPDSock() 1158 try: 1159 tempSock.sock.connect(('localhost',conPort)) 1160 consoleAlreadyExists = 1 1161 except Exception, errmsg: 1162 os.unlink(self.conFilename) 1163 tempSock.close() 1164 if consoleAlreadyExists: 1165 print 'An mpd is already running with console at %s on %s. ' % \ 1166 (self.conFilename, socket.gethostname()) 1167 print 'Start mpd with the -n option for a second mpd on same host.' 1168 if syslog_module_available: 1169 syslog.syslog(syslog.LOG_ERR, 1170 "%s: exiting; an mpd is already using the console" % \ 1171 (mpd_my_id)) 1172 sys.exit(-1) 1173 if hasattr(socket,'AF_UNIX') and sockFamily == socket.AF_UNIX: 1174 MPDListenSock.__init__(self,family=sockFamily,socktype=socket.SOCK_STREAM, 1175 filename=self.conFilename,listen=1,name=name) 1176 else: 1177 MPDListenSock.__init__(self,family=sockFamily,socktype=socket.SOCK_STREAM, 1178 port=conPort,listen=1,name=name) 1179 conFD = os.open(self.conFilename,os.O_CREAT|os.O_WRONLY|os.O_EXCL,0600) 1180 self.port = self.sock.getsockname()[1] 1181 os.write(conFD,'port=%d\n' % (self.port) ) 1182 os.close(conFD) 1183 1184class MPDConClientSock(MPDSock): 1185 def __init__(self,name='console_to_mpd',mpdroot='',secretword='',**kargs): 1186 MPDSock.__init__(self) 1187 self.sock = 0 1188 if os.environ.has_key('MPD_CON_EXT'): 1189 self.conExt = '_' + os.environ['MPD_CON_EXT'] 1190 else: 1191 self.conExt = '' 1192 self.secretword = secretword 1193 if mpdroot: 1194 self.conFilename = mpd_tmpdir + '/mpd2.console_root' + self.conExt 1195 self.sock = MPDSock(family=socket.AF_UNIX,name=name) 1196 rootpid = os.fork() 1197 if rootpid == 0: 1198 os.execvpe(mpdroot,[mpdroot,self.conFilename,str(self.sock.fileno())],{}) 1199 mpd_print(1,'failed to exec mpdroot (%s)' % mpdroot ) 1200 sys.exit(-1) 1201 else: 1202 (pid,status) = os.waitpid(rootpid,0) 1203 if os.WIFSIGNALED(status): 1204 status = status & 0x007f # AND off core flag 1205 else: 1206 status = os.WEXITSTATUS(status) 1207 if status != 0: 1208 mpd_print(1,'forked process failed; status=%s' % status) 1209 sys.exit(-1) 1210 else: 1211 self.conFilename = mpd_tmpdir + '/mpd2.console_' + mpd_get_my_username() + self.conExt 1212 if hasattr(socket,'AF_UNIX'): 1213 sockFamily = socket.AF_UNIX 1214 else: 1215 sockFamily = socket.AF_INET 1216 if os.environ.has_key('MPD_CON_INET_HOST_PORT'): 1217 sockFamily = socket.AF_INET # override above-assigned value 1218 (conHost,conPort) = os.environ['MPD_CON_INET_HOST_PORT'].split(':') 1219 conPort = int(conPort) 1220 else: 1221 (conHost,conPort) = ('',0) 1222 self.sock = MPDSock(family=sockFamily,socktype=socket.SOCK_STREAM,name=name) 1223 if hasattr(socket,'AF_UNIX') and sockFamily == socket.AF_UNIX: 1224 if hasattr(signal,'alarm'): 1225 oldAlarmTime = signal.alarm(8) 1226 else: # assume python2.3 or later 1227 oldTimeout = socket.getdefaulttimeout() 1228 socket.setdefaulttimeout(8) 1229 try: 1230 self.sock.connect(self.conFilename) 1231 except Exception, errmsg: 1232 self.sock.close() 1233 self.sock = 0 1234 if hasattr(signal,'alarm'): 1235 signal.alarm(oldAlarmTime) 1236 else: # assume python2.3 or later 1237 socket.setdefaulttimeout(oldTimeout) 1238 if self.sock: 1239 # this is done by mpdroot otherwise 1240 msgToSend = 'realusername=%s secretword=UNUSED\n' % \ 1241 mpd_get_my_username() 1242 self.sock.send_char_msg(msgToSend) 1243 else: 1244 if not conPort: 1245 conFile = open(self.conFilename) 1246 for line in conFile: 1247 line = line.strip() 1248 (k,v) = line.split('=') 1249 if k == 'port': 1250 conPort = int(v) 1251 conFile.close() 1252 if conHost: 1253 conIfhn = socket.gethostbyname_ex(conHost)[2][0] 1254 else: 1255 conIfhn = 'localhost' 1256 self.sock = MPDSock(name=name) 1257 if hasattr(signal,'alarm'): 1258 oldAlarmTime = signal.alarm(8) 1259 else: # assume python2.3 or later 1260 oldTimeout = socket.getdefaulttimeout() 1261 socket.setdefaulttimeout(8) 1262 try: 1263 self.sock.connect((conIfhn,conPort)) 1264 except Exception, errmsg: 1265 mpd_print(1,"failed to connect to host %s port %d" % \ 1266 (conIfhn,conPort) ) 1267 self.sock.close() 1268 self.sock = 0 1269 if hasattr(signal,'alarm'): 1270 signal.alarm(oldAlarmTime) 1271 else: # assume python2.3 or later 1272 socket.setdefaulttimeout(oldTimeout) 1273 if not self.sock: 1274 print '%s: cannot connect to local mpd (%s); possible causes:' % \ 1275 (mpd_my_id,self.conFilename) 1276 print ' 1. no mpd is running on this host' 1277 print ' 2. an mpd is running but was started without a "console" (-n option)' 1278 print 'In case 1, you can start an mpd on this host with:' 1279 print ' mpd &' 1280 print 'and you will be able to run jobs just on this host.' 1281 print 'For more details on starting mpds on a set of hosts, see' 1282 print 'the MPICH2 Installation Guide.' 1283 sys.exit(-1) 1284 msgToSend = { 'cmd' : 'con_init' } 1285 self.sock.send_dict_msg(msgToSend) 1286 msg = self.sock.recv_dict_msg() 1287 if not msg: 1288 mpd_print(1,'expected con_challenge from mpd; got eof') 1289 sys.exit(-1) 1290 if msg['cmd'] != 'con_challenge': 1291 mpd_print(1,'expected con_challenge from mpd; got msg=:%s:' % (msg) ) 1292 sys.exit(-1) 1293 randVal = self.secretword + str(msg['randnum']) 1294 response = md5new(randVal).digest() 1295 msgToSend = { 'cmd' : 'con_challenge_response', 'response' : response, 1296 'realusername' : mpd_get_my_username() } 1297 self.sock.send_dict_msg(msgToSend) 1298 msg = self.sock.recv_dict_msg() 1299 if not msg or msg['cmd'] != 'valid_response': 1300 mpd_print(1,'expected valid_response from mpd; got msg=:%s:' % (msg) ) 1301 sys.exit(-1) 1302 if not self.sock: 1303 print '%s: cannot connect to local mpd (%s); possible causes:' % \ 1304 (mpd_my_id,self.conFilename) 1305 print ' 1. no mpd is running on this host' 1306 print ' 2. an mpd is running but was started without a "console" (-n option)' 1307 print 'In case 1, you can start an mpd on this host with:' 1308 print ' mpd &' 1309 print 'and you will be able to run jobs just on this host.' 1310 print 'For more details on starting mpds on a set of hosts, see' 1311 print 'the MPICH2 Installation Guide.' 1312 sys.exit(-1) 1313 1314class MPDParmDB(dict): 1315 def __init__(self,orderedSources=[]): 1316 dict.__init__(self) 1317 self.orderedSources = orderedSources 1318 self.db = {} 1319 for src in orderedSources: # highest to lowest 1320 self.db[src] = {} 1321 def __setitem__(self,sk_tup,val): 1322 if type(sk_tup) != TupleType or len(sk_tup) != 2: 1323 mpd_print_tb(1,"must use a 2-tuple as key in a parm db; invalid: %s" % (sk_tup) ) 1324 sys.exit(-1) 1325 s,k = sk_tup 1326 for src in self.orderedSources: 1327 if src == s: 1328 self.db[src][k] = val 1329 break 1330 else: 1331 mpd_print_tb(1,"invalid src specified for insert into parm db; src=%s" % (src) ) 1332 sys.exit(-1) 1333 def __getitem__(self,key): 1334 for src in self.orderedSources: 1335 if self.db[src].has_key(key): 1336 return self.db[src][key] 1337 raise KeyError, "key %s not found in parm db" % (key) 1338 def has_key(self,key): 1339 for src in self.orderedSources: 1340 if self.db[src].has_key(key): 1341 return 1 1342 return 0 1343 def printall(self): 1344 print "MPDRUN's PARMDB; values from all sources:" 1345 for src in self.orderedSources: 1346 print ' %s (source)' % (src) 1347 for key in self.db[src].keys(): 1348 print ' %s = %s' % (key,self.db[src][key]) 1349 def printdef(self): 1350 print "MPDRUN's PARMDB; default values only:" 1351 printed = {} 1352 for src in self.orderedSources: 1353 for key in self.db[src]: 1354 if not printed.has_key(key): 1355 printed[key] = 1 1356 print ' %s %s = %s' % (src,key,self.db[src][key]) 1357 def get_parms_from_env(self,parmsToOverride): 1358 for k in parmsToOverride.keys(): 1359 if os.environ.has_key(k): 1360 self[('env',k)] = os.environ[k] 1361 def get_parms_from_rcfile(self,parmsToOverride,errIfMissingFile=0): 1362 if os.environ.has_key('MPD_CONF_FILE') and os.access(os.environ['MPD_CONF_FILE'], os.R_OK): 1363 parmsRCFilename = os.environ['MPD_CONF_FILE'] 1364 elif hasattr(os,'getuid') and os.getuid() == 0: # if ROOT 1365 parmsRCFilename = os.path.abspath('/etc/mpd.conf') 1366 elif os.environ.has_key('HOME') and os.access(os.path.join(os.environ['HOME'], '.mpd.conf'), os.R_OK): 1367 parmsRCFilename = os.path.join(os.environ['HOME'],'.mpd.conf') 1368 elif os.environ.has_key('HOMEPATH'): # e.g. win32 1369 parmsRCFilename = os.path.join(os.environ['HOMEPATH'],'.mpd.conf') 1370 else: 1371 print 'unable to find mpd.conf file' 1372 sys.exit(-1) 1373 if sys.platform == 'win32': 1374 mode = 0x80 # fake it 1375 else: 1376 try: 1377 mode = os.stat(parmsRCFilename)[0] 1378 except: 1379 mode = '' 1380 # sometimes a missing file is OK, e.g. when user running with root's mpd 1381 if not mode and not errIfMissingFile: 1382 return 1383 if not mode: 1384 print 'configuration file %s not found' % (parmsRCFilename) 1385 print 'A file named .mpd.conf file must be present in the user\'s home' 1386 print 'directory (/etc/mpd.conf if root) with read and write access' 1387 print 'only for the user, and must contain at least a line with:' 1388 print 'MPD_SECRETWORD=<secretword>' 1389 print 'One way to safely create this file is to do the following:' 1390 print ' cd $HOME' 1391 print ' touch .mpd.conf' 1392 print ' chmod 600 .mpd.conf' 1393 print 'and then use an editor to insert a line like' 1394 print ' MPD_SECRETWORD=mr45-j9z' 1395 print 'into the file. (Of course use some other secret word than mr45-j9z.)' 1396 sys.exit(-1) 1397 if (mode & 0x3f): 1398 print 'configuration file %s is accessible by others' % (parmsRCFilename) 1399 print 'change permissions to allow read and write access only by you' 1400 sys.exit(-1) 1401 parmsRCFile = open(parmsRCFilename) 1402 for line in parmsRCFile: 1403 lineWithoutComments = line.split('#')[0] # will at least be '' 1404 lineWithoutComments = lineWithoutComments.strip() 1405 if not lineWithoutComments: 1406 continue 1407 splitLine = lineWithoutComments.split('=') 1408 if not splitLine[0]: # [''] 1409 print 'warning: unrecognized (null) key in %s' % (parmsRCFilename) 1410 continue 1411 if len(splitLine) == 2: 1412 (k,v) = splitLine 1413 origKey = k 1414 if k == 'secretword': # for bkwd-compat 1415 k = 'MPD_SECRETWORD' 1416 if k in parmsToOverride.keys(): 1417 if k != 'MPD_SECRETWORD' and v.isdigit(): 1418 v = int(v) 1419 self[('rcfile',k)] = v 1420 else: 1421 mpd_print(1, 'line in mpd conf is not key=val pair; line=:%s:' % (line) ) 1422 1423class MPDTest(object): 1424 def __init__(self): 1425 pass 1426 def run(self,cmd='',expIn = '',chkEC=0,expEC=0,chkOut=0,expOut='',ordOut=0, 1427 grepOut=0, exitOnFail=1): 1428 rv = {} 1429 if chkOut and grepOut: 1430 print "grepOut and chkOut are mutually exclusive" 1431 sys.exit(-1) 1432 outLines = [] 1433 if subprocess_module_available: 1434 import re 1435 cmd = re.split(r'\s+',cmd) 1436 runner = subprocess.Popen(cmd,bufsize=0,env=os.environ,close_fds=True, 1437 stdin=subprocess.PIPE,stdout=subprocess.PIPE, 1438 stderr=subprocess.PIPE) 1439 if expIn: 1440 runner.stdin.write(expIn) 1441 runner.stdin.close() 1442 for line in runner.stdout: 1443 outLines.append(line[:-1]) # strip newlines 1444 for line in runner.stderr: 1445 outLines.append(line[:-1]) # strip newlines 1446 rv['pid'] = runner.pid 1447 rv['EC'] = runner.wait() 1448 elif hasattr(popen2,'Popen4'): # delete when python2.4+ is common 1449 runner = popen2.Popen4(cmd) 1450 if expIn: 1451 runner.tochild.write(expIn) 1452 runner.tochild.close() 1453 for line in runner.fromchild: 1454 outLines.append(line[:-1]) # strip newlines 1455 rv['pid'] = runner.pid 1456 rv['EC'] = runner.wait() 1457 else: 1458 mpd_print(1,'can not run with either subprocess or popen2-Popen4') 1459 sys.exit(-1) 1460 rv['OUT'] = outLines[:] 1461 if chkEC and expEC != rv['EC']: 1462 print "bad exit code from test: %s" % (cmd) 1463 print " expected exitcode=%d ; got %d" % (expEC,rv['EC']) 1464 print "output from cmd:" 1465 for line in outLines: 1466 print line 1467 if exitOnFail: 1468 sys.exit(-1) 1469 if chkOut: 1470 orderOK = 1 1471 expOut = expOut.split('\n')[:-1] # leave off trailing '' 1472 for line in outLines[:]: # copy of outLines 1473 if line in expOut: 1474 if ordOut and line != expOut[0]: 1475 orderOK = 0 1476 break # count rest of outLines as bad 1477 expOut.remove(line) 1478 outLines.remove(line) 1479 if not orderOK: 1480 print "lines out of order in output for test: %s" % (cmd) 1481 for line in outLines: 1482 print line 1483 if exitOnFail: 1484 sys.exit(-1) 1485 if expOut: 1486 print "some required lines not found in output for test: %s" % (cmd) 1487 for line in outLines: 1488 print line 1489 if exitOnFail: 1490 sys.exit(-1) 1491 if outLines: 1492 print "extra lines in output for test: %s" % (cmd) 1493 for line in outLines: 1494 print line 1495 if exitOnFail: 1496 sys.exit(-1) 1497 elif grepOut: 1498 foundCnt = 0 1499 for expLine in expOut: 1500 for outLine in outLines: 1501 if outLine.find(expLine) >= 0: 1502 foundCnt += 1 1503 if foundCnt < len(expOut): 1504 print "some lines not matched for test: %s" % (cmd) 1505 for line in outLines: 1506 print line 1507 if exitOnFail: 1508 sys.exit(-1) 1509 return rv 1510 1511#### experimental code for zeroconf 1512def mpd_init_zc(ifhn,my_level): 1513 import threading, Zeroconf 1514 global mpd_zc 1515 mpd_zc = Zeroconf.Zeroconf() 1516 class ListenerForPeers(object): 1517 def __init__(self): 1518 mpd_zc.peers = {} 1519 mpd_zc.peersLock = threading.Lock() 1520 mpd_zc.peers_available_event = threading.Event() 1521 def removeService(self, zc, service_type, name): 1522 mpd_zc.peersLock.acquire() 1523 del mpd_zc.peers[name] 1524 print "removed", name ; sys.stdout.flush() 1525 mpd_zc.peersLock.release() 1526 def addService(self, zc, service_type, name): 1527 info = zc.getServiceInfo(service_type, name) 1528 if info: 1529 if info.properties['username'] != mpd_get_my_username(): 1530 return 1531 mpd_zc.peersLock.acquire() 1532 mpd_zc.peers[name] = info 1533 print "added peer:", name, info.properties ; sys.stdout.flush() 1534 mpd_zc.peersLock.release() 1535 mpd_zc.peers_available_event.set() 1536 else: 1537 print "OOPS NO INFO FOR", name ; sys.stdout.flush() 1538 service_type = "_mpdzc._tcp.local." 1539 listenerForPeers = ListenerForPeers() 1540 browser = Zeroconf.ServiceBrowser(mpd_zc,service_type,listenerForPeers) 1541 ## sleep(1.5) # give browser a chance to find some peers 1542def mpd_find_zc_peer(peer_level): 1543 print "finding a peer at level %d..." % (peer_level) ; sys.stdout.flush() 1544 mpd_zc.peers_available_event.wait(5) 1545 for (peername,info) in mpd_zc.peers.items(): 1546 if info.properties['mpdid'] == mpd_my_id: 1547 continue 1548 if info.properties['level'] != peer_level: 1549 continue 1550 peerAddr = str(socket.inet_ntoa(info.getAddress())) 1551 peerPort = info.getPort() 1552 return(peerAddr,peerPort) 1553 return ('',0) 1554def mpd_register_zc(ifhn,level): 1555 import Zeroconf 1556 service_type = "_mpdzc._tcp.local." 1557 service_ifhn = socket.inet_aton(ifhn) 1558 service_host = socket.gethostname() 1559 service_port = int(mpd_my_id.split('_')[1]) 1560 svc = Zeroconf.ServiceInfo(service_type, 1561 mpd_my_id + service_type, 1562 address = service_ifhn, 1563 port = service_port, 1564 weight = 0, priority = 0, 1565 properties = { 'description': 'mpd', 1566 'mpdid' : mpd_my_id, 1567 'level' : level, 1568 'username' : mpd_get_my_username() } 1569 ) 1570 mpd_zc.registerService(svc) 1571def mpd_close_zc(): 1572 if mpd_zc: 1573 mpd_zc.close() 1574 1575 1576# code for testing 1577 1578def _handle_msg(sock): 1579 msg = sock.recv_dict_msg() 1580 print 'recvd msg=:%s:' % (msg) 1581 1582if __name__ == '__main__': 1583 sh = MPDStreamHandler() 1584 (tsock1,tsock2) = mpd_sockpair() 1585 tsock1.name = 'tsock1_connected_to_tsock2' 1586 sh.set_handler(tsock1,_handle_msg) 1587 tsock2.send_dict_msg( {'msgtype' : 'hello'} ) 1588 sh.handle_active_streams() 1589 # just to demo a listen sock 1590 lsock = MPDListenSock('',9999,name='listen_sock') 1591 print lsock.name, lsock.getsockname()[1] 1592 1593 ### import sys 1594 ### sys.excepthook = mpd_uncaught_except_tb 1595 ### i = 1/0 1596