1#!/usr/bin/env python
2#
3#   (C) 2001 by Argonne National Laboratory.
4#       See COPYRIGHT in top-level directory.
5#
6
7"""
8mpdman does NOT run as a standalone console program;
9    it is only exec'd (or imported) by mpd
10"""
11from time import ctime
12__author__ = "Ralph Butler and Rusty Lusk"
13__date__ = ctime()
14__version__ = "$Revision: 1.160 $"
15__credits__ = ""
16
17
18import sys, os, signal, socket
19
20from types    import ClassType
21from re       import findall, sub
22from cPickle  import loads
23from time     import sleep
24from urllib   import quote
25from mpdlib   import mpd_set_my_id, mpd_print, mpd_read_nbytes,  \
26                     mpd_sockpair, mpd_get_ranks_in_binary_tree, \
27                     mpd_get_my_username, mpd_set_cli_app,       \
28                     mpd_dbg_level, mpd_handle_signal,           \
29                     MPDSock, MPDListenSock, MPDStreamHandler, MPDRing
30
31try:
32    import  syslog
33    syslog_module_available = 1
34except:
35    syslog_module_available = 0
36try:
37    import  subprocess
38    subprocess_module_available = 1
39except:
40    subprocess_module_available = 0
41
42
43global clientPid, clientExited, clientExitStatus, clientExitStatusSent
44
45class MPDMan(object):
46    def __init__(self):
47        pass
48    def run(self):
49        global clientPid, clientExited, clientExitStatus, clientExitStatusSent
50        clientExited = 0
51        clientExitStatusSent = 0
52        if hasattr(signal,'SIGCHLD'):
53            signal.signal(signal.SIGCHLD,sigchld_handler)
54        self.myHost    = os.environ['MPDMAN_MYHOST']
55        self.myIfhn    = os.environ['MPDMAN_MYIFHN']
56        self.myRank    = int(os.environ['MPDMAN_RANK'])
57        self.posInRing = int(os.environ['MPDMAN_POS_IN_RING'])
58        self.myId      = self.myHost + '_mpdman_' + str(self.myRank)
59        self.spawned   = int(os.environ['MPDMAN_SPAWNED'])
60        self.spawnInProgress = 0
61        if self.spawned:
62            self.myId = self.myId + '_s'
63        # Note that in the spawned process case, this id for the mpdman
64        # will not be unique (it needs something like the world number
65        # or the pid of the mpdman process itself)
66        mpd_set_my_id(myid=self.myId)
67        self.clientPgm = os.environ['MPDMAN_CLI_PGM']
68        mpd_set_cli_app(self.clientPgm)
69        try:
70            os.chdir(os.environ['MPDMAN_CWD'])
71        except Exception, errmsg:
72            errmsg =  '%s: invalid dir: %s' % (self.myId,os.environ['MPDMAN_CWD'])
73            # print errmsg    ## may syslog it in some cases ?
74        if os.environ['MPDMAN_HOW_LAUNCHED'] == 'FORK':
75            self.listenRingPort = int(os.environ['MPDMAN_MY_LISTEN_PORT'])
76            listenRingFD = int(os.environ['MPDMAN_MY_LISTEN_FD'])  # closed in loop below
77            self.listenRingSock = socket.fromfd(listenRingFD,socket.AF_INET,socket.SOCK_STREAM)
78            self.listenRingSock = MPDSock(sock=self.listenRingSock)
79            mpdFD = int(os.environ['MPDMAN_TO_MPD_FD'])  # closed in loop below
80            self.mpdSock = socket.fromfd(mpdFD,socket.AF_INET,socket.SOCK_STREAM)
81            self.mpdSock = MPDSock(sock=self.mpdSock)
82        elif os.environ['MPDMAN_HOW_LAUNCHED'] == 'SUBPROCESS':
83            self.listenRingSock = MPDListenSock()
84            self.listenRingPort = self.listenRingSock.getsockname()[1]
85            mpdPort = int(os.environ['MPDMAN_MPD_PORT'])
86            self.mpdSock = MPDSock()
87            self.mpdSock.connect((self.myIfhn,mpdPort))
88            self.mpdSock.send_dict_msg( {'man_listen_port' : self.listenRingPort} )
89        else:
90            mpd_print(1,'I cannot figure out how I was launched')
91            sys.exit(-1)
92        self.pos0Ifhn = os.environ['MPDMAN_POS0_IFHN']
93        self.pos0Port = int(os.environ['MPDMAN_POS0_PORT'])
94        # close unused fds before I grab any more
95        # NOTE: this will also close syslog's fd inherited from mpd; re-opened below
96        try:     max_fds = os.sysconf('SC_OPEN_MAX')
97        except:  max_fds = 1024
98        # FIXME This snippet causes problems on Fedora Core 12.  FC12's python
99        # opens a file object to /etc/abrt/pyhook.conf.  Closing the fd out from
100        # under the higher level object causes problems at exit time when the
101        # higher level object is garbage collected.  See MPICH2 ticket #902 for
102        # more information.
103        #for fd in range(3,max_fds):
104        #    if fd == self.mpdSock.fileno()  or  fd == self.listenRingSock.fileno():
105        #        continue
106        #    try:    os.close(fd)
107        #    except: pass
108        if syslog_module_available:
109            syslog.openlog("mpdman",0,syslog.LOG_DAEMON)
110            syslog.syslog(syslog.LOG_INFO,"mpdman starting new log; %s" % (self.myId) )
111        self.umask = os.environ['MPDMAN_UMASK']
112        if self.umask.startswith('0x'):
113            self.umask = int(self.umask,16)
114        elif self.umask.startswith('0'):
115            self.umask = int(self.umask,8)
116        else:
117            self.umask = int(self.umask)
118        self.oldumask = os.umask(self.umask)
119        self.clientPgmArgs = loads(os.environ['MPDMAN_PGM_ARGS'])
120        self.clientPgmEnv = loads(os.environ['MPDMAN_PGM_ENVVARS'])
121        self.clientPgmLimits = loads(os.environ['MPDMAN_PGM_LIMITS'])
122        self.jobid = os.environ['MPDMAN_JOBID']
123        self.nprocs = int(os.environ['MPDMAN_NPROCS'])
124        self.mpdPort = int(os.environ['MPDMAN_MPD_LISTEN_PORT'])
125        self.mpdConfPasswd = os.environ['MPDMAN_MPD_CONF_SECRETWORD']
126        os.environ['MPDMAN_MPD_CONF_SECRETWORD'] = ''  ## do NOT pass it on to clients
127        self.kvs_template_from_env = os.environ['MPDMAN_KVS_TEMPLATE']
128        self.conIfhn  = os.environ['MPDMAN_CONIFHN']
129        self.conPort  = int(os.environ['MPDMAN_CONPORT'])
130        self.lhsIfhn  = os.environ['MPDMAN_LHS_IFHN']
131        self.lhsPort  = int(os.environ['MPDMAN_LHS_PORT'])
132        self.stdinDest = os.environ['MPDMAN_STDIN_DEST']
133        self.totalview = int(os.environ['MPDMAN_TOTALVIEW'])
134        self.gdb = int(os.environ['MPDMAN_GDB'])
135        self.gdba = os.environ['MPDMAN_GDBA']
136        self.lineLabelFmt = os.environ['MPDMAN_LINE_LABELS_FMT']
137        self.startStdoutLineLabel = 1
138        self.startStderrLineLabel = 1
139        self.singinitPID  = int(os.environ['MPDMAN_SINGINIT_PID'])
140        self.singinitPORT = int(os.environ['MPDMAN_SINGINIT_PORT'])
141        self.doingBNR = int(os.environ['MPDMAN_DOING_BNR'])
142        self.listenNonRingSock = MPDListenSock('',0,name='nonring_listen_sock')
143        self.listenNonRingPort = self.listenNonRingSock.getsockname()[1]
144        self.streamHandler = MPDStreamHandler()
145        self.streamHandler.set_handler(self.mpdSock,self.handle_mpd_input)
146        self.streamHandler.set_handler(self.listenNonRingSock,
147                                       self.handle_nonring_connection)
148
149        # set up pmi stuff early in case I was spawned
150        self.universeSize = -1
151        self.appnum = -1
152        self.pmiVersion = 1
153        self.pmiSubversion = 1
154        self.KVSs = {}
155        if self.singinitPID:
156            # self.kvsname_template = 'singinit_kvs_'
157            self.kvsname_template = 'singinit_kvs_' + str(os.getpid())
158        else:
159            self.kvsname_template = 'kvs_' + self.kvs_template_from_env + '_'
160        self.default_kvsname = self.kvsname_template + '0'
161        self.default_kvsname = sub('\.','_',self.default_kvsname)  # magpie.cs to magpie_cs
162        self.default_kvsname = sub('\-','_',self.default_kvsname)  # chg node-0 to node_0
163        self.KVSs[self.default_kvsname] = {}
164        cli_env = {}
165        cli_env['MPICH_INTERFACE_HOSTNAME'] = os.environ['MPICH_INTERFACE_HOSTNAME']
166        cli_env['MPICH_INTERFACE_HOSTNAME_R%d' % self.myRank] = os.environ['MPICH_INTERFACE_HOSTNAME']
167        for k in self.clientPgmEnv.keys():
168            if k.startswith('MPI_APPNUM'):
169                self.appnum = self.clientPgmEnv[k]    # don't put in application env
170            elif k.startswith('MPICH_INTERFACE_HOSTNAME'):
171                continue    ## already put it in above
172            else:
173                cli_env[k] = self.clientPgmEnv[k]
174        self.kvs_next_id = 1
175        self.jobEndingEarly = 0
176        self.pmiCollectiveJob = 0
177        self.spawnedCnt = 0
178        self.pmiSock = 0   # obtained later
179        self.ring = MPDRing(listenSock=self.listenRingSock,
180                            streamHandler=self.streamHandler,
181                            myIfhn=self.myIfhn)
182        if self.nprocs == 1:
183            self.ring.create_single_mem_ring(ifhn=self.myIfhn,
184                                             port=self.listenRingPort,
185                                             lhsHandler=self.handle_lhs_input,
186                                             rhsHandler=self.handle_rhs_input)
187        else:
188            if self.posInRing == 0:    # one 'end'
189                self.ring.accept_rhs(rhsHandler=self.handle_rhs_input)
190                self.ring.accept_lhs(lhsHandler=self.handle_lhs_input)
191            elif self.posInRing == (self.nprocs-1):  # the other 'end'
192                rv = self.ring.connect_lhs(lhsIfhn=self.lhsIfhn,
193                                           lhsPort=self.lhsPort,
194                                           lhsHandler=self.handle_lhs_input,
195                                           numTries=8)
196                if rv[0] <= 0:
197                    mpd_print(1,"lhs connect failed")
198                    sys.exit(-1)
199                self.rhsIfhn = self.pos0Ifhn
200                self.rhsPort = self.pos0Port
201                rv = self.ring.connect_rhs(rhsIfhn=self.rhsIfhn,
202                                           rhsPort=self.rhsPort,
203                                           rhsHandler=self.handle_rhs_input,
204                                           numTries=8)
205                if rv[0] <=  0:  # connect did not succeed; may try again
206                    mpd_print(1,"rhs connect failed")
207                    sys.exit(-1)
208            else:  # ring members 'in the middle'
209                rv = self.ring.connect_lhs(lhsIfhn=self.lhsIfhn,
210                                           lhsPort=self.lhsPort,
211                                           lhsHandler=self.handle_lhs_input,
212                                           numTries=8)
213                if rv[0] <= 0:
214                    mpd_print(1,"lhs connect failed")
215                    sys.exit(-1)
216                self.ring.accept_rhs(rhsHandler=self.handle_rhs_input)
217
218        if self.myRank == 0:
219            self.conSock = MPDSock(name='to_console')
220            self.conSock.connect((self.conIfhn,self.conPort))
221            self.streamHandler.set_handler(self.conSock,self.handle_console_input)
222            if self.spawned:
223                msgToSend = { 'cmd' : 'spawned_man0_is_up',
224                              'spawned_id' : os.environ['MPDMAN_SPAWNED'] }
225                self.conSock.send_dict_msg(msgToSend)
226                msg = self.conSock.recv_dict_msg()
227                # If there is a failure in the connection, this
228                # receive will fail and if not handled, cause mpdman
229                # to fail.  For now, we just check on a empty or unexpected
230                # message
231                if not msg or msg['cmd'] != 'preput_info_for_child':
232                    mpd_print(1,'invalid msg from parent :%s:' % msg)
233                    sys.exit(-1)
234                try:
235                    for k in msg['kvs'].keys():
236                        self.KVSs[self.default_kvsname][k] = msg['kvs'][k]
237                except:
238                    mpd_print(1,'failed to insert preput_info')
239                    sys.exit(-1)
240                msg = self.conSock.recv_dict_msg()
241                if not msg  or  not msg.has_key('cmd')  or  msg['cmd'] != 'ringsize':
242                    mpd_print(1,'spawned: bad msg from con; got: %s' % (msg) )
243                    sys.exit(-1)
244                self.universeSize = msg['ring_ncpus']
245                # if the rshSock is closed, we'll get an AttributeError
246                # exception about 'int' has no attribute 'send_dict_msg'
247                # FIXME: Does every use of a sock on which send_dict_msg
248                # is used need an "if xxxx.rhsSock:" test first?
249                # Is there an else for those cases?
250                self.ring.rhsSock.send_dict_msg(msg)  # forward it on
251            else:
252                msgToSend = { 'cmd' : 'man_checking_in' }
253                self.conSock.send_dict_msg(msgToSend)
254                msg = self.conSock.recv_dict_msg()
255                if not msg  or  not msg.has_key('cmd')  or  msg['cmd'] != 'ringsize':
256                    mpd_print(1,'invalid msg from con; expected ringsize got: %s' % (msg) )
257                    sys.exit(-1)
258                if self.clientPgmEnv.has_key('MPI_UNIVERSE_SIZE'):
259                    self.universeSize = int(self.clientPgmEnv['MPI_UNIVERSE_SIZE'])
260                else:
261                    self.universeSize = msg['ring_ncpus']
262                self.ring.rhsSock.send_dict_msg(msg)
263            ## NOTE: if you spawn a non-MPI job, it may not send this msg
264            ## in which case the pgm will hang; the reason for this is that
265            ## mpich2 does an Accept after the PMI_Spawn_multiple and a non-mpi
266            ## pgm will never do the expected Connect.
267            self.stdoutToConSock = MPDSock(name='stdout_to_console')
268            self.stdoutToConSock.connect((self.conIfhn,self.conPort))
269            if self.spawned:
270                msgToSend = { 'cmd' : 'child_in_stdout_tree', 'from_rank' : self.myRank }
271                self.stdoutToConSock.send_dict_msg(msgToSend)
272            self.stderrToConSock = MPDSock(name='stderr_to_console')
273            self.stderrToConSock.connect((self.conIfhn,self.conPort))
274            if self.spawned:
275                msgToSend = { 'cmd' : 'child_in_stderr_tree', 'from_rank' : self.myRank }
276                self.stderrToConSock.send_dict_msg(msgToSend)
277        else:
278            self.conSock = 0
279        if self.myRank == 0:
280            self.parentStdoutSock = self.stdoutToConSock
281            self.parentStderrSock = self.stderrToConSock
282        else:
283            self.parentStdoutSock = 0
284            self.parentStderrSock = 0
285        msg = self.ring.lhsSock.recv_dict_msg()    # recv msg containing ringsize
286        if not msg  or  not msg.has_key('cmd')  or  msg['cmd'] != 'ringsize':
287            mpd_print(1,'invalid msg from lhs; expecting ringsize got: %s' % (msg) )
288            sys.exit(-1)
289        if self.myRank != 0:
290            self.ring.rhsSock.send_dict_msg(msg)
291            if self.clientPgmEnv.has_key('MPI_UNIVERSE_SIZE'):
292                self.universeSize = int(self.clientPgmEnv['MPI_UNIVERSE_SIZE'])
293            else:
294                self.universeSize = msg['ring_ncpus']
295        if self.doingBNR:
296            (self.pmiSock,self.cliBNRSock) = mpd_sockpair()
297            self.streamHandler.set_handler(self.pmiSock,self.handle_pmi_input)
298            cli_env['MAN_MSGS_FD'] = str(self.cliBNRSock.fileno())       ## BNR
299        self.numDone = 0
300        self.numWithIO = 2    # stdout and stderr so far
301        self.numConndWithIO = 2
302        # FIXME: This is the old singleton approach, which didn't allow
303        # for more than one process to be a singleton
304        if self.singinitPORT:
305            self.pmiListenSock = 0
306            self.pmiSock = MPDSock(name='pmi')
307            self.pmiSock.connect((self.myIfhn,self.singinitPORT))
308            self.streamHandler.set_handler(self.pmiSock,self.handle_pmi_input)
309            self.pmiSock.send_char_msg('cmd=singinit authtype=none\n')
310            line = self.pmiSock.recv_char_msg()
311            charMsg = 'cmd=singinit_info rc=0 versionok=yes stdio=yes kvsname=%s\n' % (self.default_kvsname)
312            self.pmiSock.send_char_msg(charMsg)
313
314            sock_write_cli_stdin = MPDSock(name='write_cli_stdin')
315            sock_write_cli_stdin.connect((self.myIfhn,self.singinitPORT))
316            self.fd_write_cli_stdin = sock_write_cli_stdin.fileno()
317
318            sock_read_cli_stdout = MPDSock(name='read_cli_stdout')
319            sock_read_cli_stdout.connect((self.myIfhn,self.singinitPORT))
320            self.fd_read_cli_stdout = sock_read_cli_stdout.fileno()
321
322            sock_read_cli_stderr = MPDSock(name='read_cli_stderr')
323            sock_read_cli_stderr.connect((self.myIfhn,self.singinitPORT))
324            self.fd_read_cli_stderr = sock_read_cli_stderr.fileno()
325        else:
326            self.cliListenSock = MPDListenSock('',0,name='cli_listen_sock')  ## BNR
327            self.cliListenPort = self.cliListenSock.getsockname()[1]         ## BNR
328            self.pmiListenSock = MPDListenSock('',0,name='pmi_listen_sock')
329            self.pmiListenPort = self.pmiListenSock.getsockname()[1]
330        self.subproc = 0    # default; use fork instead of subprocess
331        if self.singinitPID:
332            clientPid = self.singinitPID
333        else:
334            cli_env['PATH']      = os.environ['MPDMAN_CLI_PATH']
335            cli_env['PMI_PORT']  = '%s:%s' % (self.myIfhn,self.pmiListenPort)
336            cli_env['PMI_SIZE']  = str(self.nprocs)
337            cli_env['PMI_RANK']  = str(self.myRank)
338            cli_env['PMI_DEBUG'] = str(0)
339            cli_env['PMI_TOTALVIEW'] = str(self.totalview)
340            if self.spawned:
341                cli_env['PMI_SPAWNED'] = '1'
342            else:
343                cli_env['PMI_SPAWNED'] = '0'
344            if self.doingBNR:
345                cli_env['MPD_TVDEBUG'] = str(0)                                   ## BNR
346                cli_env['MPD_JID'] = os.environ['MPDMAN_JOBID']                   ## BNR
347                cli_env['MPD_JSIZE'] = str(self.nprocs)                           ## BNR
348                cli_env['MPD_JRANK'] = str(self.myRank)                           ## BNR
349                cli_env['CLIENT_LISTENER_FD'] = str(self.cliListenSock.fileno())  ## BNR
350            if hasattr(os,'fork'):
351                (self.fd_read_cli_stdin, self.fd_write_cli_stdin ) = os.pipe()
352                (self.fd_read_cli_stdout,self.fd_write_cli_stdout) = os.pipe()
353                (self.fd_read_cli_stderr,self.fd_write_cli_stderr) = os.pipe()
354                (self.handshake_sock_man_end,self.handshake_sock_cli_end) = mpd_sockpair()
355                clientPid = self.launch_client_via_fork_exec(cli_env)
356                if clientPid < 0:
357                    print '**** mpdman: launch_client_via_fork_exec failed; exiting'
358                    sys.exit(-1)
359                elif clientPid > 0:
360                    self.handshake_sock_cli_end.close()
361                else:  # 0
362                    self.handshake_sock_man_end.close()
363            elif subprocess_module_available:
364                clientPid = self.launch_client_via_subprocess(cli_env)  # may chg self.subproc
365            else:
366                mpd_print(1,'neither fork nor subprocess is available')
367                sys.exit(-1)
368        # if not initially a recvr of stdin (e.g. gdb) then give immediate eof to client
369        if not in_stdinRcvrs(self.myRank,self.stdinDest):
370            if self.subproc:    # must close subproc's file (not just the fd)
371                self.subproc.stdin.close()
372            else:
373                os.close(self.fd_write_cli_stdin)
374        if self.doingBNR:
375            self.cliBNRSock.close()
376        msgToSend = { 'cmd' : 'client_info', 'jobid' : self.jobid, 'clipid' : clientPid,
377                      'manpid' : os.getpid(), 'rank' : self.myRank,
378                      'spawner_manpid' : int(os.environ['MPDMAN_SPAWNER_MANPID']),
379                      'spawner_mpd' : os.environ['MPDMAN_SPAWNER_MPD'] }
380        self.mpdSock.send_dict_msg(msgToSend)
381
382        if not self.subproc:
383            self.streamHandler.set_handler(self.fd_read_cli_stdout,
384                                           self.handle_cli_stdout_input)
385            self.streamHandler.set_handler(self.fd_read_cli_stderr,
386                                           self.handle_cli_stderr_input)
387        self.waitPids = [clientPid]
388
389        if self.pmiListenSock:
390            self.streamHandler.set_handler(self.pmiListenSock,self.handle_pmi_connection)
391
392        # begin setup of stdio tree
393        (parent,lchild,rchild) = mpd_get_ranks_in_binary_tree(self.myRank,self.nprocs)
394        self.spawnedChildSocks = []
395        self.childrenStdoutTreeSocks = []
396        self.childrenStderrTreeSocks = []
397        if lchild >= 0:
398            self.numWithIO += 2    # stdout and stderr from child
399            msgToSend = { 'cmd' : 'info_from_parent_in_tree',
400                          'to_rank' : str(lchild),
401                          'parent_ifhn'   : self.myIfhn,
402                          'parent_port' : self.listenNonRingPort }
403            self.ring.rhsSock.send_dict_msg(msgToSend)
404        if rchild >= 0:
405            self.numWithIO += 2    # stdout and stderr from child
406            msgToSend = { 'cmd' : 'info_from_parent_in_tree',
407                          'to_rank' : str(rchild),
408                          'parent_ifhn'   : self.myIfhn,
409                          'parent_port' : self.listenNonRingPort }
410            self.ring.rhsSock.send_dict_msg(msgToSend)
411
412        if os.environ.has_key('MPDMAN_RSHIP'):
413            rship = os.environ['MPDMAN_RSHIP']
414            # (rshipSock,rshipPort) = mpd_get_inet_listen_sock('',0)
415            rshipPid = os.fork()
416            if rshipPid == 0:
417                os.environ['MPDCP_MSHIP_HOST'] = os.environ['MPDMAN_MSHIP_HOST']
418                os.environ['MPDCP_MSHIP_PORT'] = os.environ['MPDMAN_MSHIP_PORT']
419                os.environ['MPDCP_MSHIP_NPROCS'] = str(self.nprocs)
420                os.environ['MPDCP_CLI_PID'] = str(clientPid)
421                try:
422                    os.execvpe(rship,[rship],os.environ)
423                except Exception, errmsg:
424                    # make sure my error msgs get to console
425                    os.dup2(self.parentStdoutSock.fileno(),1)  # closes fd 1 (stdout) if open
426                    os.dup2(self.parentStderrSock.fileno(),2)  # closes fd 2 (stderr) if open
427                    mpd_print(1,'execvpe failed for copgm %s; errmsg=:%s:' % (rship,errmsg) )
428                    sys.exit(-1)
429                sys.exit(0)
430            # rshipSock.close()
431            self.waitPids.append(rshipPid)
432
433        if not self.spawned:
434            # receive the final process mapping from our MPD overlords
435            msg = self.mpdSock.recv_dict_msg(timeout=-1)
436
437            # a few defensive checks now to make sure that the various parts of the
438            # code are all on the same page
439            if not msg.has_key('cmd') or msg['cmd'] != 'process_mapping':
440                mpd_print(1,'expected cmd="process_mapping", got cmd="%s" instead' % (msg.get('cmd','**not_present**')))
441                sys.exit(-1)
442            if msg['jobid'] != self.jobid:
443                mpd_print(1,'expected jobid="%s", got jobid="%s" instead' % (self.jobid,msg['jobid']))
444                sys.exit(-1)
445            if not msg.has_key('process_mapping'):
446                mpd_print(1,'expected msg to contain a process_mapping key')
447                sys.exit(-1)
448            self.KVSs[self.default_kvsname]['PMI_process_mapping'] = msg['process_mapping']
449
450
451        self.tvReady = 0
452        self.pmiBarrierInRecvd = 0
453        self.holdingPMIBarrierLoop1 = 0
454        if self.myRank == 0:
455            self.holdingEndBarrierLoop1 = 1
456            self.holdingJobgoLoop1 = { 'cmd' : 'jobgo_loop_1', 'procinfo' : [] }
457        else:
458            self.holdingEndBarrierLoop1 = 0
459            self.holdingJobgoLoop1 = 0
460        self.jobStarted = 0
461        self.endBarrierDone = 0
462        # Main Loop
463        while not self.endBarrierDone:
464            if self.numDone >= self.numWithIO  and  (self.singinitPID or self.subproc):
465                clientExited = 1
466                clientExitStatus = 0
467            if self.holdingJobgoLoop1 and self.numConndWithIO >= self.numWithIO:
468                msgToSend = self.holdingJobgoLoop1
469                self.ring.rhsSock.send_dict_msg(msgToSend)
470                self.holdingJobgoLoop1 = 0
471            rv = self.streamHandler.handle_active_streams(timeout=5.0)
472            if rv[0] < 0:
473                if type(rv[1]) == ClassType  and  rv[1] == KeyboardInterrupt: # ^C
474                    sys.exit(-1)
475            if clientExited:
476                if self.jobStarted  and  not clientExitStatusSent:
477                    msgToSend = { 'cmd' : 'client_exit_status', 'man_id' : self.myId,
478                                  'cli_status' : clientExitStatus, 'cli_host' : self.myHost,
479                                  'cli_ifhn' : self.myIfhn, 'cli_pid' : clientPid,
480                                  'cli_rank' : self.myRank }
481                    if self.myRank == 0:
482                        if self.conSock:
483                            try:
484                                self.conSock.send_dict_msg(msgToSend)
485                            except:
486                                pass
487                    else:
488                        if self.ring.rhsSock:
489                            self.ring.rhsSock.send_dict_msg(msgToSend)
490                    clientExitStatusSent = 1
491                if self.holdingEndBarrierLoop1 and self.numDone >= self.numWithIO:
492                    self.holdingEndBarrierLoop1 = 0
493                    msgToSend = {'cmd' : 'end_barrier_loop_1'}
494                    self.ring.rhsSock.send_dict_msg(msgToSend)
495        mpd_print(0000, "out of loop")
496        # may want to wait for waitPids here
497    def handle_nonring_connection(self,sock):
498        (tempSock,tempConnAddr) = self.listenNonRingSock.accept()
499        msg = tempSock.recv_dict_msg()
500        if msg  and  msg.has_key('cmd'):
501            if msg['cmd'] == 'child_in_stdout_tree':
502                self.streamHandler.set_handler(tempSock,self.handle_child_stdout_tree_input)
503                self.childrenStdoutTreeSocks.append(tempSock)
504                self.numConndWithIO += 1
505            elif msg['cmd'] == 'child_in_stderr_tree':
506                self.streamHandler.set_handler(tempSock,self.handle_child_stderr_tree_input)
507                self.childrenStderrTreeSocks.append(tempSock)
508                self.numConndWithIO += 1
509            elif msg['cmd'] == 'spawned_man0_is_up':
510                self.streamHandler.set_handler(tempSock,self.handle_spawned_child_input)
511                self.spawnedChildSocks.append(tempSock)
512                tempID = msg['spawned_id']
513                spawnedKVSname = 'mpdman_kvs_for_spawned_' + tempID
514                msgToSend = { 'cmd' : 'preput_info_for_child',
515                              'kvs' : self.KVSs[spawnedKVSname] }
516                tempSock.send_dict_msg(msgToSend)
517                msgToSend = { 'cmd' : 'ringsize', 'ring_ncpus' : self.universeSize }
518                tempSock.send_dict_msg(msgToSend)
519            else:
520                mpd_print(1, 'unknown msg recvd on listenNonRingSock :%s:' % (msg) )
521    def handle_lhs_input(self,sock):
522        msg = self.ring.lhsSock.recv_dict_msg()
523        if not msg:
524            mpd_print(0000, 'lhs died' )
525            self.streamHandler.del_handler(self.ring.lhsSock)
526            self.ring.lhsSock.close()
527        elif msg['cmd'] == 'jobgo_loop_1':
528            if self.myRank == 0:
529                if self.totalview:
530                    msg['procinfo'].insert(0,(socket.gethostname(),self.clientPgm,clientPid))
531                # let console pgm proceed
532                msgToSend = { 'cmd' : 'job_started', 'jobid' : self.jobid,
533                              'procinfo' : msg['procinfo'] }
534                self.conSock.send_dict_msg(msgToSend,errprint=0)
535                msgToSend = { 'cmd' : 'jobgo_loop_2' }
536                self.ring.rhsSock.send_dict_msg(msgToSend)
537            else:
538                if self.totalview:
539                    msg['procinfo'].append((socket.gethostname(),self.clientPgm,clientPid))
540                if self.numConndWithIO >= self.numWithIO:
541                    self.ring.rhsSock.send_dict_msg(msg)  # forward it on
542                else:
543                    self.holdingJobgoLoop1 = msg
544        elif msg['cmd'] == 'jobgo_loop_2':
545            if self.myRank != 0:
546                self.ring.rhsSock.send_dict_msg(msg)  # forward it on
547            if not self.singinitPID:
548                self.handshake_sock_man_end.send_char_msg('go\n')
549                self.handshake_sock_man_end.close()
550            self.jobStarted = 1
551        elif msg['cmd'] == 'info_from_parent_in_tree':
552            if int(msg['to_rank']) == self.myRank:
553                self.parentIfhn = msg['parent_ifhn']
554                self.parentPort = msg['parent_port']
555                self.parentStdoutSock = MPDSock(name='stdout_ro_parent')
556                self.parentStdoutSock.connect((self.parentIfhn,self.parentPort))
557                msgToSend = { 'cmd' : 'child_in_stdout_tree', 'from_rank' : self.myRank }
558                self.parentStdoutSock.send_dict_msg(msgToSend)
559                self.parentStderrSock = MPDSock(name='stderr_ro_parent')
560                self.parentStderrSock.connect((self.parentIfhn,self.parentPort))
561                msgToSend = { 'cmd' : 'child_in_stderr_tree', 'from_rank' : self.myRank }
562                self.parentStderrSock.send_dict_msg(msgToSend)
563            else:
564                self.ring.rhsSock.send_dict_msg(msg)
565        elif msg['cmd'] == 'end_barrier_loop_1':
566            if self.myRank == 0:
567                msgToSend = { 'cmd' : 'end_barrier_loop_2' }
568                self.ring.rhsSock.send_dict_msg(msgToSend)
569            else:
570                if self.numDone >= self.numWithIO:
571                    if self.ring.rhsSock:
572                        self.ring.rhsSock.send_dict_msg(msg)
573                else:
574                    self.holdingEndBarrierLoop1 = 1
575        elif msg['cmd'] == 'end_barrier_loop_2':
576            self.endBarrierDone = 1
577            if self.myRank != 0:
578                self.ring.rhsSock.send_dict_msg(msg)
579        elif msg['cmd'] == 'pmi_barrier_loop_1':
580            if self.myRank == 0:
581                msgToSend = { 'cmd' : 'pmi_barrier_loop_2' }
582                self.ring.rhsSock.send_dict_msg(msgToSend)
583                if self.doingBNR:    ## BNR
584                    pmiMsgToSend = 'cmd=client_bnr_fence_out\n'
585                    self.pmiSock.send_char_msg(pmiMsgToSend)
586                    sleep(0.1)  # minor pause before intr
587                    os.kill(clientPid,signal.SIGUSR1)
588                else:
589                    if self.pmiSock:
590                        pmiMsgToSend = 'cmd=barrier_out\n'
591                        self.pmiSock.send_char_msg(pmiMsgToSend)
592            else:
593                self.holdingPMIBarrierLoop1 = 1
594                if self.pmiBarrierInRecvd:
595                    self.ring.rhsSock.send_dict_msg(msg)
596        elif msg['cmd'] == 'pmi_barrier_loop_2':
597            self.pmiBarrierInRecvd = 0
598            self.holdingPMIBarrierLoop1 = 0
599            if self.myRank != 0:
600                self.ring.rhsSock.send_dict_msg(msg)
601                if self.doingBNR:    ## BNR
602                    pmiMsgToSend = 'cmd=client_bnr_fence_out\n'
603                    self.pmiSock.send_char_msg(pmiMsgToSend)
604                    sleep(0.1)  # minor pause before intr
605                    os.kill(clientPid,signal.SIGUSR1)
606                else:
607                    if self.pmiSock:
608                        pmiMsgToSend = 'cmd=barrier_out\n'
609                        self.pmiSock.send_char_msg(pmiMsgToSend)
610        elif msg['cmd'] == 'pmi_get':
611            if msg['from_rank'] == self.myRank:
612                if self.pmiSock:  # may have disappeared in early shutdown
613                    pmiMsgToSend = 'cmd=get_result rc=-1 key="%s"\n' % msg['key']
614                    self.pmiSock.send_char_msg(pmiMsgToSend)
615            else:
616                key = msg['key']
617                kvsname = msg['kvsname']
618                if self.KVSs.has_key(kvsname)  and  self.KVSs[kvsname].has_key(key):
619                    value = self.KVSs[kvsname][key]
620                    msgToSend = { 'cmd' : 'response_to_pmi_get', 'key' : key,
621                                  'kvsname' : kvsname, 'value' : value,
622                                  'to_rank' : msg['from_rank'] }
623                    self.ring.rhsSock.send_dict_msg(msgToSend)
624                else:
625                    self.ring.rhsSock.send_dict_msg(msg)
626        elif msg['cmd'] == 'pmi_getbyidx':
627            if msg['from_rank'] == self.myRank:
628                if self.pmiSock:  # may have disappeared in early shutdown
629                    self.KVSs[self.default_kvsname].update(msg['kvs'])
630                    if self.KVSs[self.default_kvsname].keys():
631                        key = self.KVSs[self.default_kvsname].keys()[0]
632                        val = self.KVSs[self.default_kvsname][key]
633                        pmiMsgToSend = 'cmd=getbyidx_results rc=0 nextidx=1 key=%s val=%s\n' % \
634                                       (key,val)
635                    else:
636                        pmiMsgToSend = 'cmd=getbyidx_results rc=-2 reason=no_more_keyvals\n'
637                    self.pmiSock.send_char_msg(pmiMsgToSend)
638            else:
639                msg['kvs'].update(self.KVSs[self.default_kvsname])
640                self.ring.rhsSock.send_dict_msg(msg)
641        elif msg['cmd'] == 'response_to_pmi_get':
642            # [goodell@ 2009-05-05] The next few lines add caching in to the kvs
643            # gets to improve lookup performance and reduce MPI_Init times.
644            # Note that this doesn't handle consistency correctly if PMI is ever
645            # changed to permit overwriting keyvals.
646            if msg['kvsname'] not in self.KVSs.keys():
647                self.KVSs[msg['kvsname']] = dict()
648            self.KVSs[msg['kvsname']][msg['key']] = msg['value']
649
650            if msg['to_rank'] == self.myRank:
651                if self.pmiSock:  # may have disappeared in early shutdown
652                    pmiMsgToSend = 'cmd=get_result rc=0 value=%s\n' % (msg['value'])
653                    self.pmiSock.send_char_msg(pmiMsgToSend)
654            else:
655                self.ring.rhsSock.send_dict_msg(msg)
656        elif msg['cmd'] == 'signal':
657            if msg['signo'] == 'SIGINT':
658                if not self.gdb:
659                    self.jobEndingEarly = 1
660                for s in self.spawnedChildSocks:
661                    s.send_dict_msg(msg)
662                if self.myRank != 0:
663                    if self.ring.rhsSock:  # still alive ?
664                        self.ring.rhsSock.send_dict_msg(msg)
665                    if self.gdb:
666                        os.kill(clientPid,signal.SIGINT)
667                    else:
668                        try:
669                            pgrp = clientPid * (-1)   # neg Pid -> group
670                            os.kill(pgrp,signal.SIGKILL)   # may be reaped by sighandler
671                        except:
672                            pass
673            elif msg['signo'] == 'SIGKILL':
674                self.jobEndingEarly = 1
675                for s in self.spawnedChildSocks:
676                    s.send_dict_msg(msg)
677                if self.myRank != 0:
678                    if self.ring.rhsSock:  # still alive ?
679                        self.ring.rhsSock.send_dict_msg(msg)
680                    if self.gdb:
681                        os.kill(clientPid,signal.SIGUSR1)   # tell gdb driver to kill all
682                    else:
683                        try:
684                            pgrp = clientPid * (-1)   # neg Pid -> group
685                            os.kill(pgrp,signal.SIGKILL)   # may be reaped by sighandler
686                        except:
687                            pass
688            elif msg['signo'] == 'SIGTSTP':
689                if msg['dest'] != self.myId:
690                    self.ring.rhsSock.send_dict_msg(msg)
691                    try:
692                        pgrp = clientPid * (-1)   # neg Pid -> group
693                        os.kill(pgrp,signal.SIGTSTP)   # may be reaped by sighandler
694                    except:
695                        pass
696            elif msg['signo'] == 'SIGCONT':
697                if msg['dest'] != self.myId:
698                    self.ring.rhsSock.send_dict_msg(msg)
699                    try:
700                        pgrp = clientPid * (-1)   # neg Pid -> group
701                        os.kill(pgrp,signal.SIGCONT)   # may be reaped by sighandler
702                    except:
703                        pass
704        elif msg['cmd'] == 'client_exit_status':
705            if self.myRank == 0:
706                if self.conSock:
707                    self.conSock.send_dict_msg(msg,errprint=0)
708            else:
709                if self.ring.rhsSock:
710                    self.ring.rhsSock.send_dict_msg(msg)
711        elif msg['cmd'] == 'collective_abort':
712            self.jobEndingEarly = 1
713            if msg['src'] != self.myId:
714                if self.ring.rhsSock:  # still alive ?
715                    self.ring.rhsSock.send_dict_msg(msg)
716            if self.conSock:
717                msgToSend = { 'cmd' : 'job_aborted_early', 'jobid' : self.jobid,
718                              'rank' : msg['rank'],
719                              'exit_status' : msg['exit_status'] }
720                self.conSock.send_dict_msg(msgToSend,errprint=0)
721            try:
722                pgrp = clientPid * (-1)   # neg Pid -> group
723                os.kill(pgrp,signal.SIGKILL)   # may be reaped by sighandler
724            except:
725                pass
726        elif msg['cmd'] == 'startup_status':
727            if msg['rc'] < 0:
728                self.jobEndingEarly = 1
729                try:
730                    pgrp = clientPid * (-1)   # neg Pid -> group
731                    os.kill(pgrp,signal.SIGKILL)   # may be reaped by sighandler
732                except:
733                    pass
734            ##### RMB if msg['src'] == self.myId:
735            if self.myRank == 0:
736                if self.conSock:
737                    self.conSock.send_dict_msg(msg,errprint=0)
738            else:
739                if msg['src'] != self.myId  and  self.ring.rhsSock:  # rhs still alive ?
740                    self.ring.rhsSock.send_dict_msg(msg)
741        elif msg['cmd'] == 'stdin_from_user':
742            if msg['src'] != self.myId:
743                self.ring.rhsSock.send_dict_msg(msg)
744                if in_stdinRcvrs(self.myRank,self.stdinDest):
745                    if msg.has_key('eof'):
746                        if self.subproc:    # must close subproc's file (not just the fd)
747                            self.subproc.stdin.close()
748                        else:
749                            os.close(self.fd_write_cli_stdin)
750                    else:
751                        os.write(self.fd_write_cli_stdin,msg['line'])
752        elif msg['cmd'] == 'stdin_dest':
753            if msg['src'] != self.myId:
754                self.stdinDest = msg['stdin_procs']
755                self.ring.rhsSock.send_dict_msg(msg)
756        elif msg['cmd'] == 'interrupt_peer_with_msg':    ## BNR
757            if int(msg['torank']) == self.myRank:
758                if self.pmiSock:  # may have disappeared in early shutdown
759                    pmiMsgToSend = '%s\n' % (msg['msg'])
760                    self.pmiSock.send_char_msg(pmiMsgToSend)
761                    sleep(0.1)  # minor pause before intr
762                    os.kill(clientPid,signal.SIGUSR1)
763            else:
764                self.ring.rhsSock.send_dict_msg(msg)
765        elif msg['cmd'] == 'tv_ready':
766            self.tvReady = 1
767            if self.myRank != 0:
768                msg['src'] = self.myId
769                self.ring.rhsSock.send_dict_msg(msg)
770                if self.pmiSock:    # should be valid sock if running tv
771                    pmiMsgToSend = 'cmd=tv_ready\n'
772                    self.pmiSock.send_char_msg(pmiMsgToSend)
773        else:
774            mpd_print(1, 'unexpected msg recvd on lhsSock :%s:' % msg )
775
776    def handle_rhs_input(self,sock):
777        msg = self.ring.rhsSock.recv_dict_msg()  #### NOT USING msg; should I ?
778        mpd_print(0000, 'rhs died' )
779        self.streamHandler.del_handler(self.ring.rhsSock)
780        self.ring.rhsSock.close()
781        self.ring.rhsSock = 0
782    def handle_cli_stdout_input(self,sock):
783        line = mpd_read_nbytes(sock,1024)  # sock is self.fd_read_cli_stdout
784        if not line:
785            if self.subproc:    # must close subproc's file (not just the fd)
786                self.subproc.stdout.close()
787            else:
788                self.streamHandler.del_handler(self.fd_read_cli_stdout)
789                os.close(self.fd_read_cli_stdout)
790            self.numDone += 1
791            if self.numDone >= self.numWithIO:
792                if self.parentStdoutSock:
793                    self.parentStdoutSock.close()
794                    self.parentStdoutSock = 0
795                if self.parentStderrSock:
796                    self.parentStderrSock.close()
797                    self.parentStderrSock = 0
798        else:
799            if self.parentStdoutSock:
800                if self.lineLabelFmt:
801                    lineLabel = self.create_line_label(self.lineLabelFmt,self.spawned)
802                    splitLine = line.split('\n',1024)
803                    if self.startStdoutLineLabel:
804                        line = lineLabel
805                    else:
806                        line = ''
807                    if splitLine[-1] == '':
808                        self.startStdoutLineLabel = 1
809                        del splitLine[-1]
810                    else:
811                        self.startStdoutLineLabel = 0
812                    for s in splitLine[0:-1]:
813                        line = line + s + '\n' + lineLabel
814                    line = line + splitLine[-1]
815                    if self.startStdoutLineLabel:
816                        line = line + '\n'
817                self.parentStdoutSock.send_char_msg(line,errprint=0)
818        return line
819    def handle_cli_stderr_input(self,sock):
820        line = mpd_read_nbytes(sock,1024)  # sock is self.fd_read_cli_stderr
821        if not line:
822            if self.subproc:    # must close subproc's file (not just the fd)
823                self.subproc.stderr.close()
824            else:
825                self.streamHandler.del_handler(self.fd_read_cli_stderr)
826                os.close(self.fd_read_cli_stderr)
827            self.numDone += 1
828            if self.numDone >= self.numWithIO:
829                if self.parentStdoutSock:
830                    self.parentStdoutSock.close()
831                    self.parentStdoutSock = 0
832                if self.parentStderrSock:
833                    self.parentStderrSock.close()
834                    self.parentStderrSock = 0
835        else:
836            if self.parentStderrSock:
837                if self.lineLabelFmt:
838                    lineLabel = self.create_line_label(self.lineLabelFmt,self.spawned)
839                    splitLine = line.split('\n',1024)
840                    if self.startStderrLineLabel:
841                        line = lineLabel
842                    else:
843                        line = ''
844                    if splitLine[-1] == '':
845                        self.startStderrLineLabel = 1
846                        del splitLine[-1]
847                    else:
848                        self.startStderrLineLabel = 0
849                    for s in splitLine[0:-1]:
850                        line = line + s + '\n' + lineLabel
851                    line = line + splitLine[-1]
852                    if self.startStderrLineLabel:
853                        line = line + '\n'
854                self.parentStderrSock.send_char_msg(line,errprint=0)
855        return line
856    def handle_child_stdout_tree_input(self,sock):
857        if self.lineLabelFmt:
858            line = sock.recv_one_line()
859        else:
860            line = sock.recv(1024)
861        if not line:
862            self.streamHandler.del_handler(sock)
863            sock.close()
864            self.numDone += 1
865            if self.numDone >= self.numWithIO:
866                if self.parentStdoutSock:
867                    self.parentStdoutSock.close()
868                    self.parentStdoutSock = 0
869                if self.parentStderrSock:
870                    self.parentStderrSock.close()
871                    self.parentStderrSock = 0
872        else:
873            if self.parentStdoutSock:
874                self.parentStdoutSock.send_char_msg(line,errprint=0)
875                # parentStdoutSock.sendall('FWD by %d: |%s|' % (self.myRank,line) )
876    def handle_child_stderr_tree_input(self,sock):
877        if self.lineLabelFmt:
878            line = sock.recv_one_line()
879        else:
880            line = sock.recv(1024)
881        if not line:
882            self.streamHandler.del_handler(sock)
883            sock.close()
884            self.numDone += 1
885            if self.numDone >= self.numWithIO:
886                if self.parentStdoutSock:
887                    self.parentStdoutSock.close()
888                    self.parentStdoutSock = 0
889                if self.parentStderrSock:
890                    self.parentStderrSock.close()
891                    self.parentStderrSock = 0
892        else:
893            if self.parentStderrSock:
894                self.parentStderrSock.send_char_msg(line,errprint=0)
895                # parentStdoutSock.sendall('FWD by %d: |%s|' % (self.myRank,line) )
896    def handle_spawned_child_input(self,sock):
897        msg = sock.recv_dict_msg()
898        if not msg:
899            self.streamHandler.del_handler(sock)
900            self.spawnedChildSocks.remove(sock)
901            sock.close()
902        elif msg['cmd'] == 'job_started':
903            pass
904        elif msg['cmd'] == 'client_exit_status':
905            if self.myRank == 0:
906                if self.conSock:
907                    self.conSock.send_dict_msg(msg,errprint=0)
908            else:
909                if self.ring.rhsSock:
910                    self.ring.rhsSock.send_dict_msg(msg)
911        elif msg['cmd'] == 'job_aborted_early':
912            if self.conSock:
913                msgToSend = { 'cmd' : 'job_aborted_early', 'jobid' : msg['jobid'],
914                              'rank' : msg['rank'],
915                              'exit_status' : msg['exit_status'] }
916                self.conSock.send_dict_msg(msgToSend,errprint=0)
917        elif msg['cmd'] == 'startup_status':
918            # remember this rc to put in spawn_result
919            self.spawnInProgress['errcodes'][msg['rank']] = msg['rc']
920            if None not in self.spawnInProgress['errcodes']:  # if all errcodes are now filled in
921                # send pmi msg to spawner
922                strerrcodes = ''  # put errcodes in str format for pmi msg
923                for ec in self.spawnInProgress['errcodes']:
924                    strerrcodes = strerrcodes + str(ec) + ','
925                strerrcodes = strerrcodes[:-1]
926                if self.pmiSock:  # may have disappeared in early shutdown
927                    # may want to make rc < 0 if any errcode is < 0
928                    pmiMsgToSend = 'cmd=spawn_result rc=0 errcodes=%s\n' % (strerrcodes)
929                    self.pmiSock.send_char_msg(pmiMsgToSend)
930                self.spawnInProgress = 0
931        else:
932            mpd_print(1, "unrecognized msg from spawned child :%s:" % msg )
933    def handle_pmi_connection(self,sock):
934        if self.pmiSock:  # already have one
935            pmiMsgToSend = 'cmd=you_already_have_an_open_pmi_conn_to_me\n'
936            self.pmiSock.send_char_msg(pmiMsgToSend)
937            self.streamHandler.del_handler(self.pmiSock)
938            self.pmiSock.close()
939            self.pmiSock = 0
940            errmsg = "mpdman: invalid attempt to open 2 simultaneous pmi connections\n" + \
941                     "  client=%s  cwd=%s" % (self.clientPgm,os.environ['MPDMAN_CWD'])
942            print errmsg ; sys.stdout.flush()
943            clientExitStatus = 137  # assume kill -9 below
944            msgToSend = { 'cmd' : 'collective_abort',
945                          'src' : self.myId, 'rank' : self.myRank,
946                          'exit_status' : clientExitStatus }
947            self.ring.rhsSock.send_dict_msg(msgToSend)
948            return
949        (self.pmiSock,tempConnAddr) = self.pmiListenSock.accept()
950        # the following lines are commented out so that we can support a process
951        # that runs 2 MPI pgms in tandem  (e.g. mpish at ANL)
952        ##### del socksToSelect[pmiListenSock]
953        ##### pmiListenSock.close()
954        if not self.pmiSock:
955            mpd_print(1,"failed accept for pmi connection from client")
956            sys.exit(-1)
957        self.pmiSock.name = 'pmi'
958        self.streamHandler.set_handler(self.pmiSock,self.handle_pmi_input)
959        if self.tvReady:
960            pmiMsgToSend = 'cmd=tv_ready\n'
961            self.pmiSock.send_char_msg(pmiMsgToSend)
962    def handle_pmi_input(self,sock):
963        global clientPid, clientExited, clientExitStatus, clientExitStatusSent
964        if self.spawnInProgress:
965            return
966        line = self.pmiSock.recv_char_msg()
967        if not line:
968            self.streamHandler.del_handler(self.pmiSock)
969            self.pmiSock.close()
970            self.pmiSock = 0
971            if self.pmiCollectiveJob:
972                if self.ring.rhsSock:  # still alive ?
973                    if not self.jobEndingEarly:  # if I did not already know this
974                        if not clientExited:
975                            clientExitStatus = 137  # assume kill -9 below
976                        msgToSend = { 'cmd' : 'collective_abort',
977                                      'src' : self.myId, 'rank' : self.myRank,
978                                      'exit_status' : clientExitStatus }
979                        self.ring.rhsSock.send_dict_msg(msgToSend)
980                try:
981                    pgrp = clientPid * (-1)   # neg Pid -> group
982                    os.kill(pgrp,signal.SIGKILL)   # may be reaped by sighandler
983                except:
984                    pass
985            return
986        if line.startswith('mcmd='):
987            parsedMsg = {}
988            line = line.rstrip()
989            splitLine = line.split('=',1)
990            parsedMsg['cmd'] = splitLine[1]
991            line = ''
992            while not line.startswith('endcmd'):
993                line = self.pmiSock.recv_char_msg()
994                if not line.startswith('endcmd'):
995                    line = line.rstrip()
996                    splitLine = line.split('=',1)
997                    parsedMsg[splitLine[0]] = splitLine[1]
998        else:
999            parsedMsg = parse_pmi_msg(line)
1000        if not parsedMsg.has_key('cmd'):
1001            pmiMsgToSend = 'cmd=unparseable_msg rc=-1\n'
1002            self.pmiSock.send_char_msg(pmiMsgToSend)
1003            return
1004        # startup_status may be sent here from new process BEFORE starting client
1005        if parsedMsg['cmd'] == 'startup_status':
1006            msgToSend = { 'cmd' : 'startup_status', 'src' : self.myId,
1007                          'rc' : parsedMsg['rc'],
1008                          'jobid' : self.jobid, 'rank' : self.myRank,
1009                          'exec' : parsedMsg['exec'], 'reason' : parsedMsg['reason']  }
1010            if self.ring.rhsSock:
1011                self.ring.rhsSock.send_dict_msg(msgToSend)
1012        elif parsedMsg['cmd'] == 'init':
1013            self.pmiCollectiveJob = 1
1014            version = int(parsedMsg['pmi_version'])
1015            subversion = int(parsedMsg['pmi_subversion'])
1016            if self.pmiVersion == version  and  self.pmiSubversion >= subversion:
1017                rc = 0
1018            else:
1019                rc = -1
1020            pmiMsgToSend = 'cmd=response_to_init pmi_version=%d pmi_subversion=%d rc=%d\n' % \
1021                           (self.pmiVersion,self.pmiSubversion,rc)
1022            self.pmiSock.send_char_msg(pmiMsgToSend)
1023            msgToSend = { 'cmd' : 'startup_status', 'src' : self.myId, 'rc' : 0,
1024                          'jobid' : self.jobid, 'rank' : self.myRank,
1025                          'exec' : '', 'reason' : ''  }
1026            self.ring.rhsSock.send_dict_msg(msgToSend)
1027        elif parsedMsg['cmd'] == 'get_my_kvsname':
1028            pmiMsgToSend = 'cmd=my_kvsname kvsname=%s\n' % (self.default_kvsname)
1029            self.pmiSock.send_char_msg(pmiMsgToSend)
1030        elif parsedMsg['cmd'] == 'get_maxes':
1031            pmiMsgToSend = 'cmd=maxes kvsname_max=4096 ' + \
1032                           'keylen_max=4096 vallen_max=4096\n'
1033            self.pmiSock.send_char_msg(pmiMsgToSend)
1034        elif parsedMsg['cmd'] == 'get_universe_size':
1035            pmiMsgToSend = 'cmd=universe_size size=%s\n' % (self.universeSize)
1036            self.pmiSock.send_char_msg(pmiMsgToSend)
1037        elif parsedMsg['cmd'] == 'get_appnum':
1038            pmiMsgToSend = 'cmd=appnum appnum=%s\n' % (self.appnum)
1039            self.pmiSock.send_char_msg(pmiMsgToSend)
1040        elif parsedMsg['cmd'] == 'publish_name':
1041            msgToSend = { 'cmd' : 'publish_name',
1042                          'service' : parsedMsg['service'],
1043                          'port' : parsedMsg['port'],
1044                          'jobid' : self.jobid,
1045                          'manpid' : os.getpid() }
1046            self.mpdSock.send_dict_msg(msgToSend)
1047        elif parsedMsg['cmd'] == 'unpublish_name':
1048            msgToSend = { 'cmd' : 'unpublish_name',
1049                          'service' : parsedMsg['service'],
1050                          'jobid' : self.jobid,
1051                          'manpid' : os.getpid() }
1052            self.mpdSock.send_dict_msg(msgToSend)
1053        elif parsedMsg['cmd'] == 'lookup_name':
1054            msgToSend = { 'cmd' : 'lookup_name',
1055                          'service' : parsedMsg['service'],
1056                          'jobid' : self.jobid,
1057                          'manpid' : os.getpid() }
1058            self.mpdSock.send_dict_msg(msgToSend)
1059        elif parsedMsg['cmd'] == 'create_kvs':
1060            new_kvsname = self.kvsname_template + str(self.kvs_next_id)
1061            self.KVSs[new_kvsname] = {}
1062            self.kvs_next_id += 1
1063            pmiMsgToSend = 'cmd=newkvs kvsname=%s\n' % (new_kvsname)
1064            self.pmiSock.send_char_msg(pmiMsgToSend)
1065        elif parsedMsg['cmd'] == 'destroy_kvs':
1066            kvsname = parsedMsg['kvsname']
1067            try:
1068                del self.KVSs[kvsname]
1069                pmiMsgToSend = 'cmd=kvs_destroyed rc=0\n'
1070            except:
1071                pmiMsgToSend = 'cmd=kvs_destroyed rc=-1\n'
1072            self.pmiSock.send_char_msg(pmiMsgToSend)
1073        elif parsedMsg['cmd'] == 'put':
1074            kvsname = parsedMsg['kvsname']
1075            key = parsedMsg['key']
1076            value = parsedMsg['value']
1077            try:
1078                self.KVSs[kvsname][key] = value
1079                pmiMsgToSend = 'cmd=put_result rc=0\n'
1080                self.pmiSock.send_char_msg(pmiMsgToSend)
1081            except Exception, errmsg:
1082                pmiMsgToSend = 'cmd=put_result rc=-1 msg="%s"\n' % errmsg
1083                self.pmiSock.send_char_msg(pmiMsgToSend)
1084        elif parsedMsg['cmd'] == 'barrier_in':
1085            self.pmiBarrierInRecvd = 1
1086            if self.myRank == 0  or  self.holdingPMIBarrierLoop1:
1087                msgToSend = { 'cmd' : 'pmi_barrier_loop_1' }
1088                self.ring.rhsSock.send_dict_msg(msgToSend)
1089        elif parsedMsg['cmd'] == 'get':
1090            key = parsedMsg['key']
1091            kvsname = parsedMsg['kvsname']
1092            if self.KVSs.has_key(kvsname)  and  self.KVSs[kvsname].has_key(key):
1093                value = self.KVSs[kvsname][key]
1094                pmiMsgToSend = 'cmd=get_result rc=0 value=%s\n' % (value)
1095                self.pmiSock.send_char_msg(pmiMsgToSend)
1096            else:
1097                msgToSend = { 'cmd' : 'pmi_get', 'key' : key,
1098                              'kvsname' : kvsname, 'from_rank' : self.myRank }
1099                self.ring.rhsSock.send_dict_msg(msgToSend)
1100        elif parsedMsg['cmd'] == 'getbyidx':
1101            kvsname = parsedMsg['kvsname']
1102            idx = int(parsedMsg['idx'])
1103            if idx == 0:
1104                msgToSend = { 'cmd' : 'pmi_getbyidx', 'kvsname' : kvsname,
1105                              'from_rank' : self.myRank,
1106                              'kvs' : self.KVSs[self.default_kvsname] }
1107                self.ring.rhsSock.send_dict_msg(msgToSend)
1108            else:
1109                if len(self.KVSs[self.default_kvsname].keys()) > idx:
1110                    key = self.KVSs[self.default_kvsname].keys()[idx]
1111                    val = self.KVSs[self.default_kvsname][key]
1112                    nextidx = idx + 1
1113                    pmiMsgToSend = 'cmd=getbyidx_results rc=0 nextidx=%d key=%s val=%s\n' % \
1114                                   (nextidx,key,val)
1115                else:
1116                    pmiMsgToSend = 'cmd=getbyidx_results rc=-2 reason=no_more_keyvals\n'
1117                self.pmiSock.send_char_msg(pmiMsgToSend)
1118        elif parsedMsg['cmd'] == 'spawn':
1119            ## This code really is handling PMI_Spawn_multiple.  It translates a
1120            ## sequence of separate spawn messages into a single message to send
1121            ## to the mpd.  It keeps track by the "totspawns" and "spawnssofar"
1122            ## parameters in the incoming message.  The first message has
1123            ## "spawnssofar" set to 1.
1124            ##
1125            ## This proc may produce stdout and stderr; do this early so I
1126            ## won't exit before child sets up its conns with me.
1127            ## NOTE: if you spawn a non-MPI job, it may not send these msgs
1128            ## in which case adding 2 to numWithIO will cause the pgm to hang.
1129            totspawns = int(parsedMsg['totspawns'])
1130            spawnssofar = int(parsedMsg['spawnssofar'])
1131            if spawnssofar == 1: # this is the first of possibly several spawn msgs
1132                self.numWithIO += 2
1133                self.tpsf = 0             # total processes spawned so far
1134                self.spawnExecs = {}      # part of MPI_Spawn_multiple args
1135                self.spawnHosts = {}      # comes from info
1136                self.spawnUsers = {}      # always the current user
1137                self.spawnCwds  = {}      # could come from info, but doesn't yet
1138                self.spawnUmasks = {}     # could come from info, but doesn't yet
1139                self.spawnPaths = {}      # could come from info, but doesn't yet
1140                self.spawnEnvvars = {}    # whole environment from mpiexec, plus appnum
1141                self.spawnLimits = {}
1142                self.spawnArgs = {}
1143            self.spawnNprocs  = int(parsedMsg['nprocs']) # num procs in this spawn
1144            pmiInfo = {}
1145            for i in range(0,int(parsedMsg['info_num'])):
1146                info_key = parsedMsg['info_key_%d' % i]
1147                info_val = parsedMsg['info_val_%d' % i]
1148                pmiInfo[info_key] = info_val
1149
1150            if pmiInfo.has_key('host'):
1151                try:
1152                    toIfhn = socket.gethostbyname_ex(pmiInfo['host'])[2][0]
1153                    self.spawnHosts[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = toIfhn
1154                except:
1155                    mpd_print(1, "unable to obtain host info for :%s:" % (pmiInfo['host']))
1156                    pmiMsgToSend = 'cmd=spawn_result rc=-2 status=unknown_host\n'
1157                    self.pmiSock.send_char_msg(pmiMsgToSend)
1158                    return
1159            else:
1160                self.spawnHosts[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = '_any_'
1161            if pmiInfo.has_key('path'):
1162                self.spawnPaths[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = pmiInfo['path']
1163            else:
1164                self.spawnPaths[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = os.environ['MPDMAN_CLI_PATH']
1165            if pmiInfo.has_key('wdir'):
1166                self.spawnCwds[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = pmiInfo['wdir']
1167            else:
1168                self.spawnCwds[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = os.environ['MPDMAN_CWD']
1169            if pmiInfo.has_key('umask'):
1170                self.spawnUmasks[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = pmiInfo['umask']
1171            else:
1172                self.spawnUmasks[(self.tpsf,self.tpsf+self.spawnNprocs-1)]  = os.environ['MPDMAN_UMASK']
1173            self.spawnExecs[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = parsedMsg['execname']
1174            self.spawnUsers[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = mpd_get_my_username()
1175            self.spawnEnv = {}
1176            self.spawnEnv.update(os.environ)
1177            self.spawnEnv['MPI_APPNUM'] = str(spawnssofar-1)
1178            self.spawnEnvvars[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = self.spawnEnv
1179            self.spawnLimits[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = {} # not implemented yet
1180            ##### args[(tpsf,tpsf+spawnNprocs-1) = [ parsedMsg['args'] ]
1181            ##### args[(tpsf,tpsf+spawnNprocs-1) = [ 'AA', 'BB', 'CC' ]
1182            cliArgs = []
1183            cliArgcnt = int(parsedMsg['argcnt'])
1184            for i in range(1,cliArgcnt+1):    # start at 1
1185                cliArgs.append(parsedMsg['arg%d' % i])
1186            self.spawnArgs[(self.tpsf,self.tpsf+self.spawnNprocs-1)] = cliArgs
1187            self.tpsf += self.spawnNprocs
1188
1189            if totspawns == spawnssofar:    # This is the last in the spawn sequence
1190                self.spawnedCnt += 1    # non-zero to use for creating kvsname in msg below
1191                msgToSend = { 'cmd'          : 'spawn',
1192                              'conhost'      : self.myHost,
1193                              'conifhn'      : self.myIfhn,
1194                              'conport'      : self.listenNonRingPort,
1195                              'spawned'      : self.spawnedCnt,
1196                              'jobid'        : self.jobid,
1197                              'nstarted'     : 0,
1198                              'nprocs'       : self.tpsf,
1199                              'hosts'        : self.spawnHosts,
1200                              'execs'        : self.spawnExecs,
1201                              'users'        : self.spawnUsers,
1202                              'cwds'         : self.spawnCwds,
1203                              'umasks'       : self.spawnUmasks,
1204                              'paths'        : self.spawnPaths,
1205                              'args'         : self.spawnArgs,
1206                              'envvars'      : self.spawnEnvvars,
1207                              'limits'       : self.spawnLimits,
1208                              'singinitpid'  : 0,
1209                              'singinitport' : 0,
1210                            }
1211                msgToSend['line_labels'] = self.lineLabelFmt
1212                msgToSend['spawner_manpid'] = os.getpid()
1213                self.mpdSock.send_dict_msg(msgToSend)
1214                self.spawnInProgress = parsedMsg
1215                self.spawnInProgress['errcodes'] = [None] * self.tpsf  # one for each spawn
1216                # I could send the preput_info along but will keep it here
1217                # and let the spawnee call me up and ask for it; he will
1218                # call me anyway since I am his parent in the tree.  So, I
1219                # will create a KVS to hold the info until he calls
1220                self.spawnedKVSname = 'mpdman_kvs_for_spawned_' + str(self.spawnedCnt)
1221                self.KVSs[self.spawnedKVSname] = {}
1222                preput_num = int(parsedMsg['preput_num'])
1223                for i in range(0,preput_num):
1224                    preput_key = parsedMsg['preput_key_%d' % i]
1225                    preput_val = parsedMsg['preput_val_%d' % i]
1226                    self.KVSs[self.spawnedKVSname][preput_key] = preput_val
1227        elif parsedMsg['cmd'] == 'finalize':
1228            # the following lines are present to support a process that runs
1229            # 2 MPI pgms in tandem (e.g. mpish at ANL)
1230            self.KVSs = {}
1231            self.KVSs[self.default_kvsname] = {}
1232            self.kvs_next_id = 1
1233            self.jobEndingEarly = 0
1234            self.pmiCollectiveJob = 0
1235            self.spawnedCnt = 0
1236            pmiMsgToSend = 'cmd=finalize_ack\n'
1237            self.pmiSock.send_char_msg(pmiMsgToSend)
1238        elif parsedMsg['cmd'] == 'client_bnr_fence_in':    ## BNR
1239            self.pmiBarrierInRecvd = 1
1240            if self.myRank == 0  or  self.holdingPMIBarrierLoop1:
1241                msgToSend = { 'cmd' : 'pmi_barrier_loop_1' }
1242                self.ring.rhsSock.send_dict_msg(msgToSend)
1243        elif parsedMsg['cmd'] == 'client_bnr_put':         ## BNR
1244            key = parsedMsg['attr']
1245            value = parsedMsg['val']
1246            try:
1247                self.KVSs[self.default_kvsname][key] = value
1248                pmiMsgToSend = 'cmd=put_result rc=0\n'
1249                self.pmiSock.send_char_msg(pmiMsgToSend)
1250            except Exception, errmsg:
1251                pmiMsgToSend = 'cmd=put_result rc=-1 msg="%s"\n' % errmsg
1252                self.pmiSock.send_char_msg(pmiMsgToSend)
1253        elif parsedMsg['cmd'] == 'client_bnr_get':          ## BNR
1254            key = parsedMsg['attr']
1255            if self.KVSs[self.default_kvsname].has_key(key):
1256                value = self.KVSs[self.default_kvsname][key]
1257                pmiMsgToSend = 'cmd=client_bnr_get_output rc=0 val=%s\n' % (value)
1258                self.pmiSock.send_char_msg(pmiMsgToSend)
1259            else:
1260                msgToSend = { 'cmd' : 'bnr_get', 'key' : key,
1261                              'kvsname' : kvsname, 'from_rank' : self.myRank }
1262                self.ring.rhsSock.send_dict_msg(msgToSend)
1263        elif parsedMsg['cmd'] == 'client_ready':               ## BNR
1264            ## continue to wait for accepting_signals
1265            pass
1266        elif parsedMsg['cmd'] == 'accepting_signals':          ## BNR
1267            ## handle it like a barrier_in ??
1268            self.pmiBarrierInRecvd = 1
1269            self.doingBNR = 1    ## BNR  # set again is OK
1270        elif parsedMsg['cmd'] == 'interrupt_peer_with_msg':    ## BNR
1271            self.ring.rhsSock.send_dict_msg(parsedMsg)
1272        else:
1273            mpd_print(1, "unrecognized pmi msg :%s:" % line )
1274    def handle_console_input(self,sock):
1275        msg = self.conSock.recv_dict_msg()
1276        if not msg:
1277            if self.conSock:
1278                self.streamHandler.del_handler(self.conSock)
1279                self.conSock.close()
1280                self.conSock = 0
1281            if self.parentStdoutSock:
1282                self.streamHandler.del_handler(self.parentStdoutSock)
1283                self.parentStdoutSock.close()
1284                self.parentStdoutSock = 0
1285            if self.parentStderrSock:
1286                self.streamHandler.del_handler(self.parentStderrSock)
1287                self.parentStderrSock.close()
1288                self.parentStderrSock = 0
1289            if self.ring.rhsSock:
1290                msgToSend = { 'cmd' : 'signal', 'signo' : 'SIGKILL' }
1291                self.ring.rhsSock.send_dict_msg(msgToSend)
1292            try:
1293                pgrp = clientPid * (-1)   # neg Pid -> group
1294                os.kill(pgrp,signal.SIGKILL)   # may be reaped by sighandler
1295            except:
1296                pass
1297        elif msg['cmd'] == 'signal':
1298            if msg['signo'] == 'SIGINT':
1299                self.ring.rhsSock.send_dict_msg(msg)
1300                for s in self.spawnedChildSocks:
1301                    s.send_dict_msg(msg)
1302                if self.gdb:
1303                    os.kill(clientPid,signal.SIGINT)
1304                else:
1305                    try:
1306                        pgrp = clientPid * (-1)   # neg Pid -> group
1307                        os.kill(pgrp,signal.SIGKILL)   # may be reaped by sighandler
1308                    except:
1309                        pass
1310            elif msg['signo'] == 'SIGKILL':
1311                try:
1312                    self.ring.rhsSock.send_dict_msg(msg)
1313                except:
1314                    pass
1315                for s in self.spawnedChildSocks:
1316                    try:
1317                        s.send_dict_msg(msg)
1318                    except:
1319                        pass
1320                if self.gdb:
1321                    os.kill(clientPid,signal.SIGUSR1)    # tell gdb driver to kill all
1322                else:
1323                    try:
1324                        pgrp = clientPid * (-1)   # neg Pid -> group
1325                        os.kill(pgrp,signal.SIGKILL)   # may be reaped by sighandler
1326                    except:
1327                        pass
1328            elif msg['signo'] == 'SIGTSTP':
1329                msg['dest'] = self.myId
1330                self.ring.rhsSock.send_dict_msg(msg)
1331                try:
1332                    pgrp = clientPid * (-1)   # neg Pid -> group
1333                    os.kill(pgrp,signal.SIGTSTP)   # may be reaped by sighandler
1334                except:
1335                    pass
1336            elif msg['signo'] == 'SIGCONT':
1337                msg['dest'] = self.myId
1338                self.ring.rhsSock.send_dict_msg(msg)
1339                try:
1340                    pgrp = clientPid * (-1)   # neg Pid -> group
1341                    os.kill(pgrp,signal.SIGCONT)   # may be reaped by sighandler
1342                except:
1343                    pass
1344        elif msg['cmd'] == 'stdin_from_user':
1345            msg['src'] = self.myId
1346            if self.ring.rhsSock:
1347                # Only send to rhs if that sock is open
1348                self.ring.rhsSock.send_dict_msg(msg)
1349            if in_stdinRcvrs(self.myRank,self.stdinDest):
1350                try:
1351                    if msg.has_key('eof'):
1352                        if self.subproc:    # must close subproc's file (not just the fd)
1353                            self.subproc.stdin.close()
1354                        else:
1355                            os.close(self.fd_write_cli_stdin)
1356                    else:
1357                        os.write(self.fd_write_cli_stdin,msg['line'])
1358                except:
1359                    mpd_print(1, 'cannot send stdin to client')
1360        elif msg['cmd'] == 'stdin_dest':
1361            self.stdinDest = msg['stdin_procs']
1362            msg['src'] = self.myId
1363            if self.ring.rhsSock:
1364                # Only send to rhs if that sock is open
1365                self.ring.rhsSock.send_dict_msg(msg)
1366        elif msg['cmd'] == 'tv_ready':
1367            self.tvReady = 1
1368            msg['src'] = self.myId
1369            self.ring.rhsSock.send_dict_msg(msg)
1370            if self.pmiSock:    # should be valid sock if running tv
1371                pmiMsgToSend = 'cmd=tv_ready\n'
1372                self.pmiSock.send_char_msg(pmiMsgToSend)
1373        else:
1374            mpd_print(1, 'unexpected msg recvd on conSock :%s:' % msg )
1375    def handle_mpd_input(self,sock):
1376        msg = self.mpdSock.recv_dict_msg()
1377        mpd_print(0000, 'msg recvd on mpdSock :%s:' % msg )
1378        if not msg:
1379            if self.conSock:
1380                msgToSend = { 'cmd' : 'job_aborted', 'reason' : 'mpd disappeared',
1381                              'jobid' : self.jobid }
1382                self.conSock.send_dict_msg(msgToSend,errprint=0)
1383                self.streamHandler.del_handler(self.conSock)
1384                self.conSock.close()
1385                self.conSock = 0
1386            try:
1387                os.kill(0,signal.SIGKILL)  # pid 0 -> all in my process group
1388            except:
1389                pass
1390            sys.exit(0)
1391        if msg['cmd'] == 'abortjob':
1392            mpd_print(1, "job aborted by mpd; reason=%s" % (msg['reason']))
1393        elif msg['cmd'] == 'startup_status':  # probably some hosts not found
1394            if self.pmiSock:  # may have disappeared in early shutdown
1395                pmiMsgToSend = 'cmd=spawn_result rc=-1 errcodes='' reason=%s\n' % (msg['reason'])
1396                self.pmiSock.send_char_msg(pmiMsgToSend)
1397        elif msg['cmd'] == 'signal_to_handle'  and  msg.has_key('sigtype'):
1398            if msg['sigtype'].isdigit():
1399                signum = int(msg['sigtype'])
1400            else:
1401                exec('signum = %s' % 'signal.SIG' + msg['sigtype'])
1402            try:
1403                if msg['s_or_g'] == 's':    # single (not entire group)
1404                    pgrp = clientPid          # just client process
1405                else:
1406                    pgrp = clientPid * (-1)   # neg Pid -> process group
1407                os.kill(pgrp,signum)
1408            except Exception, errmsg:
1409                mpd_print(1, 'invalid signal (%d) from mpd' % (signum) )
1410        elif msg['cmd'] == 'publish_result':
1411            if self.pmiSock:
1412                pmiMsgToSend = 'cmd=publish_result info=%s\n' % (msg['info'])
1413                self.pmiSock.send_char_msg(pmiMsgToSend)
1414        elif msg['cmd'] == 'unpublish_result':
1415            if self.pmiSock:
1416                pmiMsgToSend = 'cmd=unpublish_result info=%s\n' % (msg['info'])
1417                self.pmiSock.send_char_msg(pmiMsgToSend)
1418        elif msg['cmd'] == 'lookup_result':
1419            if self.pmiSock:
1420                pmiMsgToSend = 'cmd=lookup_result info=%s port=%s\n' % \
1421                               (msg['info'],msg['port'])
1422                self.pmiSock.send_char_msg(pmiMsgToSend)
1423        elif msg['cmd'] == 'spawn_done_by_mpd':
1424            pass
1425        else:
1426            mpd_print(1, 'invalid msg recvd on mpdSock :%s:' % msg )
1427    def launch_client_via_fork_exec(self,cli_env):
1428        maxTries = 6
1429        numTries = 0
1430        while numTries < maxTries:
1431            try:
1432                cliPid = os.fork()
1433                errinfo = 0
1434            except OSError, errinfo:
1435                pass  ## could check for errinfo.errno == 35 (resource unavailable)
1436            if errinfo:
1437                sleep(1)
1438                numTries += 1
1439            else:
1440                break
1441        if numTries >= maxTries:
1442            ## print '**** mpdman: fork failed for launching client'
1443            return -1
1444        if cliPid == 0:
1445            mpd_set_my_id(socket.gethostname() + '_man_before_exec_client_' + `os.getpid()`)
1446            self.ring.lhsSock.close()
1447            self.ring.rhsSock.close()
1448            self.listenRingSock.close()
1449            if self.conSock:
1450                self.streamHandler.del_handler(self.conSock)
1451                self.conSock.close()
1452                self.conSock = 0
1453            self.pmiListenSock.close()
1454            os.setpgrp()
1455
1456            os.close(self.fd_write_cli_stdin)
1457            os.dup2(self.fd_read_cli_stdin,0)  # closes fd 0 (stdin) if open
1458
1459            # to simply print on the mpd's tty:
1460            #     comment out the next lines
1461            os.close(self.fd_read_cli_stdout)
1462            os.dup2(self.fd_write_cli_stdout,1)  # closes fd 1 (stdout) if open
1463            os.close(self.fd_write_cli_stdout)
1464            os.close(self.fd_read_cli_stderr)
1465            os.dup2(self.fd_write_cli_stderr,2)  # closes fd 2 (stderr) if open
1466            os.close(self.fd_write_cli_stderr)
1467
1468            msg = self.handshake_sock_cli_end.recv_char_msg()
1469            if not msg.startswith('go'):
1470                mpd_print(1,'%s: invalid go msg from man :%s:' % (self.myId,msg) )
1471                sys.exit(-1)
1472            self.handshake_sock_cli_end.close()
1473
1474            self.clientPgmArgs = [self.clientPgm] + self.clientPgmArgs
1475            errmsg = set_limits(self.clientPgmLimits)
1476            if errmsg:
1477                self.pmiSock = MPDSock(name='pmi')
1478                self.pmiSock.connect((self.myIfhn,self.pmiListenPort))
1479                reason = quote(str(errmsg))
1480                pmiMsgToSend = 'cmd=startup_status rc=-1 reason=%s exec=%s\n' % \
1481                               (reason,self.clientPgm)
1482                self.pmiSock.send_char_msg(pmiMsgToSend)
1483                sys.exit(0)
1484            try:
1485                mpd_print(0000, 'execing clientPgm=:%s:' % (self.clientPgm) )
1486                if self.gdb:
1487                    fullDirName = os.environ['MPDMAN_FULLPATHDIR']
1488                    gdbdrv = os.path.join(fullDirName,'mpdgdbdrv.py')
1489                    if not os.access(gdbdrv,os.X_OK):
1490                        print 'mpdman: cannot execute mpdgdbdrv %s' % gdbdrv
1491                        sys.exit(0);
1492                    if self.gdba:
1493                        self.clientPgmArgs.insert(0,'-attach')
1494                    self.clientPgmArgs.insert(0,self.clientPgm)
1495                    os.execvpe(gdbdrv,self.clientPgmArgs,cli_env)    # client
1496                else:
1497                    os.environ['PATH'] = cli_env['PATH']
1498                    os.execvpe(self.clientPgm,self.clientPgmArgs,cli_env)    # client
1499            except Exception, errmsg:
1500                # print '%s: could not run %s; probably executable file not found' % \
1501                #        (self.myId,clientPgm)
1502                self.pmiSock = MPDSock(name='pmi')
1503                self.pmiSock.connect((self.myIfhn,self.pmiListenPort))
1504                reason = quote(str(errmsg))
1505                pmiMsgToSend = 'cmd=startup_status rc=-1 reason=%s exec=%s\n' % \
1506                               (reason,self.clientPgm)
1507                self.pmiSock.send_char_msg(pmiMsgToSend)
1508                sys.exit(0)
1509            sys.exit(0)
1510        if not self.singinitPORT:
1511            os.close(self.fd_read_cli_stdin)
1512            os.close(self.fd_write_cli_stdout)
1513            os.close(self.fd_write_cli_stderr)
1514            self.cliListenSock.close()
1515        return cliPid
1516    def launch_client_via_subprocess(self,cli_env):
1517        import threading
1518        def read_fd_with_func(fd,func):
1519            line = 'x'
1520            while line:
1521                line = func(fd)
1522        tempListenSock = MPDListenSock()
1523        tempListenPort = tempListenSock.getsockname()[1]
1524        # python_executable = '\Python24\python.exe'
1525        python_executable = 'python2.4'
1526        fullDirName = os.environ['MPDMAN_FULLPATHDIR']
1527        mpdwrapcli = os.path.join(fullDirName,'mpdwrapcli.py')
1528        wrapCmdAndArgs = [ mpdwrapcli, str(tempListenPort),
1529                           self.clientPgm, self.clientPgm ] + self.clientPgmArgs
1530        cli_env.update(os.environ) ######  RMB: MAY NEED VARS OTHER THAN PATH ?????
1531        self.subproc = subprocess.Popen([python_executable,'-u'] + wrapCmdAndArgs,
1532                                        bufsize=0,env=cli_env,close_fds=False,
1533                                        stdin=subprocess.PIPE,
1534                                        stdout=subprocess.PIPE,
1535                                        stderr=subprocess.PIPE)
1536        self.fd_write_cli_stdin = self.subproc.stdin.fileno()
1537        stdout_thread = threading.Thread(target=read_fd_with_func,
1538                                         args=(self.subproc.stdout.fileno(),
1539                                               self.handle_cli_stdout_input))
1540        stdout_thread.start()
1541        stderr_thread = threading.Thread(target=read_fd_with_func,
1542                                         args=(self.subproc.stderr.fileno(),
1543                                               self.handle_cli_stderr_input))
1544        stderr_thread.start()
1545        (self.handshake_sock_man_end,tempAddr) = tempListenSock.accept()
1546        cliPid = self.subproc.pid
1547        ## an mpd_print wreaks havoc here; simple prints are OK (probably a stack issue)
1548        # mpd_print(0000,"CLIPID=%d" % cliPid)
1549        # print "CLIPID=%d" % cliPid  ;  sys.stdout.flush()
1550        return cliPid
1551    def create_line_label(self,line_label_fmt,spawned):
1552        lineLabel = ''  # default is no label
1553        if line_label_fmt:
1554            i = 0
1555            while i < len(line_label_fmt):
1556                if line_label_fmt[i] == '%':
1557                    fmtchar = line_label_fmt[i+1]
1558                    i += 2
1559                    if fmtchar == 'r':
1560                        lineLabel += str(self.myRank)
1561                    elif fmtchar == 'h':
1562                        lineLabel += self.myHost
1563                else:
1564                    lineLabel += line_label_fmt[i]
1565                    i += 1
1566            if spawned:
1567                lineLabel += ',' + str(spawned) + ': '    # spawned is actually a count
1568            else:
1569                lineLabel += ': '
1570        return lineLabel
1571
1572def in_stdinRcvrs(myRank,stdinDest):
1573    s1 = stdinDest.split(',')
1574    for s in s1:
1575        s2 = s.split('-')
1576        if len(s2) == 1:
1577            if myRank == int(s2[0]):
1578                return 1
1579        else:
1580            if myRank >= int(s2[0])  and  myRank <= int(s2[1]):
1581                return 1
1582    return 0
1583
1584
1585def parse_pmi_msg(msg):
1586    parsed_msg = {}
1587    try:
1588        sm = findall(r'\S+',msg)
1589        for e in sm:
1590            se = e.split('=')
1591            parsed_msg[se[0]] = se[1]
1592    except:
1593        print 'unable to parse pmi msg :%s:' % msg
1594        parsed_msg = {}
1595    return parsed_msg
1596
1597def set_limits(limits):
1598    try:
1599        import resource
1600    except:
1601        return 'unable to import resource module to set limits'
1602    for limtype in limits.keys():
1603        limit = int(limits[limtype])
1604        try:
1605            if   limtype == 'core':
1606                resource.setrlimit(resource.RLIMIT_CORE,(limit,limit))
1607            elif limtype == 'cpu':
1608                resource.setrlimit(resource.RLIMIT_CPU,(limit,limit))
1609            elif limtype == 'fsize':
1610                resource.setrlimit(resource.RLIMIT_FSIZE,(limit,limit))
1611            elif limtype == 'data':
1612                resource.setrlimit(resource.RLIMIT_DATA,(limit,limit))
1613            elif limtype == 'stack':
1614                resource.setrlimit(resource.RLIMIT_STACK,(limit,limit))
1615            elif limtype == 'rss':
1616                resource.setrlimit(resource.RLIMIT_RSS,(limit,limit))
1617            elif limtype == 'nproc':
1618                resource.setrlimit(resource.RLIMIT_NPROC,(limit,limit))
1619            elif limtype == 'nofile':
1620                resource.setrlimit(resource.RLIMIT_NOFILE,(limit,limit))
1621            elif limtype == 'ofile':
1622                resource.setrlimit(resource.RLIMIT_OFILE,(limit,limit))
1623            elif limtype == 'memloc':
1624                resource.setrlimit(resource.RLIMIT_MEMLOCK,(limit,limit))
1625            elif  limtype == 'as':
1626                resource.setrlimit(resource.RLIMIT_AS,(limit,limit))
1627            elif  limtype == 'vmem':
1628                resource.setrlimit(resource.RLIMIT_VMEM,(limit,limit))
1629            else:
1630                raise NameError, 'invalid resource name: %s' % limtype  # validated at mpdrun
1631        except (NameError,ImportError), errmsg:
1632            return errmsg
1633    return 0
1634
1635def sigchld_handler(signum,frame):
1636    global clientPid, clientExited, clientExitStatus, clientExitStatusSent
1637    done = 0
1638    while not done:
1639        try:
1640            (pid,status) = os.waitpid(-1,os.WNOHANG)
1641            if pid == 0:    # no existing child process is finished
1642                done = 1
1643            if pid == clientPid:
1644                clientExited = 1
1645                clientExitStatus = status
1646                mpd_handle_signal(signum,0)
1647        except:
1648            done = 1
1649
1650
1651if __name__ == '__main__':
1652    if not os.environ.has_key('MPDMAN_CLI_PGM'):    # assume invoked from keyboard
1653        print __doc__
1654        sys.exit(-1)
1655    mpdman = MPDMan()
1656    mpdman.run()
1657