1#!/usr/bin/python3
2
3"""
4dispy: Distribute computations among CPUs/cores on a single machine or machines
5in cluster(s), grid, cloud etc. for parallel execution.  See
6http://dispy.sourceforge.net or https://pgiri.github.com/dispy for details.
7"""
8
9import os
10import sys
11import time
12import socket
13import inspect
14import stat
15import threading
16import re
17import ssl
18import hashlib
19import traceback
20import shelve
21import datetime
22import atexit
23import functools
24import queue
25import numbers
26import collections
27import struct
28import errno
29import platform
30import itertools
31import copy
32import types
33try:
34    import netifaces
35except ImportError:
36    netifaces = None
37
38import pycos
39from pycos import Task, Pycos, AsyncSocket, Singleton, serialize, deserialize
40
41__author__ = "Giridhar Pemmasani (pgiri@yahoo.com)"
42__email__ = "pgiri@yahoo.com"
43__copyright__ = "Copyright 2011, Giridhar Pemmasani"
44__contributors__ = []
45__maintainer__ = "Giridhar Pemmasani (pgiri@yahoo.com)"
46__license__ = "Apache 2.0"
47__url__ = "http://dispy.sourceforge.net"
48__status__ = "Production"
49__version__ = "4.10.6"
50
51__all__ = ['logger', 'DispyJob', 'DispyNode', 'NodeAllocate', 'JobCluster', 'SharedJobCluster']
52
53_dispy_version = __version__
54MsgTimeout = 10
55IPV6_MULTICAST_GROUP = 'ff05::b409:3171:9705:6159:5134:70'
56IPV4_MULTICAST_GROUP = '239.255.51.34'
57# PyPI / pip packaging adjusts assertion below for Python 3.7+
58assert sys.version_info.major == 3 and sys.version_info.minor >= 7, \
59    ('"%s" is not suitable for Python version %s.%s; use file installed by pip instead' %
60     (__file__, sys.version_info.major, sys.version_info.minor))
61
62
63class DispyJob(object):
64    """Job scheduled for execution with dispy.
65
66    Once a job is scheduled (with a tuple of arguments), the __call__
67    method can be invoked. This will wait until the job is
68    complete. The result of the call (either the return value in case
69    of python methods or the exit value in case of programs) will be
70    returned; the result is also available as .result member if
71    needed.  In addition, any output, error, exception messages from
72    the job will be available as .stdout, .stderr and .exception
73    members. The time when the job was submitted for execution on a
74    node will be available as .start_time and when the job results
75    became available as .end_time.
76
77    .id field is initially set to None and may be assigned by user to
78    any value that is appropriate.  This may be useful, for example,
79    to distinguish one job from another.
80
81    .status is read-only field; it is set to one of Created, Running,
82    Finished, Cancelled, Terminated and ProvisionalResult, indicating
83    current status of job.  If job is created for SharedJobCluster,
84    status is not updated to Running when job is actually running.
85
86    .ip_addr is read-inly field; it is set to IP address of node that
87    executed job.
88
89    .finish is a read-only event that is set when a job's results are
90    available.
91
92    """
93
94    __slots__ = ('id', 'result', 'stdout', 'stderr', 'exception',
95                 'submit_time', 'start_time', 'end_time', 'status',
96                 'ip_addr', 'finish', '_args', '_kwargs', '_dispy_job_', '_uid')
97
98    Created = 5
99    Running = 6
100    ProvisionalResult = 7
101    # NB: Cancelled, Terminated and Finished status should have
102    # values in that order, as PriorityQueue sorts data.
103    # Thus, if a job with provisional result is already in the queue
104    # and a job is finished, finished/terminated job is processed (in
105    # callback) last.
106    Cancelled = 8
107    Terminated = 9
108    Abandoned = 10
109    Finished = 11
110
111    id_iter = itertools.count(start=1)
112
113    def __init__(self, job_id, args, kwargs):
114        # id can be assigned by user as appropriate (e.g., to distinguish jobs)
115        if job_id:
116            self.id = job_id
117        else:
118            self.id = next(DispyJob.id_iter)
119        # rest are read-only
120        self.result = None
121        self.stdout = None
122        self.stderr = None
123        self.exception = None
124        self.submit_time = time.time()
125        self.start_time = None
126        self.end_time = None
127        self.status = DispyJob.Created
128        self.ip_addr = None
129        self.finish = threading.Event()
130
131        # rest are for dispy implementation only - these are opaque to clients
132        self._args = args
133        self._kwargs = kwargs
134        self._dispy_job_ = None
135        self._uid = id(self)
136
137    def __call__(self, clear=False):
138        self.finish.wait()
139        if clear:
140            self.finish.clear()
141        return self.result
142
143    def __lt__(self, other):
144        if isinstance(self._dispy_job_, _DispyJob_):
145            if isinstance(other._dispy_job_, _DispyJob_):
146                return self._dispy_job_ < other._dispy_job_
147            else:
148                return True
149        else:
150            return False
151
152
153class DispyNodeAvailInfo(object):
154    """A node's status is represented as available CPU as percent, memory in
155    bytes and disk as bytes. This information is passed to NodeAllocte.allocate
156    method and in cluster status callback with status DispyNode.AvailInfo.
157    """
158    def __init__(self, cpu, memory, disk, swap):
159        self.cpu = cpu
160        self.memory = memory
161        self.disk = disk
162        self.swap = swap
163
164
165class DispyNode(object):
166    """If 'cluster_status' is used when creating cluster, that function
167    is called with an instance of this class as first argument.
168    See 'cluster_status' in JobCluster below.
169    """
170
171    Initialized = DispyJob.Created - 1
172    Closed = DispyJob.Finished + 5
173    AvailInfo = Closed + 1
174
175    def __init__(self, ip_addr, name, cpus):
176        self.ip_addr = ip_addr
177        self.name = name
178        self.cpus = cpus
179        self.avail_cpus = cpus
180        self.busy = 0
181        self.jobs_done = 0
182        self.cpu_time = 0.0
183        self.tx = 0
184        self.rx = 0
185        self.update_time = 0
186        self.avail_info = None
187
188
189class NodeAllocate(object):
190    """Objects of this class describe if / how many CPUs in a node are
191    allocated to clusters.
192
193    Each element of 'nodes' passed to JobCluster or SharedJobCluster
194    is an object of this class; if the element passed is a string
195    (host name or IP address), a tuple (see documentation for
196    details), it is converted to NodeAllocate object with
197    '_parse_node_allocs' function.
198
199    This class can be specialized (inherited) to override, for
200    example, 'allocate' method.
201    """
202    def __init__(self, host, port=None, cpus=0):
203        self.ip_addr = _node_ipaddr(host)
204        if not self.ip_addr:
205            logger.warning('host "%s" is invalid', host)
206            self.ip_rex = ''
207        else:
208            self.ip_rex = self.ip_addr.replace('.', '\\.').replace('*', '.*')
209        if port:
210            try:
211                port = int(port)
212                assert port > 0
213            except Exception:
214                logger.warning('port must be > 0 for node "%s"', host)
215                port = None
216        self.port = port
217        if cpus:
218            try:
219                cpus = int(cpus)
220            except Exception:
221                logger.warning('Invalid cpus for "%s" ignored', host)
222                cpus = 0
223        self.cpus = cpus
224
225    def allocate(self, cluster, ip_addr, name, cpus, avail_info=None, platform=''):
226        """When a node is found, dispy calls this method with the
227        cluster for which the node is being allocated, IP address,
228        name and CPUs available on that node. This method should
229        return a number indicating number of CPUs to use. If return
230        value is 0, the node is not used for that cluster.
231        """
232        if re.match(self.ip_rex, ip_addr):
233            if self.cpus > 0:
234                cpus = min(cpus, self.cpus)
235            elif (cpus + self.cpus) > 0:
236                cpus = cpus + self.cpus
237            return cpus
238        return 0
239
240
241# a cluster's "status" function (not "cluster_status" callback)
242# returns this structure; "nodes" is list of DispyNode objects and
243# "jobs_pending" is number of jobs that are not done yet
244ClusterStatus = collections.namedtuple('ClusterStatus', ['nodes', 'jobs_pending'])
245
246
247def num_min(*args):
248    items = [arg for arg in args if isinstance(arg, numbers.Number)]
249    if not items:
250        return None
251    return min(items)
252
253
254def num_max(*args):
255    items = [arg for arg in args if isinstance(arg, numbers.Number)]
256    if not items:
257        return None
258    return max(items)
259
260
261def _same_file(tgt, xf):
262    """Internal use only.
263    """
264    # TODO: compare checksum?
265    try:
266        stat_buf = os.stat(tgt)
267        if (stat_buf.st_size == xf.stat_buf.st_size and
268            abs(stat_buf.st_mtime - xf.stat_buf.st_mtime) <= 1):
269            return True
270    except Exception:
271        return False
272
273
274def auth_code(secret, sign):
275    return hashlib.sha1((secret + sign).encode()).hexdigest().encode()
276
277
278def _node_ipaddr(node):
279    """Internal use only.
280    """
281    if not node:
282        return None
283    if node.find('*') >= 0:
284        return node
285    try:
286        info = socket.getaddrinfo(node, None)[0]
287    except Exception:
288        return None
289
290    ip_addr = info[-1][0]
291    if info[0] == socket.AF_INET6:
292        # canonicalize so different platforms resolve to same string
293        ip_addr = ip_addr.split('%')[0]
294        ip_addr = re.sub(r'^0+', '', ip_addr)
295        ip_addr = re.sub(r':0+', ':', ip_addr)
296        ip_addr = re.sub(r'::+', '::', ip_addr)
297    return ip_addr
298
299
300def _parse_node_allocs(nodes):
301    """Internal use only.
302    """
303    node_allocs = []
304    for node in nodes:
305        if isinstance(node, NodeAllocate):
306            node_allocs.append(node)
307        elif isinstance(node, str):
308            node_allocs.append(NodeAllocate(node))
309        elif isinstance(node, dict):
310            node_allocs.append(NodeAllocate(node.get('host', '*'), node.get('port', None),
311                                            node.get('cpus', 0)))
312        elif isinstance(node, tuple):
313            node_allocs.append(NodeAllocate(*node))
314        elif isinstance(node, list):
315            node_allocs.append(NodeAllocate(*tuple(node)))
316        elif isinstance(node, DispyNode):
317            node_allocs.append(NodeAllocate(node.ip_addr))
318        else:
319            logger.warning('Ignoring node specification %s', type(node))
320    return [node_alloc for node_alloc in node_allocs if node_alloc.ip_addr]
321
322
323def host_addrinfo(host=None, socket_family=None, ipv4_multicast=False):
324    """If 'host' is given (as either host name or IP address), resolve it and
325    fill AddrInfo structure. If 'host' is not given, netifaces module is used to
326    find appropriate IP address. If 'socket_family' is given, IP address with that
327    'socket_family' is used. It should be either 'socket.AF_INET' (for IPv4) or
328    'socket.AF_INET6' (for IPv6).
329    """
330
331    class AddrInfo(object):
332        def __init__(self, family, ip, ifn, broadcast, netmask):
333            self.family = family
334            self.ip = ip
335            self.ifn = ifn
336            if family == socket.AF_INET and ipv4_multicast:
337                self.broadcast = IPV4_MULTICAST_GROUP
338            else:
339                self.broadcast = broadcast
340            self.netmask = netmask
341            self.ext_ip_addr = None
342            if os.name == 'nt':
343                self.bind_addr = ip
344            elif platform.system() in ('Darwin', 'DragonFlyBSD', 'FreeBSD', 'OpenBSD', 'NetBSD'):
345                if family == socket.AF_INET and (not ipv4_multicast):
346                    self.bind_addr = ''
347                else:
348                    self.bind_addr = self.broadcast
349            else:
350                self.bind_addr = self.broadcast
351
352    def canonical_ipv6(ip_addr):
353        # canonicalize so different platforms resolve to same string
354        ip_addr = ip_addr.split('%')[0]
355        ip_addr = re.sub(r'^0+', '', ip_addr)
356        ip_addr = re.sub(r':0+', ':', ip_addr)
357        ip_addr = re.sub(r'::+', '::', ip_addr)
358        return ip_addr
359
360    if socket_family:
361        if socket_family not in (socket.AF_INET, socket.AF_INET6):
362            return None
363    hosts = []
364    if host:
365        best = None
366        for addr in socket.getaddrinfo(host, None):
367            if socket_family and addr[0] != socket_family:
368                continue
369            if not best or addr[0] == socket.AF_INET:
370                best = addr
371        if best:
372            socket_family = best[0]
373            if best[0] == socket.AF_INET6:
374                addr = canonical_ipv6(best[-1][0])
375            else:
376                addr = best[-1][0]
377            hosts.append(addr)
378        else:
379            return None
380
381    if socket_family:
382        socket_families = [socket_family]
383    else:
384        socket_families = [socket.AF_INET, socket.AF_INET6]
385
386    addrinfos = []
387    if netifaces:
388        for iface in netifaces.interfaces():
389            ifn = 0
390            iface_infos = []
391            for sock_family in socket_families:
392                for link in netifaces.ifaddresses(iface).get(sock_family, []):
393                    netmask = link.get('netmask', None)
394                    if sock_family == socket.AF_INET:
395                        addr = str(link['addr'])
396                        broadcast = link.get('broadcast', '<broadcast>')
397                        # Windows seems to have broadcast same as addr
398                        if broadcast.startswith(addr):
399                            broadcast = '<broadcast>'
400                        try:
401                            addrs = socket.getaddrinfo(addr, None, sock_family, socket.SOCK_STREAM)
402                        except Exception:
403                            addrs = []
404                        for addr in addrs:
405                            if hosts and addr[-1][0] not in hosts:
406                                continue
407                            addrinfo = AddrInfo(sock_family, addr[-1][0], addr[-1][-1],
408                                                broadcast, netmask)
409                            iface_infos.append(addrinfo)
410                    else:  # sock_family == socket.AF_INET6
411                        addr = str(link['addr'])
412                        broadcast = link.get('broadcast', IPV6_MULTICAST_GROUP)
413                        if broadcast.startswith(addr):
414                            broadcast = IPV6_MULTICAST_GROUP
415                        if_sfx = ['']
416                        if not ifn and ('%' not in addr.split(':')[-1]):
417                            if_sfx.append('%' + iface)
418                        for sfx in if_sfx:
419                            if ifn and sfx:
420                                break
421                            try:
422                                addrs = socket.getaddrinfo(addr + sfx, None, sock_family,
423                                                           socket.SOCK_STREAM)
424                            except Exception:
425                                continue
426                            for addr in addrs:
427                                if addr[-1][-1]:
428                                    if ifn and addr[-1][-1] != ifn:
429                                        logger.warning('inconsistent scope IDs for %s: %s != %s',
430                                                       iface, ifn, addr[-1][-1])
431                                    ifn = addr[-1][-1]
432                                if sfx:
433                                    continue
434                                addr = canonical_ipv6(addr[-1][0])
435                                if hosts and addr not in hosts:
436                                    continue
437                                addrinfo = AddrInfo(sock_family, addr, ifn, broadcast, netmask)
438                                iface_infos.append(addrinfo)
439                        if ifn:
440                            for addrinfo in iface_infos:
441                                if not addrinfo.ifn:
442                                    addrinfo.ifn = ifn
443            addrinfos.extend(iface_infos)
444
445    else:
446        if not host:
447            host = socket.gethostname()
448        netmask = None
449        for sock_family in socket_families:
450            try:
451                addrs = socket.getaddrinfo(host, None, sock_family, socket.SOCK_STREAM)
452            except Exception:
453                continue
454            for addr in addrs:
455                ifn = addr[-1][-1]
456                if sock_family == socket.AF_INET:
457                    broadcast = '<broadcast>'
458                    addr = addr[-1][0]
459                else:  # sock_family == socket.AF_INET6
460                    addr = canonical_ipv6(addr[-1][0])
461                    broadcast = IPV6_MULTICAST_GROUP
462                    logger.warning('IPv6 may not work without "netifaces" package!')
463                addrinfo = AddrInfo(sock_family, addr, ifn, broadcast, netmask)
464                if hosts:
465                    if addrinfo.ip in hosts:
466                        return addrinfo
467                    else:
468                        continue
469                addrinfos.append(addrinfo)
470
471    best = None
472    for sock_family in socket_families:
473        for addrinfo in addrinfos:
474            if addrinfo.ip in hosts:
475                return addrinfo
476            if addrinfo.family != sock_family:
477                continue
478            if addrinfo.family == socket.AF_INET:
479                if not best or (len(best.ip) < len(addrinfo.ip)) or best.ip.startswith('127'):
480                    best = addrinfo
481            else:
482                if addrinfo.ip.startswith('fd'):
483                    # TODO: How to detect / avoid temporary addresses (privacy extensions)?
484                    if addrinfo.ifn:
485                        return addrinfo
486                if not best or (len(best.ip) < len(addrinfo.ip)) or best.ip.startswith('fe80'):
487                    best = addrinfo
488        if best and best.family == socket.AF_INET and (not best.ip.startswith('127')):
489            return best
490    return best
491
492
493# This tuple stores information about partial functions; for
494# now 'setup' and 'cleanup' functions can be partial functions.
495# TODO: useful to have 'compute' as partial function as well?
496_Function = collections.namedtuple('_Function', ['name', 'args', 'kwargs'])
497logger = pycos.Logger('dispy')
498
499
500class _Compute(object):
501    """Internal use only.
502    """
503    func_type = 1
504    prog_type = 2
505
506    def __init__(self, compute_type, name):
507        assert compute_type == _Compute.func_type or compute_type == _Compute.prog_type
508        self.type = compute_type
509        self.name = name
510        self.id = None
511        self.code = ''
512        self.dest_path = None
513        self.xfer_files = set()
514        self.reentrant = False
515        self.exclusive = True
516        self.setup = None
517        self.cleanup = None
518        self.scheduler_ip_addr = None
519        self.scheduler_port = None
520        self.node_ip_addr = None
521        self.auth = None
522        self.job_result_port = None
523        self.pulse_interval = None
524
525
526class _XferFile(object):
527    """Internal use only.
528    """
529    def __init__(self, name, dest_path, compute_id=None):
530        self.name = name
531        self.dest_path = dest_path
532        self.compute_id = compute_id
533        self.stat_buf = os.stat(name)
534        self.sep = os.sep
535
536
537class _Node(object):
538    """Internal use only.
539    """
540    __slots__ = ['ip_addr', 'port', 'name', 'cpus', 'avail_cpus', 'busy', 'cpu_time', 'clusters',
541                 'auth', 'secret', 'keyfile', 'certfile', 'last_pulse', 'scheduler_ip_addr',
542                 'pending_jobs', 'avail_info', 'platform', 'sock_family', 'tx', 'rx']
543
544    def __init__(self, ip_addr, port, cpus, sign, secret, platform='',
545                 keyfile=None, certfile=None):
546        self.ip_addr = ip_addr
547        if re.match('\d+\.', ip_addr):
548            self.sock_family = socket.AF_INET
549        else:
550            self.sock_family = socket.AF_INET6
551        self.port = port
552        self.name = None
553        self.cpus = cpus
554        self.avail_cpus = cpus
555        self.busy = 0.0
556        self.cpu_time = 0.0
557        self.clusters = set()
558        self.auth = auth_code(secret, sign)
559        self.secret = secret
560        self.keyfile = keyfile
561        self.certfile = certfile
562        self.last_pulse = None
563        self.scheduler_ip_addr = None
564        self.pending_jobs = []
565        self.avail_info = None
566        self.platform = platform
567        self.tx = 0
568        self.rx = 0
569
570    def setup(self, compute, exclusive=True, task=None):
571        # generator
572        compute.scheduler_ip_addr = self.scheduler_ip_addr
573        compute.node_ip_addr = self.ip_addr
574        compute.exclusive = exclusive
575        reply = yield self.send(b'COMPUTE:' + serialize(compute), task=task)
576        try:
577            cpus = deserialize(reply)
578            assert isinstance(cpus, int) and cpus > 0
579        except Exception:
580            logger.warning('Transfer of computation "%s" to %s failed', compute.name, self.ip_addr)
581            return(-1)
582        if not self.cpus:
583            self.cpus = cpus
584        for xf in compute.xfer_files:
585            resp = yield self.xfer_file(xf, task=task)
586            if resp < 0:
587                logger.error('Could not transfer file "%s"', xf.name)
588                yield self.close(compute, task=task)
589                return(resp)
590
591        resp = yield self.send(b'SETUP:' + serialize(compute.id), timeout=0, task=task)
592        if not isinstance(resp, int) or resp < 0:
593            if isinstance(resp, bytearray):
594                resp = resp.decode()
595            logger.warning('Setup of computation "%s" on %s failed: %s',
596                           compute.name, self.ip_addr, resp)
597            yield self.close(compute, task=task)
598            return(resp)
599        self.last_pulse = time.time()
600        return(0)
601
602    def send(self, msg, reply=True, timeout=MsgTimeout, task=None):
603        # generator
604        sock = socket.socket(self.sock_family, socket.SOCK_STREAM)
605        sock = AsyncSocket(sock, keyfile=self.keyfile, certfile=self.certfile)
606        sock.settimeout(timeout)
607        try:
608            yield sock.connect((self.ip_addr, self.port))
609            yield sock.sendall(self.auth)
610            yield sock.send_msg(msg)
611            if reply:
612                resp = yield sock.recv_msg()
613            else:
614                resp = len(msg)
615        except Exception:
616            logger.error('Could not connect to %s:%s, %s',
617                         self.ip_addr, self.port, traceback.format_exc())
618            # TODO: mark this node down, reschedule on different node?
619            raise
620        finally:
621            sock.close()
622
623        if resp == b'ACK':
624            resp = len(msg)
625            self.tx += len(msg)
626        return(resp)
627
628    def xfer_file(self, xf, task=None):
629        # generator
630        sock = socket.socket(self.sock_family, socket.SOCK_STREAM)
631        sock = AsyncSocket(sock, keyfile=self.keyfile, certfile=self.certfile)
632        sock.settimeout(MsgTimeout)
633        try:
634            yield sock.connect((self.ip_addr, self.port))
635            yield sock.sendall(self.auth)
636            yield sock.send_msg(b'FILEXFER:' + serialize(xf))
637            recvd = yield sock.recv_msg()
638            recvd = deserialize(recvd)
639            with open(xf.name, 'rb') as fd:
640                sent = 0
641                while sent == recvd:
642                    data = fd.read(1024000)
643                    if not data:
644                        break
645                    yield sock.sendall(data)
646                    sent += len(data)
647                    recvd = yield sock.recv_msg()
648                    recvd = deserialize(recvd)
649                    self.tx += sent
650            if recvd == xf.stat_buf.st_size:
651                resp = sent
652            else:
653                resp = -1
654        except Exception:
655            logger.error('Could not transfer %s to %s', xf.name, self.ip_addr)
656            # TODO: mark this node down, reschedule on different node?
657            resp = -1
658        finally:
659            sock.close()
660        return(resp)
661
662    def close(self, compute, terminate_pending=False, task=None):
663        # generator
664        logger.debug('Closing node %s for %s / %s', self.ip_addr, compute.name, compute.id)
665        req = {'compute_id': compute.id, 'auth': compute.auth, 'node_ip_addr': self.ip_addr,
666               'terminate_pending': terminate_pending}
667        try:
668            yield self.send(b'CLOSE:' + serialize(req), reply=True, task=task)
669        except Exception:
670            logger.debug('Deleting computation %s/%s from %s failed',
671                         compute.id, compute.name, self.ip_addr)
672
673
674class _DispyJob_(object):
675    """Internal use only.
676    """
677
678    __slots__ = ('job', 'uid', 'compute_id', 'hash', 'node', 'pinned',
679                 'xfer_files', '_args', '_kwargs', 'code')
680
681    def __init__(self, compute_id, job_id, args, kwargs):
682        job_deps = kwargs.pop('dispy_job_depends', [])
683        self.job = DispyJob(job_id, args, kwargs)
684        self.job._dispy_job_ = self
685        self._args = self.job._args
686        self._kwargs = self.job._kwargs
687        self.uid = None
688        self.compute_id = compute_id
689        self.hash = ''.join(hex(_)[2:] for _ in os.urandom(10))
690        self.node = None
691        self.pinned = None
692        self.xfer_files = []
693        self.code = ''
694        depend_ids = set()
695        cwd = os.getcwd()
696        for dep in job_deps:
697            if isinstance(dep, str) or inspect.ismodule(dep):
698                if inspect.ismodule(dep):
699                    name = dep.__file__
700                    if name.endswith('.pyc'):
701                        name = name[:-1]
702                    if not name.endswith('.py'):
703                        logger.warning('Invalid module "%s" - must be python source.', dep)
704                        continue
705                    if name.startswith(cwd):
706                        dst = os.path.dirname(name[len(cwd):].lstrip(os.sep))
707                    elif dep.__package__:
708                        dst = dep.__package__.replace('.', os.sep)
709                    else:
710                        dst = os.path.dirname(dep.__name__.replace('.', os.sep))
711                else:
712                    name = os.path.abspath(dep)
713                    if name.startswith(cwd):
714                        dst = os.path.dirname(name[len(cwd):].lstrip(os.sep))
715                    else:
716                        dst = '.'
717                if name in depend_ids:
718                    continue
719                self.xfer_files.append(_XferFile(name, dst, compute_id))
720                depend_ids.add(name)
721            elif (inspect.isfunction(dep) or inspect.isclass(dep) or
722                  (hasattr(dep, '__class__') and hasattr(dep, '__module__'))):
723                if inspect.isfunction(dep) or inspect.isclass(dep):
724                    pass
725                elif hasattr(dep, '__class__') and inspect.isclass(dep.__class__):
726                    dep = dep.__class__
727                if id(dep) in depend_ids:
728                    continue
729                try:
730                    lines = inspect.getsourcelines(dep)[0]
731                except Exception:
732                    logger.warning('Invalid job depends eleement "%s"; ignoring it.', dep)
733                    continue
734                lines[0] = lines[0].lstrip()
735                self.code += '\n' + ''.join(lines)
736                depend_ids.add(id(dep))
737            else:
738                logger.warning('Invalid job depends element "%s"; ignoring it.', dep)
739
740    def __getstate__(self):
741        state = {'uid': self.uid, 'hash': self.hash, 'compute_id': self.compute_id,
742                 '_args': self._args if isinstance(self._args, bytes) else serialize(self._args),
743                 '_kwargs': self._kwargs if isinstance(self._kwargs, bytes)
744                                         else serialize(self._kwargs),
745                 'xfer_files': self.xfer_files, 'code': self.code}
746        return state
747
748    def __setstate__(self, state):
749        for k, v in state.items():
750            setattr(self, k, v)
751
752    def __lt__(self, other):
753        return self.uid < other.uid
754
755    def __eq__(self, other):
756        return isinstance(other, _DispyJob_) and self.uid == other.uid
757
758    def run(self, task=None):
759        # generator
760        logger.debug('Running job %s on %s', self.uid, self.node.ip_addr)
761        self.job.start_time = time.time()
762        tx = 0
763        for xf in self.xfer_files:
764            sent = yield self.node.xfer_file(xf, task=task)
765            if sent < 0:
766                logger.warning('Transfer of file "%s" to %s failed', xf.name, self.node.ip_addr)
767                raise Exception(-1)
768            tx += sent
769        resp = yield self.node.send(b'JOB:' + serialize(self), task=task)
770        # TODO: deal with NAKs (reschedule?)
771        if isinstance(resp, int) and resp >= 0:
772            tx += resp
773        else:
774            logger.warning('Failed to run %s on %s: %s', self.job.id, self.node.ip_addr, resp)
775            raise Exception(str(resp))
776        return(tx)
777
778    def finish(self, status):
779        job = self.job
780        job.status = status
781        if status != DispyJob.ProvisionalResult:
782            job._args = ()
783            job._kwargs = {}
784            self.job._dispy_job_ = None
785            self.job = None
786        job.finish.set()
787
788
789class _JobReply(object):
790    """Internal use only.
791    """
792    def __init__(self, _job, ip_addr, status=None, keyfile=None, certfile=None):
793        self.uid = _job.uid
794        self.hash = _job.hash
795        self.ip_addr = ip_addr
796        self.status = status
797        self.result = None
798        self.stdout = None
799        self.stderr = None
800        self.exception = None
801        self.start_time = 0
802        self.end_time = 0
803
804
805class _Cluster(object, metaclass=Singleton):
806    """Internal use only.
807    """
808
809    def __init__(self, ip_addr=None, ext_ip_addr=None, port=None, node_port=None,
810                 ipv4_udp_multicast=False, shared=False, secret='', keyfile=None, certfile=None,
811                 recover_file=None):
812        if not hasattr(self, 'pycos'):
813            self.pycos = Pycos()
814            logger.info('dispy client version: %s', __version__)
815            self.ipv4_udp_multicast = bool(ipv4_udp_multicast)
816            self.addrinfos = {}
817            if isinstance(ip_addr, list):
818                ip_addrs = ip_addr
819            else:
820                ip_addrs = [ip_addr]
821            if isinstance(ext_ip_addr, list):
822                ext_ip_addrs = ext_ip_addr
823            else:
824                ext_ip_addrs = [ext_ip_addr]
825
826            for i in range(len(ip_addrs)):
827                ip_addr = ip_addrs[i]
828                if i < len(ext_ip_addrs):
829                    ext_ip_addr = ext_ip_addrs[i]
830                else:
831                    ext_ip_addr = None
832                addrinfo = host_addrinfo(host=ip_addr, ipv4_multicast=self.ipv4_udp_multicast)
833                if not addrinfo:
834                    logger.warning('Ignoring invalid ip_addr %s', ip_addr)
835                    continue
836                if ext_ip_addr:
837                    ext_ip_addr = _node_ipaddr(ext_ip_addr)
838                    if not ext_ip_addr:
839                        logger.warning('Ignoring invalid ext_ip_addr %s', ext_ip_addrs[i])
840                if not ext_ip_addr:
841                    ext_ip_addr = addrinfo.ip
842                addrinfo.ext_ip_addr = ext_ip_addr
843                self.addrinfos[addrinfo.ext_ip_addr] = addrinfo
844            if not self.addrinfos:
845                raise Exception('No valid IP address found')
846
847            if port:
848                port = int(port)
849            else:
850                if shared:
851                    port = 0
852                else:
853                    port = 51347
854            self.port = port
855            if node_port:
856                node_port = int(node_port)
857            else:
858                node_port = 51348
859            self.node_port = node_port
860            self._nodes = {}
861            self.secret = secret
862            self.keyfile = keyfile
863            self.certfile = certfile
864            self.shared = shared
865            self.pulse_interval = None
866            self.ping_interval = None
867            self.poll_interval = None
868            self.dest_path = os.getcwd()  # TODO: make it an option?
869
870            self._clusters = {}
871            self._sched_jobs = {}
872            self._sched_event = pycos.Event()
873            self._abandoned_jobs = {}
874            self.terminate = False
875            self.sign = hashlib.sha1(os.urandom(20))
876            for ext_ip_addr in self.addrinfos:
877                self.sign.update(ext_ip_addr.encode())
878            self.sign = self.sign.hexdigest()
879            self.auth = auth_code(self.secret, self.sign)
880
881            if isinstance(recover_file, str):
882                self.recover_file = recover_file
883            else:
884                now = datetime.datetime.now()
885                self.recover_file = '_dispy_%.4i%.2i%.2i%.2i%.2i%.2i' % \
886                                    (now.year, now.month, now.day,
887                                     now.hour, now.minute, now.second)
888            atexit.register(self.shutdown)
889            self.timer_task = Task(self.timer_proc)
890
891            try:
892                self.shelf = shelve.open(self.recover_file, flag='c', writeback=True)
893                self.shelf['_cluster'] = {'ip_addrs': ip_addrs, 'ext_ip_addrs': ext_ip_addrs,
894                                          'port': self.port, 'sign': self.sign,
895                                          'secret': self.secret, 'auth': self.auth,
896                                          'keyfile': self.keyfile, 'certfile': self.certfile}
897                self.shelf.sync()
898            except Exception:
899                raise Exception('Could not create fault recover file "%s"' %
900                                self.recover_file)
901            logger.info('Storing fault recovery information in "%s"', self.recover_file)
902
903            self.select_job_node = self.load_balance_schedule
904            self._scheduler = Task(self._schedule_jobs)
905            self.start_time = time.time()
906            self.compute_id = int(1000 * self.start_time)
907
908            self.worker_Q = queue.Queue()
909            self.worker_thread = threading.Thread(target=self.worker)
910            self.worker_thread.daemon = True
911            self.worker_thread.start()
912
913            if self.shared:
914                port_bound_event = None
915            else:
916                port_bound_event = pycos.Event()
917            self.tcp_tasks = []
918            self.udp_tasks = []
919            udp_addrinfos = {}
920            for addrinfo in self.addrinfos.values():
921                self.tcp_tasks.append(Task(self.tcp_server, addrinfo, port_bound_event))
922                if self.shared:
923                    continue
924                udp_addrinfos[addrinfo.bind_addr] = addrinfo
925
926            for bind_addr, addrinfo in udp_addrinfos.items():
927                self.udp_tasks.append(Task(self.udp_server, bind_addr, addrinfo, port_bound_event))
928
929            # Under Windows dispynode may send objects with
930            # '__mp_main__' scope, so make an alias to '__main__'.
931            # TODO: Make alias even if client is not Windows? It is
932            # possible the client is not Windows, but a node is.
933            if os.name == 'nt' and '__mp_main__' not in sys.modules:
934                sys.modules['__mp_main__'] = sys.modules['__main__']
935
936    def udp_server(self, bind_addr, addrinfo, port_bound_event, task=None):
937        # generator
938        task.set_daemon()
939        udp_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM))
940        # udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
941
942        while 1:
943            try:
944                udp_sock.bind((bind_addr, self.port))
945            except socket.error as exc:
946                if exc.errno == errno.EADDRINUSE:
947                    logger.warning('Port %s seems to be used by another program ...', self.port)
948                else:
949                    logger.warning('Error binding to port %s: %s ...', self.port, exc.errno)
950                yield task.sleep(5)
951            except Exception:
952                logger.warning('Could not bind to port %s: %s', self.port, traceback.format_exc())
953                yield task.sleep(5)
954            else:
955                break
956
957        if addrinfo.family == socket.AF_INET:
958            if self.ipv4_udp_multicast:
959                mreq = socket.inet_aton(addrinfo.broadcast) + socket.inet_aton(addrinfo.ip)
960                udp_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
961        else:  # addrinfo.family == socket.AF_INET6
962            mreq = socket.inet_pton(addrinfo.family, addrinfo.broadcast)
963            mreq += struct.pack('@I', addrinfo.ifn)
964            udp_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
965            try:
966                udp_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
967            except Exception:
968                pass
969        port_bound_event.set()
970        del port_bound_event
971        while 1:
972            try:
973                msg, addr = yield udp_sock.recvfrom(1000)
974            except GeneratorExit:
975                break
976            if msg.startswith(b'PING:'):
977                try:
978                    info = deserialize(msg[len(b'PING:'):])
979                    if info['version'] != _dispy_version:
980                        logger.warning('Ignoring %s due to version mismatch', addr[0])
981                        continue
982                    assert info['port'] > 0
983                    assert info['ip_addr']
984                    # socket.inet_aton(status['ip_addr'])
985                except Exception:
986                    # logger.debug(traceback.format_exc())
987                    logger.debug('Ignoring node %s', addr[0])
988                    continue
989                auth = auth_code(self.secret, info['sign'])
990                node = self._nodes.get(info['ip_addr'], None)
991                if node and node.auth == auth:
992                    continue
993                sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_STREAM),
994                                   keyfile=self.keyfile, certfile=self.certfile)
995                sock.settimeout(MsgTimeout)
996                msg = {'version': _dispy_version, 'port': self.port, 'sign': self.sign,
997                       'node_ip_addr': info['ip_addr']}
998                msg['ip_addrs'] = [ai.ext_ip_addr for ai in self.addrinfos.values()]
999                try:
1000                    yield sock.connect((info['ip_addr'], info['port']))
1001                    yield sock.sendall(auth)
1002                    yield sock.send_msg(b'PING:' + serialize(msg))
1003                except GeneratorExit:
1004                    break
1005                except Exception:
1006                    logger.debug(traceback.format_exc())
1007                finally:
1008                    sock.close()
1009            else:
1010                pass
1011        udp_sock.close()
1012
1013    def tcp_server(self, addrinfo, port_bound_event, task=None):
1014        # generator
1015        task.set_daemon()
1016        if port_bound_event:
1017            yield port_bound_event.wait()
1018        del port_bound_event
1019        sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_STREAM),
1020                           keyfile=self.keyfile, certfile=self.certfile)
1021        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1022        try:
1023            sock.bind((addrinfo.ip, self.port))
1024        except Exception:
1025            logger.warning('Could not bind TCP server to %s:%s', addrinfo.ip, self.port)
1026            return
1027        if not self.port:
1028            self.port = sock.getsockname()[1]
1029        logger.debug('dispy client at %s:%s', addrinfo.ip, self.port)
1030        sock.listen(128)
1031
1032        while 1:
1033            try:
1034                conn, addr = yield sock.accept()
1035            except ssl.SSLError as err:
1036                logger.debug('SSL connection failed: %s', str(err))
1037                continue
1038            except GeneratorExit:
1039                break
1040            except Exception:
1041                logger.debug(traceback.format_exc())
1042                continue
1043            Task(self.tcp_req, conn, addr)
1044        sock.close()
1045
1046    def tcp_req(self, conn, addr, task=None):
1047        # generator
1048        conn.settimeout(MsgTimeout)
1049        msg = yield conn.recv_msg()
1050        if msg.startswith(b'JOB_REPLY:'):
1051            try:
1052                info = deserialize(msg[len(b'JOB_REPLY:'):])
1053            except Exception:
1054                logger.warning('Invalid job reply from %s:%s ignored', addr[0], addr[1])
1055            else:
1056                yield self.job_reply_process(info, len(msg), conn, addr)
1057            conn.close()
1058
1059        elif msg.startswith(b'PULSE:'):
1060            msg = msg[len(b'PULSE:'):]
1061            try:
1062                info = deserialize(msg)
1063                node = self._nodes[info['ip_addr']]
1064                assert 0 <= info['cpus'] <= node.cpus
1065                node.last_pulse = time.time()
1066                yield conn.send_msg(b'PULSE')
1067                if info['avail_info']:
1068                    node.avail_info = info['avail_info']
1069                    for cluster in node.clusters:
1070                        if cluster.status_callback:
1071                            dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
1072                            if not dispy_node:
1073                                continue
1074                            dispy_node.avail_info = info['avail_info']
1075                            dispy_node.update_time = node.last_pulse
1076                            self.worker_Q.put((cluster.status_callback,
1077                                               (DispyNode.AvailInfo, dispy_node, None)))
1078            except Exception:
1079                logger.warning('Ignoring pulse message from %s', addr[0])
1080                # logger.debug(traceback.format_exc())
1081            conn.close()
1082
1083        elif msg.startswith(b'JOB_STATUS:'):
1084            conn.close()
1085            # message from dispyscheduler
1086            try:
1087                info = deserialize(msg[len(b'JOB_STATUS:'):])
1088                _job = self._sched_jobs[info['uid']]
1089                assert _job.hash == info['hash']
1090            except Exception:
1091                logger.warning('Invalid job status from %s:%s ignored', addr[0], addr[1])
1092            else:
1093                job = _job.job
1094                job.status = info['status']
1095                job.ip_addr = info['node']
1096                node = self._nodes.get(job.ip_addr, None)
1097                # TODO: if node is None, likely missed NODE_STATUS, so create it now?
1098                if node:
1099                    if job.status == DispyJob.Running:
1100                        job.start_time = info['start_time']
1101                        node.busy += 1
1102                    else:
1103                        logger.warning('Invalid job status for shared cluster: %s', job.status)
1104                    cluster = self._clusters.get(_job.compute_id, None)
1105                    if cluster:
1106                        dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
1107                        if dispy_node:
1108                            if job.status == DispyJob.Running:
1109                                dispy_node.busy += 1
1110                            dispy_node.update_time = time.time()
1111                            if cluster.status_callback:
1112                                self.worker_Q.put((cluster.status_callback,
1113                                                   (job.status, dispy_node, copy.copy(job))))
1114        elif msg.startswith(b'PONG:'):
1115            conn.close()
1116            try:
1117                info = deserialize(msg[len(b'PONG:'):])
1118                if info['version'] != _dispy_version:
1119                    logger.warning('Ignoring node %s due to version mismatch: %s != %s',
1120                                   info['ip_addr'], info['version'], _dispy_version)
1121                    return
1122                assert info['auth'] == self.auth
1123            except (AssertionError):
1124                logger.warning('Ignoring node %s ("secret" mismatch)', info['ip_addr'])
1125            except (Exception) as err:
1126                logger.warning('Ignoring node %s: %s', addr[0], err)
1127            else:
1128                self.add_node(info)
1129
1130        elif msg.startswith(b'PING:'):
1131            sock_family = conn.family
1132            conn.close()
1133            try:
1134                info = deserialize(msg[len(b'PING:'):])
1135                if info['version'] != _dispy_version:
1136                    logger.warning('Ignoring %s due to version mismatch', addr[0])
1137                    return
1138                assert info['port'] > 0
1139                assert info['ip_addr']
1140                # socket.inet_aton(status['ip_addr'])
1141            except Exception:
1142                # logger.debug(traceback.format_exc())
1143                logger.debug('Ignoring node %s', addr[0])
1144                return
1145            auth = auth_code(self.secret, info['sign'])
1146            node = self._nodes.get(info['ip_addr'], None)
1147            if node:
1148                if node.auth == auth:
1149                    return
1150            sock = AsyncSocket(socket.socket(sock_family, socket.SOCK_STREAM),
1151                               keyfile=self.keyfile, certfile=self.certfile)
1152            sock.settimeout(MsgTimeout)
1153            msg = {'version': _dispy_version, 'port': self.port, 'sign': self.sign,
1154                   'node_ip_addr': info['ip_addr']}
1155            msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()]
1156            try:
1157                yield sock.connect((info['ip_addr'], info['port']))
1158                yield sock.sendall(auth)
1159                yield sock.send_msg(b'PING:' + serialize(msg))
1160            except Exception:
1161                logger.debug(traceback.format_exc())
1162            finally:
1163                sock.close()
1164
1165        elif msg.startswith(b'FILEXFER:'):
1166            try:
1167                xf = deserialize(msg[len(b'FILEXFER:'):])
1168                msg = yield conn.recv_msg()
1169                job_reply = deserialize(msg)
1170            except Exception:
1171                logger.debug(traceback.format_exc())
1172            else:
1173                yield self.file_xfer_process(job_reply, xf, conn, addr)
1174            conn.close()
1175
1176        elif msg.startswith(b'NODE_CPUS:'):
1177            conn.close()
1178            try:
1179                info = deserialize(msg[len(b'NODE_CPUS:'):])
1180                node = self._nodes.get(info['ip_addr'], None)
1181                if not node:
1182                    return
1183                auth = auth_code(self.secret, info['sign'])
1184                if auth != node.auth:
1185                    logger.warning('Invalid signature from %s', node.ip_addr)
1186                    return
1187                cpus = info['cpus']
1188            except Exception:
1189                return
1190            if cpus < 0:
1191                logger.warning('Node requested using %s CPUs, disabling it',
1192                               node.ip_addr, cpus)
1193                cpus = 0
1194            logger.debug('Setting cpus for %s to %s', node.ip_addr, cpus)
1195            # TODO: set node.cpus to min(cpus, node.cpus)?
1196            node.cpus = cpus
1197            if cpus > node.avail_cpus:
1198                node.avail_cpus = cpus
1199                node_computations = []
1200                for cluster in self._clusters.values():
1201                    if cluster in node.clusters:
1202                        continue
1203                    compute = cluster._compute
1204                    for node_alloc in cluster._node_allocs:
1205                        cpus = node_alloc.allocate(cluster, node.ip_addr, node.name,
1206                                                   node.avail_cpus, avail_info=node.avail_info,
1207                                                   platform=node.platform)
1208                        if cpus <= 0:
1209                            continue
1210                        node.cpus = min(node.avail_cpus, cpus)
1211                        node_computations.append(compute)
1212                        break
1213                if node_computations:
1214                    Task(self.setup_node, node, node_computations)
1215                yield self._sched_event.set()
1216            else:
1217                node.avail_cpus = cpus
1218            for cluster in node.clusters:
1219                dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
1220                if dispy_node:
1221                    dispy_node.cpus = cpus
1222
1223        elif msg.startswith(b'TERMINATED:'):
1224            conn.close()
1225            try:
1226                info = deserialize(msg[len(b'TERMINATED:'):])
1227            except Exception:
1228                # logger.debug(traceback.format_exc())
1229                pass
1230            else:
1231                node = self._nodes.pop(info['ip_addr'], None)
1232                if not node:
1233                    return
1234                auth = auth_code(self.secret, info['sign'])
1235                if auth != node.auth:
1236                    logger.warning('Invalid signature from %s', node.ip_addr)
1237                    return
1238                logger.debug('Removing node %s', node.ip_addr)
1239                if node.clusters:
1240                    dead_jobs = [_job for _job in self._sched_jobs.values()
1241                                 if _job.node is not None and _job.node.ip_addr == node.ip_addr]
1242                    clusters = list(node.clusters)
1243                    node.clusters = set()
1244                    for cluster in clusters:
1245                        dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None)
1246                        if not dispy_node:
1247                            continue
1248                        dispy_node.avail_cpus = dispy_node.cpus = dispy_node.busy = 0
1249                        if cluster.status_callback:
1250                            self.worker_Q.put((cluster.status_callback,
1251                                               (DispyNode.Closed, dispy_node, None)))
1252                    self.reschedule_jobs(dead_jobs)
1253
1254        elif msg.startswith(b'NODE_STATUS:'):
1255            conn.close()
1256            # this message is from dispyscheduler for SharedJobCluster
1257            try:
1258                info = deserialize(msg[len(b'NODE_STATUS:'):])
1259                cluster = self._clusters[info['compute_id']]
1260                assert info['auth'] == cluster._compute.auth
1261            except Exception:
1262                logger.debug('Invalid node status from %s:%s ignored', addr[0], addr[1])
1263                # logger.debug(traceback.format_exc())
1264            else:
1265                if info['status'] == DispyNode.AvailInfo:
1266                    dispy_node = cluster._dispy_nodes.get(info['ip_addr'], None)
1267                    if dispy_node:
1268                        dispy_node.avail_info = info['avail_info']
1269                        dispy_node.tx = info['tx']
1270                        dispy_node.rx = info['rx']
1271                        if cluster.status_callback:
1272                            self.worker_Q.put((cluster.status_callback,
1273                                               (DispyNode.AvailInfo, dispy_node, None)))
1274                elif info['status'] == DispyNode.Initialized:
1275                    dispy_node = info['dispy_node']
1276                    dispy_node.update_time = time.time()
1277                    node = self._nodes.get(dispy_node.ip_addr, None)
1278                    if node:
1279                        node.name = dispy_node.name
1280                        node.cpus = dispy_node.cpus
1281                    else:
1282                        node = _Node(dispy_node.ip_addr, 0, dispy_node.cpus, '', '', platform='')
1283                        node.name = dispy_node.name
1284                        self._nodes[node.ip_addr] = node
1285                    cluster._dispy_nodes[dispy_node.ip_addr] = dispy_node
1286                    if cluster.status_callback:
1287                        self.worker_Q.put((cluster.status_callback,
1288                                           (DispyNode.Initialized, dispy_node, None)))
1289                elif info['status'] == DispyNode.Closed:
1290                    dispy_node = cluster._dispy_nodes.get(info['ip_addr'], None)
1291                    if dispy_node:
1292                        dispy_node.avail_cpus = dispy_node.cpus = 0
1293                        if cluster.status_callback:
1294                            self.worker_Q.put((cluster.status_callback,
1295                                               (DispyNode.Closed, dispy_node, None)))
1296                else:
1297                    logger.warning('Invalid node status %s from %s:%s ignored',
1298                                   info['status'], addr[0], addr[1])
1299
1300        elif msg.startswith(b'SCHEDULED:'):
1301            try:
1302                info = deserialize(msg[len(b'SCHEDULED:'):])
1303                assert self.shared
1304                cluster = self._clusters.get(info['compute_id'], None)
1305                assert info['pulse_interval'] is None or info['pulse_interval'] >= 1
1306                self.pulse_interval = info['pulse_interval']
1307                self.timer_task.resume(True)
1308                yield conn.send_msg(b'ACK')
1309                cluster._scheduled_event.set()
1310            except Exception:
1311                yield conn.send_msg(b'NAK')
1312            conn.close()
1313
1314        elif msg.startswith(b'RELAY_INFO:'):
1315            try:
1316                info = deserialize(msg[len(b'RELAY_INFO:'):])
1317                assert info['version'] == _dispy_version
1318                msg = {'sign': self.sign, 'ip_addrs': [info['scheduler_ip_addr']],
1319                       'port': self.port}
1320                if 'auth' in info and info['auth'] != self.auth:
1321                    msg = None
1322            except Exception:
1323                msg = None
1324            yield conn.send_msg(serialize(msg))
1325            conn.close()
1326
1327        else:
1328            logger.warning('Invalid message from %s:%s ignored', addr[0], addr[1])
1329            # logger.debug(traceback.format_exc())
1330            conn.close()
1331
1332    def timer_proc(self, task=None):
1333        task.set_daemon()
1334        reset = True
1335        last_pulse_time = last_ping_time = last_poll_time = time.time()
1336        timeout = None
1337        while 1:
1338            if reset:
1339                timeout = num_min(self.pulse_interval, self.ping_interval, self.poll_interval)
1340
1341            try:
1342                reset = yield task.suspend(timeout)
1343            except GeneratorExit:
1344                break
1345            if reset:
1346                continue
1347
1348            now = time.time()
1349            if self.pulse_interval and (now - last_pulse_time) >= self.pulse_interval:
1350                last_pulse_time = now
1351                if self.shared:
1352                    clusters = list(self._clusters.values())
1353                    for cluster in clusters:
1354                        msg = {'client_ip_addr': cluster._compute.scheduler_ip_addr,
1355                               'client_port': cluster._compute.job_result_port}
1356                        sock = socket.socket(cluster.addrinfo.family, socket.SOCK_STREAM)
1357                        sock = AsyncSocket(sock, keyfile=self.keyfile, certfile=self.certfile)
1358                        sock.settimeout(MsgTimeout)
1359                        try:
1360                            yield sock.connect((cluster.scheduler_ip_addr, cluster.scheduler_port))
1361                            yield sock.sendall(cluster._scheduler_auth)
1362                            yield sock.send_msg(b'PULSE:' + serialize(msg))
1363                        except Exception:
1364                            pass
1365                        sock.close()
1366                else:
1367                    dead_nodes = {}
1368                    for node in self._nodes.values():
1369                        if node.busy and node.last_pulse is not None and \
1370                           (node.last_pulse + (5 * self.pulse_interval)) <= now:
1371                            logger.warning('Node %s is not responding; removing it (%s, %s, %s)',
1372                                           node.ip_addr, node.busy, node.last_pulse, now)
1373                            dead_nodes[node.ip_addr] = node
1374                    if dead_nodes:
1375                        for node in dead_nodes.values():
1376                            clusters = list(node.clusters)
1377                            node.clusters = set()
1378                            for cluster in clusters:
1379                                dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None)
1380                                if not dispy_node:
1381                                    continue
1382                                dispy_node.avail_cpus = dispy_node.cpus = dispy_node.busy = 0
1383                                if cluster.status_callback:
1384                                    self.worker_Q.put((cluster.status_callback,
1385                                                       (DispyNode.Closed, dispy_node, None)))
1386                            del self._nodes[node.ip_addr]
1387                        dead_jobs = [_job for _job in self._sched_jobs.values()
1388                                     if _job.node is not None and _job.node.ip_addr in dead_nodes]
1389                        self.reschedule_jobs(dead_jobs)
1390
1391            if self.ping_interval and (now - last_ping_time) >= self.ping_interval:
1392                last_ping_time = now
1393                for cluster in self._clusters.values():
1394                    Task(self.discover_nodes, cluster, cluster._node_allocs)
1395
1396            if self.poll_interval and (now - last_poll_time) >= self.poll_interval:
1397                last_poll_time = now
1398                for cluster in self._clusters.values():
1399                    Task(self.poll_job_results, cluster)
1400
1401    def file_xfer_process(self, job_reply, xf, sock, addr):
1402        _job = self._sched_jobs.get(job_reply.uid, None)
1403        if _job is None or _job.hash != job_reply.hash:
1404            logger.warning('Ignoring invalid file transfer from job %s at %s',
1405                           job_reply.uid, addr[0])
1406            yield sock.send_msg(serialize(-1))
1407            return
1408        node = self._nodes.get(job_reply.ip_addr, None)
1409        if node:
1410            node.last_pulse = time.time()
1411        tgt = os.path.join(self.dest_path, xf.dest_path.replace(xf.sep, os.sep),
1412                           xf.name.split(xf.sep)[-1])
1413        if not os.path.isdir(os.path.dirname(tgt)):
1414            os.makedirs(os.path.dirname(tgt))
1415        with open(tgt, 'wb') as fd:
1416            recvd = 0
1417            while recvd < xf.stat_buf.st_size:
1418                yield sock.send_msg(serialize(recvd))
1419                data = yield sock.recvall(min(xf.stat_buf.st_size-recvd, 1024000))
1420                if not data:
1421                    break
1422                fd.write(data)
1423                recvd += len(data)
1424            yield sock.send_msg(serialize(recvd))
1425        if node:
1426            node.rx += recvd
1427        cluster = self._clusters.get(_job.compute_id, None)
1428        if cluster:
1429            dispy_node = cluster._dispy_nodes.get(job_reply.ip_addr, None)
1430            if dispy_node:
1431                dispy_node.rx += recvd
1432        if recvd != xf.stat_buf.st_size:
1433            logger.warning('Transfer of file "%s" failed', tgt)
1434            # TODO: remove file?
1435        os.utime(tgt, (xf.stat_buf.st_atime, xf.stat_buf.st_mtime))
1436        os.chmod(tgt, stat.S_IMODE(xf.stat_buf.st_mode))
1437
1438    def send_ping_node(self, ip_addr, port=None, task=None):
1439        ping_msg = {'version': _dispy_version, 'sign': self.sign, 'port': self.port,
1440                    'node_ip_addr': ip_addr}
1441        ping_msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()]
1442        if not port:
1443            port = self.node_port
1444        if re.match('\d+\.', ip_addr):
1445            sock_family = socket.AF_INET
1446        else:
1447            sock_family = socket.AF_INET6
1448        tcp_sock = AsyncSocket(socket.socket(sock_family, socket.SOCK_STREAM),
1449                               keyfile=self.keyfile, certfile=self.certfile)
1450        tcp_sock.settimeout(MsgTimeout)
1451        try:
1452            yield tcp_sock.connect((ip_addr, port))
1453            yield tcp_sock.sendall(b'x' * len(self.auth))
1454            yield tcp_sock.send_msg(b'PING:' + serialize(ping_msg))
1455        except Exception:
1456            pass
1457        tcp_sock.close()
1458
1459    def broadcast_ping(self, addrinfos=[], port=None, task=None):
1460        # generator
1461        if not port:
1462            port = self.node_port
1463        ping_msg = {'version': _dispy_version, 'sign': self.sign, 'port': self.port}
1464        ping_msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()]
1465        if not addrinfos:
1466            addrinfos = self.addrinfos.values()
1467        for addrinfo in addrinfos:
1468            bc_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM))
1469            bc_sock.settimeout(MsgTimeout)
1470            ttl_bin = struct.pack('@i', 1)
1471            if addrinfo.family == socket.AF_INET:
1472                if self.ipv4_udp_multicast:
1473                    bc_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin)
1474                else:
1475                    bc_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1476            else:  # addrinfo.family == socket.AF_INET6
1477                bc_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_bin)
1478                bc_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn)
1479            bc_sock.bind((addrinfo.ip, 0))
1480            try:
1481                yield bc_sock.sendto(b'PING:' + serialize(ping_msg), (addrinfo.broadcast, port))
1482            except Exception:
1483                pass
1484            bc_sock.close()
1485
1486    def discover_nodes(self, cluster, node_allocs, task=None):
1487        for node_alloc in node_allocs:
1488            # TODO: we assume subnets are indicated by '*', instead of
1489            # subnet mask; this is a limitation, but specifying with
1490            # subnet mask a bit cumbersome.
1491            if node_alloc.ip_rex.find('*') >= 0:
1492                Task(self.broadcast_ping, port=node_alloc.port)
1493            else:
1494                ip_addr = node_alloc.ip_addr
1495                if ip_addr in self._nodes:
1496                    continue
1497                port = node_alloc.port
1498                Task(self.send_ping_node, ip_addr, port)
1499        yield 0
1500
1501    def poll_job_results(self, cluster, task=None):
1502        # generator
1503        for ip_addr in cluster._dispy_nodes:
1504            node = self._nodes.get(ip_addr, None)
1505            if not node or not node.port:
1506                continue
1507            sock = AsyncSocket(socket.socket(node.sock_family, socket.SOCK_STREAM),
1508                               keyfile=self.keyfile, certfile=self.certfile)
1509            sock.settimeout(MsgTimeout)
1510            try:
1511                req = {'compute_id': cluster._compute.id, 'auth': cluster._compute.auth}
1512                reply = yield node.send(b'PENDING_JOBS:' + serialize(req))
1513                reply = deserialize(reply)
1514            except Exception:
1515                logger.debug(traceback.format_exc())
1516                continue
1517            finally:
1518                sock.close()
1519
1520            for uid in reply['done']:
1521                _job = self._sched_jobs.get(uid, None)
1522                if _job is None:
1523                    continue
1524                conn = AsyncSocket(socket.socket(node.sock_family, socket.SOCK_STREAM),
1525                                   keyfile=self.keyfile, certfile=self.certfile)
1526                conn.settimeout(MsgTimeout)
1527                try:
1528                    yield conn.connect((node.ip_addr, node.port))
1529                    req = {'compute_id': cluster._compute.id, 'auth': cluster._compute.auth,
1530                           'uid': uid, 'hash': _job.hash}
1531                    yield conn.sendall(node.auth)
1532                    yield conn.send_msg(b'RETRIEVE_JOB:' + serialize(req))
1533                    msg = yield conn.recv_msg()
1534                    reply = deserialize(msg)
1535                except Exception:
1536                    logger.debug(traceback.format_exc())
1537                    continue
1538                else:
1539                    if isinstance(reply, _JobReply):
1540                        yield self.job_reply_process(reply, len(msg), conn,
1541                                                     (node.ip_addr, node.port))
1542                    else:
1543                        logger.debug('Invalid reply for %s', uid)
1544                finally:
1545                    conn.close()
1546
1547    def add_cluster(self, cluster, task=None):
1548        compute = cluster._compute
1549        if self.shared:
1550            self._clusters[compute.id] = cluster
1551            for xf in compute.xfer_files:
1552                xf.compute_id = compute.id
1553
1554            node = _Node(cluster.scheduler_ip_addr, cluster.scheduler_port, 0, '', '',
1555                         platform='', keyfile=self.keyfile, certfile=self.certfile)
1556            node.auth = cluster._scheduler_auth
1557            self._nodes[cluster.scheduler_ip_addr] = node
1558            dispy_node = DispyNode(cluster.scheduler_ip_addr, None, 0)
1559            dispy_node.avail_info = node.avail_info
1560            cluster._dispy_nodes[dispy_node.ip_addr] = dispy_node
1561            info = self.shelf['_cluster']
1562            info['port'] = self.port
1563            self.shelf['_cluster'] = info
1564            info = {'name': compute.name, 'auth': compute.auth,
1565                    'nodes': [cluster.scheduler_ip_addr]}
1566            self.shelf['compute_%s' % compute.id] = info
1567            info = {'port': cluster.scheduler_port, 'auth': cluster._scheduler_auth,
1568                    'scheduler': True}
1569            self.shelf['node_%s' % (cluster.scheduler_ip_addr)] = info
1570            self.shelf.sync()
1571            if cluster.poll_interval:
1572                self.poll_interval = num_min(self.poll_interval, cluster.poll_interval)
1573            if self.poll_interval:
1574                self.timer_task.resume(True)
1575            return
1576
1577        # if a node is added with 'allocate_node', compute is already
1578        # initialized, so don't reinitialize it
1579        if compute.id is None:
1580            compute.id = self.compute_id
1581            self.compute_id += 1
1582            self._clusters[compute.id] = cluster
1583            for xf in compute.xfer_files:
1584                xf.compute_id = compute.id
1585            info = {'name': compute.name, 'auth': compute.auth, 'nodes': []}
1586            self.shelf['compute_%s' % compute.id] = info
1587            self.shelf.sync()
1588
1589            if compute.pulse_interval:
1590                self.pulse_interval = num_min(self.pulse_interval, compute.pulse_interval)
1591            if cluster.ping_interval:
1592                self.ping_interval = num_min(self.ping_interval, cluster.ping_interval)
1593            if cluster.poll_interval:
1594                self.poll_interval = num_min(self.poll_interval, cluster.poll_interval)
1595            if self.pulse_interval or self.ping_interval or self.poll_interval:
1596                self.timer_task.resume(True)
1597
1598        Task(self.discover_nodes, cluster, cluster._node_allocs)
1599        compute_nodes = []
1600        for ip_addr, node in self._nodes.items():
1601            if cluster in node.clusters:
1602                continue
1603            for node_alloc in cluster._node_allocs:
1604                cpus = node_alloc.allocate(cluster, node.ip_addr, node.name, node.avail_cpus,
1605                                           avail_info=node.avail_info, platform=node.platform)
1606                if cpus <= 0:
1607                    continue
1608                node.cpus = min(node.avail_cpus, cpus)
1609                compute_nodes.append(node)
1610        for node in compute_nodes:
1611            Task(self.setup_node, node, [compute])
1612        yield None
1613
1614    def del_cluster(self, cluster, task=None):
1615        # generator
1616        if self._clusters.pop(cluster._compute.id, None) != cluster:
1617            logger.warning('Cluster %s already closed?', cluster._compute.name)
1618            return
1619
1620        if self.shared:
1621            sock = socket.socket(cluster.addrinfo.family, socket.SOCK_STREAM)
1622            sock = AsyncSocket(sock, keyfile=self.keyfile, certfile=self.certfile)
1623            sock.settimeout(MsgTimeout)
1624            yield sock.connect((cluster.scheduler_ip_addr, cluster.scheduler_port))
1625            yield sock.sendall(cluster._scheduler_auth)
1626            req = {'compute_id': cluster._compute.id, 'auth': cluster._compute.auth,
1627                   'terminate_pending': cluster._complete.is_set()}
1628            yield sock.send_msg(b'CLOSE:' + serialize(req))
1629            sock.close()
1630        else:
1631            cid = cluster._compute.id
1632            cluster._jobs = []
1633            cluster._pending_jobs = 0
1634            # remove cluster from all nodes before closing (which uses
1635            # yield); otherwise, scheduler may access removed cluster
1636            # through node.clusters
1637            close_nodes = []
1638            for dispy_node in cluster._dispy_nodes.values():
1639                node = self._nodes.get(dispy_node.ip_addr, None)
1640                if not node:
1641                    continue
1642                if not cluster._complete.is_set():
1643                    node.pending_jobs = [_job for _job in node.pending_jobs
1644                                         if _job.compute_id != cid]
1645                node.clusters.discard(cluster)
1646                close_nodes.append((Task(node.close, cluster._compute,
1647                                         terminate_pending=cluster._complete.is_set()),
1648                                    dispy_node))
1649            cluster._dispy_nodes.clear()
1650            for close_task, dispy_node in close_nodes:
1651                yield close_task.finish()
1652                dispy_node.update_time = time.time()
1653                if cluster.status_callback:
1654                    self.worker_Q.put((cluster.status_callback,
1655                                       (DispyNode.Closed, dispy_node, None)))
1656        self.shelf.pop('compute_%s' % (cluster._compute.id), None)
1657        # TODO: prune nodes in shelf
1658        self.shelf.sync()
1659
1660    def setup_node(self, node, computations, task=None):
1661        # generator
1662        task.set_daemon()
1663        for compute in computations:
1664            # NB: to avoid computation being sent multiple times, we
1665            # add to cluster's _dispy_nodes before sending computation
1666            # to node
1667            cluster = self._clusters.get(compute.id, None)
1668            if not cluster or node.ip_addr in cluster._dispy_nodes:
1669                continue
1670            dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
1671            if not dispy_node:
1672                dispy_node = DispyNode(node.ip_addr, node.name, node.cpus)
1673                cluster._dispy_nodes[node.ip_addr] = dispy_node
1674                dispy_node.tx = node.tx
1675                dispy_node.rx = node.rx
1676            dispy_node.avail_cpus = node.avail_cpus
1677            dispy_node.avail_info = node.avail_info
1678            self.shelf['node_%s' % (node.ip_addr)] = {'port': node.port, 'auth': node.auth}
1679            shelf_compute = self.shelf['compute_%s' % compute.id]
1680            shelf_compute['nodes'].append(node.ip_addr)
1681            self.shelf['compute_%s' % compute.id] = shelf_compute
1682            self.shelf.sync()
1683            res = yield node.setup(compute, exclusive=True, task=task)
1684            if res or compute.id not in self._clusters:
1685                cluster._dispy_nodes.pop(node.ip_addr, None)
1686                logger.warning('Failed to setup %s for compute "%s": %s',
1687                               node.ip_addr, compute.name, res)
1688                # TODO: delete node from shelf's cluster._dispy_nodes
1689                del self.shelf['node_%s' % (node.ip_addr)]
1690                self.shelf.sync()
1691                yield node.close(compute, task=task)
1692            else:
1693                dispy_node.update_time = time.time()
1694                node.clusters.add(cluster)
1695                self._sched_event.set()
1696                if cluster.status_callback:
1697                    self.worker_Q.put((cluster.status_callback,
1698                                       (DispyNode.Initialized, dispy_node, None)))
1699
1700    def add_node(self, info):
1701        try:
1702            # assert info['version'] == _dispy_version
1703            assert info['port'] > 0 and info['cpus'] > 0
1704            # TODO: check if it is one of ext_ip_addr?
1705        except Exception:
1706            # logger.debug(traceback.format_exc())
1707            return
1708        node = self._nodes.get(info['ip_addr'], None)
1709        if node is None:
1710            logger.debug('Discovered %s:%s (%s) with %s cpus',
1711                         info['ip_addr'], info['port'], info['name'], info['cpus'])
1712            node = _Node(info['ip_addr'], info['port'], info['cpus'], info['sign'],
1713                         self.secret, platform=info['platform'],
1714                         keyfile=self.keyfile, certfile=self.certfile)
1715            node.name = info['name']
1716            node.avail_info = info['avail_info']
1717            self._nodes[node.ip_addr] = node
1718        else:
1719            node.last_pulse = time.time()
1720            auth = auth_code(self.secret, info['sign'])
1721            if info['cpus'] > 0:
1722                node.avail_cpus = info['cpus']
1723                node.cpus = min(node.cpus, node.avail_cpus)
1724                for cluster in node.clusters:
1725                    dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
1726                    if dispy_node:
1727                        dispy_node.avail_cpus = node.avail_cpus
1728                        dispy_node.cpus = node.cpus
1729            else:
1730                logger.warning('Invalid "cpus" %s from %s ignored', info['cpus'], info['ip_addr'])
1731            if node.port == info['port'] and node.auth == auth:
1732                return
1733            logger.debug('Node %s rediscovered', info['ip_addr'])
1734            node.port = info['port']
1735            if node.auth is not None:
1736                dead_jobs = [_job for _job in self._sched_jobs.values()
1737                             if _job.node is not None and _job.node.ip_addr == node.ip_addr]
1738                self.reschedule_jobs(dead_jobs)
1739                node.busy = 0
1740                node.auth = auth
1741                clusters = list(node.clusters)
1742                node.clusters = set()
1743                for cluster in clusters:
1744                    dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None)
1745                    if dispy_node and cluster.status_callback:
1746                        self.worker_Q.put((cluster.status_callback,
1747                                           (DispyNode.Closed, dispy_node, None)))
1748            node.auth = auth
1749        node_computations = []
1750        node.name = info['name']
1751        node.scheduler_ip_addr = info['scheduler_ip_addr']
1752        for cluster in self._clusters.values():
1753            if cluster in node.clusters:
1754                continue
1755            compute = cluster._compute
1756            for node_alloc in cluster._node_allocs:
1757                cpus = node_alloc.allocate(cluster, node.ip_addr, node.name, node.avail_cpus,
1758                                           avail_info=node.avail_info, platform=node.platform)
1759                if cpus <= 0:
1760                    continue
1761                node.cpus = min(node.avail_cpus, cpus)
1762                node_computations.append(compute)
1763                break
1764        if node_computations:
1765            Task(self.setup_node, node, node_computations)
1766
1767    def worker(self):
1768        # used for user callbacks only
1769        while 1:
1770            item = self.worker_Q.get(block=True)
1771            if item is None:
1772                self.worker_Q.task_done()
1773                break
1774            func, args = item
1775            try:
1776                func(*args)
1777            except Exception:
1778                if isinstance(func, types.FunctionType):
1779                    name = func.__name__
1780                elif isinstance(getattr(func, 'func', None), types.FunctionType):
1781                    name = func.func.__name__
1782                else:
1783                    name = ''
1784                logger.warning('Callback %s failed: %s', name, traceback.format_exc())
1785            self.worker_Q.task_done()
1786
1787    def finish_job(self, cluster, _job, status):
1788        # assert status in (DispyJob.Finished, DispyJob.Terminated, DispyJob.Abandoned)
1789        job = _job.job
1790        _job.finish(status)
1791        if cluster.callback:
1792            self.worker_Q.put((cluster.callback, (copy.copy(job),)))
1793        if status != DispyJob.ProvisionalResult:
1794            # assert cluster._pending_jobs > 0
1795            cluster._pending_jobs -= 1
1796            if cluster._pending_jobs == 0:
1797                cluster.end_time = time.time()
1798                cluster._complete.set()
1799
1800    def job_reply_process(self, reply, msg_len, sock, addr):
1801        _job = self._sched_jobs.pop(reply.uid, None)
1802        if _job:
1803            if reply.hash != _job.hash:
1804                self._sched_jobs[reply.uid] = _job
1805                logger.warning('Ignoring invalid reply for job %s from %s', reply.uid, addr[0])
1806                yield sock.send_msg('NAK'.encode())
1807                return
1808        else:
1809            _job = self._abandoned_jobs.pop(reply.uid, None)
1810            if _job:
1811                if reply.hash != _job.hash:
1812                    self._abandoned_jobs[reply.uid] = _job
1813                    logger.warning('Ignoring invalid reply for job %s from %s', reply.uid, addr[0])
1814                    yield sock.send_msg('NAK'.encode())
1815                    return
1816            else:
1817                logger.warning('Ignoring invalid reply for job %s from %s', reply.uid, addr[0])
1818                yield sock.send_msg('NAK'.encode())
1819                return
1820
1821        job = _job.job
1822        job.ip_addr = reply.ip_addr
1823        node = self._nodes.get(reply.ip_addr, None)
1824        cluster = self._clusters.get(_job.compute_id, None)
1825        if not cluster:
1826            # job cancelled while/after closing computation
1827            if node and node.busy > 0:
1828                node.busy -= 1
1829                node.cpu_time += reply.end_time - reply.start_time
1830                node.last_pulse = time.time()
1831                self._sched_event.set()
1832            yield sock.send_msg(b'ACK')
1833            return
1834        if node:
1835            node.last_pulse = time.time()
1836        else:
1837            if self.shared:
1838                node = _Node(reply.ip_addr, 0, getattr(reply, 'cpus', 0), '', self.secret,
1839                             platform='', keyfile=None, certfile=None)
1840                self._nodes[reply.ip_addr] = node
1841                dispy_node = DispyNode(node.ip_addr, node.name, node.cpus)
1842                dispy_node.update_time = time.time()
1843                cluster._dispy_nodes[reply.ip_addr] = dispy_node
1844                if cluster.status_callback:
1845                    self.worker_Q.put((cluster.status_callback,
1846                                       (DispyNode.Initialized, dispy_node, None)))
1847            elif job.status == DispyJob.Abandoned:
1848                pass
1849            else:
1850                logger.warning('Invalid job reply? %s: %s', job.id, job.status)
1851
1852        job.result, reply.result = deserialize(reply.result), None
1853        job.start_time = reply.start_time
1854        job.status = reply.status
1855        logger.debug('Received reply for job %s / %s from %s', job.id, _job.uid, job.ip_addr)
1856        if node:
1857            node.rx += msg_len
1858        dispy_node = cluster._dispy_nodes.get(job.ip_addr, None)
1859        if dispy_node:
1860            dispy_node.rx += msg_len
1861        if reply.status == DispyJob.ProvisionalResult:
1862            self._sched_jobs[_job.uid] = _job
1863            if cluster.callback:
1864                self.worker_Q.put((cluster.callback, (copy.copy(job),)))
1865        else:
1866            if node and dispy_node:
1867                if reply.status == DispyJob.Finished or reply.status == DispyJob.Terminated:
1868                    node.busy -= 1
1869                    node.cpu_time += reply.end_time - reply.start_time
1870                    dispy_node.busy -= 1
1871                    dispy_node.cpu_time += reply.end_time - reply.start_time
1872                    dispy_node.jobs_done += 1
1873                    dispy_node.update_time = time.time()
1874                elif reply.status == DispyJob.Cancelled:
1875                    assert self.shared is True
1876                else:
1877                    logger.warning('Invalid reply status: %s for job %s', reply.status, job.id)
1878            elif job.status == DispyJob.Abandoned:
1879                pass
1880            else:
1881                logger.warning('Invalid job reply (status)? %s: %s', job.id, job.status)
1882
1883            job.stdout = reply.stdout
1884            job.stderr = reply.stderr
1885            job.exception = reply.exception
1886            job.end_time = reply.end_time
1887            self.finish_job(cluster, _job, reply.status)
1888            if cluster.status_callback:
1889                self.worker_Q.put((cluster.status_callback, (reply.status, dispy_node,
1890                                                             copy.copy(job))))
1891            self._sched_event.set()
1892        yield sock.send_msg(b'ACK')
1893
1894    def reschedule_jobs(self, dead_jobs):
1895        if not dead_jobs:
1896            return
1897        for _job in dead_jobs:
1898            cluster = self._clusters[_job.compute_id]
1899            del self._sched_jobs[_job.uid]
1900            dispy_node = cluster._dispy_nodes.get(_job.node.ip_addr, None)
1901            if dispy_node:
1902                dispy_node.cpus = 0
1903                dispy_node.busy = 0
1904                dispy_node.update_time = time.time()
1905            if cluster._compute.reentrant and not _job.pinned:
1906                logger.debug('Rescheduling job %s from %s', _job.uid, _job.node.ip_addr)
1907                dispy_job = _job.job
1908                dispy_job.status = DispyJob.Created
1909                dispy_job.ip_addr = None
1910                _job.node = None
1911                # TODO: call 'status_callback'?
1912                # _job.hash = ''.join(hex(x)[2:] for x in os.urandom(10))
1913                cluster._jobs.append(_job)
1914            else:
1915                dispy_job = _job.job
1916                logger.debug('Job %s scheduled on %s abandoned', dispy_job.id, _job.node.ip_addr)
1917                dispy_job.status = DispyJob.Abandoned
1918                self._abandoned_jobs[_job.uid] = _job
1919                cluster._pending_jobs -= 1
1920                if cluster._pending_jobs == 0:
1921                    cluster.end_time = time.time()
1922                    cluster._complete.set()
1923
1924            if cluster.status_callback:
1925                self.worker_Q.put((cluster.status_callback,
1926                                   (DispyJob.Abandoned, dispy_node, copy.copy(dispy_job))))
1927        self._sched_event.set()
1928
1929    def run_job(self, _job, cluster, task=None):
1930        # generator
1931        node = _job.node
1932        dispy_node = cluster._dispy_nodes[node.ip_addr]
1933        try:
1934            tx = yield _job.run(task=task)
1935            dispy_node.tx += tx
1936        except (EnvironmentError, OSError):
1937            logger.warning('Failed to run job %s on %s for computation %s; removing this node',
1938                           _job.uid, node.ip_addr, cluster._compute.name)
1939            logger.debug(traceback.format_exc())
1940            if node.pending_jobs:
1941                # TODO: instead of discarding pending jobs, maintain them
1942                # elsewhere, while cluster is alive?
1943                for njob in node.pending_jobs:
1944                    cl = self._clusters[njob.compute_id]
1945                    self.finish_job(cl, njob, DispyJob.Cancelled)
1946                    if cl.status_callback:
1947                        dn = cl._dispy_nodes.get(node.ip_addr, None)
1948                        self.worker_Q.put((cl.status_callback,
1949                                           (DispyJob.Cancelled, dn, njob.job)))
1950                node.pending_jobs = []
1951            # TODO: need to close computations on this node?
1952            for cl in node.clusters:
1953                dn = cl._dispy_nodes.pop(node.ip_addr, None)
1954                if dn and cl.status_callback:
1955                    self.worker_Q.put((cl.status_callback, (DispyNode.Closed, dn, None)))
1956            node.clusters.clear()
1957            self._nodes.pop(node.ip_addr, None)
1958            if self._sched_jobs.pop(_job.uid, None) == _job:
1959                if not _job.pinned:
1960                    cluster._jobs.insert(0, _job)
1961                node.busy -= 1
1962            self._sched_event.set()
1963        except Exception:
1964            logger.warning('Failed to run job %s on %s for computation %s',
1965                           _job.uid, node.ip_addr, cluster._compute.name)
1966            logger.debug(traceback.format_exc())
1967            # TODO: delay executing again for some time?
1968            # this job might have been deleted already due to timeout
1969            if self._sched_jobs.pop(_job.uid, None) == _job:
1970                dispy_job = _job.job
1971                self.finish_job(cluster, _job, DispyJob.Cancelled)
1972                if cluster.status_callback and dispy_node:
1973                    dispy_node.update_time = time.time()
1974                    self.worker_Q.put((cluster.status_callback,
1975                                       (DispyJob.Cancelled, dispy_node, dispy_job)))
1976                node.busy -= 1
1977            self._sched_event.set()
1978        else:
1979            # job may have already finished (in which case _job.job would be None)
1980            if _job.job:
1981                _job.job.ip_addr = node.ip_addr
1982                logger.debug('Running job %s / %s on %s (busy: %d / %d)',
1983                             _job.job.id, _job.uid, node.ip_addr, node.busy, node.cpus)
1984                _job.job.status = DispyJob.Running
1985                _job.job.start_time = time.time()
1986                dispy_node.busy += 1
1987                dispy_node.update_time = time.time()
1988                if cluster.status_callback:
1989                    self.worker_Q.put((cluster.status_callback,
1990                                       (DispyJob.Running, dispy_node, copy.copy(_job.job))))
1991        if (not cluster._compute.reentrant) and (not cluster.status_callback) and _job.job:
1992            _job.job._args = ()
1993            _job.job._kwargs = {}
1994
1995    def wait(self, cluster, timeout):
1996        ret = cluster._complete.wait(timeout=timeout)
1997        if ret or not self._abandoned_jobs:
1998            return ret
1999        cid = cluster._compute.id
2000        for _job in self._abandoned_jobs.values():
2001            if _job.compute_id == cid:
2002                _job.finish(DispyJob.Abandoned)
2003        self._abandoned_jobs = {uid: _job for uid, _job in self._abandoned_jobs.items()
2004                                if _job.compute_id != cid}
2005        return 0
2006
2007    def load_balance_schedule(self):
2008        host = None
2009        load = 1.0
2010        for node in self._nodes.values():
2011            if node.busy >= node.cpus:
2012                continue
2013            if node.pending_jobs:
2014                host = node
2015                break
2016            if not any(cluster._jobs for cluster in node.clusters):
2017                continue
2018            if (node.busy / node.cpus) < load:
2019                load = node.busy / node.cpus
2020                host = node
2021        return host
2022
2023    def _schedule_jobs(self, task=None):
2024        # generator
2025        while not self.terminate:
2026            # n = sum(len(cluster._jobs) for cluster in self._clusters.values())
2027            node = self.select_job_node()
2028            if not node:
2029                self._sched_event.clear()
2030                yield self._sched_event.wait()
2031                continue
2032            if node.pending_jobs:
2033                _job = node.pending_jobs.pop(0)
2034                cluster = self._clusters[_job.compute_id]
2035            else:
2036                # TODO: strategy to pick a cluster?
2037                for cluster in node.clusters:
2038                    # assert node.ip_addr in cluster._dispy_nodes
2039                    if cluster._jobs:
2040                        _job = cluster._jobs.pop(0)
2041                        break
2042                else:
2043                    self._sched_event.clear()
2044                    yield self._sched_event.wait()
2045                    continue
2046            _job.node = node
2047            # assert node.busy < node.cpus
2048            self._sched_jobs[_job.uid] = _job
2049            node.busy += 1
2050            Task(self.run_job, _job, cluster)
2051
2052        logger.debug('Scheduler quitting: %s', len(self._sched_jobs))
2053        self._sched_jobs = {}
2054        for udp_task in self.udp_tasks:
2055            udp_task.terminate()
2056        for cid in list(self._clusters.keys()):
2057            cluster = self._clusters[cid]
2058            if not hasattr(cluster, '_compute'):
2059                # cluster is closed
2060                continue
2061            for _job in cluster._jobs:
2062                if _job.job.status == DispyJob.Running:
2063                    status = DispyJob.Terminated
2064                else:
2065                    status = DispyJob.Cancelled
2066                dispy_job = _job.job
2067                self.finish_job(cluster, _job, status)
2068                if cluster.status_callback:
2069                    dispy_node = cluster._dispy_nodes.get(_job.node.ip_addr, None)
2070                    if dispy_node:
2071                        dispy_node.update_time = time.time()
2072                        self.worker_Q.put((cluster.status_callback,
2073                                           (status, dispy_node, copy.copy(dispy_job))))
2074            for dispy_node in cluster._dispy_nodes.values():
2075                node = self._nodes.get(dispy_node.ip_addr, None)
2076                if not node:
2077                    continue
2078                for _job in node.pending_jobs:
2079                    # TODO: delete only jobs for this cluster?
2080                    if _job.job.status == DispyJob.Running:
2081                        status = DispyJob.Terminated
2082                    else:
2083                        status = DispyJob.Cancelled
2084                    dispy_job = _job.job
2085                    self.finish_job(cluster, _job, status)
2086                    if cluster.status_callback:
2087                        dispy_node.update_time = time.time()
2088                        self.worker_Q.put((cluster.status_callback,
2089                                           (status, dispy_node, copy.copy(dispy_job))))
2090                node.pending_jobs = []
2091            cluster._jobs = []
2092            cluster._pending_jobs = 0
2093            yield self.del_cluster(cluster, task=task)
2094        self._clusters = {}
2095        self._nodes = {}
2096        logger.debug('Scheduler quit')
2097
2098    def submit_job(self, _job, ip_addr=None, task=None):
2099        # generator
2100        _job.uid = id(_job)
2101        cluster = self._clusters[_job.compute_id]
2102        if ip_addr:
2103            node = self._nodes.get(ip_addr, None)
2104            if not node or cluster not in node.clusters:
2105                return(-1)
2106            node.pending_jobs.append(_job)
2107            _job.pinned = node
2108        else:
2109            cluster._jobs.append(_job)
2110        cluster._pending_jobs += 1
2111        cluster._complete.clear()
2112        if cluster.status_callback:
2113            self.worker_Q.put((cluster.status_callback, (DispyJob.Created, None,
2114                                                         copy.copy(_job.job))))
2115        self._sched_event.set()
2116        yield 0
2117
2118    def cancel_job(self, job, task=None):
2119        # generator
2120        _job = job._dispy_job_
2121        if _job is None:
2122            logger.warning('Job %s is invalid for cancellation!', job.id)
2123            return(-1)
2124        cluster = self._clusters.get(_job.compute_id, None)
2125        if not cluster:
2126            logger.warning('Invalid job %s for cluster "%s"!', job.id, cluster._compute.name)
2127            return(-1)
2128        # assert cluster._pending_jobs >= 1
2129        if _job.job.status == DispyJob.Created:
2130            if _job.pinned:
2131                _job.pinned.pending_jobs.remove(_job)
2132            else:
2133                cluster._jobs.remove(_job)
2134            dispy_job = _job.job
2135            self.finish_job(cluster, _job, DispyJob.Cancelled)
2136            if cluster.status_callback:
2137                self.worker_Q.put((cluster.status_callback, (DispyJob.Cancelled, None, dispy_job)))
2138            logger.debug('Cancelled (removed) job %s', job.id)
2139            return(0)
2140        elif not (_job.job.status == DispyJob.Running or
2141                  _job.job.status == DispyJob.ProvisionalResult or _job.node is None):
2142            logger.warning('Job %s is not valid for cancel (%s)', job.id, _job.job.status)
2143            return(-1)
2144        _job.job.status = DispyJob.Cancelled
2145        # don't send this status - when job is terminated status/callback get called
2146        logger.debug('Job %s / %s is being terminated', _job.job.id, _job.uid)
2147        try:
2148            resp = yield _job.node.send(b'TERMINATE_JOB:' + serialize(_job), reply=False, task=task)
2149        except Exception:
2150            resp = -1
2151        if resp < 0:
2152            logger.debug('Terminating job %s / %s failed: %s', _job.job.id, _job.uid, resp)
2153        else:
2154            resp = 0
2155        return(resp)
2156
2157    def allocate_node(self, cluster, node_allocs, task=None):
2158        # generator
2159        for i in range(len(node_allocs)-1, -1, -1):
2160            node = self._nodes.get(node_allocs[i].ip_addr, None)
2161            if node:
2162                dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
2163                if dispy_node:
2164                    node.clusters.add(cluster)
2165                    self._sched_event.set()
2166                    del node_allocs[i]
2167                    continue
2168        if not node_allocs:
2169            return(0)
2170        cluster._node_allocs.extend(node_allocs)
2171        cluster._node_allocs = sorted(cluster._node_allocs,
2172                                      key=lambda node_alloc: node_alloc.ip_rex, reverse=True)
2173        present = set()
2174        cluster._node_allocs = [na for na in cluster._node_allocs
2175                                if na.ip_rex not in present and not present.add(na.ip_rex)]
2176        del present
2177        yield self.add_cluster(cluster, task=task)
2178        yield self._sched_event.set()
2179        return(0)
2180
2181    def deallocate_node(self, cluster, node, task=None):
2182        # generator
2183        if isinstance(node, DispyNode):
2184            node = cluster._dispy_nodes.get(node.ip_addr, None)
2185        elif isinstance(node, str):
2186            node = cluster._dispy_nodes.get(_node_ipaddr(node), None)
2187        else:
2188            node = None
2189        if node:
2190            node = self._nodes.get(node.ip_addr, None)
2191        if not node:
2192            return(-1)
2193        node.clusters.discard(cluster)
2194        yield 0
2195
2196    def close_node(self, cluster, node, terminate_pending, task=None):
2197        # generator
2198        if isinstance(node, DispyNode):
2199            node = cluster._dispy_nodes.get(node.ip_addr, None)
2200        elif isinstance(node, str):
2201            node = cluster._dispy_nodes.get(_node_ipaddr(node), None)
2202        else:
2203            node = None
2204        if node:
2205            node = self._nodes.get(node.ip_addr, None)
2206        if not node:
2207            return(-1)
2208        node.clusters.discard(cluster)
2209        jobs = [_job for _job in node.pending_jobs if _job.compute_id == cluster._compute.id]
2210        if cluster.status_callback:
2211            dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
2212            for _job in jobs:
2213                self.worker_Q.put((cluster.status_callback,
2214                                   (DispyJob.Cancelled, dispy_node, copy.copy(_job.job))))
2215        if jobs:
2216            node.pending_jobs = [_job for _job in node.pending_jobs
2217                                 if _job.compute_id != cluster._compute.id]
2218        yield node.close(cluster._compute, terminate_pending=terminate_pending)
2219
2220    def set_node_cpus(self, cluster, node, cpus, task=None):
2221        # generator
2222        if isinstance(node, DispyNode):
2223            node = cluster._dispy_nodes.get(node.ip_addr, None)
2224        elif isinstance(node, str):
2225            node = cluster._dispy_nodes.get(_node_ipaddr(node), None)
2226        else:
2227            node = None
2228        if node:
2229            node = self._nodes.get(node.ip_addr, None)
2230        if not node:
2231            return(-1)
2232        try:
2233            cpus = int(cpus)
2234        except ValueError:
2235            return(-1)
2236        if cpus >= 0:
2237            node.cpus = min(node.avail_cpus, cpus)
2238        elif (node.avail_cpus + cpus) >= 0:
2239            node.cpus = node.avail_cpus + cpus
2240        cpus = node.cpus
2241        for cluster in node.clusters:
2242            dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
2243            if dispy_node:
2244                dispy_node.cpus = cpus
2245        yield self._sched_event.set()
2246        return(cpus)
2247
2248    def send_file(self, cluster, node, xf, task=None):
2249        if isinstance(node, DispyNode):
2250            dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
2251        elif isinstance(node, str):
2252            dispy_node = cluster._dispy_nodes.get(_node_ipaddr(node), None)
2253        else:
2254            dispy_node = None
2255        if not dispy_node:
2256            return(-1)
2257        node = self._nodes.get(dispy_node.ip_addr, None)
2258        if not node:
2259            return(-1)
2260        tx = yield node.xfer_file(xf)
2261        dispy_node.tx += tx
2262
2263    def node_jobs(self, cluster, node, from_node, task=None):
2264        # generator
2265        if isinstance(node, DispyNode):
2266            node = cluster._dispy_nodes.get(node.ip_addr, None)
2267        elif isinstance(node, str):
2268            node = cluster._dispy_nodes.get(_node_ipaddr(node), None)
2269        else:
2270            node = None
2271        if node:
2272            node = self._nodes.get(node.ip_addr, None)
2273        if not node:
2274            return(-1)
2275        if cluster not in node.clusters:
2276            return([])
2277        if from_node:
2278            sock = socket.socket(node.sock_family, socket.SOCK_STREAM)
2279            sock = AsyncSocket(sock, keyfile=self.keyfile, certfile=self.certfile)
2280            sock.settimeout(MsgTimeout)
2281            try:
2282                yield sock.connect((node.ip_addr, node.port))
2283                yield sock.sendall(node.auth)
2284                req = {'compute_id': cluster._compute.id, 'auth': cluster._compute.auth}
2285                yield sock.send_msg(b'JOBS:' + serialize(req))
2286                info = yield sock.recv_msg()
2287                _jobs = [self._sched_jobs.get(uid, None) for uid in deserialize(info)]
2288                jobs = [_job.job for _job in _jobs if _job]
2289            except Exception:
2290                logger.debug(traceback.format_exc())
2291                jobs = []
2292            sock.close()
2293        else:
2294            jobs = [_job.job for _job in self._sched_jobs.values()
2295                    if _job.node == node and _job.compute_id == cluster._compute.id]
2296
2297        return(jobs)
2298
2299    def shutdown(self):
2300        # non-generator
2301        if not self.shared:
2302            if self.terminate:
2303                return
2304            if any(cluster._pending_jobs for cluster in self._clusters.values()):
2305                return
2306            logger.debug('Shutting down scheduler ...')
2307            self.terminate = True
2308
2309            def _terminate_scheduler(self, task=None):
2310                yield self._sched_event.set()
2311
2312            Task(_terminate_scheduler, self).value()
2313            self.worker_Q.put(None)
2314            self._scheduler.value()
2315            self.worker_Q.join()
2316        if self.shelf:
2317            # TODO: need to check all clusters are deleted?
2318            self.shelf.close()
2319            self.shelf = None
2320            for ext in ('', '.db', '.bak', '.dat', '.dir'):
2321                if os.path.isfile(self.recover_file + ext):
2322                    try:
2323                        os.remove(self.recover_file + ext)
2324                    except Exception:
2325                        pass
2326        if self.pycos:
2327            self.pycos.finish()
2328            self.pycos = None
2329        Singleton.empty(self.__class__)
2330
2331
2332class JobCluster(object):
2333    """Create an instance of cluster for a specific computation.
2334    """
2335
2336    def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None,
2337                 ip_addr=None, port=None, node_port=None, ext_ip_addr=None,
2338                 ipv4_udp_multicast=False, dest_path=None, loglevel=logger.INFO,
2339                 setup=None, cleanup=True, ping_interval=None, pulse_interval=None,
2340                 poll_interval=None, reentrant=False, secret='', keyfile=None, certfile=None,
2341                 recover_file=None):
2342        """Create an instance of cluster for a specific computation.
2343
2344        @computation is either a string (which is name of program, possibly
2345        with full path) or a python function or class method.
2346
2347        @nodes is a list. Each element of @nodes is either a string
2348          (which must be either IP address or name of server node), or
2349          a tuple with up to 3 elements.  The tuple's first element
2350          must be IP address or name of server node, second element,
2351          if present, must be port number where that node is listening
2352           for ping from clients, the third element, if present, must
2353          be number of CPUs to use on that node.
2354
2355        @depends is a list. Each element of @depends is either
2356          a string or a python object. If the element is a string,
2357          it must be a file which will be transferred to the node
2358          executing a job for this cluster.
2359          If the element is a python object (a function name, class name etc.),
2360          then the code for that object is transferred to the node executing
2361          a job for this cluster.
2362
2363       @callback is a function or class method. When a job's results
2364          become available, dispy will call provided callback
2365          function/method with that job as the argument. If a job
2366          sends provisional results with 'dispy_provisional_result'
2367          multiple times, then dispy will call provided callback each
2368          such time. The (provisional) results of computation can be
2369          retrieved with 'result' field of job, etc. While
2370          computations are run on nodes in isolated environments,
2371          callbacks are run in the context of user programs from which
2372          (Shared)JobCluster is called - for example, callbacks can
2373          access global variables in user programs.
2374
2375        @cluster_status is a function or class method. When a node
2376          accepts this cluster's computation, a job is submitted, a
2377          jos is done or node is closed, given function is called with
2378          three parameters: an instance of DispyNode, node/job status
2379          (one of DispyNode.Initialized, DispyNode.Closed, or job
2380          status), and an instance of DispyJob (if job submitted,
2381          finished etc.) or None (if node started or closed). dispy
2382          queues these status messages and a worker thread calls the
2383          functions, so it is possible that actual current status of
2384          node may be different from the status indicated at the time
2385          status function is called.
2386
2387        @ip_addr and @port indicate the address where the cluster will bind to.
2388          If multiple instances of JobCluster are used, these arguments are used
2389          only in the case of first instance.
2390          If no value for @ip_addr is given (default), IP address associated
2391          with the 'hostname' is used.
2392          If no value for @port is given (default), number 51347 is used.
2393
2394        @ext_ip_addr is the IP address of NAT firewall/gateway if
2395          dispy client is behind that firewall/gateway.
2396
2397        @node_port indicates port on which node servers are listening
2398          for ping messages. The client (JobCluster instance) broadcasts
2399          ping requests to this port.
2400          If no value for @node_port is given (default), number 51348 is used.
2401
2402        @dest_path indicates path of directory to which files are
2403          transferred to a server node when executing a job.  If
2404          @computation is a string, indicating a program, then that
2405          program is also transferred to @dest_path.
2406
2407        @loglevel indicates message logging level.
2408
2409        @cleanup indicates if the files transferred should be removed when
2410          shutting down.
2411
2412        @secret is a string that is (hashed and) used for handshaking
2413          of communication with nodes.
2414
2415        @certfile is path to file containing SSL certificate (see
2416          Python 'ssl' module).
2417
2418        @keyfile is path to file containing private key for SSL
2419          communication (see Python 'ssl' module). This key may be
2420          stored in 'certfile' itself, in which case this should be
2421          None.
2422
2423        @ping_interval is number of seconds between 1 and
2424        1000. Normally dispy can find nodes running 'dispynode' by
2425        broadcasting 'ping' messages that nodes respond to. However,
2426        these packets may get lost. If ping_interval is set, then
2427        every ping_interval seconds, dispy sends ping messages to find
2428        nodes that may have missed earlier ping messages.
2429
2430        @pulse_interval is number of seconds between 1 and 1000. If
2431        pulse_interval is set, dispy directs nodes to send 'pulse'
2432        messages to indicate they are computing submitted jobs. A node
2433        is presumed dead if 5*pulse_interval elapses without a pulse
2434        message. See 'reentrant' below.
2435
2436        @poll_interval is number of seconds between 5 and 1000. If
2437        poll_interval is set, the client uses polling to check the
2438        status of jobs executed by nodes, instead of nodes connecting
2439        to the client to send the status of jobs, which is not
2440        possible if the client is behind a gateway / router which
2441        doesn't forward ports to where the client is running. Polling
2442        is not efficient, so it must be used only where necessary.
2443
2444        @reentrant must be either True or False. This value is used
2445        only if 'pulse_interval' is set for any of the clusters. If
2446        pulse_interval is given and reentrant is False (default), jobs
2447        scheduled for a dead node are automatically cancelled; if
2448        reentrant is True, then jobs scheduled for a dead node are
2449        resubmitted to other eligible nodes.
2450
2451        @recover_file must be either None (default) or file path. If
2452        this is None, dispy stores information about cluster in a file
2453        of the form '_dispy_YYYYMMDDHHMMSS' in current directory. If
2454        it is a path, dispy will use given path to store
2455        information. If user program terminates for some reason (such
2456        as raising an exception), it is possible to retrieve results
2457        of scheduled jobs later (after they are finished) by calling
2458        'recover' function (implemented in this file) with this file.
2459        """
2460
2461        logger.setLevel(loglevel)
2462        pycos.logger.setLevel(loglevel)
2463        if reentrant is not True and reentrant is not False:
2464            logger.warning('Invalid value for reentrant (%s) is ignored; '
2465                           'it must be either True or False', reentrant)
2466            reentrant = False
2467        if ping_interval is not None:
2468            try:
2469                ping_interval = float(ping_interval)
2470                assert 1.0 <= ping_interval <= 1000
2471            except Exception:
2472                raise Exception('Invalid ping_interval; must be between 1 and 1000')
2473        self.ping_interval = ping_interval
2474        if pulse_interval is not None:
2475            try:
2476                pulse_interval = float(pulse_interval)
2477                assert 1.0 <= pulse_interval <= 1000
2478            except Exception:
2479                raise Exception('Invalid pulse_interval; must be between 1 and 1000')
2480        self.pulse_interval = pulse_interval
2481
2482        if poll_interval is not None:
2483            try:
2484                poll_interval = float(poll_interval)
2485                assert 5.0 <= poll_interval <= 1000
2486            except Exception:
2487                raise Exception('Invalid poll_interval; must be between 5 and 1000')
2488        self.poll_interval = poll_interval
2489
2490        if callback:
2491            assert inspect.isfunction(callback) or inspect.ismethod(callback), \
2492                'callback must be a function or method'
2493            try:
2494                args = inspect.getargspec(callback)
2495                if inspect.isfunction(callback):
2496                    assert len(args.args) == 1
2497                else:
2498                    assert len(args.args) == 2
2499                    if args.args[0] != 'self':
2500                        logger.warning('First argument to callback method is not "self"')
2501                assert args.varargs is None
2502                assert args.keywords is None
2503                assert args.defaults is None
2504            except Exception:
2505                raise Exception('Invalid callback function; '
2506                                'it must take excatly one argument - an instance of DispyJob')
2507        self.callback = callback
2508
2509        if cluster_status:
2510            assert inspect.isfunction(cluster_status) or inspect.ismethod(cluster_status), \
2511                'cluster_status must be a function or method'
2512            try:
2513                args = inspect.getargspec(cluster_status)
2514                if inspect.isfunction(cluster_status):
2515                    assert len(args.args) == 3
2516                else:
2517                    assert len(args.args) == 4
2518                    if args.args[0] != 'self':
2519                        logger.warning('First argument to cluster_status method is not "self"')
2520                assert args.varargs is None
2521                assert args.keywords is None
2522                assert args.defaults is None
2523            except Exception:
2524                raise Exception('Invalid cluster_status function; '
2525                                'it must take excatly 3 arguments')
2526        self.status_callback = cluster_status
2527
2528        if hasattr(self, 'scheduler_ip_addr'):
2529            shared = True
2530            self._node_allocs = []
2531        else:
2532            shared = False
2533            if not nodes:
2534                nodes = ['*']
2535            elif not isinstance(nodes, list):
2536                if isinstance(nodes, str):
2537                    nodes = [nodes]
2538                else:
2539                    raise Exception('"nodes" must be list of IP addresses or host names')
2540            self._node_allocs = _parse_node_allocs(nodes)
2541            if not self._node_allocs:
2542                raise Exception('"nodes" argument is invalid')
2543            self._node_allocs = sorted(self._node_allocs,
2544                                       key=lambda node_alloc: node_alloc.ip_rex, reverse=True)
2545        self._dispy_nodes = {}
2546
2547        if inspect.isfunction(computation) or inspect.ismethod(computation):
2548            func = computation
2549            compute = _Compute(_Compute.func_type, func.__name__)
2550            lines = inspect.getsourcelines(func)[0]
2551            lines[0] = lines[0].lstrip()
2552            compute.code = ''.join(lines)
2553            if inspect.ismethod(computation):
2554                depends.append(computation.__self__.__class__)
2555        elif isinstance(computation, str):
2556            compute = _Compute(_Compute.prog_type, computation)
2557            depends.append(computation)
2558        else:
2559            raise Exception('Invalid computation type: %s' % type(computation))
2560
2561        if setup:
2562            if inspect.isfunction(setup):
2563                depends.append(setup)
2564                compute.setup = _Function(setup.__name__, (), {})
2565            elif isinstance(setup, functools.partial):
2566                depends.append(setup.func)
2567                if setup.args:
2568                    args = setup.args
2569                else:
2570                    args = ()
2571                if setup.keywords:
2572                    kwargs = setup.keywords
2573                else:
2574                    kwargs = {}
2575                compute.setup = _Function(setup.func.__name__, args, kwargs)
2576            else:
2577                raise Exception('"setup" must be Python (partial) function')
2578
2579        if inspect.isfunction(cleanup):
2580            depends.append(cleanup)
2581            compute.cleanup = _Function(cleanup.__name__, (), {})
2582        elif isinstance(cleanup, functools.partial):
2583            depends.append(cleanup.func)
2584            if cleanup.args:
2585                args = cleanup.args
2586            else:
2587                args = ()
2588            if cleanup.keywords:
2589                kwargs = cleanup.keywords
2590            else:
2591                kwargs = {}
2592            compute.cleanup = _Function(cleanup.func.__name__, args, kwargs)
2593        elif isinstance(cleanup, bool):
2594            compute.cleanup = cleanup
2595        else:
2596            raise Exception('"cleanup" must be Python (partial) function')
2597
2598        self._cluster = _Cluster(ip_addr=ip_addr, port=port, node_port=node_port,
2599                                 ext_ip_addr=ext_ip_addr, ipv4_udp_multicast=ipv4_udp_multicast,
2600                                 shared=shared, secret=secret, keyfile=keyfile, certfile=certfile,
2601                                 recover_file=recover_file)
2602        atexit.register(self.shutdown)
2603
2604        depend_ids = {}
2605        cwd = self._cluster.dest_path
2606        for dep in depends:
2607            if isinstance(dep, str) or inspect.ismodule(dep):
2608                if inspect.ismodule(dep):
2609                    name = dep.__file__
2610                    if name.endswith('.pyc'):
2611                        name = name[:-1]
2612                    if not name.endswith('.py'):
2613                        logger.warning('Invalid module "%s" - must be python source.', dep)
2614                        continue
2615                    if name.startswith(cwd):
2616                        dst = os.path.dirname(name[len(cwd):].lstrip(os.sep))
2617                    elif dep.__package__:
2618                        dst = dep.__package__.replace('.', os.sep)
2619                    else:
2620                        dst = os.path.dirname(dep.__name__.replace('.', os.sep))
2621                else:
2622                    if os.path.isfile(dep):
2623                        name = os.path.abspath(dep)
2624                    elif compute.type == _Compute.prog_type:
2625                        for p in os.environ['PATH'].split(os.pathsep):
2626                            f = os.path.join(p, dep)
2627                            if os.path.isfile(f):
2628                                logger.debug('Assuming "%s" is program "%s"', dep, f)
2629                                name = f
2630                                break
2631                    else:
2632                        raise Exception('Path "%s" is not valid' % dep)
2633                    if name.startswith(cwd):
2634                        dst = os.path.dirname(name[len(cwd):].lstrip(os.sep))
2635                    else:
2636                        dst = '.'
2637                if name in depend_ids:
2638                    continue
2639                try:
2640                    with open(name, 'rb') as fd:
2641                        pass
2642                    xf = _XferFile(name, dst, compute.id)
2643                    compute.xfer_files.add(xf)
2644                    depend_ids[name] = dep
2645                except Exception:
2646                    raise Exception('File "%s" is not valid' % name)
2647            elif (inspect.isfunction(dep) or inspect.isclass(dep) or
2648                  (hasattr(dep, '__class__') and hasattr(dep, '__module__'))):
2649                if inspect.isfunction(dep) or inspect.isclass(dep):
2650                    pass
2651                elif hasattr(dep, '__class__') and inspect.isclass(dep.__class__):
2652                    dep = dep.__class__
2653                if id(dep) in depend_ids:
2654                    continue
2655                try:
2656                    lines = inspect.getsourcelines(dep)[0]
2657                except Exception:
2658                    logger.warning('Depenendency "%s" is not valid', dep)
2659                    raise
2660                lines[0] = lines[0].lstrip()
2661                compute.code += '\n' + ''.join(lines)
2662                depend_ids[id(dep)] = id(dep)
2663            elif isinstance(dep, functools.partial):
2664                try:
2665                    lines = inspect.getsourcelines(dep.func)[0]
2666                except Exception:
2667                    logger.warning('Depenendency "%s" is not valid', dep)
2668                    raise
2669                lines[0] = lines[0].lstrip()
2670                compute.code += '\n' + ''.join(lines)
2671                depend_ids[id(dep)] = id(dep)
2672            else:
2673                raise Exception('Invalid dependency: %s' % dep)
2674        if compute.code:
2675            # make sure code can be compiled
2676            code = compile(compute.code, '<string>', 'exec')
2677            del code
2678        if dest_path:
2679            if not isinstance(dest_path, str):
2680                raise Exception('Invalid dest_path: it must be a string')
2681            dest_path = dest_path.strip()
2682            # we should check for absolute path in dispynode.py as well
2683            if dest_path.startswith(os.sep):
2684                logger.warning('dest_path must not be absolute path')
2685            dest_path = dest_path.lstrip(os.sep)
2686            compute.dest_path = dest_path
2687
2688        compute.scheduler_port = self._cluster.port
2689        compute.auth = hashlib.sha1(os.urandom(20)).hexdigest()
2690        compute.job_result_port = self._cluster.port
2691        compute.reentrant = reentrant
2692        compute.pulse_interval = pulse_interval
2693
2694        self._compute = compute
2695        self._pending_jobs = 0
2696        self._jobs = []
2697        self._complete = threading.Event()
2698        self._complete.set()
2699        self.cpu_time = 0
2700        self.start_time = time.time()
2701        self.end_time = None
2702        if not shared:
2703            Task(self._cluster.add_cluster, self).value()
2704
2705    def submit(self, *args, **kwargs):
2706        """Submit a job for execution with the given arguments.
2707
2708        Arguments should be serializable and should correspond to
2709        arguments for computation used when cluster is created.
2710        """
2711        return self.submit_job_id(None, *args, **kwargs)
2712
2713    def submit_job_id(self, job_id, *args, **kwargs):
2714        """Same as 'submit' but job's 'id' is initialized to 'job_id'.
2715        """
2716        if self._compute.type == _Compute.prog_type:
2717            args = [str(arg) for arg in args]
2718        try:
2719            _job = _DispyJob_(self._compute.id, job_id, args, kwargs)
2720        except Exception:
2721            logger.warning('Creating job for "%s", "%s" failed with "%s"',
2722                           str(args), str(kwargs), traceback.format_exc())
2723            return None
2724
2725        if Task(self._cluster.submit_job, _job).value() == 0:
2726            return _job.job
2727        else:
2728            return None
2729
2730    def submit_node(self, node, *args, **kwargs):
2731        """Submit a job for execution at 'node' with the given
2732        arguments. 'node' can be an instance of DispyNode (e.g., as
2733        received in cluster/job status callback) or IP address or host
2734        name.
2735
2736        Arguments should be serializable and should correspond to
2737        arguments for computation used when cluster is created.
2738        """
2739        return self.submit_job_id_node(None, *args, **kwargs)
2740
2741    def submit_job_id_node(self, job_id, node, *args, **kwargs):
2742        """Same as 'submit_node' but job's 'id' is initialized to 'job_id'.
2743        """
2744        if isinstance(node, DispyNode):
2745            node = self._dispy_nodes.get(node.ip_addr, None)
2746        elif isinstance(node, str):
2747            node = self._dispy_nodes.get(_node_ipaddr(node), None)
2748        else:
2749            node = None
2750        if not node:
2751            logger.warning('Invalid node')
2752            return None
2753
2754        if self._compute.type == _Compute.prog_type:
2755            args = [str(arg) for arg in args]
2756        try:
2757            _job = _DispyJob_(self._compute.id, job_id, args, kwargs)
2758        except Exception:
2759            logger.warning('Creating job for "%s", "%s" failed with "%s"',
2760                           str(args), str(kwargs), traceback.format_exc())
2761            return None
2762
2763        if Task(self._cluster.submit_job, _job, ip_addr=node.ip_addr).value() == 0:
2764            return _job.job
2765        else:
2766            return None
2767
2768    def cancel(self, job):
2769        """Cancel given job. If the job is not yet running on any
2770        node, it is simply removed from scheduler's queue. If the job
2771        is running on a node, it is terminated/killed.
2772
2773        Returns 0 if the job has been cancelled (i.e., removed from
2774        the queue or terminated).
2775        """
2776        return Task(self._cluster.cancel_job, job).value()
2777
2778    def discover_nodes(self, nodes):
2779        """Discover given list of nodes. Each node may be host name or IP address, or an
2780        instance of NodeAllocate. If a node is '*', UDP broadcast is used to
2781        detect all nodes in local network.
2782        """
2783        if not isinstance(nodes, list):
2784            nodes = [nodes]
2785        nodes = _parse_node_allocs(nodes)
2786        return Task(self._cluster.discover_nodes, self, nodes).value()
2787
2788    def allocate_node(self, node):
2789        """Allocate given node for this cluster. 'node' may be (list of) host
2790        name or IP address, or an instance of NodeAllocate.
2791        """
2792        if not isinstance(node, list):
2793            node = [node]
2794        node_allocs = _parse_node_allocs(node)
2795        if not node_allocs:
2796            return -1
2797        return Task(self._cluster.allocate_node, self, node_allocs).value()
2798
2799    def deallocate_node(self, node):
2800        """Deallocate given node for this cluster. 'node' may be host name or IP
2801        address, or an instance of NodeAllocate.
2802        """
2803        return Task(self._cluster.deallocate_node, self, node).value()
2804
2805    def close_node(self, node, terminate_pending=False):
2806        """Close given node for this cluster. 'node' may be host name or IP
2807        address, or an instance of NodeAllocate.
2808        """
2809        return Task(self._cluster.close_node, self, node, terminate_pending).value()
2810
2811    def node_jobs(self, node, from_node=False):
2812        """Returns list of jobs currently running on given node, given
2813        as host name or IP address.
2814        """
2815        return Task(self._cluster.node_jobs, self, node, from_node).value()
2816
2817    def set_node_cpus(self, node, cpus):
2818        """Sets (alters) CPUs managed by dispy on a node, given as
2819        host name or IP address, to given number of CPUs. If the
2820        number of CPUs given is negative then that many CPUs are not
2821        used (from the available CPUs).
2822        """
2823        return Task(self._cluster.set_node_cpus, self, node, cpus).value()
2824
2825    def send_file(self, path, node):
2826        """Send file with given 'path' to 'node'.  'node' can be an
2827        instance of DispyNode (e.g., as received in cluster status
2828        callback) or IP address or host name.
2829        """
2830        cwd = self._cluster.dest_path
2831        path = os.path.abspath(path)
2832        if path.startswith(cwd):
2833            dst = os.path.dirname(path[len(cwd):].lstrip(os.sep))
2834        else:
2835            dst = '.'
2836        xf = _XferFile(path, dst, self._compute.id)
2837        return Task(self._cluster.send_file, self, node, xf).value()
2838
2839    @property
2840    def name(self):
2841        """Returns name of computation. If the computation is Python
2842        function, then this would be name of the function. If the
2843        computation is a program, then this would be name of the
2844        program (without path).
2845        """
2846        return self._compute.name
2847
2848    def __enter__(self):
2849        return self
2850
2851    def __exit__(self, exc_type, exc_value, trace):
2852        self.close()
2853        return True
2854
2855    def status(self):
2856        """
2857        Return cluster status (ClusterStatus structure).
2858        """
2859        def _status(self, task=None):
2860            yield ClusterStatus(list(self._dispy_nodes.values()), self._pending_jobs)
2861        return Task(_status, self).value()
2862
2863    def print_status(self, wall_time=None):
2864        """
2865        Prints status of cluster (see 'status').
2866        """
2867
2868        KB = 1024
2869        MB = 1024 * KB
2870        GB = 1024 * MB
2871        TB = 1024 * GB
2872
2873        def byte_size(size):
2874            if size > TB:
2875                return '%.1f T' % (float(size) / TB)
2876            elif size > GB:
2877                return '%.1f G' % (float(size) / GB)
2878            elif size > MB:
2879                return '%.1f M' % (float(size) / MB)
2880            elif size > KB:
2881                return '%.1f K' % (float(size) / KB)
2882            return '%d B' % size
2883
2884        print('')
2885        heading = (' %15s | %5s | %7s | %8s | %13s | %7s | %7s' %
2886                   ('Node', 'CPUs', 'Jobs', 'Sec/Job', 'Node Time Sec', 'Sent', 'Rcvd'))
2887        print(heading)
2888        print('-' * len(heading))
2889        info = self.status()
2890        cpu_time = 0.0
2891        for dispy_node in info.nodes:
2892            cpu_time += dispy_node.cpu_time
2893            if dispy_node.name:
2894                name = dispy_node.name
2895            else:
2896                name = dispy_node.ip_addr
2897            if dispy_node.jobs_done > 0:
2898                secs_per_job = dispy_node.cpu_time / dispy_node.jobs_done
2899            else:
2900                secs_per_job = 0
2901            print(' %15.15s | %5s | %7s | %8.1f | %13.1f | %7s | %7s' %
2902                  (name, dispy_node.cpus, dispy_node.jobs_done, secs_per_job, dispy_node.cpu_time,
2903                   byte_size(dispy_node.tx), byte_size(dispy_node.rx)))
2904        print('')
2905        if info.jobs_pending:
2906            print('Jobs pending: %s' % info.jobs_pending)
2907        msg = 'Total job time: %.3f sec' % cpu_time
2908        if not wall_time:
2909            wall_time = time.time() - self.start_time
2910        msg += ', wall time: %.3f sec, speedup: %.3f' % (wall_time, cpu_time / wall_time)
2911        print(msg)
2912        print('')
2913
2914    # for backward compatibility
2915    stats = print_status
2916
2917    def wait(self, timeout=None):
2918        """Wait for scheduled jobs to complete.
2919        """
2920        return self._cluster.wait(self, timeout)
2921
2922    def __call__(self):
2923        """Wait for scheduled jobs to complete.
2924        """
2925        self.wait()
2926
2927    def close(self, timeout=None, terminate=False):
2928        """Close the cluster (jobs can no longer be submitted to it). If there
2929        are any jobs pending, this method waits until they all finish, unless
2930        'terminate' is True in which case pending jobs are cancelled (removed or
2931        terminated by nodes executing them). Additional clusters may be created
2932        after this call returns.
2933        """
2934        if getattr(self, '_compute', None):
2935            ret = self._complete.wait(timeout=timeout)
2936            if not terminate and not ret:
2937                return False
2938            self._complete.set()
2939            Task(self._cluster.del_cluster, self).value()
2940            self._compute = None
2941            return True
2942
2943    def shutdown(self):
2944        """Close the cluster and shutdown the scheduler (so additional
2945        clusters can't be created).
2946        """
2947        self.close()
2948        if self._cluster:
2949            cluster, self._cluster = self._cluster, None
2950            cluster.shutdown()
2951
2952
2953class SharedJobCluster(JobCluster):
2954    """SharedJobCluster should be used (instead of JobCluster) if two
2955    or more processes can simultaneously use dispy. In this case,
2956    'dispyscheduler' must be running on a node and 'scheduler_node'
2957    parameter should be set to that node's IP address or host name.
2958
2959    @scheduler_node is name or IP address where dispyscheduler is
2960      running to which jobs are submitted.
2961
2962    @scheduler_port is port where dispyscheduler is running at
2963    @scheduler_node.
2964
2965    @port is port where this client will get job results from
2966    dispyscheduler.
2967
2968    @pulse_interval for SharedJobCluster is not used; instead,
2969    dispyscheduler must be called with appropriate pulse_interval.
2970    The behaviour is same as for JobCluster.
2971    """
2972    def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None,
2973                 ip_addr=None, port=51347, scheduler_node=None, scheduler_port=None,
2974                 ext_ip_addr=None, loglevel=logger.INFO, setup=None, cleanup=True, dest_path=None,
2975                 poll_interval=None, reentrant=False, exclusive=False,
2976                 secret='', keyfile=None, certfile=None, recover_file=None):
2977
2978        self.scheduler_ip_addr = _node_ipaddr(scheduler_node)
2979        self.addrinfo = host_addrinfo(host=ip_addr)
2980        self.addrinfo.family = socket.getaddrinfo(self.scheduler_ip_addr, None)[0][0]
2981
2982        if not nodes:
2983            nodes = ['*']
2984        elif not isinstance(nodes, list):
2985            if isinstance(nodes, str):
2986                nodes = [nodes]
2987            else:
2988                raise Exception('"nodes" must be list of IP addresses or host names')
2989        node_allocs = _parse_node_allocs(nodes)
2990        if not node_allocs:
2991            raise Exception('"nodes" argument is invalid')
2992        node_allocs = [(na.ip_addr, na.port, na.cpus) for na in node_allocs]
2993        if ext_ip_addr:
2994            ext_ip_addr = host_addrinfo(host=ext_ip_addr).ip
2995
2996        JobCluster.__init__(self, computation, depends=depends,
2997                            callback=callback, cluster_status=cluster_status,
2998                            ip_addr=ip_addr, port=port, ext_ip_addr=ext_ip_addr,
2999                            loglevel=loglevel, setup=setup, cleanup=cleanup, dest_path=dest_path,
3000                            poll_interval=poll_interval, reentrant=reentrant,
3001                            secret=secret, keyfile=keyfile, certfile=certfile,
3002                            recover_file=recover_file)
3003
3004        def _terminate_scheduler(self, task=None):
3005            yield self._cluster._sched_event.set()
3006
3007        # wait for scheduler to terminate
3008        self._cluster.terminate = True
3009        Task(_terminate_scheduler, self).value()
3010        self._cluster._scheduler.value()
3011        self._cluster.job_uid = None
3012
3013        if not scheduler_port:
3014            scheduler_port = 51349
3015
3016        # wait until tcp server has started
3017        while not self._cluster.port:
3018            time.sleep(0.1)
3019
3020        sock = socket.socket(self.addrinfo.family, socket.SOCK_STREAM)
3021        sock = AsyncSocket(sock, blocking=True, keyfile=keyfile, certfile=certfile)
3022        sock.connect((self.scheduler_ip_addr, scheduler_port))
3023        sock.sendall(self._cluster.auth)
3024        req = {'version': _dispy_version, 'ip_addr': ext_ip_addr,
3025               'scheduler_ip_addr': self.scheduler_ip_addr}
3026        sock.send_msg(b'CLIENT:' + serialize(req))
3027        reply = sock.recv_msg()
3028        sock.close()
3029        reply = deserialize(reply)
3030        if reply['version'] != _dispy_version:
3031            raise Exception('dispyscheduler version "%s" is different from dispy version "%s"' %
3032                            reply['version'], _dispy_version)
3033        ext_ip_addr = reply['ip_addr']
3034
3035        self.scheduler_port = reply['port']
3036        self._scheduler_auth = auth_code(secret, reply['sign'])
3037        self._compute.scheduler_ip_addr = ext_ip_addr
3038        self._compute.scheduler_port = self._cluster.port
3039        self._compute.job_result_port = self._cluster.port
3040
3041        sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True,
3042                           keyfile=keyfile, certfile=certfile)
3043        sock.settimeout(MsgTimeout)
3044        try:
3045            sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3046            sock.sendall(self._scheduler_auth)
3047            req = {'compute': self._compute, 'node_allocs': node_allocs,
3048                   'exclusive': bool(exclusive)}
3049            sock.send_msg(b'COMPUTE:' + serialize(req))
3050            reply = sock.recv_msg()
3051            reply = deserialize(reply)
3052            if isinstance(reply, dict):
3053                self._compute.id = reply['compute_id']
3054                self._compute.auth = reply['auth']
3055            else:
3056                raise Exception('Scheduler refused computation: %s' % reply)
3057        except Exception:
3058            raise
3059        finally:
3060            sock.close()
3061
3062        for xf in self._compute.xfer_files:
3063            xf.compute_id = self._compute.id
3064            logger.debug('Sending file "%s"', xf.name)
3065            sock = socket.socket(self.addrinfo.family, socket.SOCK_STREAM)
3066            sock = AsyncSocket(sock, blocking=True, keyfile=keyfile, certfile=certfile)
3067            sock.settimeout(MsgTimeout)
3068            try:
3069                sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3070                sock.sendall(self._scheduler_auth)
3071                sock.send_msg(b'FILEXFER:' + serialize(xf))
3072                recvd = sock.recv_msg()
3073                recvd = deserialize(recvd)
3074                sent = 0
3075                with open(xf.name, 'rb') as fd:
3076                    while sent == recvd:
3077                        data = fd.read(1024000)
3078                        if not data:
3079                            break
3080                        sock.sendall(data)
3081                        sent += len(data)
3082                        recvd = sock.recv_msg()
3083                        recvd = deserialize(recvd)
3084                assert recvd == xf.stat_buf.st_size
3085            except Exception:
3086                logger.error('Could not transfer %s to %s', xf.name, self.scheduler_ip_addr)
3087                # TODO: delete computation?
3088            sock.close()
3089
3090        Task(self._cluster.add_cluster, self).value()
3091        self._scheduled_event = threading.Event()
3092        sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True,
3093                           keyfile=keyfile, certfile=certfile)
3094        sock.settimeout(MsgTimeout)
3095        sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3096        sock.sendall(self._scheduler_auth)
3097        req = {'compute_id': self._compute.id, 'auth': self._compute.auth}
3098        sock.send_msg(b'SCHEDULE:' + serialize(req))
3099        resp = sock.recv_msg()
3100        sock.close()
3101        if resp == b'ACK':
3102            self._scheduled_event.wait()
3103            logger.debug('Computation %s created with %s', self._compute.name, self._compute.id)
3104        else:
3105            self._cluster._clusters.pop(self._compute.id, None)
3106            raise Exception('Computation "%s" could not be sent to scheduler' % self._compute.name)
3107
3108    def submit(self, *args, **kwargs):
3109        """Submit a job for execution with the given arguments.
3110
3111        Arguments should be serializable and should correspond to
3112        arguments for computation used when cluster is created.
3113        """
3114        return self.submit_job_id_node(None, None, *args, **kwargs)
3115
3116    def submit_job_id(self, job_id, *args, **kwargs):
3117        """Same as 'submit' but job's 'id' is initialized to 'job_id'.
3118        """
3119        return self.submit_job_id_node(job_id, None, *args, **kwargs)
3120
3121    def submit_node(self, node, *args, **kwargs):
3122        """Submit a job for execution at 'node' with the given
3123        arguments. 'node' can be an instance of DispyNode (e.g., as
3124        received in cluster/job status callback) or IP address or host
3125        name.
3126
3127        Arguments should be serializable and should correspond to
3128        arguments for computation used when cluster is created.
3129        """
3130        return self.submit_job_id_node(None, node, *args, **kwargs)
3131
3132    def submit_job_id_node(self, job_id, node, *args, **kwargs):
3133        """Same as 'submit_node' but job's 'id' is initialized to 'job_id'.
3134        """
3135        if node:
3136            if isinstance(node, DispyNode):
3137                node = node.ip_addr
3138            elif isinstance(node, str):
3139                pass
3140            else:
3141                node = None
3142            if not node:
3143                return None
3144
3145        if self._compute.type == _Compute.prog_type:
3146            args = [str(arg) for arg in args]
3147        try:
3148            _job = _DispyJob_(self._compute.id, job_id, args, kwargs)
3149        except Exception:
3150            logger.warning('Creating job for "%s", "%s" failed with "%s"',
3151                           str(args), str(kwargs), traceback.format_exc())
3152            return None
3153
3154        job = None
3155        try:
3156            for xf in _job.xfer_files:
3157                sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM),
3158                                   blocking=True,
3159                                   keyfile=self._cluster.keyfile, certfile=self._cluster.certfile)
3160                sock.settimeout(MsgTimeout)
3161                sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3162                sock.sendall(self._scheduler_auth)
3163                sock.send_msg(b'FILEXFER:' + serialize(xf))
3164                recvd = sock.recv_msg()
3165                recvd = deserialize(recvd)
3166                sent = 0
3167                with open(xf.name, 'rb') as fd:
3168                    while sent == recvd:
3169                        data = fd.read(1024000)
3170                        if not data:
3171                            break
3172                        sock.sendall(data)
3173                        sent += len(data)
3174                        recvd = sock.recv_msg()
3175                        recvd = deserialize(recvd)
3176                assert recvd == xf.stat_buf.st_size
3177                sock.close()
3178
3179            sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM),
3180                               blocking=True,
3181                               keyfile=self._cluster.keyfile, certfile=self._cluster.certfile)
3182            sock.settimeout(MsgTimeout)
3183            sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3184            sock.sendall(self._scheduler_auth)
3185            req = {'node': node, 'job': _job, 'auth': self._compute.auth}
3186            sock.send_msg(b'JOB:' + serialize(req))
3187            msg = sock.recv_msg()
3188            _job.uid = deserialize(msg)
3189            if _job.uid:
3190                job = _job.job
3191                self._cluster._sched_jobs[_job.uid] = _job
3192                self._pending_jobs += 1
3193                self._complete.clear()
3194                sock.send_msg(b'ACK')
3195                if self.status_callback:
3196                    self._cluster.worker_Q.put((self.status_callback,
3197                                                (DispyJob.Created, None, copy.copy(_job.job))))
3198            else:
3199                sock.send_msg('NAK'.encode())
3200                _job.job._dispy_job_ = None
3201                del _job.job
3202        except Exception:
3203            logger.warning('Creating job for "%s", "%s" failed with "%s"',
3204                           str(args), str(kwargs), traceback.format_exc())
3205            _job.job._dispy_job_ = None
3206            del _job.job
3207            job = None
3208        finally:
3209            sock.close()
3210        return job
3211
3212    def cancel(self, job):
3213        """Similar to 'cancel' of JobCluster.
3214        """
3215        _job = job._dispy_job_
3216        if _job is None or self._cluster._clusters.get(_job.compute_id, None) != self:
3217            logger.warning('Invalid job %s for cluster "%s"!', job.id, self._compute.name)
3218            return -1
3219        if job.status not in [DispyJob.Created, DispyJob.Running, DispyJob.ProvisionalResult]:
3220            logger.warning('Job %s is not valid for cancel (%s)', job.id, job.status)
3221            return -1
3222
3223        job.status = DispyJob.Cancelled
3224        # assert self._pending_jobs >= 1
3225        sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True,
3226                           keyfile=self._cluster.keyfile, certfile=self._cluster.certfile)
3227        sock.settimeout(MsgTimeout)
3228        try:
3229            sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3230            sock.sendall(self._scheduler_auth)
3231            req = {'uid': _job.uid, 'compute_id': self._compute.id, 'auth': self._compute.auth}
3232            sock.send_msg(b'TERMINATE_JOB:' + serialize(req))
3233        except Exception:
3234            logger.warning('Could not connect to scheduler to terminate job')
3235            return -1
3236        finally:
3237            sock.close()
3238        return 0
3239
3240    def allocate_node(self, node):
3241        """Similar to 'allocate_node' of JobCluster.
3242        """
3243        if not isinstance(node, list):
3244            node = [node]
3245        node_allocs = _parse_node_allocs(node)
3246        if not node_allocs:
3247            return(-1)
3248        if len(node_allocs) != 1:
3249            return -1
3250
3251        sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True,
3252                           keyfile=self._cluster.keyfile, certfile=self._cluster.certfile)
3253        sock.settimeout(MsgTimeout)
3254        try:
3255            sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3256            sock.sendall(self._scheduler_auth)
3257            req = {'compute_id': self._compute.id, 'auth': self._compute.auth,
3258                   'node_alloc': node_allocs}
3259            sock.send_msg(b'ALLOCATE_NODE:' + serialize(req))
3260            reply = sock.recv_msg()
3261            reply = deserialize(reply)
3262        except Exception:
3263            logger.warning('Could not connect to scheduler to add node')
3264            reply = -1
3265        finally:
3266            sock.close()
3267        return reply
3268
3269    def deallocate_node(self, node):
3270        """Similar to 'allocate_node' of JobCluster.
3271        """
3272        if isinstance(node, DispyNode):
3273            node = node.ip_addr
3274        elif isinstance(node, str):
3275            pass
3276        if not node:
3277            return -1
3278        sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True,
3279                           keyfile=self._cluster.keyfile, certfile=self._cluster.certfile)
3280        sock.settimeout(MsgTimeout)
3281        try:
3282            sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3283            sock.sendall(self._scheduler_auth)
3284            req = {'compute_id': self._compute.id, 'auth': self._compute.auth, 'node': node}
3285            sock.send_msg(b'DEALLOCATE_NODE:' + serialize(req))
3286            reply = sock.recv_msg()
3287            reply = deserialize(reply)
3288        except Exception:
3289            logger.warning('Could not connect to scheduler to add node')
3290            reply = -1
3291        finally:
3292            sock.close()
3293        return reply
3294
3295    def close_node(self, node, terminate_pending=False):
3296        """Similar to 'cloe_node' of JobCluster.
3297        """
3298        if isinstance(node, DispyNode):
3299            node = node.ip_addr
3300        elif isinstance(node, str):
3301            pass
3302        if not node:
3303            return -1
3304        sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True,
3305                           keyfile=self._cluster.keyfile, certfile=self._cluster.certfile)
3306        sock.settimeout(MsgTimeout)
3307        try:
3308            sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3309            sock.sendall(self._scheduler_auth)
3310            req = {'compute_id': self._compute.id, 'auth': self._compute.auth, 'node': node,
3311                   'terminate_pending': terminate_pending}
3312            sock.send_msg(b'CLOSE_NODE:' + serialize(req))
3313            reply = sock.recv_msg()
3314            reply = deserialize(reply)
3315        except Exception:
3316            logger.warning('Could not connect to scheduler to add node')
3317            reply = -1
3318        finally:
3319            sock.close()
3320        return reply
3321
3322    def node_jobs(self, node, from_node=False):
3323        """Similar to 'node_jobs' of JobCluster.
3324        """
3325        if isinstance(node, DispyNode):
3326            node = node.ip_addr
3327        elif isinstance(node, str):
3328            pass
3329        if not node:
3330            return []
3331        sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True,
3332                           keyfile=self._cluster.keyfile, certfile=self._cluster.certfile)
3333        sock.settimeout(MsgTimeout)
3334        try:
3335            sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3336            sock.sendall(self._scheduler_auth)
3337            req = {'compute_id': self._compute.id, 'auth': self._compute.auth,
3338                   'node': node, 'get_uids': True, 'from_node': bool(from_node)}
3339            sock.send_msg(b'NODE_JOBS:' + serialize(req))
3340            reply = sock.recv_msg()
3341            job_uids = deserialize(reply)
3342            _jobs = [self._cluster._sched_jobs.get(uid, None) for uid in job_uids]
3343        except Exception:
3344            logger.warning('Could not connect to scheduler to get running jobs at node')
3345            _jobs = []
3346        finally:
3347            sock.close()
3348        jobs = [_job.job for _job in _jobs if _job]
3349        return jobs
3350
3351    def set_node_cpus(self, node, cpus):
3352        """Similar to 'set_node_cpus' of JobCluster.
3353        """
3354        # setting a node's cpus may affect other clients, so (for now) disable it.
3355        return -1
3356
3357        if isinstance(node, DispyNode):
3358            node = node.ip_addr
3359        elif isinstance(node, str):
3360            pass
3361        if not node:
3362            return -1
3363        sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True,
3364                           keyfile=self._cluster.keyfile, certfile=self._cluster.certfile)
3365        sock.settimeout(MsgTimeout)
3366        try:
3367            sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3368            sock.sendall(self._scheduler_auth)
3369            req = {'compute_id': self._compute.id, 'auth': self._compute.auth, 'node': node,
3370                   'cpus': cpus}
3371            sock.send_msg(b'SET_NODE_CPUS:' + serialize(req))
3372            reply = sock.recv_msg()
3373            reply = deserialize(reply)
3374            if isinstance(reply, tuple) and reply[0]:
3375                node = self._dispy_nodes.get(reply[0], None)
3376                if node:
3377                    node.cpus = reply[1]
3378        except Exception:
3379            logger.warning('Could not connect to scheduler to add node')
3380            return -1
3381        finally:
3382            sock.close()
3383        return reply
3384
3385    def send_file(self, path, node):
3386        """Send file with given 'path' to 'node'.  'node' can be an
3387        instance of DispyNode (e.g., as received in cluster status
3388        callback) or IP address or host name.
3389        """
3390        if isinstance(node, DispyNode):
3391            node = node.ip_addr
3392        elif isinstance(node, str):
3393            pass
3394        if not node:
3395            return -1
3396
3397        cwd = self._cluster.dest_path
3398        path = os.path.abspath(path)
3399        if path.startswith(cwd):
3400            dst = os.path.dirname(path[len(cwd):].lstrip(os.sep))
3401        else:
3402            dst = '.'
3403        xf = _XferFile(path, dst, self._compute.id)
3404        sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True,
3405                           keyfile=self._cluster.keyfile, certfile=self._cluster.certfile)
3406        sock.settimeout(MsgTimeout)
3407        try:
3408            sock.connect((self.scheduler_ip_addr, self.scheduler_port))
3409            sock.sendall(self._scheduler_auth)
3410            sock.send_msg(b'SENDFILE:' + serialize({'node': node, 'xf': xf}))
3411            recvd = sock.recv_msg()
3412            recvd = deserialize(recvd)
3413            sent = 0
3414            with open(xf.name, 'rb') as fd:
3415                while sent == recvd:
3416                    data = fd.read(1024000)
3417                    if not data:
3418                        break
3419                    sock.sendall(data)
3420                    sent += len(data)
3421                    recvd = sock.recv_msg()
3422                    recvd = deserialize(recvd)
3423            assert recvd == xf.stat_buf.st_size
3424        except Exception:
3425            return -1
3426        else:
3427            return 0
3428
3429
3430def recover_jobs(recover_file=None, timeout=None, terminate_pending=False):
3431    """
3432    If dispy client crashes or loses connection to nodes, the nodes
3433    will continue to execute scheduled jobs. This 'recover_jobs'
3434    function can be used to retrieve the results of those jobs
3435    (DispyJob objects).
3436
3437    @recover_file is path to file in which dispy stored information
3438      about cluster (see 'recover_file' in JobCluster above). If
3439      incorrect 'recover_file' is used, this function issues a warning
3440      and will block.
3441
3442    @timeout is time limit in seconds for recovery. This function will
3443      return all jobs that finish before 'timeout'. Any jobs still
3444      running or couldn't be recovered before timeout will be ignored.
3445
3446    @terminate_pending indicates if any jobs currently running should
3447      be terminated (so that, for example, node can be used for
3448      computations again right away instead of having to wait until
3449      all jobs finish).
3450
3451    Returns list of DispyJob instances that will have .result,
3452    .stdout, .stderr etc.; however, the nodes don't keep track of .id,
3453    .args, .kwargs so they will be None.
3454
3455    Once all the jobs that were scheduled at the time of crash are
3456    retrieved (if the jobs are still running, this function will block
3457    until all the jobs are finished and results obtained), nodes are
3458    closed (so they can serve new clients), 'recover_file' is removed
3459    and the jobs are returned.
3460    """
3461
3462    if not recover_file:
3463        import glob
3464        recover_file = sorted(glob.glob('_dispy_*'))
3465        if recover_file:
3466            recover_file = recover_file[-1]
3467        else:
3468            print('Could not find recover file of the form "_dispy_*"')
3469            return []
3470
3471    shelf_nodes = {}
3472    computes = {}
3473    cluster = None
3474    pycos_scheduler = pycos.Pycos.instance()
3475
3476    try:
3477        shelf = shelve.open(recover_file, flag='r')
3478    except Exception:
3479        print('Could not open recover file "%s"' % recover_file)
3480        return []
3481
3482    for key, val in shelf.items():
3483        if key.startswith('node_'):
3484            shelf_nodes[key[len('node_'):]] = val
3485        elif key.startswith('compute_'):
3486            computes[int(key[len('compute_'):])] = val
3487        elif key == '_cluster':
3488            cluster = val
3489        else:
3490            logger.warning('Invalid key "%s" ignored', key)
3491    shelf.close()
3492    if not cluster or not computes or not shelf_nodes:
3493        for ext in ('', '.db', '.bak', '.dat', '.dir'):
3494            if os.path.isfile(recover_file + ext):
3495                try:
3496                    os.remove(recover_file + ext)
3497                except Exception:
3498                    pass
3499        return []
3500
3501    nodes = {}
3502    for ip_addr, info in shelf_nodes.items():
3503        node = _Node(ip_addr, info['port'], 0, '', cluster['secret'], platform='',
3504                     keyfile=cluster['keyfile'], certfile=cluster['certfile'])
3505        node.auth = info['auth']
3506        if info.get('scheduler'):
3507            node.scheduler_ip_addr = ip_addr
3508        nodes[node.ip_addr] = node
3509
3510    def tcp_req(conn, addr, pending, task=None):
3511        # generator
3512        conn.settimeout(MsgTimeout)
3513        msg = yield conn.recv_msg()
3514        if msg.startswith(b'JOB_REPLY:'):
3515            try:
3516                reply = deserialize(msg[len(b'JOB_REPLY:'):])
3517            except Exception:
3518                logger.warning('Invalid job reply from %s:%s ignored', addr[0], addr[1])
3519                conn.close()
3520                return
3521            yield conn.send_msg(b'ACK')
3522            logger.debug('Received reply for job %s', reply.uid)
3523            job = DispyJob(None, (), {})
3524            job.result = deserialize(reply.result)
3525            job.stdout = reply.stdout
3526            job.stderr = reply.stderr
3527            job.exception = reply.exception
3528            job.start_time = reply.start_time
3529            job.end_time = reply.end_time
3530            job.status = reply.status
3531            job.ip_addr = reply.ip_addr
3532            job.finish.set()
3533            pending['jobs'].append(job)
3534            pending['count'] -= 1
3535            if pending['count'] == 0 and pending['resend_req_done'] is True:
3536                pending['complete'].set()
3537        else:
3538            logger.debug('Invalid TCP message from %s ignored', addr[0])
3539        conn.close()
3540
3541    def tcp_server(ip_addr, pending, task=None):
3542        task.set_daemon()
3543        addrinfo = host_addrinfo(host=ip_addr)
3544        sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_STREAM),
3545                           keyfile=cluster['keyfile'], certfile=cluster['certfile'])
3546        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
3547        sock.bind((ip_addr, cluster['port']))
3548        sock.listen(32)
3549
3550        while 1:
3551            if pending['timeout'] is not None:
3552                timeout = pending['timeout'] - (time.time() - pending['start_time'])
3553                if timeout <= 0:
3554                    pending['complete'].set()
3555                    timeout = 2
3556                sock.settimeout(timeout)
3557
3558            try:
3559                conn, addr = yield sock.accept()
3560            except ssl.SSLError as err:
3561                logger.debug('SSL connection failed: %s', str(err))
3562                continue
3563            except GeneratorExit:
3564                sock.close()
3565                break
3566            except socket.timeout:
3567                continue
3568            except Exception:
3569                continue
3570            else:
3571                Task(tcp_req, conn, addr, pending)
3572        return
3573
3574    def resend_requests(pending, task=None):
3575        for compute_id, compute in list(computes.items()):
3576            if pending['timeout'] is not None and \
3577               ((time.time() - pending['start_time']) > pending['timeout']):
3578                break
3579            req = {'compute_id': compute_id, 'auth': compute['auth']}
3580            for ip_addr in compute['nodes']:
3581                node = nodes.get(ip_addr, None)
3582                if not node:
3583                    continue
3584                try:
3585                    reply = yield node.send(b'RESEND_JOB_RESULTS:' + serialize(req))
3586                    reply = deserialize(reply)
3587                    assert isinstance(reply, int)
3588                except Exception:
3589                    logger.warning('Invalid resend reply from %s', ip_addr)
3590                    continue
3591                logger.debug('Pending jobs from %s for %s: %s',
3592                             node.ip_addr, compute['name'], reply)
3593                if reply == 0:
3594                    req['node_ip_addr'] = ip_addr
3595                    try:
3596                        yield node.send(b'CLOSE:' + serialize(req), reply=True, task=task)
3597                    except Exception:
3598                        pass
3599                else:
3600                    pending['count'] += reply
3601        pending['resend_req_done'] = True
3602        if pending['count'] == 0:
3603            pending['complete'].set()
3604
3605    pending = {'count': 0, 'resend_req_done': False, 'jobs': [], 'complete': threading.Event(),
3606               'timeout': timeout, 'start_time': time.time()}
3607    for ip_addr in cluster['ip_addrs']:
3608        if not ip_addr:
3609            ip_addr = ''
3610        Task(tcp_server, ip_addr, pending)
3611
3612    Task(resend_requests, pending)
3613
3614    pending['complete'].wait()
3615
3616    for compute_id, compute in computes.items():
3617        req = {'compute_id': compute_id, 'auth': compute['auth'],
3618               'terminate_pending': terminate_pending}
3619        for ip_addr in compute['nodes']:
3620            node = nodes.get(ip_addr, None)
3621            if not node:
3622                continue
3623            if node.scheduler_ip_addr:
3624                continue
3625            req['node_ip_addr'] = ip_addr
3626            Task(node.send, b'CLOSE:' + serialize(req), reply=True)
3627
3628    if terminate_pending:
3629        # wait a bit to get cancelled job results
3630        for x in range(10):
3631            if pending['count'] == 0 and pending['resend_req_done'] is True:
3632                break
3633            time.sleep(0.2)
3634
3635    pycos_scheduler.finish()
3636
3637    if pending['count'] == 0 and pending['resend_req_done'] is True:
3638        for ext in ('', '.db', '.bak', '.dat', '.dir'):
3639            if os.path.isfile(recover_file + ext):
3640                try:
3641                    os.remove(recover_file + ext)
3642                except Exception:
3643                    pass
3644    return pending['jobs']
3645
3646
3647if __name__ == '__main__':
3648    import argparse
3649
3650    logger.info('dispy version %s', _dispy_version)
3651
3652    parser = argparse.ArgumentParser()
3653    parser.add_argument('computation', help='program to distribute and parallelize')
3654    parser.add_argument('-c', action='store_false', dest='cleanup', default=True,
3655                        help='if True, nodes will remove any files transferred when '
3656                        'this computation is over')
3657    parser.add_argument('-d', '--debug', action='store_true', dest='loglevel', default=False,
3658                        help='if given, debug messages are printed')
3659    parser.add_argument('-a', action='append', dest='args', default=[],
3660                        help='argument(s) to program; repeat for multiple instances')
3661    parser.add_argument('-f', action='append', dest='depends', default=[],
3662                        help='dependencies (files) needed by program')
3663    parser.add_argument('-n', '--nodes', action='append', dest='nodes', default=[],
3664                        help='list of nodes (names or IP address) acceptable for this computation')
3665    parser.add_argument('--ip_addr', dest='ip_addr', default=None,
3666                        help='IP address of this client')
3667    parser.add_argument('--secret', dest='secret', default='',
3668                        help='authentication secret for handshake with nodes')
3669    parser.add_argument('--certfile', dest='certfile', default='',
3670                        help='file containing SSL certificate')
3671    parser.add_argument('--keyfile', dest='keyfile', default='',
3672                        help='file containing SSL key')
3673    parser.add_argument('--scheduler_node', dest='scheduler_node', default=None,
3674                        help='name or IP address where dispyscheduler is running to which '
3675                        'jobs are submitted')
3676
3677    config = vars(parser.parse_args(sys.argv[1:]))
3678
3679    if config['loglevel']:
3680        logger.setLevel(logger.DEBUG)
3681        pycos.logger.setLevel(logger.DEBUG)
3682    else:
3683        logger.setLevel(logger.INFO)
3684    del config['loglevel']
3685
3686    if config['certfile']:
3687        config['certfile'] = os.path.abspath(config['certfile'])
3688    else:
3689        config['certfile'] = None
3690    if config['keyfile']:
3691        config['keyfile'] = os.path.abspath(config['keyfile'])
3692    else:
3693        config['keyfile'] = None
3694
3695    args = config.pop('args')
3696
3697    if config['scheduler_node']:
3698        cluster = SharedJobCluster(**config)
3699    else:
3700        del config['scheduler_node']
3701        cluster = JobCluster(**config)
3702
3703    jobs = []
3704    for n, arg in enumerate(args, start=1):
3705        job = cluster.submit(*(arg.split()))
3706        job.id = n
3707        jobs.append((job, arg))
3708
3709    for job, args in jobs:
3710        job()
3711        sargs = ''.join(arg for arg in args)
3712        if job.exception:
3713            print('Job %s with arguments "%s" failed with "%s"' %
3714                  (job.id, sargs, job.exception))
3715            continue
3716        if job.result:
3717            print('Job %s with arguments "%s" exited with: "%s"' %
3718                  (job.id, sargs, str(job.result)))
3719        if job.stdout:
3720            print('Job %s with arguments "%s" produced output: "%s"' %
3721                  (job.id, sargs, job.stdout))
3722        if job.stderr:
3723            print('Job %s with argumens "%s" produced error messages: "%s"' %
3724                  (job.id, sargs, job.stderr))
3725
3726    cluster.print_status()
3727    exit(0)
3728