1#!/usr/bin/env python3
2
3
4"""
5dispyscheduler: Schedule jobs to nodes running 'dispynode'; needed
6when multiple processes may use same nodes simultaneously with
7SharedJobCluster; see accompanying 'dispy' for more details.
8"""
9
10import os
11import sys
12import time
13import socket
14import stat
15import re
16import ssl
17import atexit
18import traceback
19import tempfile
20import shutil
21import glob
22import pickle
23import hashlib
24import struct
25try:
26    import netifaces
27except ImportError:
28    netifaces = None
29
30# 'httpd' module may not be available at sys.path[0] as 'dispy.py' is
31# installed in same directory as this script is; prepend directory
32# where httpd.py module is installed to sys.path.
33for path in sys.path:
34    if os.path.isfile(os.path.join(path, 'dispy', 'httpd.py')):
35        sys.path.insert(0, path)
36        break
37del path
38
39import pycos
40from pycos import Task, Pycos, AsyncSocket, Singleton, serialize, deserialize
41import dispy
42import dispy.httpd
43from dispy import _Compute, DispyJob, _DispyJob_, _Function, _Node, DispyNode, NodeAllocate, \
44    _JobReply, auth_code, num_min, _parse_node_allocs, _XferFile, _dispy_version, \
45    _same_file, MsgTimeout, _node_ipaddr
46
47__author__ = "Giridhar Pemmasani (pgiri@yahoo.com)"
48__email__ = "pgiri@yahoo.com"
49__copyright__ = "Copyright 2011, Giridhar Pemmasani"
50__contributors__ = []
51__maintainer__ = "Giridhar Pemmasani (pgiri@yahoo.com)"
52__license__ = "Apache 2.0"
53__url__ = "http://dispy.sourceforge.net"
54__status__ = "Production"
55__version__ = _dispy_version
56__all__ = []
57
58MaxFileSize = 0
59# PyPI / pip packaging adjusts assertion below for Python 3.7+
60assert sys.version_info.major == 3 and sys.version_info.minor < 7, \
61    ('"%s" is not suitable for Python version %s.%s; use file installed by pip instead' %
62     (__file__, sys.version_info.major, sys.version_info.minor))
63
64
65class _Cluster(object):
66    """Internal use only.
67    """
68    def __init__(self, compute, node_allocs, scheduler):
69        self._compute = compute
70        # self.name = compute.name
71        self.name = '%s @ %s' % (compute.name, compute.scheduler_ip_addr)
72        self._node_allocs = _parse_node_allocs(node_allocs)
73        self._node_allocs = sorted(self._node_allocs,
74                                   key=lambda node_alloc: node_alloc.ip_rex, reverse=True)
75        self.scheduler = scheduler
76        self.status_callback = None
77        self.pending_jobs = 0
78        self.pending_results = 0
79        self._jobs = []
80        self._dispy_nodes = {}
81        self.cpu_time = 0
82        self.start_time = time.time()
83        self.end_time = None
84        self.job_sched_time = 0
85        self.zombie = False
86        self.exclusive = False
87        self.last_pulse = time.time()
88        self.client_ip_addr = None
89        self.client_port = None
90        self.client_sock_family = None
91        self.client_job_result_port = None
92        self.client_auth = None
93        self.ip_addr = None
94        self.dest_path = None
95        self.file_uses = {}
96
97    def __getstate__(self):
98        state = dict(self.__dict__)
99        for var in ('_node_allocs', 'scheduler', 'status_callback', '_jobs', '_dispy_nodes'):
100            state.pop(var, None)
101        return state
102
103    def node_jobs(self, node, from_node=False, task=None):
104        jobs = Task(self.scheduler.node_jobs, self, node, from_node, get_uids=False).value()
105        return jobs
106
107    def cancel(self, job):
108        return self.scheduler.cancel_job(self, job.id)
109
110    def allocate_node(self, node_alloc):
111        if not isinstance(node_alloc, list):
112            node_alloc = [node_alloc]
113        node_allocs = _parse_node_allocs(node_alloc)
114        Task(self.scheduler.allocate_node, self, node_allocs)
115
116    def set_node_cpus(self, node, cpus):
117        return Task(self.scheduler.set_node_cpus, node, cpus).value()
118
119
120class _Scheduler(object, metaclass=Singleton):
121    """Internal use only.
122
123    See dispy's JobCluster and SharedJobCluster for documentation.
124    """
125
126    def __init__(self, nodes=[], ip_addrs=[], ext_ip_addrs=[], port=None, node_port=None,
127                 scheduler_port=None, ipv4_udp_multicast=False, scheduler_alg=None,
128                 pulse_interval=None, ping_interval=None, cooperative=False, cleanup_nodes=False,
129                 node_secret='', cluster_secret='', node_keyfile=None, node_certfile=None,
130                 cluster_keyfile=None, cluster_certfile=None, dest_path_prefix=None, clean=False,
131                 zombie_interval=60, http_server=False):
132        self.ipv4_udp_multicast = bool(ipv4_udp_multicast)
133        self.addrinfos = {}
134        if not ip_addrs:
135            ip_addrs = [None]
136        for i in range(len(ip_addrs)):
137            ip_addr = ip_addrs[i]
138            if i < len(ext_ip_addrs):
139                ext_ip_addr = ext_ip_addrs[i]
140            else:
141                ext_ip_addr = None
142            addrinfo = dispy.host_addrinfo(host=ip_addr, ipv4_multicast=self.ipv4_udp_multicast)
143            if not addrinfo:
144                logger.warning('Ignoring invalid ip_addr %s', ip_addr)
145                continue
146            if ext_ip_addr:
147                ext_ip_addr = dispy._node_ipaddr(ext_ip_addr)
148                if not ext_ip_addr:
149                    logger.warning('Ignoring invalid ext_ip_addr %s', ext_ip_addrs[i])
150            if not ext_ip_addr:
151                ext_ip_addr = addrinfo.ip
152            addrinfo.ext_ip_addr = ext_ip_addr
153            self.addrinfos[addrinfo.ext_ip_addr] = addrinfo
154        if not self.addrinfos:
155            raise Exception('No valid IP address found')
156
157        if not port:
158            port = 51347
159        if not node_port:
160            node_port = 51348
161        if not scheduler_port:
162            scheduler_port = 51349
163        if not nodes:
164            nodes = ['*']
165
166        self.port = port
167        self.node_port = node_port
168        self.scheduler_port = scheduler_port
169        self._node_allocs = _parse_node_allocs(nodes)
170        self._nodes = {}
171        self.node_secret = node_secret
172        self.node_keyfile = node_keyfile
173        self.node_certfile = node_certfile
174        self.cluster_secret = cluster_secret
175        self.cluster_keyfile = cluster_keyfile
176        self.cluster_certfile = cluster_certfile
177        if not dest_path_prefix:
178            dest_path_prefix = os.path.join(tempfile.gettempdir(), 'dispy', 'scheduler')
179        self.dest_path_prefix = os.path.abspath(dest_path_prefix.strip()).rstrip(os.sep)
180        if clean:
181            shutil.rmtree(self.dest_path_prefix, ignore_errors=True)
182        if not os.path.isdir(self.dest_path_prefix):
183            os.makedirs(self.dest_path_prefix)
184            os.chmod(self.dest_path_prefix, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
185
186        self.cooperative = bool(cooperative)
187        self.cleanup_nodes = bool(cleanup_nodes)
188        if pulse_interval:
189            try:
190                self.pulse_interval = float(pulse_interval)
191                assert 1.0 <= self.pulse_interval <= 1000
192            except Exception:
193                raise Exception('Invalid pulse_interval; must be between 1 and 1000')
194        else:
195            self.pulse_interval = None
196
197        if ping_interval:
198            try:
199                self.ping_interval = float(ping_interval)
200                assert 1.0 <= self.ping_interval <= 1000
201            except Exception:
202                raise Exception('Invalid ping_interval; must be between 1 and 1000')
203        else:
204            self.ping_interval = None
205
206        if zombie_interval:
207            self.zombie_interval = 60 * zombie_interval
208            if self.pulse_interval:
209                self.pulse_interval = min(self.pulse_interval, self.zombie_interval / 5.0)
210            else:
211                self.pulse_interval = self.zombie_interval / 5.0
212        else:
213            self.zombie_interval = None
214
215        self.pycos = Pycos()
216        atexit.register(self.shutdown)
217
218        self._clusters = {}
219        self.unsched_clusters = []
220        self.pending_clusters = {}
221        self._sched_jobs = {}
222        self._sched_event = pycos.Event()
223        # once a _job is done (i.e., final result for it is
224        # received from node), it is added to done_jobs, so same
225        # object is not reused by Python (when a new job is
226        # submitted) until the result is sent back to client
227        # (otherwise, 'id' may be duplicate)
228        self.done_jobs = {}
229        self.terminate = False
230        self.sign = hashlib.sha1(os.urandom(20))
231        for ext_ip_addr in self.addrinfos:
232            self.sign.update(ext_ip_addr.encode())
233        self.sign = self.sign.hexdigest()
234        self.cluster_auth = auth_code(self.cluster_secret, self.sign)
235        self.node_auth = auth_code(self.node_secret, self.sign)
236
237        with open(os.path.join(self.dest_path_prefix, 'config'), 'wb') as fd:
238            config = {
239                'ip_addrs': [ai.ip for ai in self.addrinfos.values()],
240                'ext_ip_addrs': [ai.ext_ip_addr for ai in self.addrinfos.values()],
241                'port': self.port, 'sign': self.sign,
242                'cluster_secret': self.cluster_secret, 'cluster_auth': self.cluster_auth,
243                'node_secret': self.node_secret, 'node_auth': self.node_auth
244                }
245            pickle.dump(config, fd)
246
247        if scheduler_alg == 'fair_cluster':
248            self.select_job_node_cluster = self.fair_cluster_schedule
249        elif scheduler_alg == 'fcfs_cluster':
250            self.select_job_node_cluster = self.fcfs_cluster_schedule
251        else:
252            self.select_job_node_cluster = self.fsfs_job_schedule
253
254        self.start_time = time.time()
255        if http_server:
256            self.httpd = dispy.httpd.DispyHTTPServer(None)
257        else:
258            self.httpd = None
259
260        self.timer_task = Task(self.timer_proc)
261        self.job_scheduler_task = Task(self._schedule_jobs)
262
263        self.tcp_tasks = []
264        self.udp_tasks = []
265        self.scheduler_tasks = []
266        udp_addrinfos = {}
267        for addrinfo in self.addrinfos.values():
268            self.tcp_tasks.append(Task(self.tcp_server, addrinfo))
269            self.scheduler_tasks.append(Task(self.scheduler_server, addrinfo))
270            if os.name == 'nt':
271                bind_addr = addrinfo.ip
272            elif sys.platform == 'darwin':
273                if addrinfo.family == socket.AF_INET and (not self.ipv4_udp_multicast):
274                    bind_addr = ''
275                else:
276                    bind_addr = addrinfo.broadcast
277            else:
278                bind_addr = addrinfo.broadcast
279            udp_addrinfos[bind_addr] = addrinfo
280
281        for bind_addr, addrinfo in udp_addrinfos.items():
282            self.udp_tasks.append(Task(self.udp_server, bind_addr, addrinfo))
283
284    def udp_server(self, bind_addr, addrinfo, task=None):
285        task.set_daemon()
286
287        udp_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM))
288        udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
289        if hasattr(socket, 'SO_REUSEPORT'):
290            try:
291                udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
292            except Exception:
293                pass
294
295        while not self.port:
296            yield task.sleep(0.2)
297        udp_sock.bind((bind_addr, self.port))
298        if addrinfo.family == socket.AF_INET:
299            if self.ipv4_udp_multicast:
300                mreq = socket.inet_aton(addrinfo.broadcast) + socket.inet_aton(addrinfo.ip)
301                udp_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
302        else:  # addrinfo.family == socket.AF_INET6:
303            mreq = socket.inet_pton(addrinfo.family, addrinfo.broadcast)
304            mreq += struct.pack('@I', addrinfo.ifn)
305            udp_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
306            try:
307                udp_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
308            except Exception:
309                pass
310
311        Task(self.broadcast_ping, addrinfos=[addrinfo])
312        self.send_ping_cluster(self._node_allocs, set())
313
314        while 1:
315            msg, addr = yield udp_sock.recvfrom(1000)
316            if msg.startswith(b'PING:'):
317                try:
318                    info = deserialize(msg[len(b'PING:'):])
319                    if info['version'] != _dispy_version:
320                        logger.warning('Ignoring %s due to version mismatch', addr[0])
321                        continue
322                    assert info['port'] > 0
323                    assert info['ip_addr']
324                    # socket.inet_aton(status['ip_addr'])
325                except Exception:
326                    logger.debug('Ignoring node %s', addr[0])
327                    logger.debug(traceback.format_exc())
328                    continue
329                if info['port'] == self.port:
330                    continue
331                auth = auth_code(self.node_secret, info['sign'])
332                node = self._nodes.get(info['ip_addr'], None)
333                if node:
334                    if node.auth == auth:
335                        continue
336                sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_STREAM),
337                                   keyfile=self.node_keyfile, certfile=self.node_certfile)
338                sock.settimeout(MsgTimeout)
339                msg = {'port': self.port, 'sign': self.sign, 'version': _dispy_version}
340                msg['ip_addrs'] = [ai.ext_ip_addr for ai in self.addrinfos.values()]
341                try:
342                    yield sock.connect((info['ip_addr'], info['port']))
343                    yield sock.sendall(auth)
344                    yield sock.send_msg(b'PING:' + serialize(msg))
345                except Exception:
346                    logger.debug(traceback.format_exc())
347                finally:
348                    sock.close()
349            else:
350                pass
351
352    def tcp_server(self, addrinfo, task=None):
353        # generator
354        task.set_daemon()
355        sock = socket.socket(addrinfo.family, socket.SOCK_STREAM)
356        sock = AsyncSocket(sock, keyfile=self.node_keyfile, certfile=self.node_certfile)
357        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
358        try:
359            sock.bind((addrinfo.ip, self.port))
360        except Exception:
361            logger.debug('Could not bind TCP to %s:%s', addrinfo.ip, self.port)
362            raise StopIteration
363        logger.debug('TCP server at %s:%s', addrinfo.ip, self.port)
364        sock.listen(32)
365
366        while 1:
367            try:
368                conn, addr = yield sock.accept()
369            except ssl.SSLError as err:
370                logger.debug('SSL connection failed: %s', str(err))
371                continue
372            except GeneratorExit:
373                break
374            except Exception:
375                logger.debug(traceback.format_exc())
376                continue
377            Task(self.tcp_req, conn, addr)
378
379    def tcp_req(self, conn, addr, task=None):
380        # generator
381        conn.settimeout(MsgTimeout)
382        msg = yield conn.recv_msg()
383        if msg.startswith(b'JOB_REPLY:'):
384            try:
385                info = deserialize(msg[len(b'JOB_REPLY:'):])
386            except Exception:
387                logger.warning('Invalid job reply from %s:%s ignored', addr[0], addr[1])
388            else:
389                yield self.job_reply_process(info, len(msg), conn, addr)
390            conn.close()
391
392        elif msg.startswith(b'PULSE:'):
393            msg = msg[len(b'PULSE:'):]
394            try:
395                info = deserialize(msg)
396            except Exception:
397                logger.warning('Ignoring pulse message from %s', addr[0])
398                conn.close()
399                raise StopIteration
400            node = self._nodes.get(info['ip_addr'], None)
401            if node:
402                # assert 0 <= info['cpus'] <= node.cpus
403                node.last_pulse = time.time()
404                yield conn.send_msg(b'PULSE')
405                if info['avail_info']:
406                    node.avail_info = info['avail_info']
407                    for cluster in node.clusters:
408                        dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
409                        if dispy_node:
410                            dispy_node.avail_info = info['avail_info']
411                            dispy_node.update_time = node.last_pulse
412                            Task(self.send_node_status, cluster, dispy_node, DispyNode.AvailInfo)
413                            if cluster.status_callback:
414                                cluster.status_callback(DispyNode.AvailInfo, dispy_node, None)
415            conn.close()
416
417        elif msg.startswith(b'PONG:'):
418            conn.close()
419            try:
420                info = deserialize(msg[len(b'PONG:'):])
421                assert info['auth'] == self.node_auth
422            except Exception:
423                logger.warning('Ignoring node %s due to "secret" mismatch', addr[0])
424            else:
425                self.add_node(info)
426
427        elif msg.startswith(b'PING:'):
428            sock_family = conn.family
429            conn.close()
430            try:
431                info = deserialize(msg[len(b'PING:'):])
432                if info['version'] != _dispy_version:
433                    logger.warning('Ignoring node %s due to version mismatch', addr[0])
434                    raise Exception('')
435                assert info['port'] > 0
436                assert info['ip_addr']
437            except Exception:
438                logger.debug('Ignoring node %s', addr[0])
439                logger.debug(traceback.format_exc())
440                raise StopIteration
441            if info['port'] != self.port:
442                auth = auth_code(self.node_secret, info['sign'])
443                node = self._nodes.get(info['ip_addr'], None)
444                if node:
445                    if node.auth == auth:
446                        raise StopIteration
447                sock = AsyncSocket(socket.socket(sock_family, socket.SOCK_STREAM),
448                                   keyfile=self.node_keyfile, certfile=self.node_certfile)
449                sock.settimeout(MsgTimeout)
450                msg = {'port': self.port, 'sign': self.sign, 'version': _dispy_version}
451                msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()]
452                try:
453                    yield sock.connect((info['ip_addr'], info['port']))
454                    yield sock.sendall(auth)
455                    yield sock.send_msg(b'PING:' + serialize(msg))
456                except Exception:
457                    logger.debug(traceback.format_exc())
458                finally:
459                    sock.close()
460
461        elif msg.startswith(b'FILEXFER:'):
462            try:
463                xf = deserialize(msg[len(b'FILEXFER:'):])
464                msg = yield conn.recv_msg()
465                job_reply = deserialize(msg)
466                yield self.xfer_to_client(job_reply, xf, conn, addr)
467            except Exception:
468                logger.debug(traceback.format_exc())
469            conn.close()
470
471        elif msg.startswith(b'TERMINATED:'):
472            conn.close()
473            try:
474                info = deserialize(msg[len(b'TERMINATED:'):])
475                node = self._nodes.get(info['ip_addr'], None)
476                if not node:
477                    raise StopIteration
478                auth = auth_code(self.node_secret, info['sign'])
479                if auth != node.auth:
480                    logger.warning('Invalid signature from %s', node.ip_addr)
481                    raise StopIteration
482                logger.debug('Removing node %s', node.ip_addr)
483                del self._nodes[node.ip_addr]
484                if node.clusters:
485                    dead_jobs = [_job for _job in self._sched_jobs.values()
486                                 if _job.node is not None and _job.node.ip_addr == node.ip_addr]
487                    clusters = list(node.clusters)
488                    node.clusters.clear()
489                    for cluster in clusters:
490                        dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None)
491                        if not dispy_node:
492                            continue
493                        Task(self.send_node_status, cluster, dispy_node, DispyNode.Closed)
494                    self.reschedule_jobs(dead_jobs)
495            except Exception:
496                # logger.debug(traceback.format_exc())
497                pass
498
499        elif msg.startswith(b'NODE_CPUS:'):
500            conn.close()
501            try:
502                info = deserialize(msg[len(b'NODE_CPUS:'):])
503                node = self._nodes.get(info['ip_addr'], None)
504                if not node:
505                    raise StopIteration
506                auth = auth_code(self.node_secret, info['sign'])
507                if auth != node.auth:
508                    logger.warning('Invalid signature from %s', node.ip_addr)
509                    raise StopIteration
510                cpus = info['cpus']
511            except Exception:
512                logger.debug(traceback.format_exc())
513                raise StopIteration
514            if cpus < 0:
515                logger.warning('Node requested using %s CPUs, disabling it', node.ip_addr, cpus)
516                cpus = 0
517            logger.debug('Setting cpus for %s to %s', node.ip_addr, cpus)
518            # TODO: set node.cpus to min(cpus, node.cpus)?
519            node.cpus = cpus
520            if cpus > node.avail_cpus:
521                node.avail_cpus = cpus
522                node_computations = []
523                for cluster in self._clusters.values():
524                    if cluster in node.clusters:
525                        continue
526                    compute = cluster._compute
527                    for node_alloc in cluster._node_allocs:
528                        cpus = node_alloc.allocate(cluster, node.ip_addr, node.name,
529                                                   node.avail_cpus)
530                        if cpus <= 0:
531                            continue
532                        node.cpus = min(node.avail_cpus, cpus)
533                        node_computations.append(compute)
534                        break
535                if node_computations:
536                    Task(self.setup_node, node, node_computations)
537                yield self._sched_event.set()
538            else:
539                node.avail_cpus = cpus
540            for cluster in node.clusters:
541                dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
542                if dispy_node:
543                    dispy_node.cpus = cpus
544
545        elif msg.startswith(b'RELAY_INFO:'):
546            try:
547                info = deserialize(msg[len(b'RELAY_INFO:'):])
548                assert info['version'] == _dispy_version
549                msg = {'sign': self.sign, 'ip_addrs': [info['scheduler_ip_addr']],
550                       'port': self.port}
551                if 'auth' in info and info['auth'] != self.node_auth:
552                    msg = None
553            except Exception:
554                msg = None
555            yield conn.send_msg(serialize(msg))
556            conn.close()
557
558        else:
559            logger.warning('Invalid message from %s:%s ignored', addr[0], addr[1])
560            conn.close()
561
562    def schedule_cluster(self, task=None):
563        while self.unsched_clusters:
564            cluster = self.unsched_clusters[0]
565            if self._clusters:
566                if cluster.exclusive:
567                    raise StopIteration
568                for cur_cluster in self._clusters.values():
569                    if cur_cluster.exclusive:
570                        raise StopIteration
571                    break
572            self.unsched_clusters.pop(0)
573            reply_sock = socket.socket(cluster.client_sock_family, socket.SOCK_STREAM)
574            reply_sock = AsyncSocket(reply_sock, keyfile=self.cluster_keyfile,
575                                     certfile=self.cluster_certfile)
576            reply_sock.settimeout(MsgTimeout)
577            reply = {'compute_id': cluster._compute.id, 'pulse_interval': self.pulse_interval}
578            self._clusters[cluster._compute.id] = cluster
579            try:
580                yield reply_sock.connect((cluster.client_ip_addr, cluster.client_job_result_port))
581                yield reply_sock.send_msg('SCHEDULED:'.encode() + serialize(reply))
582                msg = yield reply_sock.recv_msg()
583                assert msg == 'ACK'.encode()
584                self.add_cluster(cluster)
585            except Exception:
586                self._clusters.pop(cluster._compute.id, None)
587                logger.debug('Ignoring computation %s / %s from %s:%s',
588                             cluster._compute.name, cluster._compute.id,
589                             cluster.client_ip_addr, cluster.client_job_result_port)
590                continue
591            finally:
592                reply_sock.close()
593
594    def scheduler_server(self, addrinfo, task=None):
595        task.set_daemon()
596        sock = socket.socket(addrinfo.family, socket.SOCK_STREAM)
597        sock = AsyncSocket(sock, keyfile=self.cluster_keyfile, certfile=self.cluster_certfile)
598        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
599        try:
600            sock.bind((addrinfo.ip, self.scheduler_port))
601        except Exception:
602            logger.warning('Could not bind scheduler server to %s:%s',
603                           addrinfo.ip, self.scheduler_port)
604            raise StopIteration
605        logger.debug('Scheduler at %s:%s', addrinfo.ip, self.scheduler_port)
606        sock.listen(32)
607        while 1:
608            conn, addr = yield sock.accept()
609            Task(self.scheduler_req, conn, addr)
610
611    def scheduler_req(self, conn, addr, task=None):
612        # generator
613        def _job_request(self, cluster, node, _job):
614            # generator
615            _job.uid = id(_job)
616            for xf in _job.xfer_files:
617                xf.name = os.path.join(cluster.dest_path, xf.dest_path.replace(xf.sep, os.sep),
618                                       xf.name.split(xf.sep)[-1])
619                xf.sep = os.sep
620
621            job = DispyJob(None, (), {})
622            job.id = _job.uid
623            _job.job = job
624            yield conn.send_msg(serialize(_job.uid))
625            ack = yield conn.recv_msg()
626            if ack != b'ACK':
627                raise StopIteration
628            if node:
629                _job.pinned = node
630                node.pending_jobs.append(_job)
631            else:
632                _job.pinned = None
633                cluster._jobs.append(_job)
634            logger.debug('Submitted job %s / %s', _job.uid, job.submit_time)
635            cluster.pending_jobs += 1
636            cluster.last_pulse = job.submit_time
637            self._sched_event.set()
638            if cluster.status_callback:
639                cluster.status_callback(DispyJob.Created, None, job)
640
641        def _compute_req(self, msg):
642            # function
643            try:
644                req = deserialize(msg)
645                compute = req['compute']
646                node_allocs = req['node_allocs']
647                exclusive = req['exclusive']
648            except Exception:
649                return serialize(('Invalid computation').encode())
650            for xf in compute.xfer_files:
651                if MaxFileSize and xf.stat_buf.st_size > MaxFileSize:
652                    return serialize(('File "%s" is too big; limit is %s' %
653                                      (xf.name, MaxFileSize)).encode())
654            if self.terminate:
655                return serialize(('Scheduler is closing').encode())
656            if self.cleanup_nodes and not compute.cleanup:
657                compute.cleanup = True
658            cluster = _Cluster(compute, node_allocs, self)
659            cluster.ip_addr = conn.getsockname()[0]
660            cluster.exclusive = exclusive
661            dest = compute.scheduler_ip_addr
662            if os.name == 'nt':
663                dest = dest.replace(':', '_')
664            dest = os.path.join(self.dest_path_prefix, dest)
665            if not os.path.isdir(dest):
666                try:
667                    os.mkdir(dest)
668                except Exception:
669                    return serialize(('Could not create destination directory').encode())
670            if compute.dest_path and isinstance(compute.dest_path, str):
671                # TODO: get os.sep from client and convert (in case of mixed environments)?
672                if compute.dest_path.startswith(os.sep):
673                    cluster.dest_path = compute.dest_path
674                else:
675                    cluster.dest_path = os.path.join(dest, compute.dest_path)
676                if not os.path.isdir(cluster.dest_path):
677                    try:
678                        os.makedirs(cluster.dest_path)
679                    except Exception:
680                        return serialize(('Could not create destination directory').encode())
681            else:
682                cluster.dest_path = tempfile.mkdtemp(prefix=compute.name + '_', dir=dest)
683
684            compute.id = id(compute)
685            cluster.client_job_result_port = compute.job_result_port
686            cluster.client_ip_addr = compute.scheduler_ip_addr
687            cluster.client_port = compute.scheduler_port
688            cluster.client_sock_family = conn.family
689            cluster.client_auth = compute.auth
690            compute.job_result_port = self.port
691            compute.scheduler_port = self.port
692            compute.auth = hashlib.sha1(os.urandom(10)).hexdigest()
693            cluster.last_pulse = time.time()
694            for xf in compute.xfer_files:
695                xf.compute_id = compute.id
696                xf.name = os.path.join(cluster.dest_path, xf.dest_path.replace(xf.sep, os.sep),
697                                       xf.name.split(xf.sep)[-1])
698                xf.sep = os.sep
699
700            with open(os.path.join(self.dest_path_prefix,
701                                   '%s_%s' % (compute.id, cluster.client_auth)), 'wb') as fd:
702                pickle.dump(cluster, fd)
703            self.pending_clusters[cluster._compute.id] = cluster
704            logger.debug('New computation %s: %s, %s',
705                         compute.id, compute.name, cluster.dest_path)
706            return serialize({'compute_id': cluster._compute.id, 'auth': cluster.client_auth})
707
708        def xfer_from_client(self, msg):
709            # generator
710            try:
711                xf = deserialize(msg)
712            except Exception:
713                logger.debug('Ignoring file trasnfer request from %s', addr[0])
714                raise StopIteration(serialize(-1))
715            cluster = self.pending_clusters.get(xf.compute_id, None)
716            if not cluster:
717                # if file is transfered for 'dispy_job_depends', cluster would be active
718                cluster = self._clusters.get(xf.compute_id, None)
719                if not cluster:
720                    logger.error('Computation "%s" is invalid', xf.compute_id)
721                    raise StopIteration(serialize(-1))
722            tgt = os.path.join(cluster.dest_path, xf.dest_path.replace(xf.sep, os.sep),
723                               xf.name.split(xf.sep)[-1])
724            if os.path.isfile(tgt) and _same_file(tgt, xf):
725                if tgt in cluster.file_uses:
726                    cluster.file_uses[tgt] += 1
727                else:
728                    cluster.file_uses[tgt] = 2
729                raise StopIteration(serialize(xf.stat_buf.st_size))
730            logger.debug('Copying file %s to %s (%s)', xf.name, tgt, xf.stat_buf.st_size)
731            try:
732                if not os.path.isdir(os.path.dirname(tgt)):
733                    os.makedirs(os.path.dirname(tgt))
734                with open(tgt, 'wb') as fd:
735                    recvd = 0
736                    while recvd < xf.stat_buf.st_size:
737                        yield conn.send_msg(serialize(recvd))
738                        data = yield conn.recvall(min(xf.stat_buf.st_size-recvd, 1024000))
739                        if not data:
740                            break
741                        fd.write(data)
742                        recvd += len(data)
743                assert recvd == xf.stat_buf.st_size
744                os.utime(tgt, (xf.stat_buf.st_atime, xf.stat_buf.st_mtime))
745                os.chmod(tgt, stat.S_IMODE(xf.stat_buf.st_mode))
746                if tgt in cluster.file_uses:
747                    cluster.file_uses[tgt] += 1
748                else:
749                    cluster.file_uses[tgt] = 1
750                logger.debug('Copied file %s', tgt)
751            except Exception:
752                logger.warning('Copying file "%s" failed with "%s"',
753                               xf.name, traceback.format_exc())
754                recvd = -1
755                try:
756                    os.remove(tgt)
757                    if len(os.listdir(cluster.dest_path)) == 0:
758                        os.rmdir(cluster.dest_path)
759                except Exception:
760                    pass
761            raise StopIteration(serialize(recvd))
762
763        def send_file(self, msg):
764            # generator
765            try:
766                msg = deserialize(msg)
767                node = self._nodes.get(msg['node'], None)
768                xf = msg['xf']
769            except Exception:
770                logger.debug('Ignoring file trasnfer request from %s', addr[0])
771                raise StopIteration(serialize(-1))
772            cluster = self._clusters.get(xf.compute_id, None)
773            if not cluster or not node:
774                logger.error('send_file "%s" is invalid', xf.name)
775                raise StopIteration(serialize(-1))
776            dispy_node = cluster._dispy_nodes.get(node.ip_addr)
777            if not dispy_node:
778                logger.error('send_file "%s" is invalid', xf.name)
779                raise StopIteration(serialize(-1))
780            if _same_file(xf.name, xf):
781                resp = yield node.xfer_file(xf)
782                if resp < 0:
783                    raise StopIteration(serialize(-1))
784                else:
785                    node.tx += resp
786                    dispy_node.tx += resp
787                    raise StopIteration(serialize(xf.stat_buf.st_size))
788
789            node_sock = AsyncSocket(socket.socket(node.sock_family, socket.SOCK_STREAM),
790                                    keyfile=self.node_keyfile, certfile=self.node_certfile)
791            node_sock.settimeout(MsgTimeout)
792            try:
793                yield node_sock.connect((node.ip_addr, node.port))
794                yield node_sock.sendall(node.auth)
795                yield node_sock.send_msg('FILEXFER:'.encode() + serialize(xf))
796                recvd = yield node_sock.recv_msg()
797                recvd = deserialize(recvd)
798                while recvd < xf.stat_buf.st_size:
799                    yield conn.send_msg(serialize(recvd))
800                    data = yield conn.recvall(min(xf.stat_buf.st_size-recvd, 1024000))
801                    if not data:
802                        break
803                    yield node_sock.sendall(data)
804                    recvd = yield node_sock.recv_msg()
805                    recvd = deserialize(recvd)
806                node.tx += recvd
807                dispy_node.tx += recvd
808            except Exception:
809                logger.error('Could not transfer %s to %s: %s', xf.name, node.ip_addr, recvd)
810                logger.debug(traceback.format_exc())
811                # TODO: mark this node down, reschedule on different node?
812                recvd = -1
813            finally:
814                node_sock.close()
815            raise StopIteration(serialize(recvd))
816
817        # scheduler_req begins here
818        conn.settimeout(MsgTimeout)
819        resp = None
820        try:
821            req = yield conn.recvall(len(self.cluster_auth))
822        except Exception:
823            logger.warning('Failed to read message from %s: %s', str(addr), traceback.format_exc())
824            conn.close()
825            raise StopIteration
826
827        if req != self.cluster_auth:
828            msg = yield conn.recv_msg()
829            if msg.startswith(b'CLIENT:'):
830                try:
831                    req = deserialize(msg[len(b'CLIENT:'):])
832                    if req['version'] != _dispy_version:
833                        logger.warning('Ignoring %s due to version mismatch', addr[0])
834                        raise Exception('')
835                    if not req['ip_addr']:
836                        req['ip_addr'] = addr[0]
837                    reply = {'ip_addr': req['ip_addr'], 'port': self.scheduler_port,
838                             'sign': self.sign, 'version': _dispy_version}
839                    yield conn.send_msg(serialize(reply))
840                except Exception:
841                    pass
842            else:
843                logger.warning('Invalid/unauthorized request ignored')
844            conn.close()
845            raise StopIteration
846        msg = yield conn.recv_msg()
847        if not msg:
848            logger.info('Closing connection')
849            conn.close()
850            raise StopIteration
851
852        if msg.startswith(b'JOB:'):
853            msg = msg[len(b'JOB:'):]
854            try:
855                req = deserialize(msg)
856                _job = req['job']
857                cluster = self._clusters[_job.compute_id]
858                assert cluster.client_auth == req['auth']
859                node = req['node']
860                if node:
861                    node = self._nodes[node]
862            except Exception:
863                pass
864            else:
865                yield _job_request(self, cluster, node, _job)
866            resp = None
867
868        elif msg.startswith(b'PULSE:'):
869            msg = msg[len(b'PULSE:'):]
870            try:
871                info = deserialize(msg)
872            except Exception:
873                logger.warning('Ignoring pulse message from %s', addr[0])
874                conn.close()
875                raise StopIteration
876            if 'client_port' in info:
877                for cluster in self._clusters.values():
878                    if (cluster.client_ip_addr == addr[0] and
879                        cluster.client_port == info['client_port']):
880                        cluster.last_pulse = time.time()
881            conn.close()
882
883        elif msg.startswith(b'COMPUTE:'):
884            msg = msg[len(b'COMPUTE:'):]
885            resp = _compute_req(self, msg)
886
887        elif msg.startswith(b'SCHEDULE:'):
888            msg = msg[len(b'SCHEDULE:'):]
889            try:
890                req = deserialize(msg)
891                cluster = self.pending_clusters[req['compute_id']]
892                assert cluster.client_auth == req['auth']
893                for xf in cluster._compute.xfer_files:
894                    assert os.path.isfile(xf.name)
895                self.unsched_clusters.append(cluster)
896                self.pending_clusters.pop(cluster._compute.id)
897            except Exception:
898                logger.debug('Ignoring schedule request from %s', addr[0])
899                resp = 'NAK'.encode()
900            else:
901                resp = 'ACK'.encode()
902                Task(self.schedule_cluster)
903
904        elif msg.startswith(b'CLOSE:'):
905            msg = msg[len(b'CLOSE:'):]
906            try:
907                req = deserialize(msg)
908                auth = req['auth']
909            except Exception:
910                logger.warning('Invalid compuation for deleting')
911                conn.close()
912                raise StopIteration
913            cluster = self._clusters.get(req['compute_id'], None)
914            if cluster is None or cluster.client_auth != auth:
915                # this cluster is closed
916                conn.close()
917                raise StopIteration
918            cluster.zombie = True
919            terminate_pending = req.get('terminate_pending', False)
920            Task(self.cleanup_computation, cluster, terminate_pending=bool(terminate_pending))
921
922        elif msg.startswith(b'FILEXFER:'):
923            msg = msg[len(b'FILEXFER:'):]
924            resp = yield xfer_from_client(self, msg)
925
926        elif msg.startswith(b'SENDFILE:'):
927            msg = msg[len(b'SENDFILE:'):]
928            resp = yield send_file(self, msg)
929
930        elif msg.startswith(b'NODE_JOBS:'):
931            msg = msg[len(b'NODE_JOBS:'):]
932            try:
933                req = deserialize(msg)
934                cluster = self._clusters.get(req['compute_id'], None)
935                if cluster is None or cluster.client_auth != req['auth']:
936                    job_uids = []
937                else:
938                    node = req['node']
939                    from_node = req['from_node']
940                    # assert req['get_uids'] == True
941                    job_uids = yield self.node_jobs(cluster, node, from_node,
942                                                    get_uids=True, task=task)
943            except Exception:
944                job_uids = []
945            resp = serialize(job_uids)
946
947        elif msg.startswith(b'TERMINATE_JOB:'):
948            msg = msg[len(b'TERMINATE_JOB:'):]
949            try:
950                req = deserialize(msg)
951                uid = req['uid']
952                cluster = self._clusters[req['compute_id']]
953                assert cluster.client_auth == req['auth']
954            except Exception:
955                logger.warning('Invalid job cancel message')
956                conn.close()
957                raise StopIteration
958            self.cancel_job(cluster, uid)
959
960        elif msg.startswith(b'RESEND_JOB_RESULTS:'):
961            msg = msg[len(b'RESEND_JOB_RESULTS:'):]
962            try:
963                info = deserialize(msg)
964                compute_id = info['compute_id']
965                auth = info['auth']
966            except Exception:
967                resp = serialize(0)
968            else:
969                cluster = self._clusters.get(compute_id, None)
970                if cluster is None or cluster.client_auth != auth:
971                    try:
972                        with open(os.path.join(self.dest_path_prefix,
973                                               '%s_%s' % (compute_id, auth)), 'rb') as fd:
974                            cluster = pickle.load(fd)
975                    except Exception:
976                        pass
977                if cluster is None:
978                    resp = 0
979                else:
980                    resp = cluster.pending_results + cluster.pending_jobs
981            yield conn.send_msg(serialize(resp))
982            conn.close()
983            if resp > 0:
984                yield self.resend_job_results(cluster, task=task)
985            raise StopIteration
986
987        elif msg.startswith(b'PENDING_JOBS:'):
988            msg = msg[len(b'PENDING_JOBS:'):]
989            reply = {'done': [], 'pending': 0}
990            try:
991                info = deserialize(msg)
992                compute_id = info['compute_id']
993                auth = info['auth']
994            except Exception:
995                pass
996            else:
997                cluster = self._clusters.get(compute_id, None)
998                if cluster is None or cluster.client_auth != auth:
999                    with open(os.path.join(self.dest_path_prefix,
1000                                           '%s_%s' % (compute_id, auth)), 'rb') as fd:
1001                        cluster = pickle.load(fd)
1002                if cluster is not None and cluster.client_auth == auth:
1003                    done = []
1004                    if cluster.pending_results:
1005                        for result_file in glob.glob(os.path.join(cluster.dest_path,
1006                                                                  '_dispy_job_reply_*')):
1007                            result_file = os.path.basename(result_file)
1008                            try:
1009                                uid = int(result_file[len('_dispy_job_reply_'):])
1010                            except Exception:
1011                                pass
1012                            else:
1013                                done.append(uid)
1014                                # limit so as not to take up too much time
1015                                if len(done) > 50:
1016                                    break
1017                    reply['done'] = done
1018                    reply['pending'] = cluster.pending_jobs
1019            resp = serialize(reply)
1020
1021        elif msg.startswith(b'RETRIEVE_JOB:'):
1022            msg = msg[len(b'RETRIEVE_JOB:'):]
1023            yield self.retrieve_job_req(conn, msg)
1024
1025        elif msg.startswith(b'ALLOCATE_NODE:'):
1026            req = msg[len(b'ALLOCATE_NODE:'):]
1027            try:
1028                req = deserialize(req)
1029                cluster = self._clusters[req['compute_id']]
1030                assert cluster.client_auth == req['auth']
1031                resp = yield self.allocate_node(cluster, req['node_alloc'], task=task)
1032                resp = serialize(resp)
1033            except Exception:
1034                resp = serialize(-1)
1035
1036        elif msg.startswith(b'DEALLOCATE_NODE:'):
1037            req = msg[len(b'DEALLOCATE_NODE:'):]
1038            try:
1039                req = deserialize(req)
1040                cluster = self._clusters[req['compute_id']]
1041                assert cluster.client_auth == req['auth']
1042                resp = yield self.deallocate_node(cluster, req['node'], task=task)
1043                resp = serialize(resp)
1044            except Exception:
1045                resp = serialize(-1)
1046
1047        elif msg.startswith(b'CLOSE_NODE:'):
1048            req = msg[len(b'CLOSE_NODE:'):]
1049            try:
1050                req = deserialize(req)
1051                cluster = self._clusters[req['compute_id']]
1052                assert cluster.client_auth == req['auth']
1053                resp = yield self.close_node(cluster, req['node'],
1054                                             terminate_pending=req['terminate_pending'], task=task)
1055                resp = serialize(resp)
1056            except Exception:
1057                resp = serialize(-1)
1058
1059        elif msg.startswith(b'SET_NODE_CPUS:'):
1060            req = msg[len(b'SET_NODE_CPUS:'):]
1061            try:
1062                req = deserialize(req)
1063                cluster = self._clusters[req['compute_id']]
1064                assert cluster.client_auth == req['auth']
1065                # for shared cluster, changing cpus may not be valid, as we
1066                # don't maintain cpus per cluster
1067                resp = yield self.set_node_cpus(req['node'], req['cpus'])
1068            except Exception:
1069                logger.debug(traceback.format_exc())
1070                resp = (None, -1)
1071            resp = serialize(resp)
1072
1073        else:
1074            logger.debug('Ignoring invalid command')
1075
1076        if resp is not None:
1077            try:
1078                yield conn.send_msg(resp)
1079            except Exception:
1080                logger.warning('Failed to send response to %s: %s',
1081                               str(addr), traceback.format_exc())
1082        conn.close()
1083        # end of scheduler_req
1084
1085    def resend_job_results(self, cluster, task=None):
1086        # TODO: limit number queued so as not to take up too much space/time
1087        result_files = [f for f in os.listdir(cluster.dest_path)
1088                        if f.startswith('_dispy_job_reply_')]
1089        result_files = result_files[:min(len(result_files), 64)]
1090        for result_file in result_files:
1091            result_file = os.path.join(cluster.dest_path, result_file)
1092            try:
1093                with open(result_file, 'rb') as fd:
1094                    result = pickle.load(fd)
1095            except Exception:
1096                logger.debug('Could not load "%s"', result_file)
1097            else:
1098                status = yield self.send_job_result(
1099                    result.uid, cluster, result, resending=True, task=task)
1100                if status:
1101                    break
1102
1103    def timer_proc(self, task=None):
1104        task.set_daemon()
1105        reset = True
1106        last_ping_time = last_pulse_time = last_zombie_time = time.time()
1107        while 1:
1108            if reset:
1109                timeout = num_min(self.pulse_interval, self.ping_interval, self.zombie_interval)
1110
1111            reset = yield task.suspend(timeout)
1112            if reset:
1113                continue
1114
1115            now = time.time()
1116            if self.pulse_interval and (now - last_pulse_time) >= self.pulse_interval:
1117                last_pulse_time = now
1118                dead_nodes = {}
1119                for node in self._nodes.values():
1120                    if node.busy and (node.last_pulse + (5 * self.pulse_interval)) < now:
1121                        logger.warning('Node %s is not responding; removing it (%s, %s, %s)',
1122                                       node.ip_addr, node.busy, node.last_pulse, now)
1123                        dead_nodes[node.ip_addr] = node
1124                for ip_addr in dead_nodes:
1125                    node = self._nodes.pop(ip_addr, None)
1126                    clusters = list(node.clusters)
1127                    node.clusters.clear()
1128                    for cluster in clusters:
1129                        dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None)
1130                        if not dispy_node:
1131                            continue
1132                        Task(self.send_node_status, cluster, dispy_node, DispyNode.Closed)
1133
1134                dead_jobs = [_job for _job in self._sched_jobs.values()
1135                             if _job.node is not None and _job.node.ip_addr in dead_nodes]
1136                self.reschedule_jobs(dead_jobs)
1137                resend = [resend_cluster for resend_cluster in self._clusters.values()
1138                          if resend_cluster.pending_results and not resend_cluster.zombie]
1139                for cluster in resend:
1140                    Task(self.resend_job_results, cluster)
1141
1142            if self.ping_interval and (now - last_ping_time) >= self.ping_interval:
1143                last_ping_time = now
1144                for cluster in self._clusters.values():
1145                    self.send_ping_cluster(cluster._node_allocs,
1146                                           set(cluster._dispy_nodes.keys()))
1147                self.send_ping_cluster(self._node_allocs, set())
1148
1149            if self.zombie_interval and (now - last_zombie_time) >= self.zombie_interval:
1150                last_zombie_time = now
1151                for cluster in self._clusters.values():
1152                    if (now - cluster.last_pulse) > self.zombie_interval:
1153                        cluster.zombie = True
1154                zombies = [cluster for cluster in self._clusters.values()
1155                           if cluster.zombie and cluster.pending_jobs == 0]
1156                for cluster in zombies:
1157                    logger.debug('Deleting zombie computation "%s" / %s',
1158                                 cluster._compute.name, cluster._compute.id)
1159                    Task(self.cleanup_computation, cluster)
1160                zombies = [cluster for cluster in self.pending_clusters.values()
1161                           if (now - cluster.last_pulse) > self.zombie_interval]
1162                for cluster in zombies:
1163                    logger.debug('Deleting zombie computation "%s" / %s',
1164                                 cluster._compute.name, cluster._compute.id)
1165                    path = os.path.join(self.dest_path_prefix,
1166                                        '%s_%s' % (cluster._compute.id, cluster.client_auth))
1167                    if os.path.isfile(path):
1168                        os.remove(path)
1169                    try:
1170                        shutil.rmtree(cluster.dest_path)
1171                    except Exception:
1172                        logger.debug(traceback.format_exc())
1173                    self.pending_clusters.pop(cluster._compute.id, None)
1174
1175    def xfer_to_client(self, job_reply, xf, conn, addr):
1176        _job = self._sched_jobs.get(job_reply.uid, None)
1177        if _job is None or _job.hash != job_reply.hash:
1178            logger.warning('Ignoring invalid file transfer from job %s at %s',
1179                           job_reply.uid, addr[0])
1180            yield conn.send_msg(serialize(-1))
1181            raise StopIteration
1182        node = self._nodes.get(job_reply.ip_addr, None)
1183        cluster = self._clusters.get(_job.compute_id, None)
1184        if not node or not cluster:
1185            logger.warning('Ignoring invalid file transfer from job %s at %s',
1186                           job_reply.uid, addr[0])
1187            yield conn.send_msg(serialize(-1))
1188            raise StopIteration
1189        node.last_pulse = time.time()
1190        client_sock = AsyncSocket(socket.socket(node.sock_family, socket.SOCK_STREAM),
1191                                  keyfile=self.cluster_keyfile, certfile=self.cluster_certfile)
1192        client_sock.settimeout(MsgTimeout)
1193        try:
1194            yield client_sock.connect((cluster.client_ip_addr, cluster.client_job_result_port))
1195            yield client_sock.send_msg('FILEXFER:'.encode() + serialize(xf))
1196            yield client_sock.send_msg(serialize(job_reply))
1197
1198            recvd = yield client_sock.recv_msg()
1199            recvd = deserialize(recvd)
1200            while recvd < xf.stat_buf.st_size:
1201                yield conn.send_msg(serialize(recvd))
1202                data = yield conn.recvall(min(xf.stat_buf.st_size-recvd, 1024000))
1203                if not data:
1204                    break
1205                yield client_sock.sendall(data)
1206                recvd = yield client_sock.recv_msg()
1207                recvd = deserialize(recvd)
1208            yield conn.send_msg(serialize(recvd))
1209        except Exception:
1210            yield conn.send_msg(serialize(-1))
1211        finally:
1212            client_sock.close()
1213            conn.close()
1214
1215    def send_ping_node(self, ip_addr, port=None, task=None):
1216        ping_msg = {'version': _dispy_version, 'sign': self.sign, 'port': self.port,
1217                    'node_ip_addr': ip_addr}
1218        ping_msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()]
1219        if not port:
1220            port = self.node_port
1221        if re.match('\d+\.', ip_addr):
1222            sock_family = socket.AF_INET
1223        else:
1224            sock_family = socket.AF_INET6
1225        tcp_sock = AsyncSocket(socket.socket(sock_family, socket.SOCK_STREAM),
1226                               keyfile=self.node_keyfile, certfile=self.node_certfile)
1227        tcp_sock.settimeout(MsgTimeout)
1228        try:
1229            yield tcp_sock.connect((ip_addr, port))
1230            yield tcp_sock.sendall(b'x' * len(self.node_auth))
1231            yield tcp_sock.send_msg(b'PING:' + serialize(ping_msg))
1232        except Exception:
1233            pass
1234        tcp_sock.close()
1235
1236    def broadcast_ping(self, addrinfos=[], port=None, task=None):
1237        # generator
1238        if not port:
1239            port = self.node_port
1240        ping_msg = {'version': _dispy_version, 'sign': self.sign, 'port': self.port}
1241        ping_msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()]
1242        if not addrinfos:
1243            addrinfos = list(self.addrinfos.values())
1244        for addrinfo in addrinfos:
1245            bc_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM))
1246            bc_sock.settimeout(MsgTimeout)
1247            ttl_bin = struct.pack('@i', 1)
1248            if addrinfo.family == socket.AF_INET:
1249                if self.ipv4_udp_multicast:
1250                    bc_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin)
1251                else:
1252                    bc_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1253            else:  # addrinfo.family == socket.AF_INET6
1254                bc_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_bin)
1255                bc_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn)
1256            bc_sock.bind((addrinfo.ip, 0))
1257            try:
1258                yield bc_sock.sendto(b'PING:' + serialize(ping_msg), (addrinfo.broadcast, port))
1259            except Exception:
1260                pass
1261            bc_sock.close()
1262
1263    def send_ping_cluster(self, node_allocs, present_ip_addrs, task=None):
1264        for node_alloc in node_allocs:
1265            # TODO: we assume subnets are indicated by '*', instead of
1266            # subnet mask; this is a limitation, but specifying with
1267            # subnet mask a bit cumbersome.
1268            if node_alloc.ip_rex.find('*') >= 0:
1269                Task(self.broadcast_ping, addrinfos=[], port=node_alloc.port)
1270            else:
1271                ip_addr = node_alloc.ip_addr
1272                if ip_addr in present_ip_addrs:
1273                    continue
1274                port = node_alloc.port
1275                Task(self.send_ping_node, ip_addr, port)
1276
1277    def add_cluster(self, cluster):
1278        compute = cluster._compute
1279        compute.pulse_interval = self.pulse_interval
1280        if self.httpd and cluster.status_callback is None:
1281            self.httpd.add_cluster(cluster)
1282        # TODO: should we allow clients to add new nodes, or use only
1283        # the nodes initially created with command-line?
1284        self.send_ping_cluster(cluster._node_allocs, set(cluster._dispy_nodes.keys()))
1285        compute_nodes = []
1286        for ip_addr, node in self._nodes.items():
1287            if cluster in node.clusters:
1288                continue
1289            for node_alloc in cluster._node_allocs:
1290                cpus = node_alloc.allocate(cluster, node.ip_addr, node.name, node.avail_cpus)
1291                if cpus <= 0:
1292                    continue
1293                if cluster.exclusive or self.cooperative:
1294                    node.cpus = min(node.avail_cpus, cpus)
1295                compute_nodes.append(node)
1296        for node in compute_nodes:
1297            Task(self.setup_node, node, [compute])
1298
1299    def cleanup_computation(self, cluster, terminate_pending=False, task=None):
1300        # generator
1301        if not cluster.zombie:
1302            raise StopIteration
1303
1304        compute = cluster._compute
1305        cid = compute.id
1306        pkl_path = os.path.join(self.dest_path_prefix,
1307                                '%s_%s' % (cid, cluster.client_auth))
1308        if self._clusters.pop(cid, None) is None:
1309            if not cluster.pending_results:
1310                try:
1311                    os.remove(pkl_path)
1312                except Exception:
1313                    logger.debug(traceback.format_exc())
1314                    pass
1315            raise StopIteration
1316        cluster._jobs = []
1317        cluster.pending_jobs = 0
1318
1319        if cluster.pending_results == 0:
1320            try:
1321                os.remove(pkl_path)
1322            except Exception:
1323                logger.warning('Could not remove "%s"', pkl_path)
1324        else:
1325            with open(pkl_path, 'wb') as fd:
1326                pickle.dump(cluster, fd)
1327
1328        for path, use_count in cluster.file_uses.items():
1329            if use_count == 1:
1330                try:
1331                    os.remove(path)
1332                except Exception:
1333                    logger.warning('Could not remove "%s"', path)
1334        cluster.file_uses.clear()
1335
1336        if os.path.isdir(cluster.dest_path):
1337            for dirpath, dirnames, filenames in os.walk(cluster.dest_path, topdown=False):
1338                if not filenames or dirpath.endswith('__pycache__'):
1339                    try:
1340                        shutil.rmtree(dirpath)
1341                    except Exception:
1342                        logger.warning('Could not remove "%s"', dirpath)
1343                        break
1344
1345        # remove cluster from all nodes before closing (which uses
1346        # yield); otherwise, scheduler may access removed cluster
1347        # through node.clusters
1348        close_nodes = []
1349        for dispy_node in cluster._dispy_nodes.values():
1350            node = self._nodes.get(dispy_node.ip_addr, None)
1351            if not node:
1352                continue
1353            drop_jobs = [i for i, _job in enumerate(node.pending_jobs)
1354                         if _job.compute_id == cid]
1355            for i in reversed(drop_jobs):
1356                node.pending_jobs.remove(i)
1357            node.clusters.discard(cluster)
1358            if cluster.exclusive:
1359                node.cpus = node.avail_cpus
1360            close_nodes.append((Task(node.close, compute, terminate_pending=terminate_pending),
1361                                dispy_node))
1362        cluster._dispy_nodes.clear()
1363        for close_task, dispy_node in close_nodes:
1364            yield close_task.finish()
1365            yield self.send_node_status(cluster, dispy_node, DispyNode.Closed)
1366        if self.httpd:
1367            self.httpd.del_cluster(cluster)
1368        Task(self.schedule_cluster)
1369
1370    def setup_node(self, node, computes, task=None):
1371        # generator
1372        task.set_daemon()
1373        for compute in computes:
1374            # NB: to avoid computation being sent multiple times, we
1375            # add to cluster's _dispy_nodes before sending computation
1376            # to node
1377            cluster = self._clusters[compute.id]
1378            if node.ip_addr in cluster._dispy_nodes:
1379                continue
1380            dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
1381            if not dispy_node:
1382                dispy_node = DispyNode(node.ip_addr, node.name, node.cpus)
1383                cluster._dispy_nodes[node.ip_addr] = dispy_node
1384                dispy_node.tx = node.tx
1385                dispy_node.rx = node.rx
1386            dispy_node.avail_cpus = node.avail_cpus
1387            dispy_node.avail_info = node.avail_info
1388            r = yield node.setup(compute, exclusive=cluster.exclusive, task=task)
1389            if r or compute.id not in self._clusters:
1390                cluster._dispy_nodes.pop(node.ip_addr, None)
1391                logger.warning('Failed to setup %s for computation "%s": %s',
1392                               node.ip_addr, compute.name, r)
1393                Task(node.close, compute)
1394            else:
1395                dispy_node.update_time = time.time()
1396                node.clusters.add(cluster)
1397                self._sched_event.set()
1398                Task(self.send_node_status, cluster, dispy_node, DispyNode.Initialized)
1399
1400    def add_node(self, info):
1401        try:
1402            # assert info['version'] == _dispy_version
1403            assert info['port'] > 0 and info['cpus'] > 0
1404            # TODO: check if it is one of ext_ip_addr?
1405        except Exception:
1406            # logger.debug(traceback.format_exc())
1407            return
1408        node = self._nodes.get(info['ip_addr'], None)
1409        if node is None:
1410            logger.debug('Discovered %s:%s (%s) with %s cpus',
1411                         info['ip_addr'], info['port'], info['name'], info['cpus'])
1412            node = _Node(info['ip_addr'], info['port'], info['cpus'], info['sign'],
1413                         self.node_secret, platform=info['platform'],
1414                         keyfile=self.node_keyfile, certfile=self.node_certfile)
1415            node.name = info['name']
1416            node.avail_info = info['avail_info']
1417            self._nodes[node.ip_addr] = node
1418        else:
1419            node.last_pulse = time.time()
1420            auth = auth_code(self.node_secret, info['sign'])
1421            if info['cpus'] > 0:
1422                node.avail_cpus = info['cpus']
1423                node.cpus = min(node.cpus, node.avail_cpus)
1424            else:
1425                logger.warning('Invalid "cpus" %s from %s ignored', info['cpus'], info['ip_addr'])
1426            if node.port == info['port'] and node.auth == auth:
1427                return
1428            logger.debug('Node %s rediscovered', info['ip_addr'])
1429            node.port = info['port']
1430            if node.auth is not None:
1431                dead_jobs = [_job for _job in self._sched_jobs.values()
1432                             if _job.node is not None and _job.node.ip_addr == node.ip_addr]
1433                node.busy = 0
1434                node.auth = auth
1435                clusters = list(node.clusters)
1436                node.clusters.clear()
1437                for cluster in clusters:
1438                    dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None)
1439                    if not dispy_node:
1440                        continue
1441                    Task(self.send_node_status, cluster, dispy_node, DispyNode.Closed)
1442                self.reschedule_jobs(dead_jobs)
1443            node.auth = auth
1444        node_computations = []
1445        node.name = info['name']
1446        node.scheduler_ip_addr = info['scheduler_ip_addr']
1447        for cluster in self._clusters.values():
1448            if cluster in node.clusters:
1449                continue
1450            compute = cluster._compute
1451            for node_alloc in cluster._node_allocs:
1452                cpus = node_alloc.allocate(cluster, node.ip_addr, node.name, node.avail_cpus)
1453                if cpus > 0:
1454                    node_computations.append(compute)
1455                    break
1456        if node_computations:
1457            Task(self.setup_node, node, node_computations)
1458
1459    def send_job_result(self, uid, cluster, result, resending=False, task=None):
1460        # generator
1461        sock = socket.socket(cluster.client_sock_family, socket.SOCK_STREAM)
1462        sock = AsyncSocket(sock, keyfile=self.cluster_keyfile, certfile=self.cluster_certfile)
1463        sock.settimeout(MsgTimeout)
1464        try:
1465            yield sock.connect((cluster.client_ip_addr, cluster.client_job_result_port))
1466            yield sock.send_msg(b'JOB_REPLY:' + serialize(result))
1467            ack = yield sock.recv_msg()
1468            assert ack == b'ACK'
1469        except Exception:
1470            status = -1
1471            if not resending:
1472                # store job result even if computation has not enabled
1473                # fault recovery; user may be able to access node and
1474                # retrieve result manually
1475                f = os.path.join(cluster.dest_path, '_dispy_job_reply_%s' % uid)
1476                logger.error('Could not send reply for job %s to %s:%s; saving it in "%s"',
1477                             uid, cluster.client_ip_addr, cluster.client_job_result_port, f)
1478                try:
1479                    with open(f, 'wb') as fd:
1480                        pickle.dump(result, fd)
1481                except Exception:
1482                    logger.debug('Could not save reply for job %s', uid)
1483                else:
1484                    cluster.pending_results += 1
1485                    cluster.file_uses[f] = 2
1486        else:
1487            status = 0
1488            cluster.last_pulse = time.time()
1489            if result.status != DispyJob.ProvisionalResult:
1490                if resending:
1491                    cluster.pending_results -= 1
1492                    f = os.path.join(cluster.dest_path, '_dispy_job_reply_%s' % uid)
1493                    if os.path.isfile(f):
1494                        cluster.file_uses.pop(f, None)
1495                        try:
1496                            os.remove(f)
1497                        except Exception:
1498                            logger.warning('Could not remove "%s"', f)
1499                else:
1500                    self.done_jobs.pop(uid, None)
1501                    if cluster.pending_results:
1502                        Task(self.resend_job_results, cluster)
1503            if cluster.pending_jobs == 0 and cluster.pending_results == 0 and cluster.zombie:
1504                Task(self.cleanup_computation, cluster)
1505            dispy_node = cluster._dispy_nodes.get(result.ip_addr, None)
1506            if dispy_node:
1507                Task(self.send_node_status, cluster, dispy_node, DispyNode.AvailInfo)
1508        finally:
1509            sock.close()
1510
1511        raise StopIteration(status)
1512
1513    def send_job_status(self, cluster, _job, task=None):
1514        if cluster.status_callback:
1515            dispy_node = cluster._dispy_nodes.get(_job.node.ip_addr, None)
1516            # assert _job.job.status == DispyJob.Running
1517            if dispy_node:
1518                dispy_node.busy += 1
1519                dispy_node.update_time = time.time()
1520                cluster.status_callback(_job.job.status, dispy_node, _job.job)
1521        sock = socket.socket(cluster.client_sock_family, socket.SOCK_STREAM)
1522        sock = AsyncSocket(sock, keyfile=self.cluster_keyfile, certfile=self.cluster_certfile)
1523        sock.settimeout(MsgTimeout)
1524        try:
1525            yield sock.connect((cluster.client_ip_addr, cluster.client_job_result_port))
1526            status = {'uid': _job.uid, 'status': _job.job.status, 'node': _job.node.ip_addr,
1527                      'hash': _job.hash}
1528            status['start_time'] = _job.job.start_time
1529            yield sock.send_msg(b'JOB_STATUS:' + serialize(status))
1530        except Exception:
1531            logger.warning('Could not send job status to %s:%s',
1532                           cluster.client_ip_addr, cluster.client_job_result_port)
1533        sock.close()
1534
1535    def send_node_status(self, cluster, dispy_node, status, task=None):
1536        if cluster.status_callback:
1537            dispy_node.update_time = time.time()
1538            cluster.status_callback(status, dispy_node, None)
1539        sock = socket.socket(cluster.client_sock_family, socket.SOCK_STREAM)
1540        sock = AsyncSocket(sock, keyfile=self.cluster_keyfile, certfile=self.cluster_certfile)
1541        sock.settimeout(MsgTimeout)
1542        status_info = {'compute_id': cluster._compute.id,
1543                       'status': status, 'auth': cluster.client_auth}
1544        if status == DispyNode.Initialized:
1545            status_info['dispy_node'] = dispy_node
1546        else:
1547            status_info['ip_addr'] = dispy_node.ip_addr
1548            if status == DispyNode.AvailInfo:
1549                status_info['avail_info'] = dispy_node.avail_info
1550                status_info['tx'] = dispy_node.tx
1551                status_info['rx'] = dispy_node.rx
1552        try:
1553            yield sock.connect((cluster.client_ip_addr, cluster.client_job_result_port))
1554            yield sock.send_msg(b'NODE_STATUS:' + serialize(status_info))
1555        except Exception:
1556            logger.debug('Could not send node status to %s:%s',
1557                         cluster.client_ip_addr, cluster.client_job_result_port)
1558        sock.close()
1559
1560    def job_reply_process(self, reply, msg_len, sock, addr):
1561        _job = self._sched_jobs.get(reply.uid, None)
1562        if not _job or reply.hash != _job.hash:
1563            logger.warning('Ignoring invalid reply for job %s from %s', reply.uid, addr[0])
1564            yield sock.send_msg(b'ACK')
1565            raise StopIteration
1566        job = _job.job
1567        _job._args = _job._kwargs = None
1568        node = self._nodes.get(reply.ip_addr, None)
1569        cluster = self._clusters.get(_job.compute_id, None)
1570        if not cluster:
1571            # job cancelled while/after closing computation
1572            if node:
1573                node.rx += msg_len
1574                if node.busy > 0:
1575                    node.busy -= 1
1576                    node.cpu_time += reply.end_time - reply.start_time
1577                    node.last_pulse = time.time()
1578                    self._sched_event.set()
1579            yield sock.send_msg(b'ACK')
1580            raise StopIteration
1581        if not node:
1582            logger.warning('Ignoring invalid reply for job %s from %s', reply.uid, addr[0])
1583            yield sock.send_msg(b'ACK')
1584            raise StopIteration
1585        # assert reply.ip_addr == node.ip_addr
1586        node.last_pulse = time.time()
1587        logger.debug('Received reply for job %s from %s', _job.uid, addr[0])
1588        # assert _job.job.status not in [DispyJob.Created, DispyJob.Finished]
1589        setattr(reply, 'cpus', node.cpus)
1590        dispy_node = cluster._dispy_nodes.get(_job.node.ip_addr, None)
1591        if dispy_node:
1592            dispy_node.rx += msg_len
1593
1594        yield sock.send_msg(b'ACK')
1595        job.start_time = reply.start_time
1596        job.end_time = reply.end_time
1597        if reply.status != DispyJob.ProvisionalResult:
1598            self.done_jobs[_job.uid] = _job
1599            del self._sched_jobs[_job.uid]
1600            node.busy -= 1
1601            node.cpu_time += reply.end_time - reply.start_time
1602            if cluster.status_callback:
1603                if dispy_node:
1604                    dispy_node.busy -= 1
1605                    dispy_node.jobs_done += 1
1606                    dispy_node.cpu_time += reply.end_time - reply.start_time
1607                    dispy_node.update_time = time.time()
1608                    cluster.status_callback(reply.status, dispy_node, job)
1609
1610            cluster.pending_jobs -= 1
1611            if cluster.pending_jobs == 0:
1612                cluster.end_time = time.time()
1613                if cluster.zombie:
1614                    Task(self.cleanup_computation, cluster)
1615            self._sched_event.set()
1616            for xf in _job.xfer_files:
1617                try:
1618                    cluster.file_uses[xf.name] -= 1
1619                    if cluster.file_uses[xf.name] == 0:
1620                        cluster.file_uses.pop(xf.name)
1621                        os.remove(xf.name)
1622                except Exception:
1623                    logger.warning('Could not remove "%s"', xf.name)
1624        Task(self.send_job_result, _job.uid, cluster, reply, resending=False)
1625
1626    def reschedule_jobs(self, dead_jobs):
1627        if not dead_jobs:
1628            return
1629        for _job in dead_jobs:
1630            cluster = self._clusters[_job.compute_id]
1631            del self._sched_jobs[_job.uid]
1632            if cluster._compute.reentrant and not _job.pinned:
1633                logger.debug('Rescheduling job %s from %s', _job.uid, _job.node.ip_addr)
1634                _job.job.status = DispyJob.Created
1635                _job.hash = ''.join(hex(x)[2:] for x in os.urandom(10))
1636                cluster._jobs.append(_job)
1637            else:
1638                logger.debug('Terminating job %s scheduled on %s', _job.uid, _job.node.ip_addr)
1639                reply = _JobReply(_job, _job.node.ip_addr, status=DispyJob.Abandoned)
1640                reply.result = serialize(None)
1641                cluster.pending_jobs -= 1
1642                if cluster.pending_jobs == 0:
1643                    cluster.end_time = time.time()
1644                self.done_jobs[_job.uid] = _job
1645                Task(self.send_job_result, _job.uid, cluster, reply, resending=False)
1646        self._sched_event.set()
1647
1648    def load_balance_node(self):
1649        """Return node with least load
1650        """
1651        # TODO: maintain "available" sequence of nodes for better performance
1652        node = None
1653        load = 1.0
1654        for host in self._nodes.values():
1655            if host.busy >= host.cpus:
1656                continue
1657            if host.pending_jobs:
1658                return host
1659            if not any(cluster._jobs for cluster in host.clusters):
1660                continue
1661            if (host.busy / host.cpus) < load:
1662                node = host
1663                load = host.busy / host.cpus
1664        return node
1665
1666    def fsfs_job_schedule(self):
1667        """Return tuple (_job, node, cluster) such that _job is earliest
1668        submitted in all clusters.
1669        """
1670        node = self.load_balance_node()
1671        if not node:
1672            return (None, None, None)
1673        _job = cluster = lrs = None
1674        for cluster in node.clusters:
1675            if cluster._jobs and (not lrs or
1676                                  cluster._jobs[0].job.submit_time < lrs._jobs[0].job.submit_time):
1677                lrs = cluster
1678        if lrs:
1679            if node.pending_jobs:
1680                if node.pending_jobs[0].job.submit_time < lrs._jobs[0].job.submit_time:
1681                    _job = node.pending_jobs.pop(0)
1682                    cluster = self._clusters[_job.compute_id]
1683            if not _job:
1684                cluster = lrs
1685                _job = cluster._jobs.pop(0)
1686        elif node.pending_jobs:
1687            _job = node.pending_jobs.pop(0)
1688            cluster = self._clusters[_job.compute_id]
1689        return (_job, node, cluster)
1690
1691    def fair_cluster_schedule(self):
1692        """Return tuple (_job, node, cluster) such that cluster is earliest
1693        scheduled last time.
1694        """
1695        node = self.load_balance_node()
1696        if not node:
1697            return (None, None, None)
1698        _job = cluster = lrs = None
1699        for cluster in node.clusters:
1700            if cluster._jobs and (not lrs or cluster.job_sched_time < lrs.job_sched_time):
1701                lrs = cluster
1702        if lrs:
1703            if node.pending_jobs:
1704                _job = node.pending_jobs[0]
1705                cluster = self._clusters[_job.compute_id]
1706                if cluster.job_sched_time < lrs.job_sched_time:
1707                    node.pending_jobs.pop(0)
1708                else:
1709                    cluster = lrs
1710                    _job = cluster._jobs.pop(0)
1711            if not _job:
1712                cluster = lrs
1713                _job = cluster._jobs.pop(0)
1714        elif node.pending_jobs:
1715            _job = node.pending_jobs.pop(0)
1716            cluster = self._clusters[_job.compute_id]
1717        if _job:
1718            cluster.job_sched_time = time.time()
1719        return (_job, node, cluster)
1720
1721    def fcfs_cluster_schedule(self):
1722        """Return tuple (_job, node, cluster) such that cluster is created
1723        earliest.
1724        """
1725        node = self.load_balance_node()
1726        if not node:
1727            return (None, None, None)
1728        _job = cluster = lrs = None
1729        for cluster in node.clusters:
1730            if cluster._jobs and (not lrs or cluster.start_time < lrs.start_time):
1731                lrs = cluster
1732        if lrs:
1733            if node.pending_jobs:
1734                _job = node.pending_jobs[0]
1735                cluster = self._clusters[_job.compute_id]
1736                if cluster.start_time < lrs.start_time:
1737                    node.pending_jobs.pop(0)
1738                else:
1739                    cluster = lrs
1740                    _job = cluster._jobs.pop(0)
1741            if not _job:
1742                cluster = lrs
1743                _job = cluster._jobs.pop(0)
1744        elif node.pending_jobs:
1745            _job = node.pending_jobs.pop(0)
1746            cluster = self._clusters[_job.compute_id]
1747        return (_job, node, cluster)
1748
1749    def run_job(self, _job, cluster, task=None):
1750        # generator
1751        # assert task is not None
1752        node = _job.node
1753        dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
1754        try:
1755            tx = yield _job.run(task=task)
1756            if dispy_node:
1757                dispy_node.tx += tx
1758        except (EnvironmentError, OSError):
1759            logger.warning('Failed to run job %s on %s for computation %s; removing this node',
1760                           _job.uid, node.ip_addr, cluster._compute.name)
1761            if node.pending_jobs:
1762                # TODO: instead of discarding pending jobs, maintain them
1763                # elsewhere, while cluster is alive?
1764                for njob in node.pending_jobs:
1765                    self.done_jobs[_njob.uid] = _njob
1766                    cl = self._clusters[njob.compute_id]
1767                    cl.pending_jobs -= 1
1768                    reply = _JobReply(_njob, cl.ip_addr, status=DispyJob.Cancelled)
1769                    reply.result = serialize(None)
1770                    Task(self.send_job_result, _njob.uid, cl, reply, resending=False)
1771                node.pending_jobs = []
1772            # TODO: need to close computations on this node?
1773            for cl in node.clusters:
1774                dn = cl._dispy_nodes.pop(node.ip_addr, None)
1775                if dn and cl.status_callback:
1776                    Task(self.send_node_status, cl, dn, DispyNode.Closed)
1777            node.clusters.clear()
1778            self._nodes.pop(node.ip_addr, None)
1779            if self._sched_jobs.pop(_job.uid, None) == _job:
1780                if not _job.pinned:
1781                    cluster._jobs.insert(0, _job)
1782                node.busy -= 1
1783            self._sched_event.set()
1784        except Exception:
1785            logger.warning('Failed to run job %s on %s for computation %s',
1786                           _job.uid, node.ip_addr, cluster._compute.name)
1787            # logger.debug(traceback.format_exc())
1788            # TODO: delay executing again for some time?
1789            # this job might have been deleted already due to timeout
1790            if self._sched_jobs.pop(_job.uid, None) == _job:
1791                if cluster.status_callback:
1792                    if dispy_node:
1793                        dispy_node.update_time = time.time()
1794                        cluster.status_callback(DispyJob.Cancelled, dispy_node, _job.job)
1795                node.busy -= 1
1796            self._sched_event.set()
1797        else:
1798            # job may have already finished (in which case _job.job would be None)
1799            if _job.job:
1800                logger.debug('Running job %s on %s (busy: %d / %d)',
1801                             _job.uid, node.ip_addr, node.busy, node.cpus)
1802                _job.job.status = DispyJob.Running
1803                _job.job.start_time = time.time()
1804                cluster = self._clusters[_job.compute_id]
1805                # TODO/Note: It is likely that this job status may arrive at
1806                # the client before the job is done and the node's status
1807                # arrives. Either use queing for messages (ideally with
1808                # pycos's message passing) or tag messages with timestamps
1809                # so recipient can use temporal ordering to ignore prior
1810                # messages
1811                Task(self.send_job_status, cluster, _job)
1812        if not cluster._compute.reentrant:
1813            _job._args = _job._kwargs = None
1814
1815    def _schedule_jobs(self, task=None):
1816        # generator
1817        assert task is not None
1818        while not self.terminate:
1819            # n = sum(len(cluster._jobs) for cluster in self._clusters.values())
1820            _job, node, cluster = self.select_job_node_cluster()
1821            if not _job:
1822                self._sched_event.clear()
1823                yield self._sched_event.wait()
1824                continue
1825            _job.node = node
1826            # assert node.busy < node.cpus
1827            self._sched_jobs[_job.uid] = _job
1828            node.busy += 1
1829            Task(self.run_job, _job, cluster)
1830
1831        logger.debug('Scheduler quitting: %s', len(self._sched_jobs))
1832        for uid, _job in self._sched_jobs.items():
1833            cluster = self._clusters.get(_job.compute_id, None)
1834            if cluster:
1835                reply = _JobReply(_job, cluster.ip_addr, status=DispyJob.Terminated)
1836                reply.result = serialize(None)
1837                Task(self.send_job_result, _job.uid, cluster, reply, resending=False)
1838        for cluster in self._clusters.values():
1839            for _job in cluster._jobs:
1840                reply = _JobReply(_job, cluster.ip_addr, status=DispyJob.Terminated)
1841                reply.result = serialize(None)
1842                Task(self.send_job_result, _job.uid, cluster, reply, resending=False)
1843            cluster._jobs = []
1844
1845        for cluster in list(self._clusters.values()):
1846            cluster.pending_jobs = 0
1847            cluster.zombie = True
1848            yield self.cleanup_computation(cluster)
1849        self._clusters = {}
1850        self._sched_jobs = {}
1851        self.done_jobs = {}
1852        logger.debug('Scheduler quit')
1853
1854    def retrieve_job_req(self, conn, msg):
1855        # generator
1856
1857        def send_reply(reply):
1858            try:
1859                yield conn.send_msg(serialize(reply))
1860            except Exception:
1861                raise StopIteration(-1)
1862            raise StopIteration(0)
1863
1864        try:
1865            req = deserialize(msg)
1866            uid = req['uid']
1867            compute_id = req['compute_id']
1868            auth = req['auth']
1869            job_hash = req['hash']
1870        except Exception:
1871            yield send_reply(None)
1872            raise StopIteration
1873
1874        pkl_path = os.path.join(self.dest_path_prefix, '%s_%s' % (compute_id, auth))
1875        cluster = self._clusters.get(compute_id, None)
1876        if not cluster or cluster.client_auth != auth:
1877            with open(pkl_path, 'rb') as fd:
1878                cluster = pickle.load(fd)
1879        if not cluster or cluster.client_auth != auth:
1880            yield send_reply(None)
1881            raise StopIteration
1882
1883        info_file = os.path.join(cluster.dest_path, '_dispy_job_reply_%s' % uid)
1884        if not os.path.isfile(info_file):
1885            yield send_reply(None)
1886            raise StopIteration
1887
1888        try:
1889            with open(info_file, 'rb') as fd:
1890                job_reply = pickle.load(fd)
1891            assert job_reply.hash == job_hash
1892        except Exception:
1893            yield send_reply(None)
1894            raise StopIteration
1895
1896        try:
1897            yield conn.send_msg(serialize(job_reply))
1898            ack = yield conn.recv_msg()
1899            assert ack == 'ACK'.encode()
1900            cluster.pending_results -= 1
1901            with open(pkl_path, 'wb') as fd:
1902                pickle.dump(cluster, fd)
1903        except Exception:
1904            pass
1905        else:
1906            cluster.file_uses.pop(info_file, None)
1907            try:
1908                os.remove(info_file)
1909            except Exception:
1910                pass
1911
1912    def cancel_job(self, cluster, uid):
1913        # function
1914        cluster.last_pulse = time.time()
1915        _job = self._sched_jobs.get(uid, None)
1916        if _job:
1917            _job.job.status = DispyJob.Cancelled
1918            Task(_job.node.send, b'TERMINATE_JOB:' + serialize(_job), reply=False)
1919            return 0
1920        else:
1921            for i, _job in enumerate(cluster._jobs):
1922                if _job.uid == uid:
1923                    del cluster._jobs[i]
1924                    self.done_jobs[_job.uid] = _job
1925                    cluster.pending_jobs -= 1
1926                    reply = _JobReply(_job, cluster.ip_addr, status=DispyJob.Cancelled)
1927                    reply.result = serialize(None)
1928                    Task(self.send_job_result, _job.uid, cluster, reply, resending=False)
1929                    return 0
1930
1931            for ip_addr in cluster._dispy_nodes:
1932                node = self._nodes.get(ip_addr, None)
1933                if not node:
1934                    continue
1935                for i, _job in enumerate(node.pending_jobs):
1936                    if _job.uid == uid:
1937                        del node.pending_jobs[i]
1938                        self.done_jobs[_job.uid] = _job
1939                        cluster.pending_jobs -= 1
1940                        reply = _JobReply(_job, cluster.ip_addr, status=DispyJob.Cancelled)
1941                        reply.result = serialize(None)
1942                        Task(self.send_job_result, _job.uid, cluster, reply, resending=False)
1943                        return 0
1944            logger.debug('Invalid job %s!', uid)
1945            return -1
1946
1947    def allocate_node(self, cluster, node_alloc, task=None):
1948        # generator
1949        if not isinstance(node_alloc, list):
1950            node_alloc = [node_alloc]
1951        for i in range(len(node_alloc)-1, -1, -1):
1952            node = self._nodes.get(node_alloc[i].ip_addr, None)
1953            if node:
1954                dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
1955                if dispy_node:
1956                    node.clusters.add(cluster)
1957                    self._sched_event.set()
1958                    del node_alloc[i]
1959                    continue
1960        if not node_alloc:
1961            raise StopIteration(0)
1962        cluster._node_allocs.extend(node_alloc)
1963        cluster._node_allocs = sorted(cluster._node_allocs,
1964                                      key=lambda node_alloc: node_alloc.ip_rex, reverse=True)
1965        present = set()
1966        cluster._node_allocs = [na for na in cluster._node_allocs
1967                                if na.ip_rex not in present and not present.add(na.ip_rex)]
1968        del present
1969        self.add_cluster(cluster)
1970        yield 0
1971
1972    def deallocate_node(self, cluster, node, task=None):
1973        # generator
1974        node = self._nodes.get(_node_ipaddr(node), None)
1975        if node is None:
1976            raise StopIteration(-1)
1977        node.clusters.discard(cluster)
1978        yield 0
1979
1980    def close_node(self, cluster, node, terminate_pending, task=None):
1981        # generator
1982        node = self._nodes.get(_node_ipaddr(node), None)
1983        if node is None:
1984            raise StopIteration(-1)
1985        node.clusters.discard(cluster)
1986        yield node.close(cluster._compute, terminate_pending=terminate_pending)
1987
1988    def node_jobs(self, cluster, node, from_node=False, get_uids=True, task=None):
1989        # generator
1990        node = self._nodes.get(_node_ipaddr(node), None)
1991        if not node or cluster not in node.clusters:
1992            raise StopIteration([])
1993        if from_node:
1994            sock = socket.socket(node.sock_family, socket.SOCK_STREAM)
1995            sock = AsyncSocket(sock, keyfile=self.node_keyfile, certfile=self.node_certfile)
1996            sock.settimeout(MsgTimeout)
1997            try:
1998                yield sock.connect((node.ip_addr, node.port))
1999                yield sock.sendall(node.auth)
2000                req = {'compute_id': cluster._compute.id, 'auth': cluster._compute.auth}
2001                yield sock.send_msg(b'JOBS:' + serialize(req))
2002                msg = yield sock.recv_msg()
2003                uids = [info['uid'] for info in deserialize(msg)]
2004            except Exception:
2005                logger.debug(traceback.format_exc())
2006                uids = []
2007            sock.close()
2008            if get_uids:
2009                jobs = uids
2010            else:
2011                _jobs = [self._sched_jobs.get(uid, None) for uid in uids]
2012                jobs = [_job.job for _job in _jobs if _job is not None
2013                        and _job.compute_id == cluster._compute.id
2014                        ]
2015        else:
2016            if get_uids:
2017                jobs = [_job.uid for _job in self._sched_jobs.values() if _job.node == node
2018                        and _job.compute_id == cluster._compute.id]
2019            else:
2020                jobs = [_job.job for _job in self._sched_jobs.values() if _job.node == node
2021                        and _job.compute_id == cluster._compute.id]
2022
2023        raise StopIteration(jobs)
2024
2025    def set_node_cpus(self, node, cpus):
2026        # generator
2027        try:
2028            cpus = int(cpus)
2029        except ValueError:
2030            raise StopIteration(-1)
2031        node = self._nodes.get(_node_ipaddr(node), None)
2032        if node is None:
2033            reply = (None, -1)
2034        else:
2035            if cpus >= 0:
2036                node.cpus = min(node.avail_cpus, cpus)
2037            elif (node.avail_cpus + cpus) >= 0:
2038                node.cpus = node.avail_cpus + cpus
2039            cpus = node.cpus
2040            for cluster in node.clusters:
2041                dispy_node = cluster._dispy_nodes.get(node.ip_addr, None)
2042                if dispy_node:
2043                    dispy_node.cpus = cpus
2044            yield self._sched_event.set()
2045            reply = (node.ip_addr, node.cpus)
2046        raise StopIteration(reply)
2047
2048    def shutdown(self):
2049        def _shutdown(self, task=None):
2050            logger.debug('Shutting down scheduler ...')
2051            for cluster in list(self.pending_clusters.values()) + self.unsched_clusters:
2052                path = os.path.join(self.dest_path_prefix,
2053                                    '%s_%s' % (cluster._compute.id, cluster.client_auth))
2054                if os.path.isfile(path):
2055                    os.remove(path)
2056                try:
2057                    shutil.rmtree(cluster.dest_path)
2058                except Exception:
2059                    logger.debug(traceback.format_exc())
2060                # TODO: inform cluster
2061            self.pending_clusters.clear()
2062            self.unsched_clusters = []
2063            while (any(cluster.pending_jobs for cluster in self._clusters.values())):
2064                logger.warning('Waiting for %s clusters to finish', len(self._clusters))
2065                yield task.sleep(5)
2066
2067            self._sched_event.set()
2068            yield self.job_scheduler_task.finish()
2069            yield self.print_status()
2070
2071        if self.terminate:
2072            return
2073        self.terminate = True
2074        Task(_shutdown, self).value()
2075        if self.pycos:
2076            self.pycos.finish()
2077            self.pycos = None
2078        Singleton.empty(self.__class__)
2079
2080    def print_status(self):
2081        print('')
2082        heading = ' %30s | %5s | %13s' % ('Node', 'CPUs', 'Node Time Sec')
2083        print(heading)
2084        print('-' * len(heading))
2085        tot_cpu_time = 0
2086        for ip_addr in sorted(self._nodes, key=lambda addr: self._nodes[addr].cpu_time,
2087                              reverse=True):
2088            node = self._nodes[ip_addr]
2089            tot_cpu_time += node.cpu_time
2090            if node.name:
2091                name = ip_addr + ' (' + node.name + ')'
2092            else:
2093                name = ip_addr
2094            print(' %-30.30s | %5s | %13.3f' % (name, node.cpus, node.cpu_time))
2095        print('')
2096        print('Total job time: %.3f sec\n' % (tot_cpu_time))
2097        if self._clusters:
2098            print('Current clients: %s (%s)' % (len(self._clusters),
2099                                                ', '.join(cluster.ip_addr for cluster in
2100                                                          self._clusters.values())))
2101        if self.unsched_clusters:
2102            print('Pending clients: %s' % (len(self.unsched_clusters)))
2103        print('')
2104        yield 0
2105
2106
2107if __name__ == '__main__':
2108    import argparse
2109
2110    logger = pycos.Logger('dispyscheduler')
2111
2112    parser = argparse.ArgumentParser()
2113    parser.add_argument('--config', dest='config', default='',
2114                        help='use configuration in given file')
2115    parser.add_argument('--save_config', dest='save_config', default='',
2116                        help='save configuration in given file and exit')
2117    parser.add_argument('-d', '--debug', action='store_true', dest='loglevel', default=False,
2118                        help='if given, debug messages are printed')
2119    parser.add_argument('-n', '--nodes', action='append', dest='nodes', default=[],
2120                        help='name or IP address used for all computations; '
2121                        'repeat for multiple nodes')
2122    parser.add_argument('-i', '--ip_addr', action='append', dest='ip_addrs', default=[],
2123                        help='IP address to use; repeat for multiple interfaces')
2124    parser.add_argument('--ext_ip_addr', action='append', dest='ext_ip_addrs', default=[],
2125                        help='External IP address to use (needed in case of NAT firewall/gateway);'
2126                        ' repeat for multiple interfaces')
2127    parser.add_argument('-p', '--port', dest='port', type=int, default=51347,
2128                        help='port number for UDP data and job results')
2129    parser.add_argument('--node_port', dest='node_port', type=int, default=51348,
2130                        help='port number used by nodes')
2131    parser.add_argument('--scheduler_port', dest='scheduler_port', type=int, default=51349,
2132                        help='port number for scheduler')
2133    parser.add_argument('--ipv4_udp_multicast', dest='ipv4_udp_multicast', action='store_true',
2134                        default=False, help='use multicast for IPv4 UDP instead of broadcast')
2135    parser.add_argument('--node_secret', dest='node_secret', default='',
2136                        help='authentication secret for handshake with dispy clients')
2137    parser.add_argument('--node_keyfile', dest='node_keyfile', default='',
2138                        help='file containing SSL key to be used with nodes')
2139    parser.add_argument('--node_certfile', dest='node_certfile', default='',
2140                        help='file containing SSL certificate to be used with nodes')
2141    parser.add_argument('--cluster_secret', dest='cluster_secret', default='',
2142                        help='file containing SSL certificate to be used with dispy clients')
2143    parser.add_argument('--cluster_certfile', dest='cluster_certfile', default='',
2144                        help='file containing SSL certificate to be used with dispy clients')
2145    parser.add_argument('--cluster_keyfile', dest='cluster_keyfile', default='',
2146                        help='file containing SSL key to be used with dispy clients')
2147    parser.add_argument('--pulse_interval', dest='pulse_interval', type=float, default=0,
2148                        help='number of seconds between pulse messages to indicate '
2149                        'whether node is alive')
2150    parser.add_argument('--ping_interval', dest='ping_interval', type=float, default=0,
2151                        help='number of seconds between ping messages to discover nodes')
2152    parser.add_argument('--zombie_interval', dest='zombie_interval', default=60, type=float,
2153                        help='interval in minutes to presume unresponsive scheduler is zombie')
2154    parser.add_argument('--msg_timeout', dest='msg_timeout', default=MsgTimeout, type=float,
2155                        help='timeout used for messages to/from client/nodes in seconds')
2156    parser.add_argument('--dest_path_prefix', dest='dest_path_prefix', default=None,
2157                        help='path prefix where files sent by dispy are stored')
2158    parser.add_argument('--max_file_size', dest='max_file_size', default=str(MaxFileSize), type=str,
2159                        help='maximum file size of any file transferred')
2160    parser.add_argument('--clean', action='store_true', dest='clean', default=False,
2161                        help='if given, files copied from or generated by clients '
2162                        'will be removed before scheduler starts')
2163    parser.add_argument('--httpd', action='store_true', dest='http_server', default=False,
2164                        help='if given, HTTP server is created so clusters can be '
2165                        'monitored and managed')
2166    parser.add_argument('--fair_cluster_scheduler', dest='scheduler_alg', action='store_const',
2167                        const='fair_cluster',
2168                        help='Choose job from cluster that was least recently scheduled')
2169    parser.add_argument('--early_cluster_scheduler', dest='scheduler_alg', action='store_const',
2170                        const='fcfs_cluster',
2171                        help='Choose job from cluster created earliest')
2172    parser.add_argument('--cooperative', action='store_true', dest='cooperative', default=False,
2173                        help='if given, clients (clusters) can update CPUs')
2174    parser.add_argument('--cleanup_nodes', action='store_true', dest='cleanup_nodes', default=False,
2175                        help='if given, nodes always remove files even if '
2176                        '"cleanup=False" is used by clients')
2177    parser.add_argument('--daemon', action='store_true', dest='daemon', default=False,
2178                        help='if given, input is not read from terminal')
2179
2180    config = vars(parser.parse_args(sys.argv[1:]))
2181
2182    if config['config']:
2183        import configparser
2184        cfg = configparser.ConfigParser()
2185        cfg.read(config['config'])
2186        cfg = dict(cfg.items('DEFAULT'))
2187        cfg['nodes'] = [] if cfg['nodes'] == '[]' else \
2188                       [_.strip() for _ in cfg['nodes'][1:-1].split(',')]
2189        cfg['ip_addrs'] = [] if cfg['ip_addrs'] == '[]' else \
2190                         [_.strip() for _ in cfg['ip_addrs'][1:-1].split(',')]
2191        cfg['ext_ip_addrs'] = [] if cfg['ext_ip_addrs'] == '[]' else \
2192                             [_.strip() for _ in cfg['ext_ip_addrs'][1:-1].split(',')]
2193        cfg['port'] = int(cfg['port'])
2194        cfg['node_port'] = int(cfg['node_port'])
2195        cfg['scheduler_port'] = int(cfg['scheduler_port'])
2196        cfg['ipv4_udp_multicast'] = cfg['ipv4_udp_multicast'] == 'True'
2197        cfg['pulse_interval'] = float(cfg['pulse_interval'])
2198        cfg['ping_interval'] = float(cfg['ping_interval'])
2199        cfg['zombie_interval'] = float(cfg['zombie_interval'])
2200        cfg['msg_timeout'] = float(cfg['msg_timeout'])
2201        cfg['loglevel'] = cfg['loglevel'] == 'True'
2202        cfg['clean'] = cfg['clean'] == 'True'
2203        cfg['http_server'] = cfg['http_server'] == 'True'
2204        cfg['cooperative'] = cfg['cooperative'] == 'True'
2205        cfg['cleanup_nodes'] = cfg['cleanup_nodes'] == 'True'
2206        cfg['daemon'] = cfg['daemon'] == 'True'
2207        if cfg['dest_path_prefix'] == 'None':
2208            cfg['dest_path_prefix'] = None
2209        if cfg['scheduler_alg'] == 'None':
2210            cfg['scheduler_alg'] = None
2211        config = cfg
2212    config.pop('config', None)
2213
2214    cfg = config.pop('save_config', None)
2215    if cfg:
2216        import configparser
2217        config = configparser.ConfigParser(config)
2218        cfg = open(cfg, 'w')
2219        config.write(cfg)
2220        cfg.close()
2221        exit(0)
2222    del parser, cfg
2223
2224    if config['loglevel']:
2225        logger.setLevel(logger.DEBUG)
2226        pycos.logger.setLevel(pycos.logger.DEBUG)
2227    else:
2228        logger.setLevel(logger.INFO)
2229    del config['loglevel']
2230
2231    if config['zombie_interval']:
2232        config['zombie_interval'] = float(config['zombie_interval'])
2233        if config['zombie_interval'] < 1:
2234            raise Exception('zombie_interval must be at least 1')
2235
2236    MsgTimeout = config['msg_timeout']
2237    del config['msg_timeout']
2238
2239    m = re.match(r'(\d+)([kKmMgGtT]?)', config['max_file_size'])
2240    if m:
2241        MaxFileSize = int(m.group(1))
2242        if m.group(2):
2243            m = m.group(2).lower()
2244            if m == 'k':
2245                MaxFileSize *= 1024
2246            elif m == 'm':
2247                MaxFileSize *= 1024**2
2248            elif m == 'g':
2249                MaxFileSize *= 1024**3
2250            elif m == 't':
2251                MaxFileSize *= 1024**4
2252            else:
2253                raise Exception('invalid max_file_size option')
2254    else:
2255        raise Exception('max_file_size must be >= 0')
2256    del config['max_file_size']
2257
2258    if config['node_certfile']:
2259        config['node_certfile'] = os.path.abspath(config['node_certfile'])
2260    else:
2261        config['node_certfile'] = None
2262    if config['node_keyfile']:
2263        config['node_keyfile'] = os.path.abspath(config['node_keyfile'])
2264    else:
2265        config['node_keyfile'] = None
2266
2267    if config['cluster_certfile']:
2268        config['cluster_certfile'] = os.path.abspath(config['cluster_certfile'])
2269    else:
2270        config['cluster_certfile'] = None
2271    if config['cluster_keyfile']:
2272        config['cluster_keyfile'] = os.path.abspath(config['cluster_keyfile'])
2273    else:
2274        config['cluster_keyfile'] = None
2275
2276    daemon = config.pop('daemon', False)
2277    if not daemon:
2278        try:
2279            if os.getpgrp() != os.tcgetpgrp(sys.stdin.fileno()):
2280                daemon = True
2281        except (Exception, KeyboardInterrupt):
2282            pass
2283
2284    logger.info('dispyscheduler version %s', _dispy_version)
2285    scheduler = _Scheduler(**config)
2286    if daemon:
2287        scheduler.job_scheduler_task.value()
2288    else:
2289        while 1:
2290            try:
2291                cmd = input('Enter "quit" or "exit" to terminate scheduler, '
2292                            'anything else to get status: ')
2293                cmd = cmd.strip().lower()
2294                if cmd == 'quit' or cmd == 'exit':
2295                    break
2296                else:
2297                    Task(scheduler.print_status)
2298                    time.sleep(0.2)
2299            except KeyboardInterrupt:
2300                # TODO: terminate even if jobs are scheduled?
2301                logger.info('Interrupted; terminating')
2302                break
2303            except (Exception, KeyboardInterrupt):
2304                logger.debug(traceback.format_exc())
2305    scheduler.shutdown()
2306    exit(0)
2307