1#!/usr/bin/python3
2
3"""
4This file is part of pycos project. See https://pycos.sourceforge.io for details.
5
6This module provides API for creating distributed communicating
7processes. 'Computation' class should be used to package computation components
8(Python generator functions, Python functions, files, classes, modules) and then
9schedule runs that create remote tasks at remote server processes running
10'dispycosnode.py'.
11
12See 'dispycos_client*.py' files in 'examples' directory for various use cases.
13"""
14
15import os
16import sys
17import inspect
18import hashlib
19import collections
20import time
21import shutil
22import operator
23import functools
24import re
25import copy
26import stat
27
28import pycos
29import pycos.netpycos
30from pycos import Task, SysTask, logger
31
32__author__ = "Giridhar Pemmasani (pgiri@yahoo.com)"
33__copyright__ = "Copyright (c) 2014-2015 Giridhar Pemmasani"
34__license__ = "Apache 2.0"
35__url__ = "https://pycos.sourceforge.io"
36
37__all__ = ['Scheduler', 'Computation', 'DispycosStatus', 'DispycosTaskInfo',
38           'DispycosNodeInfo', 'DispycosNodeAvailInfo', 'DispycosNodeAllocate']
39
40MsgTimeout = pycos.MsgTimeout
41MinPulseInterval = 10
42MaxPulseInterval = 10 * MinPulseInterval
43
44# status about nodes / servers are sent with this structure
45DispycosStatus = collections.namedtuple('DispycosStatus', ['status', 'info'])
46DispycosTaskInfo = collections.namedtuple('DispycosTaskInfo', ['task', 'args', 'kwargs',
47                                                               'start_time'])
48DispycosNodeInfo = collections.namedtuple('DispycosNodeInfo', ['name', 'addr', 'cpus', 'platform',
49                                                               'avail_info'])
50logger.name = 'dispycos'
51# PyPI / pip packaging adjusts assertion below for Python 3.7+
52assert sys.version_info.major == 3 and sys.version_info.minor >= 7, \
53    ('"%s" is not suitable for Python version %s.%s; use file installed by pip instead' %
54     (__file__, sys.version_info.major, sys.version_info.minor))
55
56
57class DispycosNodeAvailInfo(object):
58    """Node availability status is indicated with this class.  'cpu' is
59    available CPU in percent in the range 0 to 100. 0 indicates node is busy
60    executing tasks on all CPUs and 100 indicates node is not busy at all.
61    """
62
63    def __init__(self, location, cpu, memory, disk, swap):
64        self.location = location
65        self.cpu = cpu
66        self.memory = memory
67        self.disk = disk
68        self.swap = swap
69
70
71class DispycosNodeAllocate(object):
72    """Allocation of nodes can be customized by specifying 'nodes' of Computation
73    with DispycosNodeAllocate instances.
74
75    'node' must be hostname or IP address (with possibly '*' to match rest of IP
76    address), 'port must be TCP port used by node (only if 'node' doesn't have
77    '*'), 'cpus', if given, must be number of servers running on that node if
78    positive, or number of CPUs not to use if negative, 'memory' is minimum
79    available memory in bytes, 'disk' is minimum available disk space (on the
80    partition where dispycosnode servers are running from), and 'platform' is
81    regular expression to match output of"platform.platform()" on that node,
82    e.g., "Linux.*x86_64" to accept only nodes that run 64-bit Linux.
83    """
84
85    def __init__(self, node, port=None, platform='', cpus=None, memory=None, disk=None):
86        if node.find('*') < 0:
87            self.ip_rex = pycos.Pycos.host_ipaddr(node)
88        else:
89            self.ip_rex = node
90
91        if self.ip_rex:
92            self.ip_rex = self.ip_rex.replace('.', '\\.').replace('*', '.*')
93        else:
94            logger.warning('node "%s" is invalid', node)
95            self.ip_rex = ''
96
97        self.port = port
98        self.platform = platform.lower()
99        self.cpus = cpus
100        self.memory = memory
101        self.disk = disk
102
103    def allocate(self, ip_addr, name, platform, cpus, memory, disk):
104        """When a node is found, scheduler calls this method with IP address, name,
105        CPUs, memory and disk available on that node. This method should return
106        a number indicating number of CPUs to use. If return value is 0, the
107        node is not used; if the return value is < 0, this allocation is ignored
108        (next allocation in the 'nodes' list, if any, is applied).
109        """
110        if (self.platform and not re.search(self.platform, platform)):
111            return -1
112        if ((self.memory and memory and self.memory > memory) or
113            (self.disk and disk and self.disk > disk)):
114            return 0
115        if not isinstance(self.cpus, int):
116            return cpus
117        if self.cpus == 0:
118            return 0
119        elif self.cpus > 0:
120            if self.cpus > cpus:
121                return 0
122            return self.cpus
123        else:
124            cpus += self.cpus
125            if cpus < 0:
126                return 0
127            return cpus
128
129    def __getstate__(self):
130        state = {}
131        for attr in ['ip_rex', 'port', 'platform', 'cpus', 'memory', 'disk']:
132            state[attr] = getattr(self, attr)
133        return state
134
135
136class Computation(object):
137    """Packages components to distribute to remote pycos schedulers to create
138    (remote) tasks.
139    """
140
141    def __init__(self, components, pulse_interval=(5*MinPulseInterval), node_allocations=[],
142                 nodes=[], status_task=None, node_setup=None, server_setup=None,
143                 disable_nodes=False, disable_servers=False, peers_communicate=False,
144                 zombie_period=None):
145        """'components' should be a list, each element of which is either a
146        module, a (generator or normal) function, path name of a file, a class
147        or an object (in which case the code for its class is sent).
148
149        'pulse_interval' is interval (number of seconds) used for heart beat
150        messages to check if client / scheduler / server is alive. If the other
151        side doesn't reply to 5 heart beat messages, it is treated as dead.
152        """
153
154        if pulse_interval < MinPulseInterval or pulse_interval > MaxPulseInterval:
155            raise Exception('"pulse_interval" must be at least %s and at most %s' %
156                            (MinPulseInterval, MaxPulseInterval))
157        if not isinstance(nodes, list):
158            raise Exception('"nodes" must be list of strings or DispycosNodeAllocate instances')
159        if node_allocations:
160            logger.warning('  WARNING: "node_allocations" is deprecated; use "nodes" instead')
161            if not isinstance(node_allocations, list):
162                raise Exception('"node_allocations" must be list of DispycosNodeAllocate instances')
163            nodes.extend(node_allocations)
164        if any(not isinstance(_, (DispycosNodeAllocate, str)) for _ in nodes):
165            raise Exception('"nodes" must be list of strings or DispycosNodeAllocate instances')
166        if status_task and not isinstance(status_task, Task):
167            raise Exception('status_task must be Task instance')
168        if node_setup and not inspect.isgeneratorfunction(node_setup):
169            raise Exception('"node_setup" must be a task (generator function)')
170        if server_setup and not inspect.isgeneratorfunction(server_setup):
171            raise Exception('"server_setup" must be a task (generator function)')
172        if (disable_nodes or disable_servers) and not status_task:
173            raise Exception('status_task must be given when nodes or servers are disabled')
174        if zombie_period:
175            if zombie_period < 5*pulse_interval:
176                raise Exception('"zombie_period" must be at least 5*pulse_interval')
177        elif zombie_period is None:
178            zombie_period = 10*pulse_interval
179
180        if not isinstance(components, list):
181            components = [components]
182
183        self._code = ''
184        self._xfer_funcs = set()
185        self._xfer_files = []
186        self._auth = None
187        self.scheduler = None
188        self._pulse_task = None
189        if zombie_period:
190            self._pulse_interval = min(pulse_interval, zombie_period / 3)
191        else:
192            self._pulse_interval = pulse_interval
193        self._zombie_period = zombie_period
194        if nodes:
195            self._node_allocations = [node if isinstance(node, DispycosNodeAllocate)
196                                      else DispycosNodeAllocate(node) for node in nodes]
197            self._node_allocations.append(DispycosNodeAllocate('*', cpus=0))
198        else:
199            self._node_allocations = [DispycosNodeAllocate('*')]
200        self.status_task = status_task
201        if node_setup:
202            components.append(node_setup)
203            self._node_setup = node_setup.__name__
204        else:
205            self._node_setup = None
206        if server_setup:
207            components.append(server_setup)
208            self._server_setup = server_setup.__name__
209        else:
210            self._server_setup = None
211        self._peers_communicate = bool(peers_communicate)
212        self._disable_nodes = bool(disable_nodes)
213        self._disable_servers = bool(disable_servers)
214
215        depends = set()
216        cwd = os.getcwd()
217        for dep in components:
218            if isinstance(dep, str) or inspect.ismodule(dep):
219                if inspect.ismodule(dep):
220                    name = dep.__file__
221                    if name.endswith('.pyc'):
222                        name = name[:-1]
223                    if not name.endswith('.py'):
224                        raise Exception('Invalid module "%s" - must be python source.' % dep)
225                    if name.startswith(cwd):
226                        dst = os.path.dirname(name[len(cwd):].lstrip(os.sep))
227                    elif dep.__package__:
228                        dst = dep.__package__.replace('.', os.sep)
229                    else:
230                        dst = os.path.dirname(dep.__name__.replace('.', os.sep))
231                else:
232                    name = os.path.abspath(dep)
233                    if name.startswith(cwd):
234                        dst = os.path.dirname(name[len(cwd):].lstrip(os.sep))
235                    else:
236                        dst = '.'
237                if name in depends:
238                    continue
239                try:
240                    with open(name, 'rb') as fd:
241                        pass
242                except Exception:
243                    raise Exception('File "%s" is not valid' % name)
244                self._xfer_files.append((name, dst, os.sep))
245                depends.add(name)
246            elif (inspect.isgeneratorfunction(dep) or inspect.isfunction(dep) or
247                  inspect.isclass(dep) or hasattr(dep, '__class__')):
248                if inspect.isgeneratorfunction(dep) or inspect.isfunction(dep):
249                    name = dep.__name__
250                elif inspect.isclass(dep):
251                    name = dep.__name__
252                elif hasattr(dep, '__class__') and inspect.isclass(dep.__class__):
253                    dep = dep.__class__
254                    name = dep.__name__
255                if name in depends:
256                    continue
257                depends.add(name)
258                self._xfer_funcs.add(name)
259                self._code += '\n' + inspect.getsource(dep).lstrip()
260            else:
261                raise Exception('Invalid computation: %s' % dep)
262        # check code can be compiled
263        compile(self._code, '<string>', 'exec')
264        # Under Windows dispycos server may send objects with '__mp_main__'
265        # scope, so make an alias to '__main__'.  Do so even if scheduler is not
266        # running on Windows; it is possible the client is not Windows, but a
267        # node is.
268        if os.name == 'nt' and '__mp_main__' not in sys.modules:
269            sys.modules['__mp_main__'] = sys.modules['__main__']
270
271    def schedule(self, location=None, timeout=None):
272        """Schedule computation for execution. Must be used with 'yield' as
273        'result = yield compute.schedule()'. If scheduler is executing other
274        computations, this will block until scheduler processes them
275        (computations are processed in the order submitted).
276        """
277
278        if self._auth is not None:
279            return(0)
280        self._auth = ''
281        if self.status_task is not None and not isinstance(self.status_task, Task):
282            return(-1)
283
284        self.scheduler = yield SysTask.locate('dispycos_scheduler', location=location,
285                                              timeout=MsgTimeout)
286        if not isinstance(self.scheduler, Task):
287            return(-1)
288
289        def _schedule(self, task=None):
290            self._pulse_task = SysTask(self._pulse_proc)
291            msg = {'req': 'schedule', 'computation': pycos.serialize(self), 'client': task}
292            self.scheduler.send(msg)
293            self._auth = yield task.receive(timeout=MsgTimeout)
294            if not isinstance(self._auth, str):
295                logger.debug('Could not send computation to scheduler %s: %s',
296                             self.scheduler, self._auth)
297                return(-1)
298            SysTask.scheduler().atexit(10, lambda: SysTask(self.close))
299            if task.location != self.scheduler.location:
300                for xf, dst, sep in self._xfer_files:
301                    drive, xf = os.path.splitdrive(xf)
302                    if xf.startswith(sep):
303                        xf = os.path.join(os.sep, *(xf.split(sep)))
304                    else:
305                        xf = os.path.join(*(xf.split(sep)))
306                    xf = drive + xf
307                    dst = os.path.join(self._auth, os.path.join(*(dst.split(sep))))
308                    if (yield pycos.Pycos.instance().send_file(
309                       self.scheduler.location, xf, dir=dst, timeout=MsgTimeout)) < 0:
310                        logger.warning('Could not send file "%s" to scheduler', xf)
311                        yield self.close()
312                        return(-1)
313            msg = {'req': 'await', 'auth': self._auth, 'client': task}
314            self.scheduler.send(msg)
315            resp = yield task.receive(timeout=timeout)
316            if (isinstance(resp, dict) and resp.get('auth') == self._auth and
317               resp.get('resp') == 'scheduled'):
318                return(0)
319            else:
320                yield self.close()
321                return(-1)
322
323        yield Task(_schedule, self).finish()
324
325    def nodes(self):
326        """Get list of addresses of nodes initialized for this computation. Must
327        be used with 'yield' as 'yield compute.nodes()'.
328        """
329
330        def _nodes(self, task=None):
331            msg = {'req': 'nodes', 'auth': self._auth, 'client': task}
332            if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
333                yield task.receive(MsgTimeout)
334            else:
335                return([])
336
337        yield Task(_nodes, self).finish()
338
339    def servers(self):
340        """Get list of Location instances of servers initialized for this
341        computation. Must be used with 'yield' as 'yield compute.servers()'.
342        """
343
344        def _servers(self, task=None):
345            msg = {'req': 'servers', 'auth': self._auth, 'client': task}
346            if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
347                yield task.receive(MsgTimeout)
348            else:
349                return([])
350
351        yield Task(_servers, self).finish()
352
353    def close(self, await_async=False, timeout=None):
354        """Close computation. Must be used with 'yield' as 'yield
355        compute.close()'.
356        """
357
358        def _close(self, done, task=None):
359            msg = {'req': 'close_computation', 'auth': self._auth, 'client': task,
360                   'await_async': bool(await_async)}
361            self.scheduler.send(msg)
362            msg = yield task.receive(timeout=timeout)
363            if msg != 'closed':
364                logger.warning('%s: closing computation failed?', self._auth)
365            self._auth = None
366            if self._pulse_task:
367                yield self._pulse_task.send('quit')
368                self._pulse_task = None
369            done.set()
370
371        if self._auth:
372            done = pycos.Event()
373            SysTask(_close, self, done)
374            yield done.wait()
375
376    def run_at(self, where, gen, *args, **kwargs):
377        """Must be used with 'yield' as
378
379        'rtask = yield computation.run_at(where, gen, ...)'
380
381        Run given generator function 'gen' with arguments 'args' and 'kwargs' at
382        remote server 'where'.  If the request is successful, 'rtask' will be a
383        (remote) task; check result with 'isinstance(rtask,
384        pycos.Task)'. The generator is expected to be (mostly) CPU bound and
385        until this is finished, another CPU bound task will not be
386        submitted at same server.
387
388        If 'where' is a string, it is assumed to be IP address of a node, in
389        which case the task is scheduled at that node on a server at that
390        node. If 'where' is a Location instance, it is assumed to be server
391        location in which case the task is scheduled at that server.
392
393        'gen' must be generator function, as it is used to run task at
394        remote location.
395        """
396        yield self._run_request('run_async', where, 1, gen, *args, **kwargs)
397
398    def run(self, gen, *args, **kwargs):
399        """Run CPU bound task at any remote server; see 'run_at'
400        above.
401        """
402        yield self._run_request('run_async', None, 1, gen, *args, **kwargs)
403
404    def run_result_at(self, where, gen, *args, **kwargs):
405        """Must be used with 'yield' as
406
407        'rtask = yield computation.run_result_at(where, gen, ...)'
408
409        Whereas 'run_at' and 'run' return remote task instance,
410        'run_result_at' and 'run_result' wait until remote task is
411        finished and return the result of that remote task (i.e., either
412        the value of 'StopIteration' or the last value 'yield'ed).
413
414        'where', 'gen', 'args', 'kwargs' are as explained in 'run_at'.
415        """
416        yield self._run_request('run_result', where, 1, gen, *args, **kwargs)
417
418    def run_result(self, gen, *args, **kwargs):
419        """Run CPU bound task at any remote server and return result of
420        that task; see 'run_result_at' above.
421        """
422        yield self._run_request('run_result', None, 1, gen, *args, **kwargs)
423
424    def run_async_at(self, where, gen, *args, **kwargs):
425        """Must be used with 'yield' as
426
427        'rtask = yield computation.run_async_at(where, gen, ...)'
428
429        Run given generator function 'gen' with arguments 'args' and 'kwargs' at
430        remote server 'where'.  If the request is successful, 'rtask' will be a
431        (remote) task; check result with 'isinstance(rtask,
432        pycos.Task)'. The generator is supposed to be (mostly) I/O bound and
433        not consume CPU time. Unlike other 'run' variants, tasks created
434        with 'async' are not "tracked" by scheduler (see online documentation for
435        more details).
436
437        If 'where' is a string, it is assumed to be IP address of a node, in
438        which case the task is scheduled at that node on a server at that
439        node. If 'where' is a Location instance, it is assumed to be server
440        location in which case the task is scheduled at that server.
441
442        'gen' must be generator function, as it is used to run task at
443        remote location.
444        """
445        yield self._run_request('run_async', where, 0, gen, *args, **kwargs)
446
447    def run_async(self, gen, *args, **kwargs):
448        """Run I/O bound task at any server; see 'run_async_at'
449        above.
450        """
451        yield self._run_request('run_async', None, 0, gen, *args, **kwargs)
452
453    def run_results(self, gen, iter):
454        """Must be used with 'yield', as for example,
455        'results = yield scheduler.map_results(generator, list_of_tuples)'.
456
457        Execute generator 'gen' with arguments from given iterable. The return
458        value is list of results that correspond to executing 'gen' with
459        arguments in iterable in the same order.
460        """
461        tasks = []
462        append_task = tasks.append
463        for params in iter:
464            if not isinstance(params, tuple):
465                if hasattr(params, '__iter__'):
466                    params = tuple(params)
467                else:
468                    params = (params,)
469            append_task(Task(self.run_result, gen, *params))
470        results = [None] * len(tasks)
471        for i, task in enumerate(tasks):
472            results[i] = yield task.finish()
473        return(results)
474
475    def enable_node(self, ip_addr, *setup_args):
476        """If computation disabled nodes (with 'disabled_nodes=True' when
477        Computation is constructed), nodes are not automatically used by the
478        scheduler until nodes are enabled with 'enable_node'.
479
480        'ip_addr' must be either IP address or host name of the node to be
481        enabled.
482
483        'setup_args' is arguments passed to 'node_setup' function specific to
484        that node. If 'node_setup' succeeds (i.e., finishes with value 0), the
485        node is used for computations.
486        """
487        if self.scheduler:
488            if isinstance(ip_addr, pycos.Location):
489                ip_addr = ip_addr.addr
490            self.scheduler.send({'req': 'enable_node', 'auth': self._auth, 'addr': ip_addr,
491                                 'setup_args': setup_args})
492
493    def enable_server(self, location, *setup_args):
494        """If computation disabled servers (with 'disabled_servers=True' when
495        Computation is constructed), servers are not automatically used by the
496        scheduler until they are enabled with 'enable_server'.
497
498        'location' must be Location instance of the server to be enabled.
499
500        'setup_args' is arguments passed to 'server_setup' function specific to
501        that server. If 'server_setup' succeeds (i.e., finishes with value 0), the
502        server is used for computations.
503        """
504        if self.scheduler:
505            self.scheduler.send({'req': 'enable_server', 'auth': self._auth, 'server': location,
506                                 'setup_args': setup_args})
507
508    def suspend_node(self, location):
509        """Suspend submitting jobs (tasks) at this node. Any currently running
510        tasks are left running.
511        """
512        if self.scheduler:
513            self.scheduler.send({'req': 'suspend_node', 'auth': self._auth, 'server': location})
514
515    def resume_node(self, location):
516        """Resume submitting jobs (tasks) at this node.
517        """
518        if self.scheduler:
519            self.scheduler.send({'req': 'enable_node', 'auth': self._auth, 'server': location})
520
521    def suspend_server(self, location):
522        """Suspend submitting jobs (tasks) at this server. Any currently running
523        tasks are left running.
524        """
525        if self.scheduler:
526            self.scheduler.send({'req': 'suspend_server', 'auth': self._auth, 'server': location})
527
528    def resume_server(self, location):
529        """Resume submitting jobs (tasks) at this server.
530        """
531        if self.scheduler:
532            self.scheduler.send({'req': 'enable_server', 'auth': self._auth, 'server': location})
533
534    def node_allocate(self, node_allocate):
535        """Request scheduler to add 'node_allocate' to any previously sent
536        'node_allocations'.
537        """
538        if not isinstance(node_allocate, DispycosNodeAllocate):
539            return -1
540        if not self._pulse_task:
541            return -1
542        if (node_allocate.__class__ != DispycosNodeAllocate and
543            self._pulse_task.location != self.scheduler.location):
544            node_allocate = copy.copy(node_allocate)
545            node_allocate.__class__ = DispycosNodeAllocate
546        return self.scheduler.send({'req': 'node_allocate', 'auth': self._auth,
547                                    'node': node_allocate})
548
549    def _run_request(self, request, where, cpu, gen, *args, **kwargs):
550        """Internal use only.
551        """
552        if isinstance(gen, str):
553            name = gen
554        else:
555            name = gen.__name__
556
557        if name in self._xfer_funcs:
558            code = None
559        else:
560            # if not inspect.isgeneratorfunction(gen):
561            #     logger.warning('"%s" is not a valid generator function', name)
562            #     return([])
563            code = inspect.getsource(gen).lstrip()
564
565        def _run_req(task=None):
566            msg = {'req': 'job', 'auth': self._auth,
567                   'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)}
568            if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
569                reply = yield task.receive()
570                if isinstance(reply, Task):
571                    if self.status_task:
572                        msg = DispycosTaskInfo(reply, args, kwargs, time.time())
573                        self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg))
574                if not request.endswith('async'):
575                    reply = yield task.receive()
576            else:
577                reply = None
578            return(reply)
579
580        yield Task(_run_req).finish()
581
582    def _pulse_proc(self, task=None):
583        """For internal use only.
584        """
585        task.set_daemon()
586        last_pulse = time.time()
587        timeout = 2 * self._pulse_interval
588        while 1:
589            msg = yield task.receive(timeout=timeout)
590
591            if msg == 'pulse':
592                last_pulse = time.time()
593
594            elif isinstance(msg, dict):
595                if msg.get('auth', None) != self._auth:
596                    continue
597                if msg.get('req', None) == 'allocate':
598                    reply = msg.get('reply', None)
599                    args = msg.get('args', ())
600                    if not isinstance(reply, Task) or not args:
601                        logger.warning('Ignoring allocate request: %s', type(reply))
602                        continue
603                    ip_addr = args[0]
604                    try:
605                        node_allocation = self._node_allocations[int(msg['alloc_id'])]
606                        assert re.match(node_allocation.ip_rex, ip_addr)
607                        cpus = node_allocation.allocate(*args)
608                    except Exception:
609                        cpus = 0
610                    reply.send({'auth': self._auth, 'req': 'allocate',
611                                'ip_addr': ip_addr, 'cpus': cpus})
612
613            elif msg == 'quit':
614                break
615
616            elif msg is None:
617                logger.warning('scheduler may have gone away!')
618                if (time.time() - last_pulse) > (10 * self._pulse_interval):
619                    # TODO: inform status and / or "close"?
620                    pass
621
622            else:
623                logger.debug('ignoring invalid pulse message')
624
625        self._pulse_task = None
626
627    def __getstate__(self):
628        state = {}
629        for attr in ['_code', '_xfer_funcs', '_xfer_files', '_auth',  'scheduler', 'status_task',
630                     '_pulse_interval', '_pulse_task', '_node_setup', '_server_setup',
631                     '_disable_nodes', '_disable_servers', '_peers_communicate', '_zombie_period']:
632            state[attr] = getattr(self, attr)
633        if self._pulse_task.location == self.scheduler.location:
634            node_allocations = self._node_allocations
635        else:
636            node_allocations = []
637            for i in range(len(self._node_allocations)):
638                obj = self._node_allocations[i]
639                if obj.__class__ != DispycosNodeAllocate:
640                    ip_rex = obj.ip_rex
641                    obj = DispycosNodeAllocate('*', port=obj.port)
642                    obj.ip_rex = ip_rex
643                    obj.cpus = str(i)
644                node_allocations.append(obj)
645        state['_node_allocations'] = node_allocations
646        return state
647
648    def __setstate__(self, state):
649        for attr, value in state.items():
650            setattr(self, attr, value)
651
652
653class _DispycosJob_(object):
654    """Internal use only.
655    """
656    __slots__ = ('request', 'client', 'name', 'where', 'cpu', 'code', 'args', 'kwargs', 'done')
657
658    def __init__(self, request, client, name, where, cpu, code, args=None, kwargs=None):
659        self.request = request
660        self.client = client
661        self.name = name
662        self.where = where
663        self.cpu = cpu
664        self.code = code
665        self.args = pycos.serialize(args)
666        self.kwargs = pycos.serialize(kwargs)
667        self.done = None
668
669
670class Scheduler(object, metaclass=pycos.Singleton):
671
672    # status indications ('status' attribute of DispycosStatus)
673    NodeDiscovered = 1
674    NodeInitialized = 2
675    NodeClosed = 3
676    NodeIgnore = 4
677    NodeDisconnected = 5
678    NodeAbandoned = 6
679    NodeSuspended = 7
680    NodeResumed = 8
681
682    ServerDiscovered = 11
683    ServerInitialized = 12
684    ServerClosed = 13
685    ServerIgnore = 14
686    ServerDisconnected = 15
687    ServerAbandoned = 16
688    ServerSuspended = 17
689    ServerResumed = 18
690
691    TaskCreated = 20
692    TaskAbandoned = 21
693    ComputationScheduled = 23
694    ComputationClosed = 25
695
696    """This class is for use by Computation class (see below) only.  Other than
697    the status indications above, none of its attributes are to be accessed
698    directly.
699    """
700
701    class _Node(object):
702
703        def __init__(self, name, addr):
704            self.name = name
705            self.addr = addr
706            self.cpus_used = 0
707            self.cpus = 0
708            self.platform = None
709            self.avail_info = None
710            self.servers = {}
711            self.disabled_servers = {}
712            self.load = 0.0
713            self.status = Scheduler.NodeClosed
714            self.task = None
715            self.last_pulse = time.time()
716            self.lock = pycos.Lock()
717            self.cpu_avail = pycos.Event()
718            self.cpu_avail.clear()
719
720    class _Server(object):
721
722        def __init__(self, name, location, scheduler):
723            self.name = name
724            self.task = None
725            self.status = Scheduler.ServerClosed
726            self.rtasks = {}
727            self.xfer_files = []
728            self.askew_results = {}
729            self.cpu_avail = pycos.Event()
730            self.cpu_avail.clear()
731            self.scheduler = scheduler
732
733        def run(self, job, computation, node):
734            def _run(self, task=None):
735                self.task.send({'req': 'run', 'auth': computation._auth, 'job': job, 'client': task})
736                rtask = yield task.receive(timeout=MsgTimeout)
737                # currently fault-tolerancy is not supported, so clear job's
738                # args to save space
739                job.args = job.kwargs = None
740                if isinstance(rtask, Task):
741                    # TODO: keep func too for fault-tolerance
742                    job.done = pycos.Event()
743                    self.rtasks[rtask] = (rtask, job)
744                    if self.askew_results:
745                        msg = self.askew_results.pop(rtask, None)
746                        if msg:
747                            self.scheduler.__status_task.send(msg)
748                else:
749                    logger.debug('failed to create rtask: %s', rtask)
750                    if job.cpu:
751                        self.cpu_avail.set()
752                        if self.status == Scheduler.ServerInitialized:
753                            node.cpu_avail.set()
754                            self.scheduler._cpu_nodes.add(node)
755                            self.scheduler._cpus_avail.set()
756                            node.cpus_used -= 1
757                            node.load = float(node.cpus_used) / len(node.servers)
758                return(rtask)
759
760            rtask = yield SysTask(_run, self).finish()
761            job.client.send(rtask)
762
763    def __init__(self, **kwargs):
764        self._nodes = {}
765        self._disabled_nodes = {}
766        self._cpu_nodes = set()
767        self._cpus_avail = pycos.Event()
768        self._cpus_avail.clear()
769        self._remote = False
770
771        self._cur_computation = None
772        self.__cur_client_auth = None
773        self.__cur_node_allocations = []
774        self.__pulse_interval = kwargs.pop('pulse_interval', MaxPulseInterval)
775        self.__ping_interval = kwargs.pop('ping_interval', 0)
776        self.__zombie_period = kwargs.pop('zombie_period', 100 * MaxPulseInterval)
777        self._node_port = kwargs.pop('dispycosnode_port', 9706)
778        self.__server_locations = set()
779
780        kwargs['name'] = 'dispycos_scheduler'
781        clean = kwargs.pop('clean', False)
782        nodes = kwargs.pop('nodes', [])
783        relay_nodes = kwargs.pop('relay_nodes', False)
784        self.pycos = pycos.Pycos.instance(**kwargs)
785        self.__dest_path = os.path.join(self.pycos.dest_path, 'dispycos', 'scheduler')
786        if clean:
787            shutil.rmtree(self.__dest_path)
788        self.pycos.dest_path = self.__dest_path
789        os.chmod(self.__dest_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
790
791        self.__computation_sched_event = pycos.Event()
792        self.__computation_scheduler_task = SysTask(self.__computation_scheduler_proc)
793        self.__status_task = SysTask(self.__status_proc)
794        self.__timer_task = SysTask(self.__timer_proc)
795        self.__client_task = SysTask(self.__client_proc)
796        self.__client_task.register('dispycos_scheduler')
797        for node in nodes:
798            if not isinstance(node, pycos.Location):
799                node = pycos.Location(node, self._node_port)
800            Task(self.pycos.peer, node, relay=relay_nodes)
801
802    def status(self):
803        pending_cpu = sum(node.cpus_used for node in self._nodes.values())
804        pending = sum(len(server.rtasks) for node in self._nodes.values()
805                      for server in node.servers.values())
806        servers = functools.reduce(operator.add, [list(node.servers.keys())
807                                                  for node in self._nodes.values()], [])
808        return {'Client': self._cur_computation._pulse_task.location if self._cur_computation else '',
809                'Pending': pending, 'PendingCPU': pending_cpu,
810                'Nodes': list(self._nodes.keys()), 'Servers': servers
811                }
812
813    def print_status(self):
814        status = self.status()
815        print('')
816        print('  Client: %s' % status['Client'])
817        print('  Pending: %s' % status['Pending'])
818        print('  Pending CPU: %s' % status['PendingCPU'])
819        print('  nodes: %s' % len(status['Nodes']))
820        print('  servers: %s' % len(status['Servers']))
821
822    def __status_proc(self, task=None):
823        task.set_daemon()
824        task.register('dispycos_status')
825        self.pycos.peer_status(self.__status_task)
826        while 1:
827            msg = yield task.receive()
828            now = time.time()
829            if isinstance(msg, pycos.MonitorException):
830                rtask = msg.args[0]
831                if not isinstance(rtask, Task):
832                    logger.warning('ignoring invalid rtask %s', type(rtask))
833                    continue
834                node = self._nodes.get(rtask.location.addr, None)
835                if not node:
836                    node = self._disabled_nodes.get(rtask.location.addr, None)
837                    if not node:
838                        logger.warning('node %s is invalid', rtask.location.addr)
839                        continue
840                server = node.servers.get(rtask.location, None)
841                if not server:
842                    server = node.disabled_servers.get(rtask.location, None)
843                    if not server:
844                        logger.warning('server "%s" is invalid', rtask.location)
845                        continue
846                node.last_pulse = now
847                info = server.rtasks.pop(rtask, None)
848                if not info:
849                    # Due to 'yield' used to create rtask, scheduler may not
850                    # have updated self._rtasks before the task's
851                    # MonitorException is received, so put it in
852                    # 'askew_results'. The scheduling task will resend it
853                    # when it receives rtask
854                    server.askew_results[rtask] = msg
855                    continue
856                # assert isinstance(info[1], _DispycosJob_)
857                job = info[1]
858                if job.cpu:
859                    server.cpu_avail.set()
860                    if server.status == Scheduler.ServerInitialized:
861                        node.cpu_avail.set()
862                        self._cpu_nodes.add(node)
863                        self._cpus_avail.set()
864                        node.cpus_used -= 1
865                        node.load = float(node.cpus_used) / len(node.servers)
866                if job.request.endswith('async'):
867                    if job.done:
868                        job.done.set()
869                else:
870                    job.client.send(msg.args[1][1])
871                if self._cur_computation and self._cur_computation.status_task:
872                    if len(msg.args) > 2:
873                        msg.args = (msg.args[0], msg.args[1])
874                    self._cur_computation.status_task.send(msg)
875
876            elif isinstance(msg, pycos.PeerStatus):
877                if msg.status == pycos.PeerStatus.Online:
878                    if msg.name.endswith('_server-0'):
879                        SysTask(self.__discover_node, msg)
880                else:
881                    # msg.status == pycos.PeerStatus.Offline
882                    node = server = None
883                    node = self._nodes.get(msg.location.addr, None)
884                    if not node:
885                        node = self._disabled_nodes.get(msg.location.addr, None)
886                    if node:
887                        server = node.servers.pop(msg.location, None)
888                        if server:
889                            if node.servers:
890                                node.disabled_servers[msg.location] = server
891                        else:
892                            server = node.disabled_servers.get(msg.location, None)
893
894                        if server:
895                            server.status = Scheduler.ServerDisconnected
896                            SysTask(self.__close_server, server, node)
897                        elif node.task and node.task.location == msg.location:
898                            # TODO: inform scheduler / client
899                            if not self._nodes.pop(node.addr, None):
900                                self._disabled_nodes.get(node.addr, None)
901                            node.status = Scheduler.NodeDisconnected
902                            SysTask(self.__close_node, node)
903
904                    if ((not server and not node) and self._remote and self._cur_computation and
905                        self._cur_computation._pulse_task.location == msg.location):
906                        logger.warning('Client %s terminated; closing computation %s',
907                                       msg.location, self.__cur_client_auth)
908                        SysTask(self.__close_computation)
909
910            elif isinstance(msg, dict):  # message from a node's server
911                status = msg.get('status', None)
912                if status == 'pulse':
913                    location = msg.get('location', None)
914                    if not isinstance(location, pycos.Location):
915                        continue
916                    node = self._nodes.get(location.addr, None)
917                    if node:
918                        node.last_pulse = now
919                        node_status = msg.get('node_status', None)
920                        if (node_status and self._cur_computation and
921                           self._cur_computation.status_task):
922                            self._cur_computation.status_task.send(node_status)
923
924                elif status == Scheduler.ServerDiscovered:
925                    rtask = msg.get('task', None)
926                    if not isinstance(rtask, pycos.Task):
927                        continue
928                    if (not self._cur_computation or
929                        self._cur_computation._auth != msg.get('auth', None)):
930                        continue
931                    node = self._nodes.get(rtask.location.addr, None)
932                    if not node:
933                        node = self._disabled_nodes.get(rtask.location.addr, None)
934                    if not node or (node.status != Scheduler.NodeInitialized and
935                                    node.status != Scheduler.NodeDiscovered and
936                                    node.status != Scheduler.NodeSuspended):
937                        if node:
938                            logger.warning('Node at %s is not initialized for server %s: %s',
939                                           node.addr, rtask.location, node.status)
940                        else:
941                            logger.warning('Node is not valid for server %s', rtask.location)
942                        continue
943                    if (not self._cur_computation or
944                        self._cur_computation._auth != msg.get('auth', None)):
945                        logger.warning('Status %s for server %s is ignored', status, rtask.location)
946                        continue
947                    server = node.servers.get(rtask.location, None)
948                    if server:
949                        continue
950                    server = Scheduler._Server(msg.get('name', None), rtask.location, self)
951                    server.task = rtask
952                    server.status = Scheduler.ServerDiscovered
953                    node.disabled_servers[rtask.location] = server
954                    if self._cur_computation and self._cur_computation.status_task:
955                        info = DispycosStatus(server.status, server.task.location)
956                        self._cur_computation.status_task.send(info)
957
958                elif status == Scheduler.ServerInitialized:
959                    rtask = msg.get('task', None)
960                    if not isinstance(rtask, pycos.Task):
961                        continue
962                    if (not self._cur_computation or
963                        self._cur_computation._auth != msg.get('auth', None)):
964                        continue
965                    node = self._nodes.get(rtask.location.addr, None)
966                    if not node:
967                        node = self._disabled_nodes.get(rtask.location.addr, None)
968                    if not node or (node.status != Scheduler.NodeInitialized and
969                                    node.status != Scheduler.NodeDiscovered and
970                                    node.status != Scheduler.NodeSuspended):
971                        if node:
972                            logger.warning('Node at %s is not initialized for server %s: %s',
973                                           node.addr, rtask.location, node.status)
974                        else:
975                            logger.warning('Node is not valid for server %s', rtask.location)
976                        continue
977                    if (not self._cur_computation or
978                        self._cur_computation._auth != msg.get('auth', None)):
979                        logger.warning('Status %s for server %s is ignored', status, rtask.location)
980                        continue
981                    server = node.disabled_servers.pop(rtask.location, None)
982                    if server:
983                        if (server.status != Scheduler.ServerDiscovered and
984                            server.status != Scheduler.ServerSuspended):
985                            continue
986                    else:
987                        server = Scheduler._Server(msg.get('name', None), rtask.location, self)
988                        server.task = rtask
989
990                    node.last_pulse = now
991                    server.status = Scheduler.ServerInitialized
992                    if node.status == Scheduler.NodeInitialized:
993                        if not node.servers:
994                            if self._cur_computation and self._cur_computation.status_task:
995                                info = DispycosNodeInfo(node.name, node.addr, node.cpus,
996                                                        node.platform, node.avail_info)
997                                info = DispycosStatus(node.status, info)
998                                self._cur_computation.status_task.send(info)
999                            self._disabled_nodes.pop(rtask.location.addr, None)
1000                            self._nodes[rtask.location.addr] = node
1001                        node.servers[rtask.location] = server
1002                        server.cpu_avail.set()
1003                        self._cpu_nodes.add(node)
1004                        self._cpus_avail.set()
1005                        node.cpu_avail.set()
1006                        node.load = float(node.cpus_used) / len(node.servers)
1007                        if self._cur_computation and self._cur_computation.status_task:
1008                            self._cur_computation.status_task.send(
1009                                DispycosStatus(server.status, server.task.location))
1010                    else:
1011                        node.disabled_servers[rtask.location] = server
1012
1013                    if self._cur_computation._peers_communicate:
1014                        server.task.send({'req': 'peers', 'auth': self._cur_computation._auth,
1015                                          'peers': list(self.__server_locations)})
1016                        self.__server_locations.add(server.task.location)
1017
1018                elif status in (Scheduler.ServerClosed, Scheduler.ServerDisconnected):
1019                    location = msg.get('location', None)
1020                    if not isinstance(location, pycos.Location):
1021                        continue
1022                    node = self._nodes.get(location.addr, None)
1023                    if not node:
1024                        node = self._disabled_nodes.get(location.addr, None)
1025                        if not node:
1026                            continue
1027                    if (not self._cur_computation or
1028                        self._cur_computation._auth != msg.get('auth', None)):
1029                        logger.warning('Status %s for server %s is ignored', status, rtask.location)
1030                        continue
1031                    server = node.servers.pop(location, None)
1032                    if not server:
1033                        server = node.disabled_servers.get(location, None)
1034                        if not server:
1035                            continue
1036                        if status == Scheduler.ServerDisconnected:
1037                            server.status = status
1038                            SysTask(self.__close_server, server, node)
1039                    server.status = Scheduler.ServerClosed
1040                    node.disabled_servers[location] = server
1041                    if not node.servers:
1042                        node.status = Scheduler.NodeClosed
1043                        self._nodes.pop(node.addr, None)
1044                        self._disabled_nodes[location.addr] = node
1045                        self._cpu_nodes.discard(node)
1046                        if not self._cpu_nodes:
1047                            self._cpus_avail.clear()
1048                    if self._cur_computation:
1049                        if self._cur_computation._peers_communicate:
1050                            self.__server_locations.discard(server.task.location)
1051                            # TODO: inform other servers
1052                            # if not node.servers:
1053                            #     info = DispycosNodeInfo(node.name, node.addr, node.cpus,
1054                            #                            node.platform, node.avail_info)
1055                            #     self._cur_computation.status_task.send(
1056                            #         Scheduler.NodeClosed, DispycosStatus(node.status, info))
1057
1058                elif status == Scheduler.NodeClosed:
1059                    location = msg.get('location', None)
1060                    if not isinstance(location, pycos.Location):
1061                        continue
1062                    node = self._nodes.pop(location.addr, None)
1063                    if not node:
1064                        node = self._disabled_nodes.get(location.addr, None)
1065                        if not node:
1066                            continue
1067                    if (not self._cur_computation or
1068                        self._cur_computation._auth != msg.get('auth', None)):
1069                        logger.warning('Status %s for node %s is ignored', status, location)
1070                        continue
1071                    node.status = Scheduler.NodeDisconnected
1072                    SysTask(self.__close_node, node)
1073
1074                else:
1075                    logger.warning('Ignoring invalid status message: %s', status)
1076            else:
1077                logger.warning('invalid status message ignored')
1078
1079    def __node_allocate(self, node, task=None):
1080        if not task:
1081            task = pycos.Pycos.cur_task()
1082        for node_allocate in self.__cur_node_allocations:
1083            if not re.match(node_allocate.ip_rex, node.addr):
1084                continue
1085            if self._remote and isinstance(node_allocate.cpus, str):
1086                req = {'req': 'allocate', 'auth': self.__cur_client_auth,
1087                       'alloc_id': node_allocate.cpus, 'reply': task,
1088                       'args': (node.addr, node.name, node.platform, node.cpus,
1089                                node.avail_info.memory, node.avail_info.disk)}
1090                self._cur_computation._pulse_task.send(req)
1091                reply = yield task.recv(timeout=MsgTimeout)
1092                if (isinstance(reply, dict) and reply.get('auth', None) == self.__cur_client_auth and
1093                    reply.get('req', None) == 'allocate' and reply.get('ip_addr', '') == node.addr):
1094                    cpus = reply.get('cpus', 0)
1095                else:
1096                    cpus = 0
1097            else:
1098                cpus = node_allocate.allocate(node.addr, node.name, node.platform, node.cpus,
1099                                              node.avail_info.memory, node.avail_info.disk)
1100            if cpus < 0:
1101                continue
1102            return(min(cpus, node.cpus))
1103        return(node.cpus)
1104
1105    def __get_node_info(self, node, task=None):
1106        assert node.addr in self._disabled_nodes
1107        node.task.send({'req': 'dispycos_node_info', 'client': task})
1108        node_info = yield task.receive(timeout=MsgTimeout)
1109        if not node_info:
1110            node.status = Scheduler.NodeIgnore
1111            return
1112        node.name = node_info.name
1113        node.cpus = node_info.cpus
1114        node.platform = node_info.platform.lower()
1115        node.avail_info = node_info.avail_info
1116        if self._cur_computation:
1117            yield self.__init_node(node, task=task)
1118
1119    def __init_node(self, node, setup_args=(), task=None):
1120        computation = self._cur_computation
1121        if not computation or not node.task:
1122            return(-1)
1123        # this task may be invoked in two different paths (when a node is
1124        # found right after computation is already scheduled, and when
1125        # computation is scheduled right after a node is found). To prevent
1126        # concurrent execution (that may reserve / initialize same node more
1127        # than once), lock is used
1128        yield node.lock.acquire()
1129        # assert node.addr in self._disabled_nodes
1130        if node.status not in (Scheduler.NodeDiscovered, Scheduler.NodeClosed):
1131            logger.warning('Ignoring node initialization for %s: %s', node.addr, node.status)
1132            node.lock.release()
1133            return(0)
1134
1135        if node.status == Scheduler.NodeClosed:
1136            cpus = yield self.__node_allocate(node, task=task)
1137            if not cpus:
1138                node.status = Scheduler.NodeIgnore
1139                node.lock.release()
1140                return(0)
1141
1142            node.task.send({'req': 'reserve', 'cpus': cpus, 'auth': computation._auth,
1143                            'status_task': self.__status_task, 'client': task,
1144                            'computation_location': computation._pulse_task.location})
1145            cpus = yield task.receive(timeout=MsgTimeout)
1146            if not isinstance(cpus, int) or cpus <= 0:
1147                logger.debug('Reserving %s failed', node.addr)
1148                self._disabled_nodes.pop(node.addr, None)
1149                # node.status = Scheduler.NodeDiscoverd
1150                node.lock.release()
1151                yield pycos.Pycos.instance().close_peer(node.task.location)
1152                return(-1)
1153            if computation != self._cur_computation:
1154                node.status = Scheduler.NodeClosed
1155                node.task.send({'req': 'release', 'auth': computation._auth, 'client': None})
1156                node.lock.release()
1157                return(-1)
1158
1159            node.status = Scheduler.NodeDiscovered
1160            node.cpus = cpus
1161            if self._cur_computation and self._cur_computation.status_task:
1162                info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform,
1163                                        node.avail_info)
1164                self._cur_computation.status_task.send(DispycosStatus(node.status, info))
1165
1166            if self._cur_computation._disable_nodes:
1167                node.lock.release()
1168                return(0)
1169        else:
1170            assert node.addr in self._disabled_nodes
1171
1172        for name, dst, sep in computation._xfer_files:
1173            reply = yield self.pycos.send_file(node.task.location, name, dir=dst, timeout=MsgTimeout,
1174                                               overwrite=True)
1175            if reply < 0 or computation != self._cur_computation:
1176                logger.debug('Failed to transfer file %s: %s', name, reply)
1177                node.status = Scheduler.NodeClosed
1178                node.task.send({'req': 'release', 'auth': computation._auth, 'client': None})
1179                node.lock.release()
1180                return(-1)
1181
1182        node.task.send({'req': 'computation', 'computation': computation, 'auth': computation._auth,
1183                        'setup_args': setup_args, 'client': task})
1184        cpus = yield task.receive(timeout=MsgTimeout)
1185        if not cpus or computation != self._cur_computation:
1186            node.status = Scheduler.NodeClosed
1187            node.task.send({'req': 'release', 'auth': computation._auth, 'client': None})
1188            node.lock.release()
1189            return(-1)
1190
1191        node.cpus = cpus
1192        node.status = Scheduler.NodeInitialized
1193        node.lock.release()
1194        servers = [server for server in node.disabled_servers.values()
1195                   if server.status == Scheduler.ServerInitialized]
1196        if servers:
1197            if computation.status_task:
1198                info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform,
1199                                        node.avail_info)
1200                computation.status_task.send(DispycosStatus(node.status, info))
1201            self._disabled_nodes.pop(node.addr, None)
1202            self._nodes[node.addr] = node
1203            node.cpu_avail.set()
1204            self._cpu_nodes.add(node)
1205            self._cpus_avail.set()
1206            for server in servers:
1207                node.disabled_servers.pop(server.task.location)
1208                node.servers[server.task.location] = server
1209                server.cpu_avail.set()
1210                if computation.status_task:
1211                    computation.status_task.send(DispycosStatus(server.status,
1212                                                                server.task.location))
1213
1214    def __discover_node(self, peer_status, task=None):
1215        for _ in range(10):
1216            node_task = yield Task.locate('dispycos_node', location=peer_status.location,
1217                                          timeout=MsgTimeout)
1218            if not isinstance(node_task, Task):
1219                yield task.sleep(0.1)
1220                continue
1221            node = self._nodes.pop(peer_status.location.addr, None)
1222            if not node:
1223                node = self._disabled_nodes.pop(peer_status.location.addr, None)
1224            if node:
1225                logger.warning('Rediscovered dispycosnode at %s; discarding previous incarnation!',
1226                               peer_status.location.addr)
1227                node.status = Scheduler.NodeDisconnected
1228                yield SysTask(self.__close_node, node).finish()
1229            node = Scheduler._Node(peer_status.name, peer_status.location.addr)
1230            self._disabled_nodes[peer_status.location.addr] = node
1231            node.task = node_task
1232            yield self.__get_node_info(node, task=task)
1233            return
1234
1235    def __timer_proc(self, task=None):
1236        task.set_daemon()
1237        node_check = client_pulse = last_ping = time.time()
1238        while 1:
1239            try:
1240                yield task.sleep(self.__pulse_interval)
1241            except GeneratorExit:
1242                break
1243            now = time.time()
1244            if self.__cur_client_auth:
1245                if (yield self._cur_computation._pulse_task.deliver('pulse')) == 1:
1246                    client_pulse = now
1247                if self.__zombie_period:
1248                    if ((now - client_pulse) > self.__zombie_period):
1249                        logger.warning('Closing zombie computation %s', self.__cur_client_auth)
1250                        SysTask(self.__close_computation)
1251
1252                    if (now - node_check) > self.__zombie_period:
1253                        node_check = now
1254                        for node in self._nodes.values():
1255                            if (node.status != Scheduler.NodeInitialized and
1256                                node.status != Scheduler.NodeDiscovered and
1257                                node.status != Scheduler.NodeSuspended):
1258                                continue
1259                            if (now - node.last_pulse) > self.__zombie_period:
1260                                logger.warning('dispycos node %s is zombie!', node.addr)
1261                                self._nodes.pop(node.addr, None)
1262                                self._disabled_nodes[node.addr] = node
1263                                # TODO: assuming servers are zombies as well
1264                                node.status = Scheduler.NodeDisconnected
1265                                SysTask(self.__close_node, node)
1266
1267                        if not self._cur_computation._disable_nodes:
1268                            for node in self._disabled_nodes.values():
1269                                if node.task and node.status == Scheduler.NodeDiscovered:
1270                                    SysTask(self.__init_node, node)
1271
1272            if self.__ping_interval and ((now - last_ping) > self.__ping_interval):
1273                last_ping = now
1274                if not self.pycos.ignore_peers:
1275                    self.pycos.discover_peers(port=self._node_port)
1276
1277    def __computation_scheduler_proc(self, task=None):
1278        task.set_daemon()
1279        while 1:
1280            if self._cur_computation:
1281                self.__computation_sched_event.clear()
1282                yield self.__computation_sched_event.wait()
1283                continue
1284
1285            self._cur_computation, client = yield task.receive()
1286            self.__pulse_interval = self._cur_computation._pulse_interval
1287            if not self._remote:
1288                self.__zombie_period = self._cur_computation._zombie_period
1289
1290            self.__cur_client_auth = self._cur_computation._auth
1291            self._cur_computation._auth = hashlib.sha1(os.urandom(20)).hexdigest()
1292            self.__cur_node_allocations = self._cur_computation._node_allocations
1293            self._cur_computation._node_allocations = []
1294
1295            self._disabled_nodes.update(self._nodes)
1296            self._nodes.clear()
1297            self._cpu_nodes.clear()
1298            self._cpus_avail.clear()
1299            for node in self._disabled_nodes.values():
1300                node.status = Scheduler.NodeClosed
1301                node.disabled_servers.clear()
1302                node.servers.clear()
1303                node.cpu_avail.clear()
1304            logger.debug('Computation %s / %s scheduled', self.__cur_client_auth,
1305                         self._cur_computation._auth)
1306            msg = {'resp': 'scheduled', 'auth': self.__cur_client_auth}
1307            if (yield client.deliver(msg, timeout=MsgTimeout)) != 1:
1308                logger.warning('client not reachable?')
1309                self._cur_client_auth = None
1310                self._cur_computation = None
1311                continue
1312            for node in self.__cur_node_allocations:
1313                if node.ip_rex.find('*') >= 0:
1314                    continue
1315                loc = pycos.Location(node.ip_rex.replace('\\.', '.'),
1316                                     node.port if node.port else self._node_port)
1317                SysTask(self.pycos.peer, loc)
1318            for node in self._disabled_nodes.values():
1319                SysTask(self.__get_node_info, node)
1320            if not self.pycos.ignore_peers:
1321                self.pycos.discover_peers(port=self._node_port)
1322            self.__timer_task.resume()
1323        self.__computation_scheduler_task = None
1324
1325    def __submit_job(self, msg, task=None):
1326        task.set_daemon()
1327        job = msg['job']
1328        auth = msg.get('auth', None)
1329        if (not isinstance(job, _DispycosJob_) or not isinstance(job.client, Task)):
1330            logger.warning('Ignoring invalid client job request: %s' % type(job))
1331            return
1332        cpu = job.cpu
1333        where = job.where
1334        if not where:
1335            while 1:
1336                node = None
1337                load = None
1338                if cpu:
1339                    for host in self._cpu_nodes:
1340                        if host.cpu_avail.is_set() and (load is None or host.load < load):
1341                            node = host
1342                            load = host.load
1343                else:
1344                    for host in self._nodes.values():
1345                        if load is None or host.load < load:
1346                            node = host
1347                            load = host.load
1348                if not node:
1349                    self._cpus_avail.clear()
1350                    yield self._cpus_avail.wait()
1351                    if self.__cur_client_auth != auth:
1352                        return
1353                    continue
1354                server = None
1355                load = None
1356                for proc in node.servers.values():
1357                    if cpu:
1358                        if proc.cpu_avail.is_set() and (load is None or len(proc.rtasks) < load):
1359                            server = proc
1360                            load = len(proc.rtasks)
1361                    elif (load is None or len(proc.rtasks) < load):
1362                        server = proc
1363                        load = len(proc.rtasks)
1364                if server:
1365                    break
1366                else:
1367                    self._cpus_avail.clear()
1368                    yield self._cpus_avail.wait()
1369                    if self.__cur_client_auth != auth:
1370                        return
1371                    continue
1372
1373            if cpu:
1374                server.cpu_avail.clear()
1375                node.cpus_used += 1
1376                node.load = float(node.cpus_used) / len(node.servers)
1377                if node.cpus_used == len(node.servers):
1378                    node.cpu_avail.clear()
1379                    self._cpu_nodes.discard(node)
1380                    if not self._cpu_nodes:
1381                        self._cpus_avail.clear()
1382            yield server.run(job, self._cur_computation, node)
1383
1384        elif isinstance(where, str):
1385            node = self._nodes.get(where, None)
1386            if not node:
1387                job.client.send(None)
1388                return
1389            while 1:
1390                server = None
1391                load = None
1392                for proc in node.servers.values():
1393                    if cpu:
1394                        if proc.cpu_avail.is_set() and (load is None or len(proc.rtasks) < load):
1395                            server = proc
1396                            load = len(proc.rtasks)
1397                    elif (load is None or len(proc.rtasks) < load):
1398                        server = proc
1399                        load = len(proc.rtasks)
1400
1401                if server:
1402                    break
1403                else:
1404                    yield node.cpu_avail.wait()
1405                    if self.__cur_client_auth != auth:
1406                        return
1407                    continue
1408
1409            if cpu:
1410                server.cpu_avail.clear()
1411                node.cpus_used += 1
1412                node.load = float(node.cpus_used) / len(node.servers)
1413                if node.cpus_used >= len(node.servers):
1414                    node.cpu_avail.clear()
1415                    self._cpu_nodes.discard(node)
1416                    if not self._cpu_nodes:
1417                        self._cpus_avail.clear()
1418            yield server.run(job, self._cur_computation, node)
1419
1420        elif isinstance(where, pycos.Location):
1421            node = self._nodes.get(where.addr)
1422            if not node:
1423                job.client.send(None)
1424                return
1425            server = node.servers.get(where)
1426            if not server:
1427                job.client.send(None)
1428                return
1429            if cpu:
1430                while (not node.cpu_avail.is_set() or not server.cpu_avail.is_set()):
1431                    yield server.cpu_avail.wait()
1432                    if self.__cur_client_auth != auth:
1433                        return
1434                server.cpu_avail.clear()
1435                node.cpus_used += 1
1436                node.load = float(node.cpus_used) / len(node.servers)
1437                if node.cpus_used >= len(node.servers):
1438                    node.cpu_avail.clear()
1439                    self._cpu_nodes.discard(node)
1440                    if not self._cpu_nodes:
1441                        self._cpus_avail.clear()
1442            yield server.run(job, self._cur_computation, node)
1443
1444        else:
1445            job.client.send(None)
1446
1447    def __client_proc(self, task=None):
1448        task.set_daemon()
1449        computations = {}
1450
1451        while 1:
1452            msg = yield task.receive()
1453            if not isinstance(msg, dict):
1454                continue
1455            req = msg.get('req', None)
1456            auth = msg.get('auth', None)
1457            if self.__cur_client_auth != auth:
1458                if req == 'schedule' or req == 'await':
1459                    pass
1460                else:
1461                    continue
1462
1463            if req == 'job':
1464                SysTask(self.__submit_job, msg)
1465                continue
1466
1467            client = msg.get('client', None)
1468            if not isinstance(client, pycos.Task):
1469                client = None
1470
1471            if req == 'enable_server':
1472                loc = msg.get('server', None)
1473                if not isinstance(loc, pycos.Location):
1474                    continue
1475                node = self._nodes.get(loc.addr, None)
1476                if not node:
1477                    node = self._disabled_nodes.get(loc.addr, None)
1478                if not node or node.status not in (Scheduler.NodeInitialized,
1479                                                   Scheduler.NodeSuspended):
1480                    continue
1481                server = node.disabled_servers.get(loc, None)
1482                if not server or server.status not in (Scheduler.ServerDiscovered,
1483                                                       Scheduler.ServerSuspended):
1484                    continue
1485                if server.status == Scheduler.ServerDiscovered:
1486                    args = msg.get('setup_args', ())
1487                    server.task.send({'req': 'enable_server', 'setup_args': args,
1488                                      'auth': self._cur_computation._auth})
1489                elif server.status == Scheduler.ServerSuspended:
1490                    node.disabled_servers.pop(loc)
1491                    node.servers[loc] = server
1492                    server.status = Scheduler.ServerInitialized
1493                    server.cpu_avail.set()
1494                    if len(node.servers) == 1:
1495                        node.cpu_avail.set()
1496                        self._cpu_nodes.add(node)
1497                        self._cpus_avail.set()
1498                    if self._cur_computation.status_task:
1499                        info = DispycosStatus(Scheduler.ServerResumed, loc)
1500                        self._cur_computation.status_task.send(info)
1501                    if node.status == Scheduler.NodeSuspended:
1502                        self._disabled_nodes.pop(node.addr, None)
1503                        self._nodes[node.addr] = node
1504                        node.status = Scheduler.NodeInitialized
1505                        if self._cur_computation.status_task:
1506                            info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform,
1507                                                    node.avail_info)
1508                            info = DispycosStatus(Scheduler.NodeResumed, info)
1509                            self._cur_computation.status_task.send(info)
1510
1511            elif req == 'enable_node':
1512                addr = msg.get('addr', None)
1513                if not addr:
1514                    continue
1515                node = self._disabled_nodes.get(addr, None)
1516                if not node or node.status not in (Scheduler.NodeDiscovered,
1517                                                   Scheduler.NodeSuspended):
1518                    continue
1519                if node.status == Scheduler.NodeDiscovered:
1520                    setup_args = msg.get('setup_args', ())
1521                    SysTask(self.__init_node, node, setup_args=setup_args)
1522                elif node.status == Scheduler.NodeSuspended:
1523                    if node.servers:
1524                        node.status = Scheduler.NodeInitialized
1525                        self._disabled_nodes.pop(addr, None)
1526                        self._nodes[node.addr] = node
1527                        node.cpu_avail.set()
1528                        self._cpu_nodes.add(node)
1529                        self._cpus_avail.set()
1530                        if self._cur_computation.status_task:
1531                            info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform,
1532                                                    node.avail_info)
1533                            info = DispycosStatus(Scheduler.NodeResumed, info)
1534                            self._cur_computation.status_task.send(info)
1535
1536            elif req == 'suspend_server':
1537                loc = msg.get('server', None)
1538                if not isinstance(loc, pycos.Location):
1539                    continue
1540                node = self._nodes.get(loc.addr, None)
1541                if not node:
1542                    continue
1543                server = node.servers.pop(loc, None)
1544                if not server:
1545                    continue
1546                if server.status not in (Scheduler.ServerInitialized, Scheduler.ServerDiscovered):
1547                    node.servers[loc] = server
1548                    continue
1549                node.disabled_servers[loc] = server
1550                if server.status == Scheduler.ServerInitialized:
1551                    server.status = Scheduler.ServerSuspended
1552                server.cpu_avail.clear()
1553                if self._cur_computation.status_task:
1554                    info = DispycosStatus(server.status, loc)
1555                    self._cur_computation.status_task.send(info)
1556                if not node.servers:
1557                    self._nodes.pop(node.addr)
1558                    self._disabled_nodes[node.addr] = node
1559                    node.status = Scheduler.NodeSuspended
1560                    node.cpu_avail.clear()
1561                    self._cpu_nodes.discard(node)
1562                    if not self._cpu_nodes:
1563                        self._cpus_avail.clear()
1564                    if self._cur_computation.status_task:
1565                        info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform,
1566                                                node.avail_info)
1567                        info = DispycosStatus(node.status, info)
1568                        self._cur_computation.status_task.send(info)
1569
1570            elif req == 'suspend_node':
1571                addr = msg.get('addr', None)
1572                if not addr:
1573                    continue
1574                node = self._nodes.pop(addr, None)
1575                if not node:
1576                    continue
1577                if node.status not in (Scheduler.NodeInitialized, Scheduler.NodeDiscovered):
1578                    self._nodes[addr] = node
1579                    continue
1580                self._disabled_nodes[node.addr] = node
1581                if node.status == Scheduler.NodeInitialized:
1582                    node.status = Scheduler.NodeSuspended
1583                node.cpu_avail.clear()
1584                self._cpu_nodes.discard(node)
1585                if not self._cpu_nodes:
1586                    self._cpus_avail.clear()
1587                if self._cur_computation.status_task:
1588                    info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform,
1589                                            node.avail_info)
1590                    info = DispycosStatus(node.status, info)
1591                    self._cur_computation.status_task.send(info)
1592
1593            elif req == 'node_allocate':
1594                node = req.get('node', None)
1595                if not isinstance(node, DispycosNodeAllocate):
1596                    continue
1597                self.__cur_node_allocations = [node] + [na for na in self.__cur_node_allocations
1598                                                        if na.ip_rex != node.ip_rex]
1599                if node.ip_rex.find('*') >= 0:
1600                    continue
1601                loc = pycos.Location(node.ip_rex.replace('\\.', '.'),
1602                                     node.port if node.port else self._node_port)
1603                SysTask(self.pycos.peer, loc)
1604
1605            elif req == 'nodes':
1606                if isinstance(client, Task):
1607                    nodes = [node.addr for node in self._nodes.values()
1608                             if node.status == Scheduler.NodeInitialized]
1609                    client.send(nodes)
1610
1611            elif req == 'servers':
1612                if isinstance(client, Task):
1613                    servers = [server.task.location for node in self._nodes.values()
1614                               if node.status == Scheduler.NodeInitialized
1615                               for server in node.servers.values()
1616                               # if server.status == Scheduler.ServerInitialized
1617                               ]
1618                    client.send(servers)
1619
1620            elif req == 'schedule':
1621                if not isinstance(client, Task):
1622                    logger.warning('Ignoring invalid client request "%s"', req)
1623                    continue
1624                try:
1625                    computation = pycos.deserialize(msg['computation'])
1626                    assert isinstance(computation, Computation) or \
1627                        computation.__class__.__name__ == 'Computation'
1628                    assert isinstance(computation._pulse_task, Task)
1629                    if computation._pulse_task.location == self.pycos.location:
1630                        computation._pulse_task._id = int(computation._pulse_task._id)
1631                        if computation.status_task:
1632                            computation.status_task._id = int(computation.status_task._id)
1633                    assert isinstance(computation._pulse_interval, (float, int))
1634                    assert (MinPulseInterval <= computation._pulse_interval <= MaxPulseInterval)
1635                except Exception:
1636                    logger.warning('ignoring invalid computation request')
1637                    client.send(None)
1638                    continue
1639                while 1:
1640                    computation._auth = hashlib.sha1(os.urandom(20)).hexdigest()
1641                    if not os.path.exists(os.path.join(self.__dest_path, computation._auth)):
1642                        break
1643                try:
1644                    os.mkdir(os.path.join(self.__dest_path, computation._auth))
1645                except Exception:
1646                    logger.debug('Could not create "%s"',
1647                                 os.path.join(self.__dest_path, computation._auth))
1648                    client.send(None)
1649                    continue
1650                # TODO: save it on disk instead
1651                computations[computation._auth] = computation
1652                client.send(computation._auth)
1653
1654            elif req == 'await':
1655                if not isinstance(client, Task):
1656                    logger.warning('Ignoring invalid client request "%s"', req)
1657                    continue
1658                computation = computations.pop(auth, None)
1659                if not computation:
1660                    client.send(None)
1661                    continue
1662                if computation._pulse_task.location.addr != self.pycos.location.addr:
1663                    computation._xfer_files = [(os.path.join(self.__dest_path, computation._auth,
1664                                                             os.path.join(*(dst.split(sep))),
1665                                                             xf.split(sep)[-1]),
1666                                                os.path.join(*(dst.split(sep))), os.sep)
1667                                               for xf, dst, sep in computation._xfer_files]
1668                for xf, dst, sep in computation._xfer_files:
1669                    if not os.path.isfile(xf):
1670                        logger.warning('File "%s" for computation %s is not valid',
1671                                       xf, computation._auth)
1672                        computation = None
1673                        break
1674                if computation is None:
1675                    client.send(None)
1676                else:
1677                    self.__computation_scheduler_task.send((computation, client))
1678                    self.__computation_sched_event.set()
1679
1680            elif req == 'close_computation':
1681                if not isinstance(client, Task):
1682                    logger.warning('Ignoring invalid client request "%s"', req)
1683                    continue
1684                SysTask(self.__close_computation, client=client,
1685                        await_async=msg.get('await_async', False))
1686
1687            else:
1688                logger.warning('Ignoring invalid client request "%s"', req)
1689
1690    def __close_node(self, node, await_async=False, task=None):
1691        if not node.task:
1692            logger.debug('Closing node %s ignored: %s', node.addr, node.status)
1693            return(-1)
1694
1695        node.cpu_avail.clear()
1696        self._cpu_nodes.discard(node)
1697        if not self._cpu_nodes:
1698            self._cpus_avail.clear()
1699        self._nodes.pop(node.addr, None)
1700        self._disabled_nodes[node.addr] = node
1701        node.disabled_servers.update(node.servers)
1702        node.servers.clear()
1703        computation = self._cur_computation
1704        status_info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform,
1705                                       node.avail_info)
1706        if node.status == Scheduler.NodeDisconnected:
1707            # TODO: safe to assume servers are disconnected as well?
1708            for server in node.disabled_servers.values():
1709                if server.task:
1710                    server.status = Scheduler.ServerDisconnected
1711            if (computation and computation.status_task):
1712                computation.status_task.send(DispycosStatus(Scheduler.NodeAbandoned, status_info))
1713
1714        close_tasks = [SysTask(self.__close_server, server, node, await_async=await_async)
1715                       for server in node.disabled_servers.values() if server.task]
1716        for close_task in close_tasks:
1717            yield close_task.finish()
1718        node_task = node.task
1719        if not node_task:
1720            return(0)
1721        if node.status == Scheduler.NodeDisconnected:
1722            self._disabled_nodes.pop(node.addr, None)
1723        if node_task and node.status != Scheduler.NodeDisconnected:
1724            node_task.send({'req': 'release', 'auth': computation._auth})
1725
1726    def __close_server(self, server, node, await_async=False, task=None):
1727        if not server.task:
1728            return(-1)
1729        computation = self._cur_computation
1730        if computation:
1731            status_task = computation.status_task
1732        else:
1733            status_task = None
1734
1735        if node.servers.pop(server.task.location, None):
1736            node.disabled_servers[server.task.location] = server
1737
1738        if server.status == Scheduler.ServerDisconnected:
1739            if server.rtasks:
1740                for _ in range(10):
1741                    yield task.sleep(0.1)
1742                    if not server.rtasks:
1743                        break
1744            if server.rtasks:
1745                logger.warning('%s tasks abandoned at %s', len(server.rtasks), server.task.location)
1746                if status_task:
1747                    status_task.send(DispycosStatus(Scheduler.ServerAbandoned,
1748                                                    server.task.location))
1749                for rtask, job in server.rtasks.values():
1750                    status = pycos.MonitorException(rtask, (Scheduler.TaskAbandoned, None))
1751                    status_task.send(status)
1752                    if job.request.endswith('async'):
1753                        if job.done:
1754                            job.done.set()
1755                    else:
1756                        job.client.send(None)
1757        else:
1758            if (server.status == Scheduler.ServerInitialized or
1759                server.status == Scheduler.ServerSuspended):
1760                server.status = Scheduler.ServerClosed
1761                if not server.cpu_avail.is_set():
1762                    logger.debug('Waiting for remote tasks at %s to finish', server.task.location)
1763                    yield server.cpu_avail.wait()
1764                if await_async:
1765                    while server.rtasks:
1766                        rtask, job = server.rtasks[next(iter(server.rtasks))]
1767                        logger.debug('Remote task %s has not finished yet', rtask)
1768                        if job.done:
1769                            yield job.done.wait()
1770            if server.task:
1771                server.task.send({'req': 'close', 'auth': computation._auth, 'client': task})
1772                yield task.receive(timeout=MsgTimeout)
1773        if server.rtasks:  # wait a bit for server to terminate tasks
1774            for _ in range(10):
1775                yield task.sleep(0.1)
1776                if not server.rtasks:
1777                    break
1778        if server.rtasks:
1779            logger.warning('%s tasks running at %s', len(server.rtasks), server.task.location)
1780            for rtask, job in server.rtasks.values():
1781                if job.request.endswith('async'):
1782                    if job.done:
1783                        job.done.set()
1784                else:
1785                    job.client.send(None)
1786                if status_task:
1787                    status_task.send(pycos.MonitorException(rtask, (Scheduler.ServerClosed, None)))
1788            server.rtasks.clear()
1789
1790        server_task, server.task = server.task, None
1791        if not server_task:
1792            return(0)
1793        node.disabled_servers.pop(server_task.location, None)
1794        server.xfer_files = []
1795        server.askew_results.clear()
1796        server.status = Scheduler.ServerClosed
1797        if status_task:
1798            status_task.send(DispycosStatus(server.status, server_task.location))
1799
1800        if not server.cpu_avail.is_set():
1801            server.cpu_avail.set()
1802            node.cpus_used -= 1
1803            if node.cpus_used == len(node.servers):
1804                self._cpu_nodes.discard(node)
1805                if not self._cpu_nodes:
1806                    self._cpus_avail.clear()
1807                node.cpu_avail.clear()
1808            if node.servers:
1809                node.load = float(node.cpus_used) / len(node.servers)
1810            else:
1811                node.load = 0.0
1812
1813        if self.__server_locations:
1814            self.__server_locations.discard(server_task.location)
1815            # TODO: inform other servers
1816
1817        return(0)
1818
1819    def __close_computation(self, client=None, await_async=False, task=None):
1820        self.__server_locations.clear()
1821        if self._cur_computation:
1822            close_tasks = [SysTask(self.__close_node, node, await_async=await_async)
1823                           for node in self._nodes.values()]
1824            close_tasks.extend([SysTask(self.__close_node, node)
1825                                for node in self._disabled_nodes.values()])
1826            for close_task in close_tasks:
1827                yield close_task.finish()
1828        if self.__cur_client_auth:
1829            computation_path = os.path.join(self.__dest_path, self.__cur_client_auth)
1830            if os.path.isdir(computation_path):
1831                shutil.rmtree(computation_path, ignore_errors=True)
1832        if self._cur_computation and self._cur_computation.status_task:
1833            self._cur_computation.status_task.send(DispycosStatus(Scheduler.ComputationClosed,
1834                                                                  id(self._cur_computation)))
1835        self.__cur_client_auth = self._cur_computation = None
1836        self.__computation_sched_event.set()
1837        if client:
1838            client.send('closed')
1839        return(0)
1840
1841    def close(self, task=None):
1842        """Close current computation and quit scheduler.
1843
1844        Must be called with 'yield' as 'yield scheduler.close()' or as
1845        task.
1846        """
1847        yield self.__close_computation(task=task)
1848        return(0)
1849
1850
1851if __name__ == '__main__':
1852    """The scheduler can be started either within a client program (if no other
1853    client programs use the nodes simultaneously), or can be run on a node with
1854    the options described below (usually no options are necessary, so the
1855    scheduler can be strated with just 'dispycos.py')
1856    """
1857
1858    import logging
1859    import argparse
1860    import signal
1861    try:
1862        import readline
1863    except ImportError:
1864        pass
1865
1866    import pycos.dispycos
1867    setattr(sys.modules['pycos.dispycos'], '_DispycosJob_', _DispycosJob_)
1868
1869    parser = argparse.ArgumentParser()
1870    parser.add_argument('-i', '--ip_addr', dest='node', action='append', default=[],
1871                        help='IP address or host name of this node')
1872    parser.add_argument('--ext_ip_addr', dest='ext_ip_addr', action='append', default=[],
1873                        help='External IP address to use (needed in case of NAT firewall/gateway)')
1874    parser.add_argument('-u', '--udp_port', dest='udp_port', type=int, default=9705,
1875                        help='UDP port number to use')
1876    parser.add_argument('-t', '--tcp_port', dest='tcp_port', type=int, default=9705,
1877                        help='TCP port number to use')
1878    parser.add_argument('--ipv4_udp_multicast', dest='ipv4_udp_multicast', action='store_true',
1879                        default=False, help='use multicast for IPv4 UDP instead of broadcast')
1880    parser.add_argument('-n', '--name', dest='name', default=None,
1881                        help='(symbolic) name given to schduler')
1882    parser.add_argument('--dest_path', dest='dest_path', default=None,
1883                        help='path prefix to where files sent by peers are stored')
1884    parser.add_argument('--max_file_size', dest='max_file_size', default=None, type=int,
1885                        help='maximum file size of any file transferred')
1886    parser.add_argument('-s', '--secret', dest='secret', default='',
1887                        help='authentication secret for handshake with peers')
1888    parser.add_argument('--certfile', dest='certfile', default='',
1889                        help='file containing SSL certificate')
1890    parser.add_argument('--keyfile', dest='keyfile', default='',
1891                        help='file containing SSL key')
1892    parser.add_argument('--node', action='append', dest='nodes', default=[],
1893                        help='additional remote nodes (names or IP address) to use')
1894    parser.add_argument('--relay_nodes', action='store_true', dest='relay_nodes', default=False,
1895                        help='request each node to relay scheduler info on its network')
1896    parser.add_argument('--pulse_interval', dest='pulse_interval', type=float,
1897                        default=MaxPulseInterval,
1898                        help='interval in seconds to send "pulse" messages to check nodes '
1899                        'and client are connected')
1900    parser.add_argument('--ping_interval', dest='ping_interval', type=float, default=0,
1901                        help='interval in seconds to broadcast "ping" message to discover nodes')
1902    parser.add_argument('--zombie_period', dest='zombie_period', type=int,
1903                        default=(100 * MaxPulseInterval),
1904                        help='maximum time in seconds computation is idle')
1905    parser.add_argument('-d', '--debug', action='store_true', dest='loglevel', default=False,
1906                        help='if given, debug messages are printed')
1907    parser.add_argument('--clean', action='store_true', dest='clean', default=False,
1908                        help='if given, files copied from or generated by clients will be removed')
1909    parser.add_argument('--dispycosnode_port', dest='dispycosnode_port', type=int, default=9706,
1910                        help='UDP port number used by dispycosnode')
1911    parser.add_argument('--daemon', action='store_true', dest='daemon', default=False,
1912                        help='if given, input is not read from terminal')
1913    config = vars(parser.parse_args(sys.argv[1:]))
1914    del parser
1915
1916    if config['zombie_period'] and config['zombie_period'] < MaxPulseInterval:
1917        raise Exception('zombie_period must be >= %s' % MaxPulseInterval)
1918
1919    if not config['name']:
1920        config['name'] = 'dispycos_scheduler'
1921
1922    if config['loglevel']:
1923        logger.setLevel(logging.DEBUG)
1924    else:
1925        logger.setLevel(logging.INFO)
1926    del config['loglevel']
1927
1928    if config['certfile']:
1929        config['certfile'] = os.path.abspath(config['certfile'])
1930    else:
1931        config['certfile'] = None
1932    if config['keyfile']:
1933        config['keyfile'] = os.path.abspath(config['keyfile'])
1934    else:
1935        config['keyfile'] = None
1936
1937    daemon = config.pop('daemon', False)
1938
1939    _dispycos_scheduler = Scheduler(**config)
1940    _dispycos_scheduler._remote = True
1941    del config
1942
1943    def sighandler(signum, frame):
1944        # Task(_dispycos_scheduler.close).value()
1945        raise KeyboardInterrupt
1946
1947    try:
1948        signal.signal(signal.SIGHUP, sighandler)
1949        signal.signal(signal.SIGQUIT, sighandler)
1950    except Exception:
1951        pass
1952    signal.signal(signal.SIGINT, sighandler)
1953    signal.signal(signal.SIGABRT, sighandler)
1954    signal.signal(signal.SIGTERM, sighandler)
1955    del sighandler
1956
1957    if not daemon:
1958        try:
1959            if os.getpgrp() != os.tcgetpgrp(sys.stdin.fileno()):
1960                daemon = True
1961        except Exception:
1962            pass
1963
1964    if daemon:
1965        del daemon
1966        while 1:
1967            try:
1968                time.sleep(3600)
1969            except (Exception, KeyboardInterrupt):
1970                break
1971    else:
1972        del daemon
1973        while 1:
1974            try:
1975                _dispycos_cmd = input(
1976                    '\n\nEnter "quit" or "exit" to terminate dispycos scheduler\n'
1977                    '      "status" to show status of scheduler: '
1978                    )
1979            except KeyboardInterrupt:
1980                break
1981            _dispycos_cmd = _dispycos_cmd.strip().lower()
1982            if _dispycos_cmd in ('quit', 'exit'):
1983                break
1984            if _dispycos_cmd == 'status':
1985                _dispycos_scheduler.print_status()
1986
1987    logger.info('terminating dispycos scheduler')
1988    try:
1989        Task(_dispycos_scheduler.close).value()
1990    except KeyboardInterrupt:
1991        pass
1992