1#!/usr/bin/python3 2 3""" 4dispy: Distribute computations among CPUs/cores on a single machine or machines 5in cluster(s), grid, cloud etc. for parallel execution. See 6http://dispy.sourceforge.net or https://pgiri.github.com/dispy for details. 7""" 8 9import os 10import sys 11import time 12import socket 13import inspect 14import stat 15import threading 16import re 17import ssl 18import hashlib 19import traceback 20import shelve 21import datetime 22import atexit 23import functools 24import queue 25import numbers 26import collections 27import struct 28import errno 29import platform 30import itertools 31import copy 32import types 33try: 34 import netifaces 35except ImportError: 36 netifaces = None 37 38import pycos 39from pycos import Task, Pycos, AsyncSocket, Singleton, serialize, deserialize 40 41__author__ = "Giridhar Pemmasani (pgiri@yahoo.com)" 42__email__ = "pgiri@yahoo.com" 43__copyright__ = "Copyright 2011, Giridhar Pemmasani" 44__contributors__ = [] 45__maintainer__ = "Giridhar Pemmasani (pgiri@yahoo.com)" 46__license__ = "Apache 2.0" 47__url__ = "http://dispy.sourceforge.net" 48__status__ = "Production" 49__version__ = "4.10.6" 50 51__all__ = ['logger', 'DispyJob', 'DispyNode', 'NodeAllocate', 'JobCluster', 'SharedJobCluster'] 52 53_dispy_version = __version__ 54MsgTimeout = 10 55IPV6_MULTICAST_GROUP = 'ff05::b409:3171:9705:6159:5134:70' 56IPV4_MULTICAST_GROUP = '239.255.51.34' 57# PyPI / pip packaging adjusts assertion below for Python 3.7+ 58assert sys.version_info.major == 3 and sys.version_info.minor >= 7, \ 59 ('"%s" is not suitable for Python version %s.%s; use file installed by pip instead' % 60 (__file__, sys.version_info.major, sys.version_info.minor)) 61 62 63class DispyJob(object): 64 """Job scheduled for execution with dispy. 65 66 Once a job is scheduled (with a tuple of arguments), the __call__ 67 method can be invoked. This will wait until the job is 68 complete. The result of the call (either the return value in case 69 of python methods or the exit value in case of programs) will be 70 returned; the result is also available as .result member if 71 needed. In addition, any output, error, exception messages from 72 the job will be available as .stdout, .stderr and .exception 73 members. The time when the job was submitted for execution on a 74 node will be available as .start_time and when the job results 75 became available as .end_time. 76 77 .id field is initially set to None and may be assigned by user to 78 any value that is appropriate. This may be useful, for example, 79 to distinguish one job from another. 80 81 .status is read-only field; it is set to one of Created, Running, 82 Finished, Cancelled, Terminated and ProvisionalResult, indicating 83 current status of job. If job is created for SharedJobCluster, 84 status is not updated to Running when job is actually running. 85 86 .ip_addr is read-inly field; it is set to IP address of node that 87 executed job. 88 89 .finish is a read-only event that is set when a job's results are 90 available. 91 92 """ 93 94 __slots__ = ('id', 'result', 'stdout', 'stderr', 'exception', 95 'submit_time', 'start_time', 'end_time', 'status', 96 'ip_addr', 'finish', '_args', '_kwargs', '_dispy_job_', '_uid') 97 98 Created = 5 99 Running = 6 100 ProvisionalResult = 7 101 # NB: Cancelled, Terminated and Finished status should have 102 # values in that order, as PriorityQueue sorts data. 103 # Thus, if a job with provisional result is already in the queue 104 # and a job is finished, finished/terminated job is processed (in 105 # callback) last. 106 Cancelled = 8 107 Terminated = 9 108 Abandoned = 10 109 Finished = 11 110 111 id_iter = itertools.count(start=1) 112 113 def __init__(self, job_id, args, kwargs): 114 # id can be assigned by user as appropriate (e.g., to distinguish jobs) 115 if job_id: 116 self.id = job_id 117 else: 118 self.id = next(DispyJob.id_iter) 119 # rest are read-only 120 self.result = None 121 self.stdout = None 122 self.stderr = None 123 self.exception = None 124 self.submit_time = time.time() 125 self.start_time = None 126 self.end_time = None 127 self.status = DispyJob.Created 128 self.ip_addr = None 129 self.finish = threading.Event() 130 131 # rest are for dispy implementation only - these are opaque to clients 132 self._args = args 133 self._kwargs = kwargs 134 self._dispy_job_ = None 135 self._uid = id(self) 136 137 def __call__(self, clear=False): 138 self.finish.wait() 139 if clear: 140 self.finish.clear() 141 return self.result 142 143 def __lt__(self, other): 144 if isinstance(self._dispy_job_, _DispyJob_): 145 if isinstance(other._dispy_job_, _DispyJob_): 146 return self._dispy_job_ < other._dispy_job_ 147 else: 148 return True 149 else: 150 return False 151 152 153class DispyNodeAvailInfo(object): 154 """A node's status is represented as available CPU as percent, memory in 155 bytes and disk as bytes. This information is passed to NodeAllocte.allocate 156 method and in cluster status callback with status DispyNode.AvailInfo. 157 """ 158 def __init__(self, cpu, memory, disk, swap): 159 self.cpu = cpu 160 self.memory = memory 161 self.disk = disk 162 self.swap = swap 163 164 165class DispyNode(object): 166 """If 'cluster_status' is used when creating cluster, that function 167 is called with an instance of this class as first argument. 168 See 'cluster_status' in JobCluster below. 169 """ 170 171 Initialized = DispyJob.Created - 1 172 Closed = DispyJob.Finished + 5 173 AvailInfo = Closed + 1 174 175 def __init__(self, ip_addr, name, cpus): 176 self.ip_addr = ip_addr 177 self.name = name 178 self.cpus = cpus 179 self.avail_cpus = cpus 180 self.busy = 0 181 self.jobs_done = 0 182 self.cpu_time = 0.0 183 self.tx = 0 184 self.rx = 0 185 self.update_time = 0 186 self.avail_info = None 187 188 189class NodeAllocate(object): 190 """Objects of this class describe if / how many CPUs in a node are 191 allocated to clusters. 192 193 Each element of 'nodes' passed to JobCluster or SharedJobCluster 194 is an object of this class; if the element passed is a string 195 (host name or IP address), a tuple (see documentation for 196 details), it is converted to NodeAllocate object with 197 '_parse_node_allocs' function. 198 199 This class can be specialized (inherited) to override, for 200 example, 'allocate' method. 201 """ 202 def __init__(self, host, port=None, cpus=0): 203 self.ip_addr = _node_ipaddr(host) 204 if not self.ip_addr: 205 logger.warning('host "%s" is invalid', host) 206 self.ip_rex = '' 207 else: 208 self.ip_rex = self.ip_addr.replace('.', '\\.').replace('*', '.*') 209 if port: 210 try: 211 port = int(port) 212 assert port > 0 213 except Exception: 214 logger.warning('port must be > 0 for node "%s"', host) 215 port = None 216 self.port = port 217 if cpus: 218 try: 219 cpus = int(cpus) 220 except Exception: 221 logger.warning('Invalid cpus for "%s" ignored', host) 222 cpus = 0 223 self.cpus = cpus 224 225 def allocate(self, cluster, ip_addr, name, cpus, avail_info=None, platform=''): 226 """When a node is found, dispy calls this method with the 227 cluster for which the node is being allocated, IP address, 228 name and CPUs available on that node. This method should 229 return a number indicating number of CPUs to use. If return 230 value is 0, the node is not used for that cluster. 231 """ 232 if re.match(self.ip_rex, ip_addr): 233 if self.cpus > 0: 234 cpus = min(cpus, self.cpus) 235 elif (cpus + self.cpus) > 0: 236 cpus = cpus + self.cpus 237 return cpus 238 return 0 239 240 241# a cluster's "status" function (not "cluster_status" callback) 242# returns this structure; "nodes" is list of DispyNode objects and 243# "jobs_pending" is number of jobs that are not done yet 244ClusterStatus = collections.namedtuple('ClusterStatus', ['nodes', 'jobs_pending']) 245 246 247def num_min(*args): 248 items = [arg for arg in args if isinstance(arg, numbers.Number)] 249 if not items: 250 return None 251 return min(items) 252 253 254def num_max(*args): 255 items = [arg for arg in args if isinstance(arg, numbers.Number)] 256 if not items: 257 return None 258 return max(items) 259 260 261def _same_file(tgt, xf): 262 """Internal use only. 263 """ 264 # TODO: compare checksum? 265 try: 266 stat_buf = os.stat(tgt) 267 if (stat_buf.st_size == xf.stat_buf.st_size and 268 abs(stat_buf.st_mtime - xf.stat_buf.st_mtime) <= 1): 269 return True 270 except Exception: 271 return False 272 273 274def auth_code(secret, sign): 275 return hashlib.sha1((secret + sign).encode()).hexdigest().encode() 276 277 278def _node_ipaddr(node): 279 """Internal use only. 280 """ 281 if not node: 282 return None 283 if node.find('*') >= 0: 284 return node 285 try: 286 info = socket.getaddrinfo(node, None)[0] 287 except Exception: 288 return None 289 290 ip_addr = info[-1][0] 291 if info[0] == socket.AF_INET6: 292 # canonicalize so different platforms resolve to same string 293 ip_addr = ip_addr.split('%')[0] 294 ip_addr = re.sub(r'^0+', '', ip_addr) 295 ip_addr = re.sub(r':0+', ':', ip_addr) 296 ip_addr = re.sub(r'::+', '::', ip_addr) 297 return ip_addr 298 299 300def _parse_node_allocs(nodes): 301 """Internal use only. 302 """ 303 node_allocs = [] 304 for node in nodes: 305 if isinstance(node, NodeAllocate): 306 node_allocs.append(node) 307 elif isinstance(node, str): 308 node_allocs.append(NodeAllocate(node)) 309 elif isinstance(node, dict): 310 node_allocs.append(NodeAllocate(node.get('host', '*'), node.get('port', None), 311 node.get('cpus', 0))) 312 elif isinstance(node, tuple): 313 node_allocs.append(NodeAllocate(*node)) 314 elif isinstance(node, list): 315 node_allocs.append(NodeAllocate(*tuple(node))) 316 elif isinstance(node, DispyNode): 317 node_allocs.append(NodeAllocate(node.ip_addr)) 318 else: 319 logger.warning('Ignoring node specification %s', type(node)) 320 return [node_alloc for node_alloc in node_allocs if node_alloc.ip_addr] 321 322 323def host_addrinfo(host=None, socket_family=None, ipv4_multicast=False): 324 """If 'host' is given (as either host name or IP address), resolve it and 325 fill AddrInfo structure. If 'host' is not given, netifaces module is used to 326 find appropriate IP address. If 'socket_family' is given, IP address with that 327 'socket_family' is used. It should be either 'socket.AF_INET' (for IPv4) or 328 'socket.AF_INET6' (for IPv6). 329 """ 330 331 class AddrInfo(object): 332 def __init__(self, family, ip, ifn, broadcast, netmask): 333 self.family = family 334 self.ip = ip 335 self.ifn = ifn 336 if family == socket.AF_INET and ipv4_multicast: 337 self.broadcast = IPV4_MULTICAST_GROUP 338 else: 339 self.broadcast = broadcast 340 self.netmask = netmask 341 self.ext_ip_addr = None 342 if os.name == 'nt': 343 self.bind_addr = ip 344 elif platform.system() in ('Darwin', 'DragonFlyBSD', 'FreeBSD', 'OpenBSD', 'NetBSD'): 345 if family == socket.AF_INET and (not ipv4_multicast): 346 self.bind_addr = '' 347 else: 348 self.bind_addr = self.broadcast 349 else: 350 self.bind_addr = self.broadcast 351 352 def canonical_ipv6(ip_addr): 353 # canonicalize so different platforms resolve to same string 354 ip_addr = ip_addr.split('%')[0] 355 ip_addr = re.sub(r'^0+', '', ip_addr) 356 ip_addr = re.sub(r':0+', ':', ip_addr) 357 ip_addr = re.sub(r'::+', '::', ip_addr) 358 return ip_addr 359 360 if socket_family: 361 if socket_family not in (socket.AF_INET, socket.AF_INET6): 362 return None 363 hosts = [] 364 if host: 365 best = None 366 for addr in socket.getaddrinfo(host, None): 367 if socket_family and addr[0] != socket_family: 368 continue 369 if not best or addr[0] == socket.AF_INET: 370 best = addr 371 if best: 372 socket_family = best[0] 373 if best[0] == socket.AF_INET6: 374 addr = canonical_ipv6(best[-1][0]) 375 else: 376 addr = best[-1][0] 377 hosts.append(addr) 378 else: 379 return None 380 381 if socket_family: 382 socket_families = [socket_family] 383 else: 384 socket_families = [socket.AF_INET, socket.AF_INET6] 385 386 addrinfos = [] 387 if netifaces: 388 for iface in netifaces.interfaces(): 389 ifn = 0 390 iface_infos = [] 391 for sock_family in socket_families: 392 for link in netifaces.ifaddresses(iface).get(sock_family, []): 393 netmask = link.get('netmask', None) 394 if sock_family == socket.AF_INET: 395 addr = str(link['addr']) 396 broadcast = link.get('broadcast', '<broadcast>') 397 # Windows seems to have broadcast same as addr 398 if broadcast.startswith(addr): 399 broadcast = '<broadcast>' 400 try: 401 addrs = socket.getaddrinfo(addr, None, sock_family, socket.SOCK_STREAM) 402 except Exception: 403 addrs = [] 404 for addr in addrs: 405 if hosts and addr[-1][0] not in hosts: 406 continue 407 addrinfo = AddrInfo(sock_family, addr[-1][0], addr[-1][-1], 408 broadcast, netmask) 409 iface_infos.append(addrinfo) 410 else: # sock_family == socket.AF_INET6 411 addr = str(link['addr']) 412 broadcast = link.get('broadcast', IPV6_MULTICAST_GROUP) 413 if broadcast.startswith(addr): 414 broadcast = IPV6_MULTICAST_GROUP 415 if_sfx = [''] 416 if not ifn and ('%' not in addr.split(':')[-1]): 417 if_sfx.append('%' + iface) 418 for sfx in if_sfx: 419 if ifn and sfx: 420 break 421 try: 422 addrs = socket.getaddrinfo(addr + sfx, None, sock_family, 423 socket.SOCK_STREAM) 424 except Exception: 425 continue 426 for addr in addrs: 427 if addr[-1][-1]: 428 if ifn and addr[-1][-1] != ifn: 429 logger.warning('inconsistent scope IDs for %s: %s != %s', 430 iface, ifn, addr[-1][-1]) 431 ifn = addr[-1][-1] 432 if sfx: 433 continue 434 addr = canonical_ipv6(addr[-1][0]) 435 if hosts and addr not in hosts: 436 continue 437 addrinfo = AddrInfo(sock_family, addr, ifn, broadcast, netmask) 438 iface_infos.append(addrinfo) 439 if ifn: 440 for addrinfo in iface_infos: 441 if not addrinfo.ifn: 442 addrinfo.ifn = ifn 443 addrinfos.extend(iface_infos) 444 445 else: 446 if not host: 447 host = socket.gethostname() 448 netmask = None 449 for sock_family in socket_families: 450 try: 451 addrs = socket.getaddrinfo(host, None, sock_family, socket.SOCK_STREAM) 452 except Exception: 453 continue 454 for addr in addrs: 455 ifn = addr[-1][-1] 456 if sock_family == socket.AF_INET: 457 broadcast = '<broadcast>' 458 addr = addr[-1][0] 459 else: # sock_family == socket.AF_INET6 460 addr = canonical_ipv6(addr[-1][0]) 461 broadcast = IPV6_MULTICAST_GROUP 462 logger.warning('IPv6 may not work without "netifaces" package!') 463 addrinfo = AddrInfo(sock_family, addr, ifn, broadcast, netmask) 464 if hosts: 465 if addrinfo.ip in hosts: 466 return addrinfo 467 else: 468 continue 469 addrinfos.append(addrinfo) 470 471 best = None 472 for sock_family in socket_families: 473 for addrinfo in addrinfos: 474 if addrinfo.ip in hosts: 475 return addrinfo 476 if addrinfo.family != sock_family: 477 continue 478 if addrinfo.family == socket.AF_INET: 479 if not best or (len(best.ip) < len(addrinfo.ip)) or best.ip.startswith('127'): 480 best = addrinfo 481 else: 482 if addrinfo.ip.startswith('fd'): 483 # TODO: How to detect / avoid temporary addresses (privacy extensions)? 484 if addrinfo.ifn: 485 return addrinfo 486 if not best or (len(best.ip) < len(addrinfo.ip)) or best.ip.startswith('fe80'): 487 best = addrinfo 488 if best and best.family == socket.AF_INET and (not best.ip.startswith('127')): 489 return best 490 return best 491 492 493# This tuple stores information about partial functions; for 494# now 'setup' and 'cleanup' functions can be partial functions. 495# TODO: useful to have 'compute' as partial function as well? 496_Function = collections.namedtuple('_Function', ['name', 'args', 'kwargs']) 497logger = pycos.Logger('dispy') 498 499 500class _Compute(object): 501 """Internal use only. 502 """ 503 func_type = 1 504 prog_type = 2 505 506 def __init__(self, compute_type, name): 507 assert compute_type == _Compute.func_type or compute_type == _Compute.prog_type 508 self.type = compute_type 509 self.name = name 510 self.id = None 511 self.code = '' 512 self.dest_path = None 513 self.xfer_files = set() 514 self.reentrant = False 515 self.exclusive = True 516 self.setup = None 517 self.cleanup = None 518 self.scheduler_ip_addr = None 519 self.scheduler_port = None 520 self.node_ip_addr = None 521 self.auth = None 522 self.job_result_port = None 523 self.pulse_interval = None 524 525 526class _XferFile(object): 527 """Internal use only. 528 """ 529 def __init__(self, name, dest_path, compute_id=None): 530 self.name = name 531 self.dest_path = dest_path 532 self.compute_id = compute_id 533 self.stat_buf = os.stat(name) 534 self.sep = os.sep 535 536 537class _Node(object): 538 """Internal use only. 539 """ 540 __slots__ = ['ip_addr', 'port', 'name', 'cpus', 'avail_cpus', 'busy', 'cpu_time', 'clusters', 541 'auth', 'secret', 'keyfile', 'certfile', 'last_pulse', 'scheduler_ip_addr', 542 'pending_jobs', 'avail_info', 'platform', 'sock_family', 'tx', 'rx'] 543 544 def __init__(self, ip_addr, port, cpus, sign, secret, platform='', 545 keyfile=None, certfile=None): 546 self.ip_addr = ip_addr 547 if re.match('\d+\.', ip_addr): 548 self.sock_family = socket.AF_INET 549 else: 550 self.sock_family = socket.AF_INET6 551 self.port = port 552 self.name = None 553 self.cpus = cpus 554 self.avail_cpus = cpus 555 self.busy = 0.0 556 self.cpu_time = 0.0 557 self.clusters = set() 558 self.auth = auth_code(secret, sign) 559 self.secret = secret 560 self.keyfile = keyfile 561 self.certfile = certfile 562 self.last_pulse = None 563 self.scheduler_ip_addr = None 564 self.pending_jobs = [] 565 self.avail_info = None 566 self.platform = platform 567 self.tx = 0 568 self.rx = 0 569 570 def setup(self, compute, exclusive=True, task=None): 571 # generator 572 compute.scheduler_ip_addr = self.scheduler_ip_addr 573 compute.node_ip_addr = self.ip_addr 574 compute.exclusive = exclusive 575 reply = yield self.send(b'COMPUTE:' + serialize(compute), task=task) 576 try: 577 cpus = deserialize(reply) 578 assert isinstance(cpus, int) and cpus > 0 579 except Exception: 580 logger.warning('Transfer of computation "%s" to %s failed', compute.name, self.ip_addr) 581 return(-1) 582 if not self.cpus: 583 self.cpus = cpus 584 for xf in compute.xfer_files: 585 resp = yield self.xfer_file(xf, task=task) 586 if resp < 0: 587 logger.error('Could not transfer file "%s"', xf.name) 588 yield self.close(compute, task=task) 589 return(resp) 590 591 resp = yield self.send(b'SETUP:' + serialize(compute.id), timeout=0, task=task) 592 if not isinstance(resp, int) or resp < 0: 593 if isinstance(resp, bytearray): 594 resp = resp.decode() 595 logger.warning('Setup of computation "%s" on %s failed: %s', 596 compute.name, self.ip_addr, resp) 597 yield self.close(compute, task=task) 598 return(resp) 599 self.last_pulse = time.time() 600 return(0) 601 602 def send(self, msg, reply=True, timeout=MsgTimeout, task=None): 603 # generator 604 sock = socket.socket(self.sock_family, socket.SOCK_STREAM) 605 sock = AsyncSocket(sock, keyfile=self.keyfile, certfile=self.certfile) 606 sock.settimeout(timeout) 607 try: 608 yield sock.connect((self.ip_addr, self.port)) 609 yield sock.sendall(self.auth) 610 yield sock.send_msg(msg) 611 if reply: 612 resp = yield sock.recv_msg() 613 else: 614 resp = len(msg) 615 except Exception: 616 logger.error('Could not connect to %s:%s, %s', 617 self.ip_addr, self.port, traceback.format_exc()) 618 # TODO: mark this node down, reschedule on different node? 619 raise 620 finally: 621 sock.close() 622 623 if resp == b'ACK': 624 resp = len(msg) 625 self.tx += len(msg) 626 return(resp) 627 628 def xfer_file(self, xf, task=None): 629 # generator 630 sock = socket.socket(self.sock_family, socket.SOCK_STREAM) 631 sock = AsyncSocket(sock, keyfile=self.keyfile, certfile=self.certfile) 632 sock.settimeout(MsgTimeout) 633 try: 634 yield sock.connect((self.ip_addr, self.port)) 635 yield sock.sendall(self.auth) 636 yield sock.send_msg(b'FILEXFER:' + serialize(xf)) 637 recvd = yield sock.recv_msg() 638 recvd = deserialize(recvd) 639 with open(xf.name, 'rb') as fd: 640 sent = 0 641 while sent == recvd: 642 data = fd.read(1024000) 643 if not data: 644 break 645 yield sock.sendall(data) 646 sent += len(data) 647 recvd = yield sock.recv_msg() 648 recvd = deserialize(recvd) 649 self.tx += sent 650 if recvd == xf.stat_buf.st_size: 651 resp = sent 652 else: 653 resp = -1 654 except Exception: 655 logger.error('Could not transfer %s to %s', xf.name, self.ip_addr) 656 # TODO: mark this node down, reschedule on different node? 657 resp = -1 658 finally: 659 sock.close() 660 return(resp) 661 662 def close(self, compute, terminate_pending=False, task=None): 663 # generator 664 logger.debug('Closing node %s for %s / %s', self.ip_addr, compute.name, compute.id) 665 req = {'compute_id': compute.id, 'auth': compute.auth, 'node_ip_addr': self.ip_addr, 666 'terminate_pending': terminate_pending} 667 try: 668 yield self.send(b'CLOSE:' + serialize(req), reply=True, task=task) 669 except Exception: 670 logger.debug('Deleting computation %s/%s from %s failed', 671 compute.id, compute.name, self.ip_addr) 672 673 674class _DispyJob_(object): 675 """Internal use only. 676 """ 677 678 __slots__ = ('job', 'uid', 'compute_id', 'hash', 'node', 'pinned', 679 'xfer_files', '_args', '_kwargs', 'code') 680 681 def __init__(self, compute_id, job_id, args, kwargs): 682 job_deps = kwargs.pop('dispy_job_depends', []) 683 self.job = DispyJob(job_id, args, kwargs) 684 self.job._dispy_job_ = self 685 self._args = self.job._args 686 self._kwargs = self.job._kwargs 687 self.uid = None 688 self.compute_id = compute_id 689 self.hash = ''.join(hex(_)[2:] for _ in os.urandom(10)) 690 self.node = None 691 self.pinned = None 692 self.xfer_files = [] 693 self.code = '' 694 depend_ids = set() 695 cwd = os.getcwd() 696 for dep in job_deps: 697 if isinstance(dep, str) or inspect.ismodule(dep): 698 if inspect.ismodule(dep): 699 name = dep.__file__ 700 if name.endswith('.pyc'): 701 name = name[:-1] 702 if not name.endswith('.py'): 703 logger.warning('Invalid module "%s" - must be python source.', dep) 704 continue 705 if name.startswith(cwd): 706 dst = os.path.dirname(name[len(cwd):].lstrip(os.sep)) 707 elif dep.__package__: 708 dst = dep.__package__.replace('.', os.sep) 709 else: 710 dst = os.path.dirname(dep.__name__.replace('.', os.sep)) 711 else: 712 name = os.path.abspath(dep) 713 if name.startswith(cwd): 714 dst = os.path.dirname(name[len(cwd):].lstrip(os.sep)) 715 else: 716 dst = '.' 717 if name in depend_ids: 718 continue 719 self.xfer_files.append(_XferFile(name, dst, compute_id)) 720 depend_ids.add(name) 721 elif (inspect.isfunction(dep) or inspect.isclass(dep) or 722 (hasattr(dep, '__class__') and hasattr(dep, '__module__'))): 723 if inspect.isfunction(dep) or inspect.isclass(dep): 724 pass 725 elif hasattr(dep, '__class__') and inspect.isclass(dep.__class__): 726 dep = dep.__class__ 727 if id(dep) in depend_ids: 728 continue 729 try: 730 lines = inspect.getsourcelines(dep)[0] 731 except Exception: 732 logger.warning('Invalid job depends eleement "%s"; ignoring it.', dep) 733 continue 734 lines[0] = lines[0].lstrip() 735 self.code += '\n' + ''.join(lines) 736 depend_ids.add(id(dep)) 737 else: 738 logger.warning('Invalid job depends element "%s"; ignoring it.', dep) 739 740 def __getstate__(self): 741 state = {'uid': self.uid, 'hash': self.hash, 'compute_id': self.compute_id, 742 '_args': self._args if isinstance(self._args, bytes) else serialize(self._args), 743 '_kwargs': self._kwargs if isinstance(self._kwargs, bytes) 744 else serialize(self._kwargs), 745 'xfer_files': self.xfer_files, 'code': self.code} 746 return state 747 748 def __setstate__(self, state): 749 for k, v in state.items(): 750 setattr(self, k, v) 751 752 def __lt__(self, other): 753 return self.uid < other.uid 754 755 def __eq__(self, other): 756 return isinstance(other, _DispyJob_) and self.uid == other.uid 757 758 def run(self, task=None): 759 # generator 760 logger.debug('Running job %s on %s', self.uid, self.node.ip_addr) 761 self.job.start_time = time.time() 762 tx = 0 763 for xf in self.xfer_files: 764 sent = yield self.node.xfer_file(xf, task=task) 765 if sent < 0: 766 logger.warning('Transfer of file "%s" to %s failed', xf.name, self.node.ip_addr) 767 raise Exception(-1) 768 tx += sent 769 resp = yield self.node.send(b'JOB:' + serialize(self), task=task) 770 # TODO: deal with NAKs (reschedule?) 771 if isinstance(resp, int) and resp >= 0: 772 tx += resp 773 else: 774 logger.warning('Failed to run %s on %s: %s', self.job.id, self.node.ip_addr, resp) 775 raise Exception(str(resp)) 776 return(tx) 777 778 def finish(self, status): 779 job = self.job 780 job.status = status 781 if status != DispyJob.ProvisionalResult: 782 job._args = () 783 job._kwargs = {} 784 self.job._dispy_job_ = None 785 self.job = None 786 job.finish.set() 787 788 789class _JobReply(object): 790 """Internal use only. 791 """ 792 def __init__(self, _job, ip_addr, status=None, keyfile=None, certfile=None): 793 self.uid = _job.uid 794 self.hash = _job.hash 795 self.ip_addr = ip_addr 796 self.status = status 797 self.result = None 798 self.stdout = None 799 self.stderr = None 800 self.exception = None 801 self.start_time = 0 802 self.end_time = 0 803 804 805class _Cluster(object, metaclass=Singleton): 806 """Internal use only. 807 """ 808 809 def __init__(self, ip_addr=None, ext_ip_addr=None, port=None, node_port=None, 810 ipv4_udp_multicast=False, shared=False, secret='', keyfile=None, certfile=None, 811 recover_file=None): 812 if not hasattr(self, 'pycos'): 813 self.pycos = Pycos() 814 logger.info('dispy client version: %s', __version__) 815 self.ipv4_udp_multicast = bool(ipv4_udp_multicast) 816 self.addrinfos = {} 817 if isinstance(ip_addr, list): 818 ip_addrs = ip_addr 819 else: 820 ip_addrs = [ip_addr] 821 if isinstance(ext_ip_addr, list): 822 ext_ip_addrs = ext_ip_addr 823 else: 824 ext_ip_addrs = [ext_ip_addr] 825 826 for i in range(len(ip_addrs)): 827 ip_addr = ip_addrs[i] 828 if i < len(ext_ip_addrs): 829 ext_ip_addr = ext_ip_addrs[i] 830 else: 831 ext_ip_addr = None 832 addrinfo = host_addrinfo(host=ip_addr, ipv4_multicast=self.ipv4_udp_multicast) 833 if not addrinfo: 834 logger.warning('Ignoring invalid ip_addr %s', ip_addr) 835 continue 836 if ext_ip_addr: 837 ext_ip_addr = _node_ipaddr(ext_ip_addr) 838 if not ext_ip_addr: 839 logger.warning('Ignoring invalid ext_ip_addr %s', ext_ip_addrs[i]) 840 if not ext_ip_addr: 841 ext_ip_addr = addrinfo.ip 842 addrinfo.ext_ip_addr = ext_ip_addr 843 self.addrinfos[addrinfo.ext_ip_addr] = addrinfo 844 if not self.addrinfos: 845 raise Exception('No valid IP address found') 846 847 if port: 848 port = int(port) 849 else: 850 if shared: 851 port = 0 852 else: 853 port = 51347 854 self.port = port 855 if node_port: 856 node_port = int(node_port) 857 else: 858 node_port = 51348 859 self.node_port = node_port 860 self._nodes = {} 861 self.secret = secret 862 self.keyfile = keyfile 863 self.certfile = certfile 864 self.shared = shared 865 self.pulse_interval = None 866 self.ping_interval = None 867 self.poll_interval = None 868 self.dest_path = os.getcwd() # TODO: make it an option? 869 870 self._clusters = {} 871 self._sched_jobs = {} 872 self._sched_event = pycos.Event() 873 self._abandoned_jobs = {} 874 self.terminate = False 875 self.sign = hashlib.sha1(os.urandom(20)) 876 for ext_ip_addr in self.addrinfos: 877 self.sign.update(ext_ip_addr.encode()) 878 self.sign = self.sign.hexdigest() 879 self.auth = auth_code(self.secret, self.sign) 880 881 if isinstance(recover_file, str): 882 self.recover_file = recover_file 883 else: 884 now = datetime.datetime.now() 885 self.recover_file = '_dispy_%.4i%.2i%.2i%.2i%.2i%.2i' % \ 886 (now.year, now.month, now.day, 887 now.hour, now.minute, now.second) 888 atexit.register(self.shutdown) 889 self.timer_task = Task(self.timer_proc) 890 891 try: 892 self.shelf = shelve.open(self.recover_file, flag='c', writeback=True) 893 self.shelf['_cluster'] = {'ip_addrs': ip_addrs, 'ext_ip_addrs': ext_ip_addrs, 894 'port': self.port, 'sign': self.sign, 895 'secret': self.secret, 'auth': self.auth, 896 'keyfile': self.keyfile, 'certfile': self.certfile} 897 self.shelf.sync() 898 except Exception: 899 raise Exception('Could not create fault recover file "%s"' % 900 self.recover_file) 901 logger.info('Storing fault recovery information in "%s"', self.recover_file) 902 903 self.select_job_node = self.load_balance_schedule 904 self._scheduler = Task(self._schedule_jobs) 905 self.start_time = time.time() 906 self.compute_id = int(1000 * self.start_time) 907 908 self.worker_Q = queue.Queue() 909 self.worker_thread = threading.Thread(target=self.worker) 910 self.worker_thread.daemon = True 911 self.worker_thread.start() 912 913 if self.shared: 914 port_bound_event = None 915 else: 916 port_bound_event = pycos.Event() 917 self.tcp_tasks = [] 918 self.udp_tasks = [] 919 udp_addrinfos = {} 920 for addrinfo in self.addrinfos.values(): 921 self.tcp_tasks.append(Task(self.tcp_server, addrinfo, port_bound_event)) 922 if self.shared: 923 continue 924 udp_addrinfos[addrinfo.bind_addr] = addrinfo 925 926 for bind_addr, addrinfo in udp_addrinfos.items(): 927 self.udp_tasks.append(Task(self.udp_server, bind_addr, addrinfo, port_bound_event)) 928 929 # Under Windows dispynode may send objects with 930 # '__mp_main__' scope, so make an alias to '__main__'. 931 # TODO: Make alias even if client is not Windows? It is 932 # possible the client is not Windows, but a node is. 933 if os.name == 'nt' and '__mp_main__' not in sys.modules: 934 sys.modules['__mp_main__'] = sys.modules['__main__'] 935 936 def udp_server(self, bind_addr, addrinfo, port_bound_event, task=None): 937 # generator 938 task.set_daemon() 939 udp_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM)) 940 # udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 941 942 while 1: 943 try: 944 udp_sock.bind((bind_addr, self.port)) 945 except socket.error as exc: 946 if exc.errno == errno.EADDRINUSE: 947 logger.warning('Port %s seems to be used by another program ...', self.port) 948 else: 949 logger.warning('Error binding to port %s: %s ...', self.port, exc.errno) 950 yield task.sleep(5) 951 except Exception: 952 logger.warning('Could not bind to port %s: %s', self.port, traceback.format_exc()) 953 yield task.sleep(5) 954 else: 955 break 956 957 if addrinfo.family == socket.AF_INET: 958 if self.ipv4_udp_multicast: 959 mreq = socket.inet_aton(addrinfo.broadcast) + socket.inet_aton(addrinfo.ip) 960 udp_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 961 else: # addrinfo.family == socket.AF_INET6 962 mreq = socket.inet_pton(addrinfo.family, addrinfo.broadcast) 963 mreq += struct.pack('@I', addrinfo.ifn) 964 udp_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) 965 try: 966 udp_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) 967 except Exception: 968 pass 969 port_bound_event.set() 970 del port_bound_event 971 while 1: 972 try: 973 msg, addr = yield udp_sock.recvfrom(1000) 974 except GeneratorExit: 975 break 976 if msg.startswith(b'PING:'): 977 try: 978 info = deserialize(msg[len(b'PING:'):]) 979 if info['version'] != _dispy_version: 980 logger.warning('Ignoring %s due to version mismatch', addr[0]) 981 continue 982 assert info['port'] > 0 983 assert info['ip_addr'] 984 # socket.inet_aton(status['ip_addr']) 985 except Exception: 986 # logger.debug(traceback.format_exc()) 987 logger.debug('Ignoring node %s', addr[0]) 988 continue 989 auth = auth_code(self.secret, info['sign']) 990 node = self._nodes.get(info['ip_addr'], None) 991 if node and node.auth == auth: 992 continue 993 sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_STREAM), 994 keyfile=self.keyfile, certfile=self.certfile) 995 sock.settimeout(MsgTimeout) 996 msg = {'version': _dispy_version, 'port': self.port, 'sign': self.sign, 997 'node_ip_addr': info['ip_addr']} 998 msg['ip_addrs'] = [ai.ext_ip_addr for ai in self.addrinfos.values()] 999 try: 1000 yield sock.connect((info['ip_addr'], info['port'])) 1001 yield sock.sendall(auth) 1002 yield sock.send_msg(b'PING:' + serialize(msg)) 1003 except GeneratorExit: 1004 break 1005 except Exception: 1006 logger.debug(traceback.format_exc()) 1007 finally: 1008 sock.close() 1009 else: 1010 pass 1011 udp_sock.close() 1012 1013 def tcp_server(self, addrinfo, port_bound_event, task=None): 1014 # generator 1015 task.set_daemon() 1016 if port_bound_event: 1017 yield port_bound_event.wait() 1018 del port_bound_event 1019 sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_STREAM), 1020 keyfile=self.keyfile, certfile=self.certfile) 1021 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1022 try: 1023 sock.bind((addrinfo.ip, self.port)) 1024 except Exception: 1025 logger.warning('Could not bind TCP server to %s:%s', addrinfo.ip, self.port) 1026 return 1027 if not self.port: 1028 self.port = sock.getsockname()[1] 1029 logger.debug('dispy client at %s:%s', addrinfo.ip, self.port) 1030 sock.listen(128) 1031 1032 while 1: 1033 try: 1034 conn, addr = yield sock.accept() 1035 except ssl.SSLError as err: 1036 logger.debug('SSL connection failed: %s', str(err)) 1037 continue 1038 except GeneratorExit: 1039 break 1040 except Exception: 1041 logger.debug(traceback.format_exc()) 1042 continue 1043 Task(self.tcp_req, conn, addr) 1044 sock.close() 1045 1046 def tcp_req(self, conn, addr, task=None): 1047 # generator 1048 conn.settimeout(MsgTimeout) 1049 msg = yield conn.recv_msg() 1050 if msg.startswith(b'JOB_REPLY:'): 1051 try: 1052 info = deserialize(msg[len(b'JOB_REPLY:'):]) 1053 except Exception: 1054 logger.warning('Invalid job reply from %s:%s ignored', addr[0], addr[1]) 1055 else: 1056 yield self.job_reply_process(info, len(msg), conn, addr) 1057 conn.close() 1058 1059 elif msg.startswith(b'PULSE:'): 1060 msg = msg[len(b'PULSE:'):] 1061 try: 1062 info = deserialize(msg) 1063 node = self._nodes[info['ip_addr']] 1064 assert 0 <= info['cpus'] <= node.cpus 1065 node.last_pulse = time.time() 1066 yield conn.send_msg(b'PULSE') 1067 if info['avail_info']: 1068 node.avail_info = info['avail_info'] 1069 for cluster in node.clusters: 1070 if cluster.status_callback: 1071 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 1072 if not dispy_node: 1073 continue 1074 dispy_node.avail_info = info['avail_info'] 1075 dispy_node.update_time = node.last_pulse 1076 self.worker_Q.put((cluster.status_callback, 1077 (DispyNode.AvailInfo, dispy_node, None))) 1078 except Exception: 1079 logger.warning('Ignoring pulse message from %s', addr[0]) 1080 # logger.debug(traceback.format_exc()) 1081 conn.close() 1082 1083 elif msg.startswith(b'JOB_STATUS:'): 1084 conn.close() 1085 # message from dispyscheduler 1086 try: 1087 info = deserialize(msg[len(b'JOB_STATUS:'):]) 1088 _job = self._sched_jobs[info['uid']] 1089 assert _job.hash == info['hash'] 1090 except Exception: 1091 logger.warning('Invalid job status from %s:%s ignored', addr[0], addr[1]) 1092 else: 1093 job = _job.job 1094 job.status = info['status'] 1095 job.ip_addr = info['node'] 1096 node = self._nodes.get(job.ip_addr, None) 1097 # TODO: if node is None, likely missed NODE_STATUS, so create it now? 1098 if node: 1099 if job.status == DispyJob.Running: 1100 job.start_time = info['start_time'] 1101 node.busy += 1 1102 else: 1103 logger.warning('Invalid job status for shared cluster: %s', job.status) 1104 cluster = self._clusters.get(_job.compute_id, None) 1105 if cluster: 1106 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 1107 if dispy_node: 1108 if job.status == DispyJob.Running: 1109 dispy_node.busy += 1 1110 dispy_node.update_time = time.time() 1111 if cluster.status_callback: 1112 self.worker_Q.put((cluster.status_callback, 1113 (job.status, dispy_node, copy.copy(job)))) 1114 elif msg.startswith(b'PONG:'): 1115 conn.close() 1116 try: 1117 info = deserialize(msg[len(b'PONG:'):]) 1118 if info['version'] != _dispy_version: 1119 logger.warning('Ignoring node %s due to version mismatch: %s != %s', 1120 info['ip_addr'], info['version'], _dispy_version) 1121 return 1122 assert info['auth'] == self.auth 1123 except (AssertionError): 1124 logger.warning('Ignoring node %s ("secret" mismatch)', info['ip_addr']) 1125 except (Exception) as err: 1126 logger.warning('Ignoring node %s: %s', addr[0], err) 1127 else: 1128 self.add_node(info) 1129 1130 elif msg.startswith(b'PING:'): 1131 sock_family = conn.family 1132 conn.close() 1133 try: 1134 info = deserialize(msg[len(b'PING:'):]) 1135 if info['version'] != _dispy_version: 1136 logger.warning('Ignoring %s due to version mismatch', addr[0]) 1137 return 1138 assert info['port'] > 0 1139 assert info['ip_addr'] 1140 # socket.inet_aton(status['ip_addr']) 1141 except Exception: 1142 # logger.debug(traceback.format_exc()) 1143 logger.debug('Ignoring node %s', addr[0]) 1144 return 1145 auth = auth_code(self.secret, info['sign']) 1146 node = self._nodes.get(info['ip_addr'], None) 1147 if node: 1148 if node.auth == auth: 1149 return 1150 sock = AsyncSocket(socket.socket(sock_family, socket.SOCK_STREAM), 1151 keyfile=self.keyfile, certfile=self.certfile) 1152 sock.settimeout(MsgTimeout) 1153 msg = {'version': _dispy_version, 'port': self.port, 'sign': self.sign, 1154 'node_ip_addr': info['ip_addr']} 1155 msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()] 1156 try: 1157 yield sock.connect((info['ip_addr'], info['port'])) 1158 yield sock.sendall(auth) 1159 yield sock.send_msg(b'PING:' + serialize(msg)) 1160 except Exception: 1161 logger.debug(traceback.format_exc()) 1162 finally: 1163 sock.close() 1164 1165 elif msg.startswith(b'FILEXFER:'): 1166 try: 1167 xf = deserialize(msg[len(b'FILEXFER:'):]) 1168 msg = yield conn.recv_msg() 1169 job_reply = deserialize(msg) 1170 except Exception: 1171 logger.debug(traceback.format_exc()) 1172 else: 1173 yield self.file_xfer_process(job_reply, xf, conn, addr) 1174 conn.close() 1175 1176 elif msg.startswith(b'NODE_CPUS:'): 1177 conn.close() 1178 try: 1179 info = deserialize(msg[len(b'NODE_CPUS:'):]) 1180 node = self._nodes.get(info['ip_addr'], None) 1181 if not node: 1182 return 1183 auth = auth_code(self.secret, info['sign']) 1184 if auth != node.auth: 1185 logger.warning('Invalid signature from %s', node.ip_addr) 1186 return 1187 cpus = info['cpus'] 1188 except Exception: 1189 return 1190 if cpus < 0: 1191 logger.warning('Node requested using %s CPUs, disabling it', 1192 node.ip_addr, cpus) 1193 cpus = 0 1194 logger.debug('Setting cpus for %s to %s', node.ip_addr, cpus) 1195 # TODO: set node.cpus to min(cpus, node.cpus)? 1196 node.cpus = cpus 1197 if cpus > node.avail_cpus: 1198 node.avail_cpus = cpus 1199 node_computations = [] 1200 for cluster in self._clusters.values(): 1201 if cluster in node.clusters: 1202 continue 1203 compute = cluster._compute 1204 for node_alloc in cluster._node_allocs: 1205 cpus = node_alloc.allocate(cluster, node.ip_addr, node.name, 1206 node.avail_cpus, avail_info=node.avail_info, 1207 platform=node.platform) 1208 if cpus <= 0: 1209 continue 1210 node.cpus = min(node.avail_cpus, cpus) 1211 node_computations.append(compute) 1212 break 1213 if node_computations: 1214 Task(self.setup_node, node, node_computations) 1215 yield self._sched_event.set() 1216 else: 1217 node.avail_cpus = cpus 1218 for cluster in node.clusters: 1219 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 1220 if dispy_node: 1221 dispy_node.cpus = cpus 1222 1223 elif msg.startswith(b'TERMINATED:'): 1224 conn.close() 1225 try: 1226 info = deserialize(msg[len(b'TERMINATED:'):]) 1227 except Exception: 1228 # logger.debug(traceback.format_exc()) 1229 pass 1230 else: 1231 node = self._nodes.pop(info['ip_addr'], None) 1232 if not node: 1233 return 1234 auth = auth_code(self.secret, info['sign']) 1235 if auth != node.auth: 1236 logger.warning('Invalid signature from %s', node.ip_addr) 1237 return 1238 logger.debug('Removing node %s', node.ip_addr) 1239 if node.clusters: 1240 dead_jobs = [_job for _job in self._sched_jobs.values() 1241 if _job.node is not None and _job.node.ip_addr == node.ip_addr] 1242 clusters = list(node.clusters) 1243 node.clusters = set() 1244 for cluster in clusters: 1245 dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None) 1246 if not dispy_node: 1247 continue 1248 dispy_node.avail_cpus = dispy_node.cpus = dispy_node.busy = 0 1249 if cluster.status_callback: 1250 self.worker_Q.put((cluster.status_callback, 1251 (DispyNode.Closed, dispy_node, None))) 1252 self.reschedule_jobs(dead_jobs) 1253 1254 elif msg.startswith(b'NODE_STATUS:'): 1255 conn.close() 1256 # this message is from dispyscheduler for SharedJobCluster 1257 try: 1258 info = deserialize(msg[len(b'NODE_STATUS:'):]) 1259 cluster = self._clusters[info['compute_id']] 1260 assert info['auth'] == cluster._compute.auth 1261 except Exception: 1262 logger.debug('Invalid node status from %s:%s ignored', addr[0], addr[1]) 1263 # logger.debug(traceback.format_exc()) 1264 else: 1265 if info['status'] == DispyNode.AvailInfo: 1266 dispy_node = cluster._dispy_nodes.get(info['ip_addr'], None) 1267 if dispy_node: 1268 dispy_node.avail_info = info['avail_info'] 1269 dispy_node.tx = info['tx'] 1270 dispy_node.rx = info['rx'] 1271 if cluster.status_callback: 1272 self.worker_Q.put((cluster.status_callback, 1273 (DispyNode.AvailInfo, dispy_node, None))) 1274 elif info['status'] == DispyNode.Initialized: 1275 dispy_node = info['dispy_node'] 1276 dispy_node.update_time = time.time() 1277 node = self._nodes.get(dispy_node.ip_addr, None) 1278 if node: 1279 node.name = dispy_node.name 1280 node.cpus = dispy_node.cpus 1281 else: 1282 node = _Node(dispy_node.ip_addr, 0, dispy_node.cpus, '', '', platform='') 1283 node.name = dispy_node.name 1284 self._nodes[node.ip_addr] = node 1285 cluster._dispy_nodes[dispy_node.ip_addr] = dispy_node 1286 if cluster.status_callback: 1287 self.worker_Q.put((cluster.status_callback, 1288 (DispyNode.Initialized, dispy_node, None))) 1289 elif info['status'] == DispyNode.Closed: 1290 dispy_node = cluster._dispy_nodes.get(info['ip_addr'], None) 1291 if dispy_node: 1292 dispy_node.avail_cpus = dispy_node.cpus = 0 1293 if cluster.status_callback: 1294 self.worker_Q.put((cluster.status_callback, 1295 (DispyNode.Closed, dispy_node, None))) 1296 else: 1297 logger.warning('Invalid node status %s from %s:%s ignored', 1298 info['status'], addr[0], addr[1]) 1299 1300 elif msg.startswith(b'SCHEDULED:'): 1301 try: 1302 info = deserialize(msg[len(b'SCHEDULED:'):]) 1303 assert self.shared 1304 cluster = self._clusters.get(info['compute_id'], None) 1305 assert info['pulse_interval'] is None or info['pulse_interval'] >= 1 1306 self.pulse_interval = info['pulse_interval'] 1307 self.timer_task.resume(True) 1308 yield conn.send_msg(b'ACK') 1309 cluster._scheduled_event.set() 1310 except Exception: 1311 yield conn.send_msg(b'NAK') 1312 conn.close() 1313 1314 elif msg.startswith(b'RELAY_INFO:'): 1315 try: 1316 info = deserialize(msg[len(b'RELAY_INFO:'):]) 1317 assert info['version'] == _dispy_version 1318 msg = {'sign': self.sign, 'ip_addrs': [info['scheduler_ip_addr']], 1319 'port': self.port} 1320 if 'auth' in info and info['auth'] != self.auth: 1321 msg = None 1322 except Exception: 1323 msg = None 1324 yield conn.send_msg(serialize(msg)) 1325 conn.close() 1326 1327 else: 1328 logger.warning('Invalid message from %s:%s ignored', addr[0], addr[1]) 1329 # logger.debug(traceback.format_exc()) 1330 conn.close() 1331 1332 def timer_proc(self, task=None): 1333 task.set_daemon() 1334 reset = True 1335 last_pulse_time = last_ping_time = last_poll_time = time.time() 1336 timeout = None 1337 while 1: 1338 if reset: 1339 timeout = num_min(self.pulse_interval, self.ping_interval, self.poll_interval) 1340 1341 try: 1342 reset = yield task.suspend(timeout) 1343 except GeneratorExit: 1344 break 1345 if reset: 1346 continue 1347 1348 now = time.time() 1349 if self.pulse_interval and (now - last_pulse_time) >= self.pulse_interval: 1350 last_pulse_time = now 1351 if self.shared: 1352 clusters = list(self._clusters.values()) 1353 for cluster in clusters: 1354 msg = {'client_ip_addr': cluster._compute.scheduler_ip_addr, 1355 'client_port': cluster._compute.job_result_port} 1356 sock = socket.socket(cluster.addrinfo.family, socket.SOCK_STREAM) 1357 sock = AsyncSocket(sock, keyfile=self.keyfile, certfile=self.certfile) 1358 sock.settimeout(MsgTimeout) 1359 try: 1360 yield sock.connect((cluster.scheduler_ip_addr, cluster.scheduler_port)) 1361 yield sock.sendall(cluster._scheduler_auth) 1362 yield sock.send_msg(b'PULSE:' + serialize(msg)) 1363 except Exception: 1364 pass 1365 sock.close() 1366 else: 1367 dead_nodes = {} 1368 for node in self._nodes.values(): 1369 if node.busy and node.last_pulse is not None and \ 1370 (node.last_pulse + (5 * self.pulse_interval)) <= now: 1371 logger.warning('Node %s is not responding; removing it (%s, %s, %s)', 1372 node.ip_addr, node.busy, node.last_pulse, now) 1373 dead_nodes[node.ip_addr] = node 1374 if dead_nodes: 1375 for node in dead_nodes.values(): 1376 clusters = list(node.clusters) 1377 node.clusters = set() 1378 for cluster in clusters: 1379 dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None) 1380 if not dispy_node: 1381 continue 1382 dispy_node.avail_cpus = dispy_node.cpus = dispy_node.busy = 0 1383 if cluster.status_callback: 1384 self.worker_Q.put((cluster.status_callback, 1385 (DispyNode.Closed, dispy_node, None))) 1386 del self._nodes[node.ip_addr] 1387 dead_jobs = [_job for _job in self._sched_jobs.values() 1388 if _job.node is not None and _job.node.ip_addr in dead_nodes] 1389 self.reschedule_jobs(dead_jobs) 1390 1391 if self.ping_interval and (now - last_ping_time) >= self.ping_interval: 1392 last_ping_time = now 1393 for cluster in self._clusters.values(): 1394 Task(self.discover_nodes, cluster, cluster._node_allocs) 1395 1396 if self.poll_interval and (now - last_poll_time) >= self.poll_interval: 1397 last_poll_time = now 1398 for cluster in self._clusters.values(): 1399 Task(self.poll_job_results, cluster) 1400 1401 def file_xfer_process(self, job_reply, xf, sock, addr): 1402 _job = self._sched_jobs.get(job_reply.uid, None) 1403 if _job is None or _job.hash != job_reply.hash: 1404 logger.warning('Ignoring invalid file transfer from job %s at %s', 1405 job_reply.uid, addr[0]) 1406 yield sock.send_msg(serialize(-1)) 1407 return 1408 node = self._nodes.get(job_reply.ip_addr, None) 1409 if node: 1410 node.last_pulse = time.time() 1411 tgt = os.path.join(self.dest_path, xf.dest_path.replace(xf.sep, os.sep), 1412 xf.name.split(xf.sep)[-1]) 1413 if not os.path.isdir(os.path.dirname(tgt)): 1414 os.makedirs(os.path.dirname(tgt)) 1415 with open(tgt, 'wb') as fd: 1416 recvd = 0 1417 while recvd < xf.stat_buf.st_size: 1418 yield sock.send_msg(serialize(recvd)) 1419 data = yield sock.recvall(min(xf.stat_buf.st_size-recvd, 1024000)) 1420 if not data: 1421 break 1422 fd.write(data) 1423 recvd += len(data) 1424 yield sock.send_msg(serialize(recvd)) 1425 if node: 1426 node.rx += recvd 1427 cluster = self._clusters.get(_job.compute_id, None) 1428 if cluster: 1429 dispy_node = cluster._dispy_nodes.get(job_reply.ip_addr, None) 1430 if dispy_node: 1431 dispy_node.rx += recvd 1432 if recvd != xf.stat_buf.st_size: 1433 logger.warning('Transfer of file "%s" failed', tgt) 1434 # TODO: remove file? 1435 os.utime(tgt, (xf.stat_buf.st_atime, xf.stat_buf.st_mtime)) 1436 os.chmod(tgt, stat.S_IMODE(xf.stat_buf.st_mode)) 1437 1438 def send_ping_node(self, ip_addr, port=None, task=None): 1439 ping_msg = {'version': _dispy_version, 'sign': self.sign, 'port': self.port, 1440 'node_ip_addr': ip_addr} 1441 ping_msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()] 1442 if not port: 1443 port = self.node_port 1444 if re.match('\d+\.', ip_addr): 1445 sock_family = socket.AF_INET 1446 else: 1447 sock_family = socket.AF_INET6 1448 tcp_sock = AsyncSocket(socket.socket(sock_family, socket.SOCK_STREAM), 1449 keyfile=self.keyfile, certfile=self.certfile) 1450 tcp_sock.settimeout(MsgTimeout) 1451 try: 1452 yield tcp_sock.connect((ip_addr, port)) 1453 yield tcp_sock.sendall(b'x' * len(self.auth)) 1454 yield tcp_sock.send_msg(b'PING:' + serialize(ping_msg)) 1455 except Exception: 1456 pass 1457 tcp_sock.close() 1458 1459 def broadcast_ping(self, addrinfos=[], port=None, task=None): 1460 # generator 1461 if not port: 1462 port = self.node_port 1463 ping_msg = {'version': _dispy_version, 'sign': self.sign, 'port': self.port} 1464 ping_msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()] 1465 if not addrinfos: 1466 addrinfos = self.addrinfos.values() 1467 for addrinfo in addrinfos: 1468 bc_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM)) 1469 bc_sock.settimeout(MsgTimeout) 1470 ttl_bin = struct.pack('@i', 1) 1471 if addrinfo.family == socket.AF_INET: 1472 if self.ipv4_udp_multicast: 1473 bc_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin) 1474 else: 1475 bc_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 1476 else: # addrinfo.family == socket.AF_INET6 1477 bc_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_bin) 1478 bc_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn) 1479 bc_sock.bind((addrinfo.ip, 0)) 1480 try: 1481 yield bc_sock.sendto(b'PING:' + serialize(ping_msg), (addrinfo.broadcast, port)) 1482 except Exception: 1483 pass 1484 bc_sock.close() 1485 1486 def discover_nodes(self, cluster, node_allocs, task=None): 1487 for node_alloc in node_allocs: 1488 # TODO: we assume subnets are indicated by '*', instead of 1489 # subnet mask; this is a limitation, but specifying with 1490 # subnet mask a bit cumbersome. 1491 if node_alloc.ip_rex.find('*') >= 0: 1492 Task(self.broadcast_ping, port=node_alloc.port) 1493 else: 1494 ip_addr = node_alloc.ip_addr 1495 if ip_addr in self._nodes: 1496 continue 1497 port = node_alloc.port 1498 Task(self.send_ping_node, ip_addr, port) 1499 yield 0 1500 1501 def poll_job_results(self, cluster, task=None): 1502 # generator 1503 for ip_addr in cluster._dispy_nodes: 1504 node = self._nodes.get(ip_addr, None) 1505 if not node or not node.port: 1506 continue 1507 sock = AsyncSocket(socket.socket(node.sock_family, socket.SOCK_STREAM), 1508 keyfile=self.keyfile, certfile=self.certfile) 1509 sock.settimeout(MsgTimeout) 1510 try: 1511 req = {'compute_id': cluster._compute.id, 'auth': cluster._compute.auth} 1512 reply = yield node.send(b'PENDING_JOBS:' + serialize(req)) 1513 reply = deserialize(reply) 1514 except Exception: 1515 logger.debug(traceback.format_exc()) 1516 continue 1517 finally: 1518 sock.close() 1519 1520 for uid in reply['done']: 1521 _job = self._sched_jobs.get(uid, None) 1522 if _job is None: 1523 continue 1524 conn = AsyncSocket(socket.socket(node.sock_family, socket.SOCK_STREAM), 1525 keyfile=self.keyfile, certfile=self.certfile) 1526 conn.settimeout(MsgTimeout) 1527 try: 1528 yield conn.connect((node.ip_addr, node.port)) 1529 req = {'compute_id': cluster._compute.id, 'auth': cluster._compute.auth, 1530 'uid': uid, 'hash': _job.hash} 1531 yield conn.sendall(node.auth) 1532 yield conn.send_msg(b'RETRIEVE_JOB:' + serialize(req)) 1533 msg = yield conn.recv_msg() 1534 reply = deserialize(msg) 1535 except Exception: 1536 logger.debug(traceback.format_exc()) 1537 continue 1538 else: 1539 if isinstance(reply, _JobReply): 1540 yield self.job_reply_process(reply, len(msg), conn, 1541 (node.ip_addr, node.port)) 1542 else: 1543 logger.debug('Invalid reply for %s', uid) 1544 finally: 1545 conn.close() 1546 1547 def add_cluster(self, cluster, task=None): 1548 compute = cluster._compute 1549 if self.shared: 1550 self._clusters[compute.id] = cluster 1551 for xf in compute.xfer_files: 1552 xf.compute_id = compute.id 1553 1554 node = _Node(cluster.scheduler_ip_addr, cluster.scheduler_port, 0, '', '', 1555 platform='', keyfile=self.keyfile, certfile=self.certfile) 1556 node.auth = cluster._scheduler_auth 1557 self._nodes[cluster.scheduler_ip_addr] = node 1558 dispy_node = DispyNode(cluster.scheduler_ip_addr, None, 0) 1559 dispy_node.avail_info = node.avail_info 1560 cluster._dispy_nodes[dispy_node.ip_addr] = dispy_node 1561 info = self.shelf['_cluster'] 1562 info['port'] = self.port 1563 self.shelf['_cluster'] = info 1564 info = {'name': compute.name, 'auth': compute.auth, 1565 'nodes': [cluster.scheduler_ip_addr]} 1566 self.shelf['compute_%s' % compute.id] = info 1567 info = {'port': cluster.scheduler_port, 'auth': cluster._scheduler_auth, 1568 'scheduler': True} 1569 self.shelf['node_%s' % (cluster.scheduler_ip_addr)] = info 1570 self.shelf.sync() 1571 if cluster.poll_interval: 1572 self.poll_interval = num_min(self.poll_interval, cluster.poll_interval) 1573 if self.poll_interval: 1574 self.timer_task.resume(True) 1575 return 1576 1577 # if a node is added with 'allocate_node', compute is already 1578 # initialized, so don't reinitialize it 1579 if compute.id is None: 1580 compute.id = self.compute_id 1581 self.compute_id += 1 1582 self._clusters[compute.id] = cluster 1583 for xf in compute.xfer_files: 1584 xf.compute_id = compute.id 1585 info = {'name': compute.name, 'auth': compute.auth, 'nodes': []} 1586 self.shelf['compute_%s' % compute.id] = info 1587 self.shelf.sync() 1588 1589 if compute.pulse_interval: 1590 self.pulse_interval = num_min(self.pulse_interval, compute.pulse_interval) 1591 if cluster.ping_interval: 1592 self.ping_interval = num_min(self.ping_interval, cluster.ping_interval) 1593 if cluster.poll_interval: 1594 self.poll_interval = num_min(self.poll_interval, cluster.poll_interval) 1595 if self.pulse_interval or self.ping_interval or self.poll_interval: 1596 self.timer_task.resume(True) 1597 1598 Task(self.discover_nodes, cluster, cluster._node_allocs) 1599 compute_nodes = [] 1600 for ip_addr, node in self._nodes.items(): 1601 if cluster in node.clusters: 1602 continue 1603 for node_alloc in cluster._node_allocs: 1604 cpus = node_alloc.allocate(cluster, node.ip_addr, node.name, node.avail_cpus, 1605 avail_info=node.avail_info, platform=node.platform) 1606 if cpus <= 0: 1607 continue 1608 node.cpus = min(node.avail_cpus, cpus) 1609 compute_nodes.append(node) 1610 for node in compute_nodes: 1611 Task(self.setup_node, node, [compute]) 1612 yield None 1613 1614 def del_cluster(self, cluster, task=None): 1615 # generator 1616 if self._clusters.pop(cluster._compute.id, None) != cluster: 1617 logger.warning('Cluster %s already closed?', cluster._compute.name) 1618 return 1619 1620 if self.shared: 1621 sock = socket.socket(cluster.addrinfo.family, socket.SOCK_STREAM) 1622 sock = AsyncSocket(sock, keyfile=self.keyfile, certfile=self.certfile) 1623 sock.settimeout(MsgTimeout) 1624 yield sock.connect((cluster.scheduler_ip_addr, cluster.scheduler_port)) 1625 yield sock.sendall(cluster._scheduler_auth) 1626 req = {'compute_id': cluster._compute.id, 'auth': cluster._compute.auth, 1627 'terminate_pending': cluster._complete.is_set()} 1628 yield sock.send_msg(b'CLOSE:' + serialize(req)) 1629 sock.close() 1630 else: 1631 cid = cluster._compute.id 1632 cluster._jobs = [] 1633 cluster._pending_jobs = 0 1634 # remove cluster from all nodes before closing (which uses 1635 # yield); otherwise, scheduler may access removed cluster 1636 # through node.clusters 1637 close_nodes = [] 1638 for dispy_node in cluster._dispy_nodes.values(): 1639 node = self._nodes.get(dispy_node.ip_addr, None) 1640 if not node: 1641 continue 1642 if not cluster._complete.is_set(): 1643 node.pending_jobs = [_job for _job in node.pending_jobs 1644 if _job.compute_id != cid] 1645 node.clusters.discard(cluster) 1646 close_nodes.append((Task(node.close, cluster._compute, 1647 terminate_pending=cluster._complete.is_set()), 1648 dispy_node)) 1649 cluster._dispy_nodes.clear() 1650 for close_task, dispy_node in close_nodes: 1651 yield close_task.finish() 1652 dispy_node.update_time = time.time() 1653 if cluster.status_callback: 1654 self.worker_Q.put((cluster.status_callback, 1655 (DispyNode.Closed, dispy_node, None))) 1656 self.shelf.pop('compute_%s' % (cluster._compute.id), None) 1657 # TODO: prune nodes in shelf 1658 self.shelf.sync() 1659 1660 def setup_node(self, node, computations, task=None): 1661 # generator 1662 task.set_daemon() 1663 for compute in computations: 1664 # NB: to avoid computation being sent multiple times, we 1665 # add to cluster's _dispy_nodes before sending computation 1666 # to node 1667 cluster = self._clusters.get(compute.id, None) 1668 if not cluster or node.ip_addr in cluster._dispy_nodes: 1669 continue 1670 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 1671 if not dispy_node: 1672 dispy_node = DispyNode(node.ip_addr, node.name, node.cpus) 1673 cluster._dispy_nodes[node.ip_addr] = dispy_node 1674 dispy_node.tx = node.tx 1675 dispy_node.rx = node.rx 1676 dispy_node.avail_cpus = node.avail_cpus 1677 dispy_node.avail_info = node.avail_info 1678 self.shelf['node_%s' % (node.ip_addr)] = {'port': node.port, 'auth': node.auth} 1679 shelf_compute = self.shelf['compute_%s' % compute.id] 1680 shelf_compute['nodes'].append(node.ip_addr) 1681 self.shelf['compute_%s' % compute.id] = shelf_compute 1682 self.shelf.sync() 1683 res = yield node.setup(compute, exclusive=True, task=task) 1684 if res or compute.id not in self._clusters: 1685 cluster._dispy_nodes.pop(node.ip_addr, None) 1686 logger.warning('Failed to setup %s for compute "%s": %s', 1687 node.ip_addr, compute.name, res) 1688 # TODO: delete node from shelf's cluster._dispy_nodes 1689 del self.shelf['node_%s' % (node.ip_addr)] 1690 self.shelf.sync() 1691 yield node.close(compute, task=task) 1692 else: 1693 dispy_node.update_time = time.time() 1694 node.clusters.add(cluster) 1695 self._sched_event.set() 1696 if cluster.status_callback: 1697 self.worker_Q.put((cluster.status_callback, 1698 (DispyNode.Initialized, dispy_node, None))) 1699 1700 def add_node(self, info): 1701 try: 1702 # assert info['version'] == _dispy_version 1703 assert info['port'] > 0 and info['cpus'] > 0 1704 # TODO: check if it is one of ext_ip_addr? 1705 except Exception: 1706 # logger.debug(traceback.format_exc()) 1707 return 1708 node = self._nodes.get(info['ip_addr'], None) 1709 if node is None: 1710 logger.debug('Discovered %s:%s (%s) with %s cpus', 1711 info['ip_addr'], info['port'], info['name'], info['cpus']) 1712 node = _Node(info['ip_addr'], info['port'], info['cpus'], info['sign'], 1713 self.secret, platform=info['platform'], 1714 keyfile=self.keyfile, certfile=self.certfile) 1715 node.name = info['name'] 1716 node.avail_info = info['avail_info'] 1717 self._nodes[node.ip_addr] = node 1718 else: 1719 node.last_pulse = time.time() 1720 auth = auth_code(self.secret, info['sign']) 1721 if info['cpus'] > 0: 1722 node.avail_cpus = info['cpus'] 1723 node.cpus = min(node.cpus, node.avail_cpus) 1724 for cluster in node.clusters: 1725 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 1726 if dispy_node: 1727 dispy_node.avail_cpus = node.avail_cpus 1728 dispy_node.cpus = node.cpus 1729 else: 1730 logger.warning('Invalid "cpus" %s from %s ignored', info['cpus'], info['ip_addr']) 1731 if node.port == info['port'] and node.auth == auth: 1732 return 1733 logger.debug('Node %s rediscovered', info['ip_addr']) 1734 node.port = info['port'] 1735 if node.auth is not None: 1736 dead_jobs = [_job for _job in self._sched_jobs.values() 1737 if _job.node is not None and _job.node.ip_addr == node.ip_addr] 1738 self.reschedule_jobs(dead_jobs) 1739 node.busy = 0 1740 node.auth = auth 1741 clusters = list(node.clusters) 1742 node.clusters = set() 1743 for cluster in clusters: 1744 dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None) 1745 if dispy_node and cluster.status_callback: 1746 self.worker_Q.put((cluster.status_callback, 1747 (DispyNode.Closed, dispy_node, None))) 1748 node.auth = auth 1749 node_computations = [] 1750 node.name = info['name'] 1751 node.scheduler_ip_addr = info['scheduler_ip_addr'] 1752 for cluster in self._clusters.values(): 1753 if cluster in node.clusters: 1754 continue 1755 compute = cluster._compute 1756 for node_alloc in cluster._node_allocs: 1757 cpus = node_alloc.allocate(cluster, node.ip_addr, node.name, node.avail_cpus, 1758 avail_info=node.avail_info, platform=node.platform) 1759 if cpus <= 0: 1760 continue 1761 node.cpus = min(node.avail_cpus, cpus) 1762 node_computations.append(compute) 1763 break 1764 if node_computations: 1765 Task(self.setup_node, node, node_computations) 1766 1767 def worker(self): 1768 # used for user callbacks only 1769 while 1: 1770 item = self.worker_Q.get(block=True) 1771 if item is None: 1772 self.worker_Q.task_done() 1773 break 1774 func, args = item 1775 try: 1776 func(*args) 1777 except Exception: 1778 if isinstance(func, types.FunctionType): 1779 name = func.__name__ 1780 elif isinstance(getattr(func, 'func', None), types.FunctionType): 1781 name = func.func.__name__ 1782 else: 1783 name = '' 1784 logger.warning('Callback %s failed: %s', name, traceback.format_exc()) 1785 self.worker_Q.task_done() 1786 1787 def finish_job(self, cluster, _job, status): 1788 # assert status in (DispyJob.Finished, DispyJob.Terminated, DispyJob.Abandoned) 1789 job = _job.job 1790 _job.finish(status) 1791 if cluster.callback: 1792 self.worker_Q.put((cluster.callback, (copy.copy(job),))) 1793 if status != DispyJob.ProvisionalResult: 1794 # assert cluster._pending_jobs > 0 1795 cluster._pending_jobs -= 1 1796 if cluster._pending_jobs == 0: 1797 cluster.end_time = time.time() 1798 cluster._complete.set() 1799 1800 def job_reply_process(self, reply, msg_len, sock, addr): 1801 _job = self._sched_jobs.pop(reply.uid, None) 1802 if _job: 1803 if reply.hash != _job.hash: 1804 self._sched_jobs[reply.uid] = _job 1805 logger.warning('Ignoring invalid reply for job %s from %s', reply.uid, addr[0]) 1806 yield sock.send_msg('NAK'.encode()) 1807 return 1808 else: 1809 _job = self._abandoned_jobs.pop(reply.uid, None) 1810 if _job: 1811 if reply.hash != _job.hash: 1812 self._abandoned_jobs[reply.uid] = _job 1813 logger.warning('Ignoring invalid reply for job %s from %s', reply.uid, addr[0]) 1814 yield sock.send_msg('NAK'.encode()) 1815 return 1816 else: 1817 logger.warning('Ignoring invalid reply for job %s from %s', reply.uid, addr[0]) 1818 yield sock.send_msg('NAK'.encode()) 1819 return 1820 1821 job = _job.job 1822 job.ip_addr = reply.ip_addr 1823 node = self._nodes.get(reply.ip_addr, None) 1824 cluster = self._clusters.get(_job.compute_id, None) 1825 if not cluster: 1826 # job cancelled while/after closing computation 1827 if node and node.busy > 0: 1828 node.busy -= 1 1829 node.cpu_time += reply.end_time - reply.start_time 1830 node.last_pulse = time.time() 1831 self._sched_event.set() 1832 yield sock.send_msg(b'ACK') 1833 return 1834 if node: 1835 node.last_pulse = time.time() 1836 else: 1837 if self.shared: 1838 node = _Node(reply.ip_addr, 0, getattr(reply, 'cpus', 0), '', self.secret, 1839 platform='', keyfile=None, certfile=None) 1840 self._nodes[reply.ip_addr] = node 1841 dispy_node = DispyNode(node.ip_addr, node.name, node.cpus) 1842 dispy_node.update_time = time.time() 1843 cluster._dispy_nodes[reply.ip_addr] = dispy_node 1844 if cluster.status_callback: 1845 self.worker_Q.put((cluster.status_callback, 1846 (DispyNode.Initialized, dispy_node, None))) 1847 elif job.status == DispyJob.Abandoned: 1848 pass 1849 else: 1850 logger.warning('Invalid job reply? %s: %s', job.id, job.status) 1851 1852 job.result, reply.result = deserialize(reply.result), None 1853 job.start_time = reply.start_time 1854 job.status = reply.status 1855 logger.debug('Received reply for job %s / %s from %s', job.id, _job.uid, job.ip_addr) 1856 if node: 1857 node.rx += msg_len 1858 dispy_node = cluster._dispy_nodes.get(job.ip_addr, None) 1859 if dispy_node: 1860 dispy_node.rx += msg_len 1861 if reply.status == DispyJob.ProvisionalResult: 1862 self._sched_jobs[_job.uid] = _job 1863 if cluster.callback: 1864 self.worker_Q.put((cluster.callback, (copy.copy(job),))) 1865 else: 1866 if node and dispy_node: 1867 if reply.status == DispyJob.Finished or reply.status == DispyJob.Terminated: 1868 node.busy -= 1 1869 node.cpu_time += reply.end_time - reply.start_time 1870 dispy_node.busy -= 1 1871 dispy_node.cpu_time += reply.end_time - reply.start_time 1872 dispy_node.jobs_done += 1 1873 dispy_node.update_time = time.time() 1874 elif reply.status == DispyJob.Cancelled: 1875 assert self.shared is True 1876 else: 1877 logger.warning('Invalid reply status: %s for job %s', reply.status, job.id) 1878 elif job.status == DispyJob.Abandoned: 1879 pass 1880 else: 1881 logger.warning('Invalid job reply (status)? %s: %s', job.id, job.status) 1882 1883 job.stdout = reply.stdout 1884 job.stderr = reply.stderr 1885 job.exception = reply.exception 1886 job.end_time = reply.end_time 1887 self.finish_job(cluster, _job, reply.status) 1888 if cluster.status_callback: 1889 self.worker_Q.put((cluster.status_callback, (reply.status, dispy_node, 1890 copy.copy(job)))) 1891 self._sched_event.set() 1892 yield sock.send_msg(b'ACK') 1893 1894 def reschedule_jobs(self, dead_jobs): 1895 if not dead_jobs: 1896 return 1897 for _job in dead_jobs: 1898 cluster = self._clusters[_job.compute_id] 1899 del self._sched_jobs[_job.uid] 1900 dispy_node = cluster._dispy_nodes.get(_job.node.ip_addr, None) 1901 if dispy_node: 1902 dispy_node.cpus = 0 1903 dispy_node.busy = 0 1904 dispy_node.update_time = time.time() 1905 if cluster._compute.reentrant and not _job.pinned: 1906 logger.debug('Rescheduling job %s from %s', _job.uid, _job.node.ip_addr) 1907 dispy_job = _job.job 1908 dispy_job.status = DispyJob.Created 1909 dispy_job.ip_addr = None 1910 _job.node = None 1911 # TODO: call 'status_callback'? 1912 # _job.hash = ''.join(hex(x)[2:] for x in os.urandom(10)) 1913 cluster._jobs.append(_job) 1914 else: 1915 dispy_job = _job.job 1916 logger.debug('Job %s scheduled on %s abandoned', dispy_job.id, _job.node.ip_addr) 1917 dispy_job.status = DispyJob.Abandoned 1918 self._abandoned_jobs[_job.uid] = _job 1919 cluster._pending_jobs -= 1 1920 if cluster._pending_jobs == 0: 1921 cluster.end_time = time.time() 1922 cluster._complete.set() 1923 1924 if cluster.status_callback: 1925 self.worker_Q.put((cluster.status_callback, 1926 (DispyJob.Abandoned, dispy_node, copy.copy(dispy_job)))) 1927 self._sched_event.set() 1928 1929 def run_job(self, _job, cluster, task=None): 1930 # generator 1931 node = _job.node 1932 dispy_node = cluster._dispy_nodes[node.ip_addr] 1933 try: 1934 tx = yield _job.run(task=task) 1935 dispy_node.tx += tx 1936 except (EnvironmentError, OSError): 1937 logger.warning('Failed to run job %s on %s for computation %s; removing this node', 1938 _job.uid, node.ip_addr, cluster._compute.name) 1939 logger.debug(traceback.format_exc()) 1940 if node.pending_jobs: 1941 # TODO: instead of discarding pending jobs, maintain them 1942 # elsewhere, while cluster is alive? 1943 for njob in node.pending_jobs: 1944 cl = self._clusters[njob.compute_id] 1945 self.finish_job(cl, njob, DispyJob.Cancelled) 1946 if cl.status_callback: 1947 dn = cl._dispy_nodes.get(node.ip_addr, None) 1948 self.worker_Q.put((cl.status_callback, 1949 (DispyJob.Cancelled, dn, njob.job))) 1950 node.pending_jobs = [] 1951 # TODO: need to close computations on this node? 1952 for cl in node.clusters: 1953 dn = cl._dispy_nodes.pop(node.ip_addr, None) 1954 if dn and cl.status_callback: 1955 self.worker_Q.put((cl.status_callback, (DispyNode.Closed, dn, None))) 1956 node.clusters.clear() 1957 self._nodes.pop(node.ip_addr, None) 1958 if self._sched_jobs.pop(_job.uid, None) == _job: 1959 if not _job.pinned: 1960 cluster._jobs.insert(0, _job) 1961 node.busy -= 1 1962 self._sched_event.set() 1963 except Exception: 1964 logger.warning('Failed to run job %s on %s for computation %s', 1965 _job.uid, node.ip_addr, cluster._compute.name) 1966 logger.debug(traceback.format_exc()) 1967 # TODO: delay executing again for some time? 1968 # this job might have been deleted already due to timeout 1969 if self._sched_jobs.pop(_job.uid, None) == _job: 1970 dispy_job = _job.job 1971 self.finish_job(cluster, _job, DispyJob.Cancelled) 1972 if cluster.status_callback and dispy_node: 1973 dispy_node.update_time = time.time() 1974 self.worker_Q.put((cluster.status_callback, 1975 (DispyJob.Cancelled, dispy_node, dispy_job))) 1976 node.busy -= 1 1977 self._sched_event.set() 1978 else: 1979 # job may have already finished (in which case _job.job would be None) 1980 if _job.job: 1981 _job.job.ip_addr = node.ip_addr 1982 logger.debug('Running job %s / %s on %s (busy: %d / %d)', 1983 _job.job.id, _job.uid, node.ip_addr, node.busy, node.cpus) 1984 _job.job.status = DispyJob.Running 1985 _job.job.start_time = time.time() 1986 dispy_node.busy += 1 1987 dispy_node.update_time = time.time() 1988 if cluster.status_callback: 1989 self.worker_Q.put((cluster.status_callback, 1990 (DispyJob.Running, dispy_node, copy.copy(_job.job)))) 1991 if (not cluster._compute.reentrant) and (not cluster.status_callback) and _job.job: 1992 _job.job._args = () 1993 _job.job._kwargs = {} 1994 1995 def wait(self, cluster, timeout): 1996 ret = cluster._complete.wait(timeout=timeout) 1997 if ret or not self._abandoned_jobs: 1998 return ret 1999 cid = cluster._compute.id 2000 for _job in self._abandoned_jobs.values(): 2001 if _job.compute_id == cid: 2002 _job.finish(DispyJob.Abandoned) 2003 self._abandoned_jobs = {uid: _job for uid, _job in self._abandoned_jobs.items() 2004 if _job.compute_id != cid} 2005 return 0 2006 2007 def load_balance_schedule(self): 2008 host = None 2009 load = 1.0 2010 for node in self._nodes.values(): 2011 if node.busy >= node.cpus: 2012 continue 2013 if node.pending_jobs: 2014 host = node 2015 break 2016 if not any(cluster._jobs for cluster in node.clusters): 2017 continue 2018 if (node.busy / node.cpus) < load: 2019 load = node.busy / node.cpus 2020 host = node 2021 return host 2022 2023 def _schedule_jobs(self, task=None): 2024 # generator 2025 while not self.terminate: 2026 # n = sum(len(cluster._jobs) for cluster in self._clusters.values()) 2027 node = self.select_job_node() 2028 if not node: 2029 self._sched_event.clear() 2030 yield self._sched_event.wait() 2031 continue 2032 if node.pending_jobs: 2033 _job = node.pending_jobs.pop(0) 2034 cluster = self._clusters[_job.compute_id] 2035 else: 2036 # TODO: strategy to pick a cluster? 2037 for cluster in node.clusters: 2038 # assert node.ip_addr in cluster._dispy_nodes 2039 if cluster._jobs: 2040 _job = cluster._jobs.pop(0) 2041 break 2042 else: 2043 self._sched_event.clear() 2044 yield self._sched_event.wait() 2045 continue 2046 _job.node = node 2047 # assert node.busy < node.cpus 2048 self._sched_jobs[_job.uid] = _job 2049 node.busy += 1 2050 Task(self.run_job, _job, cluster) 2051 2052 logger.debug('Scheduler quitting: %s', len(self._sched_jobs)) 2053 self._sched_jobs = {} 2054 for udp_task in self.udp_tasks: 2055 udp_task.terminate() 2056 for cid in list(self._clusters.keys()): 2057 cluster = self._clusters[cid] 2058 if not hasattr(cluster, '_compute'): 2059 # cluster is closed 2060 continue 2061 for _job in cluster._jobs: 2062 if _job.job.status == DispyJob.Running: 2063 status = DispyJob.Terminated 2064 else: 2065 status = DispyJob.Cancelled 2066 dispy_job = _job.job 2067 self.finish_job(cluster, _job, status) 2068 if cluster.status_callback: 2069 dispy_node = cluster._dispy_nodes.get(_job.node.ip_addr, None) 2070 if dispy_node: 2071 dispy_node.update_time = time.time() 2072 self.worker_Q.put((cluster.status_callback, 2073 (status, dispy_node, copy.copy(dispy_job)))) 2074 for dispy_node in cluster._dispy_nodes.values(): 2075 node = self._nodes.get(dispy_node.ip_addr, None) 2076 if not node: 2077 continue 2078 for _job in node.pending_jobs: 2079 # TODO: delete only jobs for this cluster? 2080 if _job.job.status == DispyJob.Running: 2081 status = DispyJob.Terminated 2082 else: 2083 status = DispyJob.Cancelled 2084 dispy_job = _job.job 2085 self.finish_job(cluster, _job, status) 2086 if cluster.status_callback: 2087 dispy_node.update_time = time.time() 2088 self.worker_Q.put((cluster.status_callback, 2089 (status, dispy_node, copy.copy(dispy_job)))) 2090 node.pending_jobs = [] 2091 cluster._jobs = [] 2092 cluster._pending_jobs = 0 2093 yield self.del_cluster(cluster, task=task) 2094 self._clusters = {} 2095 self._nodes = {} 2096 logger.debug('Scheduler quit') 2097 2098 def submit_job(self, _job, ip_addr=None, task=None): 2099 # generator 2100 _job.uid = id(_job) 2101 cluster = self._clusters[_job.compute_id] 2102 if ip_addr: 2103 node = self._nodes.get(ip_addr, None) 2104 if not node or cluster not in node.clusters: 2105 return(-1) 2106 node.pending_jobs.append(_job) 2107 _job.pinned = node 2108 else: 2109 cluster._jobs.append(_job) 2110 cluster._pending_jobs += 1 2111 cluster._complete.clear() 2112 if cluster.status_callback: 2113 self.worker_Q.put((cluster.status_callback, (DispyJob.Created, None, 2114 copy.copy(_job.job)))) 2115 self._sched_event.set() 2116 yield 0 2117 2118 def cancel_job(self, job, task=None): 2119 # generator 2120 _job = job._dispy_job_ 2121 if _job is None: 2122 logger.warning('Job %s is invalid for cancellation!', job.id) 2123 return(-1) 2124 cluster = self._clusters.get(_job.compute_id, None) 2125 if not cluster: 2126 logger.warning('Invalid job %s for cluster "%s"!', job.id, cluster._compute.name) 2127 return(-1) 2128 # assert cluster._pending_jobs >= 1 2129 if _job.job.status == DispyJob.Created: 2130 if _job.pinned: 2131 _job.pinned.pending_jobs.remove(_job) 2132 else: 2133 cluster._jobs.remove(_job) 2134 dispy_job = _job.job 2135 self.finish_job(cluster, _job, DispyJob.Cancelled) 2136 if cluster.status_callback: 2137 self.worker_Q.put((cluster.status_callback, (DispyJob.Cancelled, None, dispy_job))) 2138 logger.debug('Cancelled (removed) job %s', job.id) 2139 return(0) 2140 elif not (_job.job.status == DispyJob.Running or 2141 _job.job.status == DispyJob.ProvisionalResult or _job.node is None): 2142 logger.warning('Job %s is not valid for cancel (%s)', job.id, _job.job.status) 2143 return(-1) 2144 _job.job.status = DispyJob.Cancelled 2145 # don't send this status - when job is terminated status/callback get called 2146 logger.debug('Job %s / %s is being terminated', _job.job.id, _job.uid) 2147 try: 2148 resp = yield _job.node.send(b'TERMINATE_JOB:' + serialize(_job), reply=False, task=task) 2149 except Exception: 2150 resp = -1 2151 if resp < 0: 2152 logger.debug('Terminating job %s / %s failed: %s', _job.job.id, _job.uid, resp) 2153 else: 2154 resp = 0 2155 return(resp) 2156 2157 def allocate_node(self, cluster, node_allocs, task=None): 2158 # generator 2159 for i in range(len(node_allocs)-1, -1, -1): 2160 node = self._nodes.get(node_allocs[i].ip_addr, None) 2161 if node: 2162 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 2163 if dispy_node: 2164 node.clusters.add(cluster) 2165 self._sched_event.set() 2166 del node_allocs[i] 2167 continue 2168 if not node_allocs: 2169 return(0) 2170 cluster._node_allocs.extend(node_allocs) 2171 cluster._node_allocs = sorted(cluster._node_allocs, 2172 key=lambda node_alloc: node_alloc.ip_rex, reverse=True) 2173 present = set() 2174 cluster._node_allocs = [na for na in cluster._node_allocs 2175 if na.ip_rex not in present and not present.add(na.ip_rex)] 2176 del present 2177 yield self.add_cluster(cluster, task=task) 2178 yield self._sched_event.set() 2179 return(0) 2180 2181 def deallocate_node(self, cluster, node, task=None): 2182 # generator 2183 if isinstance(node, DispyNode): 2184 node = cluster._dispy_nodes.get(node.ip_addr, None) 2185 elif isinstance(node, str): 2186 node = cluster._dispy_nodes.get(_node_ipaddr(node), None) 2187 else: 2188 node = None 2189 if node: 2190 node = self._nodes.get(node.ip_addr, None) 2191 if not node: 2192 return(-1) 2193 node.clusters.discard(cluster) 2194 yield 0 2195 2196 def close_node(self, cluster, node, terminate_pending, task=None): 2197 # generator 2198 if isinstance(node, DispyNode): 2199 node = cluster._dispy_nodes.get(node.ip_addr, None) 2200 elif isinstance(node, str): 2201 node = cluster._dispy_nodes.get(_node_ipaddr(node), None) 2202 else: 2203 node = None 2204 if node: 2205 node = self._nodes.get(node.ip_addr, None) 2206 if not node: 2207 return(-1) 2208 node.clusters.discard(cluster) 2209 jobs = [_job for _job in node.pending_jobs if _job.compute_id == cluster._compute.id] 2210 if cluster.status_callback: 2211 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 2212 for _job in jobs: 2213 self.worker_Q.put((cluster.status_callback, 2214 (DispyJob.Cancelled, dispy_node, copy.copy(_job.job)))) 2215 if jobs: 2216 node.pending_jobs = [_job for _job in node.pending_jobs 2217 if _job.compute_id != cluster._compute.id] 2218 yield node.close(cluster._compute, terminate_pending=terminate_pending) 2219 2220 def set_node_cpus(self, cluster, node, cpus, task=None): 2221 # generator 2222 if isinstance(node, DispyNode): 2223 node = cluster._dispy_nodes.get(node.ip_addr, None) 2224 elif isinstance(node, str): 2225 node = cluster._dispy_nodes.get(_node_ipaddr(node), None) 2226 else: 2227 node = None 2228 if node: 2229 node = self._nodes.get(node.ip_addr, None) 2230 if not node: 2231 return(-1) 2232 try: 2233 cpus = int(cpus) 2234 except ValueError: 2235 return(-1) 2236 if cpus >= 0: 2237 node.cpus = min(node.avail_cpus, cpus) 2238 elif (node.avail_cpus + cpus) >= 0: 2239 node.cpus = node.avail_cpus + cpus 2240 cpus = node.cpus 2241 for cluster in node.clusters: 2242 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 2243 if dispy_node: 2244 dispy_node.cpus = cpus 2245 yield self._sched_event.set() 2246 return(cpus) 2247 2248 def send_file(self, cluster, node, xf, task=None): 2249 if isinstance(node, DispyNode): 2250 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 2251 elif isinstance(node, str): 2252 dispy_node = cluster._dispy_nodes.get(_node_ipaddr(node), None) 2253 else: 2254 dispy_node = None 2255 if not dispy_node: 2256 return(-1) 2257 node = self._nodes.get(dispy_node.ip_addr, None) 2258 if not node: 2259 return(-1) 2260 tx = yield node.xfer_file(xf) 2261 dispy_node.tx += tx 2262 2263 def node_jobs(self, cluster, node, from_node, task=None): 2264 # generator 2265 if isinstance(node, DispyNode): 2266 node = cluster._dispy_nodes.get(node.ip_addr, None) 2267 elif isinstance(node, str): 2268 node = cluster._dispy_nodes.get(_node_ipaddr(node), None) 2269 else: 2270 node = None 2271 if node: 2272 node = self._nodes.get(node.ip_addr, None) 2273 if not node: 2274 return(-1) 2275 if cluster not in node.clusters: 2276 return([]) 2277 if from_node: 2278 sock = socket.socket(node.sock_family, socket.SOCK_STREAM) 2279 sock = AsyncSocket(sock, keyfile=self.keyfile, certfile=self.certfile) 2280 sock.settimeout(MsgTimeout) 2281 try: 2282 yield sock.connect((node.ip_addr, node.port)) 2283 yield sock.sendall(node.auth) 2284 req = {'compute_id': cluster._compute.id, 'auth': cluster._compute.auth} 2285 yield sock.send_msg(b'JOBS:' + serialize(req)) 2286 info = yield sock.recv_msg() 2287 _jobs = [self._sched_jobs.get(uid, None) for uid in deserialize(info)] 2288 jobs = [_job.job for _job in _jobs if _job] 2289 except Exception: 2290 logger.debug(traceback.format_exc()) 2291 jobs = [] 2292 sock.close() 2293 else: 2294 jobs = [_job.job for _job in self._sched_jobs.values() 2295 if _job.node == node and _job.compute_id == cluster._compute.id] 2296 2297 return(jobs) 2298 2299 def shutdown(self): 2300 # non-generator 2301 if not self.shared: 2302 if self.terminate: 2303 return 2304 if any(cluster._pending_jobs for cluster in self._clusters.values()): 2305 return 2306 logger.debug('Shutting down scheduler ...') 2307 self.terminate = True 2308 2309 def _terminate_scheduler(self, task=None): 2310 yield self._sched_event.set() 2311 2312 Task(_terminate_scheduler, self).value() 2313 self.worker_Q.put(None) 2314 self._scheduler.value() 2315 self.worker_Q.join() 2316 if self.shelf: 2317 # TODO: need to check all clusters are deleted? 2318 self.shelf.close() 2319 self.shelf = None 2320 for ext in ('', '.db', '.bak', '.dat', '.dir'): 2321 if os.path.isfile(self.recover_file + ext): 2322 try: 2323 os.remove(self.recover_file + ext) 2324 except Exception: 2325 pass 2326 if self.pycos: 2327 self.pycos.finish() 2328 self.pycos = None 2329 Singleton.empty(self.__class__) 2330 2331 2332class JobCluster(object): 2333 """Create an instance of cluster for a specific computation. 2334 """ 2335 2336 def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None, 2337 ip_addr=None, port=None, node_port=None, ext_ip_addr=None, 2338 ipv4_udp_multicast=False, dest_path=None, loglevel=logger.INFO, 2339 setup=None, cleanup=True, ping_interval=None, pulse_interval=None, 2340 poll_interval=None, reentrant=False, secret='', keyfile=None, certfile=None, 2341 recover_file=None): 2342 """Create an instance of cluster for a specific computation. 2343 2344 @computation is either a string (which is name of program, possibly 2345 with full path) or a python function or class method. 2346 2347 @nodes is a list. Each element of @nodes is either a string 2348 (which must be either IP address or name of server node), or 2349 a tuple with up to 3 elements. The tuple's first element 2350 must be IP address or name of server node, second element, 2351 if present, must be port number where that node is listening 2352 for ping from clients, the third element, if present, must 2353 be number of CPUs to use on that node. 2354 2355 @depends is a list. Each element of @depends is either 2356 a string or a python object. If the element is a string, 2357 it must be a file which will be transferred to the node 2358 executing a job for this cluster. 2359 If the element is a python object (a function name, class name etc.), 2360 then the code for that object is transferred to the node executing 2361 a job for this cluster. 2362 2363 @callback is a function or class method. When a job's results 2364 become available, dispy will call provided callback 2365 function/method with that job as the argument. If a job 2366 sends provisional results with 'dispy_provisional_result' 2367 multiple times, then dispy will call provided callback each 2368 such time. The (provisional) results of computation can be 2369 retrieved with 'result' field of job, etc. While 2370 computations are run on nodes in isolated environments, 2371 callbacks are run in the context of user programs from which 2372 (Shared)JobCluster is called - for example, callbacks can 2373 access global variables in user programs. 2374 2375 @cluster_status is a function or class method. When a node 2376 accepts this cluster's computation, a job is submitted, a 2377 jos is done or node is closed, given function is called with 2378 three parameters: an instance of DispyNode, node/job status 2379 (one of DispyNode.Initialized, DispyNode.Closed, or job 2380 status), and an instance of DispyJob (if job submitted, 2381 finished etc.) or None (if node started or closed). dispy 2382 queues these status messages and a worker thread calls the 2383 functions, so it is possible that actual current status of 2384 node may be different from the status indicated at the time 2385 status function is called. 2386 2387 @ip_addr and @port indicate the address where the cluster will bind to. 2388 If multiple instances of JobCluster are used, these arguments are used 2389 only in the case of first instance. 2390 If no value for @ip_addr is given (default), IP address associated 2391 with the 'hostname' is used. 2392 If no value for @port is given (default), number 51347 is used. 2393 2394 @ext_ip_addr is the IP address of NAT firewall/gateway if 2395 dispy client is behind that firewall/gateway. 2396 2397 @node_port indicates port on which node servers are listening 2398 for ping messages. The client (JobCluster instance) broadcasts 2399 ping requests to this port. 2400 If no value for @node_port is given (default), number 51348 is used. 2401 2402 @dest_path indicates path of directory to which files are 2403 transferred to a server node when executing a job. If 2404 @computation is a string, indicating a program, then that 2405 program is also transferred to @dest_path. 2406 2407 @loglevel indicates message logging level. 2408 2409 @cleanup indicates if the files transferred should be removed when 2410 shutting down. 2411 2412 @secret is a string that is (hashed and) used for handshaking 2413 of communication with nodes. 2414 2415 @certfile is path to file containing SSL certificate (see 2416 Python 'ssl' module). 2417 2418 @keyfile is path to file containing private key for SSL 2419 communication (see Python 'ssl' module). This key may be 2420 stored in 'certfile' itself, in which case this should be 2421 None. 2422 2423 @ping_interval is number of seconds between 1 and 2424 1000. Normally dispy can find nodes running 'dispynode' by 2425 broadcasting 'ping' messages that nodes respond to. However, 2426 these packets may get lost. If ping_interval is set, then 2427 every ping_interval seconds, dispy sends ping messages to find 2428 nodes that may have missed earlier ping messages. 2429 2430 @pulse_interval is number of seconds between 1 and 1000. If 2431 pulse_interval is set, dispy directs nodes to send 'pulse' 2432 messages to indicate they are computing submitted jobs. A node 2433 is presumed dead if 5*pulse_interval elapses without a pulse 2434 message. See 'reentrant' below. 2435 2436 @poll_interval is number of seconds between 5 and 1000. If 2437 poll_interval is set, the client uses polling to check the 2438 status of jobs executed by nodes, instead of nodes connecting 2439 to the client to send the status of jobs, which is not 2440 possible if the client is behind a gateway / router which 2441 doesn't forward ports to where the client is running. Polling 2442 is not efficient, so it must be used only where necessary. 2443 2444 @reentrant must be either True or False. This value is used 2445 only if 'pulse_interval' is set for any of the clusters. If 2446 pulse_interval is given and reentrant is False (default), jobs 2447 scheduled for a dead node are automatically cancelled; if 2448 reentrant is True, then jobs scheduled for a dead node are 2449 resubmitted to other eligible nodes. 2450 2451 @recover_file must be either None (default) or file path. If 2452 this is None, dispy stores information about cluster in a file 2453 of the form '_dispy_YYYYMMDDHHMMSS' in current directory. If 2454 it is a path, dispy will use given path to store 2455 information. If user program terminates for some reason (such 2456 as raising an exception), it is possible to retrieve results 2457 of scheduled jobs later (after they are finished) by calling 2458 'recover' function (implemented in this file) with this file. 2459 """ 2460 2461 logger.setLevel(loglevel) 2462 pycos.logger.setLevel(loglevel) 2463 if reentrant is not True and reentrant is not False: 2464 logger.warning('Invalid value for reentrant (%s) is ignored; ' 2465 'it must be either True or False', reentrant) 2466 reentrant = False 2467 if ping_interval is not None: 2468 try: 2469 ping_interval = float(ping_interval) 2470 assert 1.0 <= ping_interval <= 1000 2471 except Exception: 2472 raise Exception('Invalid ping_interval; must be between 1 and 1000') 2473 self.ping_interval = ping_interval 2474 if pulse_interval is not None: 2475 try: 2476 pulse_interval = float(pulse_interval) 2477 assert 1.0 <= pulse_interval <= 1000 2478 except Exception: 2479 raise Exception('Invalid pulse_interval; must be between 1 and 1000') 2480 self.pulse_interval = pulse_interval 2481 2482 if poll_interval is not None: 2483 try: 2484 poll_interval = float(poll_interval) 2485 assert 5.0 <= poll_interval <= 1000 2486 except Exception: 2487 raise Exception('Invalid poll_interval; must be between 5 and 1000') 2488 self.poll_interval = poll_interval 2489 2490 if callback: 2491 assert inspect.isfunction(callback) or inspect.ismethod(callback), \ 2492 'callback must be a function or method' 2493 try: 2494 args = inspect.getargspec(callback) 2495 if inspect.isfunction(callback): 2496 assert len(args.args) == 1 2497 else: 2498 assert len(args.args) == 2 2499 if args.args[0] != 'self': 2500 logger.warning('First argument to callback method is not "self"') 2501 assert args.varargs is None 2502 assert args.keywords is None 2503 assert args.defaults is None 2504 except Exception: 2505 raise Exception('Invalid callback function; ' 2506 'it must take excatly one argument - an instance of DispyJob') 2507 self.callback = callback 2508 2509 if cluster_status: 2510 assert inspect.isfunction(cluster_status) or inspect.ismethod(cluster_status), \ 2511 'cluster_status must be a function or method' 2512 try: 2513 args = inspect.getargspec(cluster_status) 2514 if inspect.isfunction(cluster_status): 2515 assert len(args.args) == 3 2516 else: 2517 assert len(args.args) == 4 2518 if args.args[0] != 'self': 2519 logger.warning('First argument to cluster_status method is not "self"') 2520 assert args.varargs is None 2521 assert args.keywords is None 2522 assert args.defaults is None 2523 except Exception: 2524 raise Exception('Invalid cluster_status function; ' 2525 'it must take excatly 3 arguments') 2526 self.status_callback = cluster_status 2527 2528 if hasattr(self, 'scheduler_ip_addr'): 2529 shared = True 2530 self._node_allocs = [] 2531 else: 2532 shared = False 2533 if not nodes: 2534 nodes = ['*'] 2535 elif not isinstance(nodes, list): 2536 if isinstance(nodes, str): 2537 nodes = [nodes] 2538 else: 2539 raise Exception('"nodes" must be list of IP addresses or host names') 2540 self._node_allocs = _parse_node_allocs(nodes) 2541 if not self._node_allocs: 2542 raise Exception('"nodes" argument is invalid') 2543 self._node_allocs = sorted(self._node_allocs, 2544 key=lambda node_alloc: node_alloc.ip_rex, reverse=True) 2545 self._dispy_nodes = {} 2546 2547 if inspect.isfunction(computation) or inspect.ismethod(computation): 2548 func = computation 2549 compute = _Compute(_Compute.func_type, func.__name__) 2550 lines = inspect.getsourcelines(func)[0] 2551 lines[0] = lines[0].lstrip() 2552 compute.code = ''.join(lines) 2553 if inspect.ismethod(computation): 2554 depends.append(computation.__self__.__class__) 2555 elif isinstance(computation, str): 2556 compute = _Compute(_Compute.prog_type, computation) 2557 depends.append(computation) 2558 else: 2559 raise Exception('Invalid computation type: %s' % type(computation)) 2560 2561 if setup: 2562 if inspect.isfunction(setup): 2563 depends.append(setup) 2564 compute.setup = _Function(setup.__name__, (), {}) 2565 elif isinstance(setup, functools.partial): 2566 depends.append(setup.func) 2567 if setup.args: 2568 args = setup.args 2569 else: 2570 args = () 2571 if setup.keywords: 2572 kwargs = setup.keywords 2573 else: 2574 kwargs = {} 2575 compute.setup = _Function(setup.func.__name__, args, kwargs) 2576 else: 2577 raise Exception('"setup" must be Python (partial) function') 2578 2579 if inspect.isfunction(cleanup): 2580 depends.append(cleanup) 2581 compute.cleanup = _Function(cleanup.__name__, (), {}) 2582 elif isinstance(cleanup, functools.partial): 2583 depends.append(cleanup.func) 2584 if cleanup.args: 2585 args = cleanup.args 2586 else: 2587 args = () 2588 if cleanup.keywords: 2589 kwargs = cleanup.keywords 2590 else: 2591 kwargs = {} 2592 compute.cleanup = _Function(cleanup.func.__name__, args, kwargs) 2593 elif isinstance(cleanup, bool): 2594 compute.cleanup = cleanup 2595 else: 2596 raise Exception('"cleanup" must be Python (partial) function') 2597 2598 self._cluster = _Cluster(ip_addr=ip_addr, port=port, node_port=node_port, 2599 ext_ip_addr=ext_ip_addr, ipv4_udp_multicast=ipv4_udp_multicast, 2600 shared=shared, secret=secret, keyfile=keyfile, certfile=certfile, 2601 recover_file=recover_file) 2602 atexit.register(self.shutdown) 2603 2604 depend_ids = {} 2605 cwd = self._cluster.dest_path 2606 for dep in depends: 2607 if isinstance(dep, str) or inspect.ismodule(dep): 2608 if inspect.ismodule(dep): 2609 name = dep.__file__ 2610 if name.endswith('.pyc'): 2611 name = name[:-1] 2612 if not name.endswith('.py'): 2613 logger.warning('Invalid module "%s" - must be python source.', dep) 2614 continue 2615 if name.startswith(cwd): 2616 dst = os.path.dirname(name[len(cwd):].lstrip(os.sep)) 2617 elif dep.__package__: 2618 dst = dep.__package__.replace('.', os.sep) 2619 else: 2620 dst = os.path.dirname(dep.__name__.replace('.', os.sep)) 2621 else: 2622 if os.path.isfile(dep): 2623 name = os.path.abspath(dep) 2624 elif compute.type == _Compute.prog_type: 2625 for p in os.environ['PATH'].split(os.pathsep): 2626 f = os.path.join(p, dep) 2627 if os.path.isfile(f): 2628 logger.debug('Assuming "%s" is program "%s"', dep, f) 2629 name = f 2630 break 2631 else: 2632 raise Exception('Path "%s" is not valid' % dep) 2633 if name.startswith(cwd): 2634 dst = os.path.dirname(name[len(cwd):].lstrip(os.sep)) 2635 else: 2636 dst = '.' 2637 if name in depend_ids: 2638 continue 2639 try: 2640 with open(name, 'rb') as fd: 2641 pass 2642 xf = _XferFile(name, dst, compute.id) 2643 compute.xfer_files.add(xf) 2644 depend_ids[name] = dep 2645 except Exception: 2646 raise Exception('File "%s" is not valid' % name) 2647 elif (inspect.isfunction(dep) or inspect.isclass(dep) or 2648 (hasattr(dep, '__class__') and hasattr(dep, '__module__'))): 2649 if inspect.isfunction(dep) or inspect.isclass(dep): 2650 pass 2651 elif hasattr(dep, '__class__') and inspect.isclass(dep.__class__): 2652 dep = dep.__class__ 2653 if id(dep) in depend_ids: 2654 continue 2655 try: 2656 lines = inspect.getsourcelines(dep)[0] 2657 except Exception: 2658 logger.warning('Depenendency "%s" is not valid', dep) 2659 raise 2660 lines[0] = lines[0].lstrip() 2661 compute.code += '\n' + ''.join(lines) 2662 depend_ids[id(dep)] = id(dep) 2663 elif isinstance(dep, functools.partial): 2664 try: 2665 lines = inspect.getsourcelines(dep.func)[0] 2666 except Exception: 2667 logger.warning('Depenendency "%s" is not valid', dep) 2668 raise 2669 lines[0] = lines[0].lstrip() 2670 compute.code += '\n' + ''.join(lines) 2671 depend_ids[id(dep)] = id(dep) 2672 else: 2673 raise Exception('Invalid dependency: %s' % dep) 2674 if compute.code: 2675 # make sure code can be compiled 2676 code = compile(compute.code, '<string>', 'exec') 2677 del code 2678 if dest_path: 2679 if not isinstance(dest_path, str): 2680 raise Exception('Invalid dest_path: it must be a string') 2681 dest_path = dest_path.strip() 2682 # we should check for absolute path in dispynode.py as well 2683 if dest_path.startswith(os.sep): 2684 logger.warning('dest_path must not be absolute path') 2685 dest_path = dest_path.lstrip(os.sep) 2686 compute.dest_path = dest_path 2687 2688 compute.scheduler_port = self._cluster.port 2689 compute.auth = hashlib.sha1(os.urandom(20)).hexdigest() 2690 compute.job_result_port = self._cluster.port 2691 compute.reentrant = reentrant 2692 compute.pulse_interval = pulse_interval 2693 2694 self._compute = compute 2695 self._pending_jobs = 0 2696 self._jobs = [] 2697 self._complete = threading.Event() 2698 self._complete.set() 2699 self.cpu_time = 0 2700 self.start_time = time.time() 2701 self.end_time = None 2702 if not shared: 2703 Task(self._cluster.add_cluster, self).value() 2704 2705 def submit(self, *args, **kwargs): 2706 """Submit a job for execution with the given arguments. 2707 2708 Arguments should be serializable and should correspond to 2709 arguments for computation used when cluster is created. 2710 """ 2711 return self.submit_job_id(None, *args, **kwargs) 2712 2713 def submit_job_id(self, job_id, *args, **kwargs): 2714 """Same as 'submit' but job's 'id' is initialized to 'job_id'. 2715 """ 2716 if self._compute.type == _Compute.prog_type: 2717 args = [str(arg) for arg in args] 2718 try: 2719 _job = _DispyJob_(self._compute.id, job_id, args, kwargs) 2720 except Exception: 2721 logger.warning('Creating job for "%s", "%s" failed with "%s"', 2722 str(args), str(kwargs), traceback.format_exc()) 2723 return None 2724 2725 if Task(self._cluster.submit_job, _job).value() == 0: 2726 return _job.job 2727 else: 2728 return None 2729 2730 def submit_node(self, node, *args, **kwargs): 2731 """Submit a job for execution at 'node' with the given 2732 arguments. 'node' can be an instance of DispyNode (e.g., as 2733 received in cluster/job status callback) or IP address or host 2734 name. 2735 2736 Arguments should be serializable and should correspond to 2737 arguments for computation used when cluster is created. 2738 """ 2739 return self.submit_job_id_node(None, *args, **kwargs) 2740 2741 def submit_job_id_node(self, job_id, node, *args, **kwargs): 2742 """Same as 'submit_node' but job's 'id' is initialized to 'job_id'. 2743 """ 2744 if isinstance(node, DispyNode): 2745 node = self._dispy_nodes.get(node.ip_addr, None) 2746 elif isinstance(node, str): 2747 node = self._dispy_nodes.get(_node_ipaddr(node), None) 2748 else: 2749 node = None 2750 if not node: 2751 logger.warning('Invalid node') 2752 return None 2753 2754 if self._compute.type == _Compute.prog_type: 2755 args = [str(arg) for arg in args] 2756 try: 2757 _job = _DispyJob_(self._compute.id, job_id, args, kwargs) 2758 except Exception: 2759 logger.warning('Creating job for "%s", "%s" failed with "%s"', 2760 str(args), str(kwargs), traceback.format_exc()) 2761 return None 2762 2763 if Task(self._cluster.submit_job, _job, ip_addr=node.ip_addr).value() == 0: 2764 return _job.job 2765 else: 2766 return None 2767 2768 def cancel(self, job): 2769 """Cancel given job. If the job is not yet running on any 2770 node, it is simply removed from scheduler's queue. If the job 2771 is running on a node, it is terminated/killed. 2772 2773 Returns 0 if the job has been cancelled (i.e., removed from 2774 the queue or terminated). 2775 """ 2776 return Task(self._cluster.cancel_job, job).value() 2777 2778 def discover_nodes(self, nodes): 2779 """Discover given list of nodes. Each node may be host name or IP address, or an 2780 instance of NodeAllocate. If a node is '*', UDP broadcast is used to 2781 detect all nodes in local network. 2782 """ 2783 if not isinstance(nodes, list): 2784 nodes = [nodes] 2785 nodes = _parse_node_allocs(nodes) 2786 return Task(self._cluster.discover_nodes, self, nodes).value() 2787 2788 def allocate_node(self, node): 2789 """Allocate given node for this cluster. 'node' may be (list of) host 2790 name or IP address, or an instance of NodeAllocate. 2791 """ 2792 if not isinstance(node, list): 2793 node = [node] 2794 node_allocs = _parse_node_allocs(node) 2795 if not node_allocs: 2796 return -1 2797 return Task(self._cluster.allocate_node, self, node_allocs).value() 2798 2799 def deallocate_node(self, node): 2800 """Deallocate given node for this cluster. 'node' may be host name or IP 2801 address, or an instance of NodeAllocate. 2802 """ 2803 return Task(self._cluster.deallocate_node, self, node).value() 2804 2805 def close_node(self, node, terminate_pending=False): 2806 """Close given node for this cluster. 'node' may be host name or IP 2807 address, or an instance of NodeAllocate. 2808 """ 2809 return Task(self._cluster.close_node, self, node, terminate_pending).value() 2810 2811 def node_jobs(self, node, from_node=False): 2812 """Returns list of jobs currently running on given node, given 2813 as host name or IP address. 2814 """ 2815 return Task(self._cluster.node_jobs, self, node, from_node).value() 2816 2817 def set_node_cpus(self, node, cpus): 2818 """Sets (alters) CPUs managed by dispy on a node, given as 2819 host name or IP address, to given number of CPUs. If the 2820 number of CPUs given is negative then that many CPUs are not 2821 used (from the available CPUs). 2822 """ 2823 return Task(self._cluster.set_node_cpus, self, node, cpus).value() 2824 2825 def send_file(self, path, node): 2826 """Send file with given 'path' to 'node'. 'node' can be an 2827 instance of DispyNode (e.g., as received in cluster status 2828 callback) or IP address or host name. 2829 """ 2830 cwd = self._cluster.dest_path 2831 path = os.path.abspath(path) 2832 if path.startswith(cwd): 2833 dst = os.path.dirname(path[len(cwd):].lstrip(os.sep)) 2834 else: 2835 dst = '.' 2836 xf = _XferFile(path, dst, self._compute.id) 2837 return Task(self._cluster.send_file, self, node, xf).value() 2838 2839 @property 2840 def name(self): 2841 """Returns name of computation. If the computation is Python 2842 function, then this would be name of the function. If the 2843 computation is a program, then this would be name of the 2844 program (without path). 2845 """ 2846 return self._compute.name 2847 2848 def __enter__(self): 2849 return self 2850 2851 def __exit__(self, exc_type, exc_value, trace): 2852 self.close() 2853 return True 2854 2855 def status(self): 2856 """ 2857 Return cluster status (ClusterStatus structure). 2858 """ 2859 def _status(self, task=None): 2860 yield ClusterStatus(list(self._dispy_nodes.values()), self._pending_jobs) 2861 return Task(_status, self).value() 2862 2863 def print_status(self, wall_time=None): 2864 """ 2865 Prints status of cluster (see 'status'). 2866 """ 2867 2868 KB = 1024 2869 MB = 1024 * KB 2870 GB = 1024 * MB 2871 TB = 1024 * GB 2872 2873 def byte_size(size): 2874 if size > TB: 2875 return '%.1f T' % (float(size) / TB) 2876 elif size > GB: 2877 return '%.1f G' % (float(size) / GB) 2878 elif size > MB: 2879 return '%.1f M' % (float(size) / MB) 2880 elif size > KB: 2881 return '%.1f K' % (float(size) / KB) 2882 return '%d B' % size 2883 2884 print('') 2885 heading = (' %15s | %5s | %7s | %8s | %13s | %7s | %7s' % 2886 ('Node', 'CPUs', 'Jobs', 'Sec/Job', 'Node Time Sec', 'Sent', 'Rcvd')) 2887 print(heading) 2888 print('-' * len(heading)) 2889 info = self.status() 2890 cpu_time = 0.0 2891 for dispy_node in info.nodes: 2892 cpu_time += dispy_node.cpu_time 2893 if dispy_node.name: 2894 name = dispy_node.name 2895 else: 2896 name = dispy_node.ip_addr 2897 if dispy_node.jobs_done > 0: 2898 secs_per_job = dispy_node.cpu_time / dispy_node.jobs_done 2899 else: 2900 secs_per_job = 0 2901 print(' %15.15s | %5s | %7s | %8.1f | %13.1f | %7s | %7s' % 2902 (name, dispy_node.cpus, dispy_node.jobs_done, secs_per_job, dispy_node.cpu_time, 2903 byte_size(dispy_node.tx), byte_size(dispy_node.rx))) 2904 print('') 2905 if info.jobs_pending: 2906 print('Jobs pending: %s' % info.jobs_pending) 2907 msg = 'Total job time: %.3f sec' % cpu_time 2908 if not wall_time: 2909 wall_time = time.time() - self.start_time 2910 msg += ', wall time: %.3f sec, speedup: %.3f' % (wall_time, cpu_time / wall_time) 2911 print(msg) 2912 print('') 2913 2914 # for backward compatibility 2915 stats = print_status 2916 2917 def wait(self, timeout=None): 2918 """Wait for scheduled jobs to complete. 2919 """ 2920 return self._cluster.wait(self, timeout) 2921 2922 def __call__(self): 2923 """Wait for scheduled jobs to complete. 2924 """ 2925 self.wait() 2926 2927 def close(self, timeout=None, terminate=False): 2928 """Close the cluster (jobs can no longer be submitted to it). If there 2929 are any jobs pending, this method waits until they all finish, unless 2930 'terminate' is True in which case pending jobs are cancelled (removed or 2931 terminated by nodes executing them). Additional clusters may be created 2932 after this call returns. 2933 """ 2934 if getattr(self, '_compute', None): 2935 ret = self._complete.wait(timeout=timeout) 2936 if not terminate and not ret: 2937 return False 2938 self._complete.set() 2939 Task(self._cluster.del_cluster, self).value() 2940 self._compute = None 2941 return True 2942 2943 def shutdown(self): 2944 """Close the cluster and shutdown the scheduler (so additional 2945 clusters can't be created). 2946 """ 2947 self.close() 2948 if self._cluster: 2949 cluster, self._cluster = self._cluster, None 2950 cluster.shutdown() 2951 2952 2953class SharedJobCluster(JobCluster): 2954 """SharedJobCluster should be used (instead of JobCluster) if two 2955 or more processes can simultaneously use dispy. In this case, 2956 'dispyscheduler' must be running on a node and 'scheduler_node' 2957 parameter should be set to that node's IP address or host name. 2958 2959 @scheduler_node is name or IP address where dispyscheduler is 2960 running to which jobs are submitted. 2961 2962 @scheduler_port is port where dispyscheduler is running at 2963 @scheduler_node. 2964 2965 @port is port where this client will get job results from 2966 dispyscheduler. 2967 2968 @pulse_interval for SharedJobCluster is not used; instead, 2969 dispyscheduler must be called with appropriate pulse_interval. 2970 The behaviour is same as for JobCluster. 2971 """ 2972 def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None, 2973 ip_addr=None, port=51347, scheduler_node=None, scheduler_port=None, 2974 ext_ip_addr=None, loglevel=logger.INFO, setup=None, cleanup=True, dest_path=None, 2975 poll_interval=None, reentrant=False, exclusive=False, 2976 secret='', keyfile=None, certfile=None, recover_file=None): 2977 2978 self.scheduler_ip_addr = _node_ipaddr(scheduler_node) 2979 self.addrinfo = host_addrinfo(host=ip_addr) 2980 self.addrinfo.family = socket.getaddrinfo(self.scheduler_ip_addr, None)[0][0] 2981 2982 if not nodes: 2983 nodes = ['*'] 2984 elif not isinstance(nodes, list): 2985 if isinstance(nodes, str): 2986 nodes = [nodes] 2987 else: 2988 raise Exception('"nodes" must be list of IP addresses or host names') 2989 node_allocs = _parse_node_allocs(nodes) 2990 if not node_allocs: 2991 raise Exception('"nodes" argument is invalid') 2992 node_allocs = [(na.ip_addr, na.port, na.cpus) for na in node_allocs] 2993 if ext_ip_addr: 2994 ext_ip_addr = host_addrinfo(host=ext_ip_addr).ip 2995 2996 JobCluster.__init__(self, computation, depends=depends, 2997 callback=callback, cluster_status=cluster_status, 2998 ip_addr=ip_addr, port=port, ext_ip_addr=ext_ip_addr, 2999 loglevel=loglevel, setup=setup, cleanup=cleanup, dest_path=dest_path, 3000 poll_interval=poll_interval, reentrant=reentrant, 3001 secret=secret, keyfile=keyfile, certfile=certfile, 3002 recover_file=recover_file) 3003 3004 def _terminate_scheduler(self, task=None): 3005 yield self._cluster._sched_event.set() 3006 3007 # wait for scheduler to terminate 3008 self._cluster.terminate = True 3009 Task(_terminate_scheduler, self).value() 3010 self._cluster._scheduler.value() 3011 self._cluster.job_uid = None 3012 3013 if not scheduler_port: 3014 scheduler_port = 51349 3015 3016 # wait until tcp server has started 3017 while not self._cluster.port: 3018 time.sleep(0.1) 3019 3020 sock = socket.socket(self.addrinfo.family, socket.SOCK_STREAM) 3021 sock = AsyncSocket(sock, blocking=True, keyfile=keyfile, certfile=certfile) 3022 sock.connect((self.scheduler_ip_addr, scheduler_port)) 3023 sock.sendall(self._cluster.auth) 3024 req = {'version': _dispy_version, 'ip_addr': ext_ip_addr, 3025 'scheduler_ip_addr': self.scheduler_ip_addr} 3026 sock.send_msg(b'CLIENT:' + serialize(req)) 3027 reply = sock.recv_msg() 3028 sock.close() 3029 reply = deserialize(reply) 3030 if reply['version'] != _dispy_version: 3031 raise Exception('dispyscheduler version "%s" is different from dispy version "%s"' % 3032 reply['version'], _dispy_version) 3033 ext_ip_addr = reply['ip_addr'] 3034 3035 self.scheduler_port = reply['port'] 3036 self._scheduler_auth = auth_code(secret, reply['sign']) 3037 self._compute.scheduler_ip_addr = ext_ip_addr 3038 self._compute.scheduler_port = self._cluster.port 3039 self._compute.job_result_port = self._cluster.port 3040 3041 sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True, 3042 keyfile=keyfile, certfile=certfile) 3043 sock.settimeout(MsgTimeout) 3044 try: 3045 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3046 sock.sendall(self._scheduler_auth) 3047 req = {'compute': self._compute, 'node_allocs': node_allocs, 3048 'exclusive': bool(exclusive)} 3049 sock.send_msg(b'COMPUTE:' + serialize(req)) 3050 reply = sock.recv_msg() 3051 reply = deserialize(reply) 3052 if isinstance(reply, dict): 3053 self._compute.id = reply['compute_id'] 3054 self._compute.auth = reply['auth'] 3055 else: 3056 raise Exception('Scheduler refused computation: %s' % reply) 3057 except Exception: 3058 raise 3059 finally: 3060 sock.close() 3061 3062 for xf in self._compute.xfer_files: 3063 xf.compute_id = self._compute.id 3064 logger.debug('Sending file "%s"', xf.name) 3065 sock = socket.socket(self.addrinfo.family, socket.SOCK_STREAM) 3066 sock = AsyncSocket(sock, blocking=True, keyfile=keyfile, certfile=certfile) 3067 sock.settimeout(MsgTimeout) 3068 try: 3069 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3070 sock.sendall(self._scheduler_auth) 3071 sock.send_msg(b'FILEXFER:' + serialize(xf)) 3072 recvd = sock.recv_msg() 3073 recvd = deserialize(recvd) 3074 sent = 0 3075 with open(xf.name, 'rb') as fd: 3076 while sent == recvd: 3077 data = fd.read(1024000) 3078 if not data: 3079 break 3080 sock.sendall(data) 3081 sent += len(data) 3082 recvd = sock.recv_msg() 3083 recvd = deserialize(recvd) 3084 assert recvd == xf.stat_buf.st_size 3085 except Exception: 3086 logger.error('Could not transfer %s to %s', xf.name, self.scheduler_ip_addr) 3087 # TODO: delete computation? 3088 sock.close() 3089 3090 Task(self._cluster.add_cluster, self).value() 3091 self._scheduled_event = threading.Event() 3092 sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True, 3093 keyfile=keyfile, certfile=certfile) 3094 sock.settimeout(MsgTimeout) 3095 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3096 sock.sendall(self._scheduler_auth) 3097 req = {'compute_id': self._compute.id, 'auth': self._compute.auth} 3098 sock.send_msg(b'SCHEDULE:' + serialize(req)) 3099 resp = sock.recv_msg() 3100 sock.close() 3101 if resp == b'ACK': 3102 self._scheduled_event.wait() 3103 logger.debug('Computation %s created with %s', self._compute.name, self._compute.id) 3104 else: 3105 self._cluster._clusters.pop(self._compute.id, None) 3106 raise Exception('Computation "%s" could not be sent to scheduler' % self._compute.name) 3107 3108 def submit(self, *args, **kwargs): 3109 """Submit a job for execution with the given arguments. 3110 3111 Arguments should be serializable and should correspond to 3112 arguments for computation used when cluster is created. 3113 """ 3114 return self.submit_job_id_node(None, None, *args, **kwargs) 3115 3116 def submit_job_id(self, job_id, *args, **kwargs): 3117 """Same as 'submit' but job's 'id' is initialized to 'job_id'. 3118 """ 3119 return self.submit_job_id_node(job_id, None, *args, **kwargs) 3120 3121 def submit_node(self, node, *args, **kwargs): 3122 """Submit a job for execution at 'node' with the given 3123 arguments. 'node' can be an instance of DispyNode (e.g., as 3124 received in cluster/job status callback) or IP address or host 3125 name. 3126 3127 Arguments should be serializable and should correspond to 3128 arguments for computation used when cluster is created. 3129 """ 3130 return self.submit_job_id_node(None, node, *args, **kwargs) 3131 3132 def submit_job_id_node(self, job_id, node, *args, **kwargs): 3133 """Same as 'submit_node' but job's 'id' is initialized to 'job_id'. 3134 """ 3135 if node: 3136 if isinstance(node, DispyNode): 3137 node = node.ip_addr 3138 elif isinstance(node, str): 3139 pass 3140 else: 3141 node = None 3142 if not node: 3143 return None 3144 3145 if self._compute.type == _Compute.prog_type: 3146 args = [str(arg) for arg in args] 3147 try: 3148 _job = _DispyJob_(self._compute.id, job_id, args, kwargs) 3149 except Exception: 3150 logger.warning('Creating job for "%s", "%s" failed with "%s"', 3151 str(args), str(kwargs), traceback.format_exc()) 3152 return None 3153 3154 job = None 3155 try: 3156 for xf in _job.xfer_files: 3157 sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), 3158 blocking=True, 3159 keyfile=self._cluster.keyfile, certfile=self._cluster.certfile) 3160 sock.settimeout(MsgTimeout) 3161 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3162 sock.sendall(self._scheduler_auth) 3163 sock.send_msg(b'FILEXFER:' + serialize(xf)) 3164 recvd = sock.recv_msg() 3165 recvd = deserialize(recvd) 3166 sent = 0 3167 with open(xf.name, 'rb') as fd: 3168 while sent == recvd: 3169 data = fd.read(1024000) 3170 if not data: 3171 break 3172 sock.sendall(data) 3173 sent += len(data) 3174 recvd = sock.recv_msg() 3175 recvd = deserialize(recvd) 3176 assert recvd == xf.stat_buf.st_size 3177 sock.close() 3178 3179 sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), 3180 blocking=True, 3181 keyfile=self._cluster.keyfile, certfile=self._cluster.certfile) 3182 sock.settimeout(MsgTimeout) 3183 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3184 sock.sendall(self._scheduler_auth) 3185 req = {'node': node, 'job': _job, 'auth': self._compute.auth} 3186 sock.send_msg(b'JOB:' + serialize(req)) 3187 msg = sock.recv_msg() 3188 _job.uid = deserialize(msg) 3189 if _job.uid: 3190 job = _job.job 3191 self._cluster._sched_jobs[_job.uid] = _job 3192 self._pending_jobs += 1 3193 self._complete.clear() 3194 sock.send_msg(b'ACK') 3195 if self.status_callback: 3196 self._cluster.worker_Q.put((self.status_callback, 3197 (DispyJob.Created, None, copy.copy(_job.job)))) 3198 else: 3199 sock.send_msg('NAK'.encode()) 3200 _job.job._dispy_job_ = None 3201 del _job.job 3202 except Exception: 3203 logger.warning('Creating job for "%s", "%s" failed with "%s"', 3204 str(args), str(kwargs), traceback.format_exc()) 3205 _job.job._dispy_job_ = None 3206 del _job.job 3207 job = None 3208 finally: 3209 sock.close() 3210 return job 3211 3212 def cancel(self, job): 3213 """Similar to 'cancel' of JobCluster. 3214 """ 3215 _job = job._dispy_job_ 3216 if _job is None or self._cluster._clusters.get(_job.compute_id, None) != self: 3217 logger.warning('Invalid job %s for cluster "%s"!', job.id, self._compute.name) 3218 return -1 3219 if job.status not in [DispyJob.Created, DispyJob.Running, DispyJob.ProvisionalResult]: 3220 logger.warning('Job %s is not valid for cancel (%s)', job.id, job.status) 3221 return -1 3222 3223 job.status = DispyJob.Cancelled 3224 # assert self._pending_jobs >= 1 3225 sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True, 3226 keyfile=self._cluster.keyfile, certfile=self._cluster.certfile) 3227 sock.settimeout(MsgTimeout) 3228 try: 3229 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3230 sock.sendall(self._scheduler_auth) 3231 req = {'uid': _job.uid, 'compute_id': self._compute.id, 'auth': self._compute.auth} 3232 sock.send_msg(b'TERMINATE_JOB:' + serialize(req)) 3233 except Exception: 3234 logger.warning('Could not connect to scheduler to terminate job') 3235 return -1 3236 finally: 3237 sock.close() 3238 return 0 3239 3240 def allocate_node(self, node): 3241 """Similar to 'allocate_node' of JobCluster. 3242 """ 3243 if not isinstance(node, list): 3244 node = [node] 3245 node_allocs = _parse_node_allocs(node) 3246 if not node_allocs: 3247 return(-1) 3248 if len(node_allocs) != 1: 3249 return -1 3250 3251 sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True, 3252 keyfile=self._cluster.keyfile, certfile=self._cluster.certfile) 3253 sock.settimeout(MsgTimeout) 3254 try: 3255 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3256 sock.sendall(self._scheduler_auth) 3257 req = {'compute_id': self._compute.id, 'auth': self._compute.auth, 3258 'node_alloc': node_allocs} 3259 sock.send_msg(b'ALLOCATE_NODE:' + serialize(req)) 3260 reply = sock.recv_msg() 3261 reply = deserialize(reply) 3262 except Exception: 3263 logger.warning('Could not connect to scheduler to add node') 3264 reply = -1 3265 finally: 3266 sock.close() 3267 return reply 3268 3269 def deallocate_node(self, node): 3270 """Similar to 'allocate_node' of JobCluster. 3271 """ 3272 if isinstance(node, DispyNode): 3273 node = node.ip_addr 3274 elif isinstance(node, str): 3275 pass 3276 if not node: 3277 return -1 3278 sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True, 3279 keyfile=self._cluster.keyfile, certfile=self._cluster.certfile) 3280 sock.settimeout(MsgTimeout) 3281 try: 3282 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3283 sock.sendall(self._scheduler_auth) 3284 req = {'compute_id': self._compute.id, 'auth': self._compute.auth, 'node': node} 3285 sock.send_msg(b'DEALLOCATE_NODE:' + serialize(req)) 3286 reply = sock.recv_msg() 3287 reply = deserialize(reply) 3288 except Exception: 3289 logger.warning('Could not connect to scheduler to add node') 3290 reply = -1 3291 finally: 3292 sock.close() 3293 return reply 3294 3295 def close_node(self, node, terminate_pending=False): 3296 """Similar to 'cloe_node' of JobCluster. 3297 """ 3298 if isinstance(node, DispyNode): 3299 node = node.ip_addr 3300 elif isinstance(node, str): 3301 pass 3302 if not node: 3303 return -1 3304 sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True, 3305 keyfile=self._cluster.keyfile, certfile=self._cluster.certfile) 3306 sock.settimeout(MsgTimeout) 3307 try: 3308 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3309 sock.sendall(self._scheduler_auth) 3310 req = {'compute_id': self._compute.id, 'auth': self._compute.auth, 'node': node, 3311 'terminate_pending': terminate_pending} 3312 sock.send_msg(b'CLOSE_NODE:' + serialize(req)) 3313 reply = sock.recv_msg() 3314 reply = deserialize(reply) 3315 except Exception: 3316 logger.warning('Could not connect to scheduler to add node') 3317 reply = -1 3318 finally: 3319 sock.close() 3320 return reply 3321 3322 def node_jobs(self, node, from_node=False): 3323 """Similar to 'node_jobs' of JobCluster. 3324 """ 3325 if isinstance(node, DispyNode): 3326 node = node.ip_addr 3327 elif isinstance(node, str): 3328 pass 3329 if not node: 3330 return [] 3331 sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True, 3332 keyfile=self._cluster.keyfile, certfile=self._cluster.certfile) 3333 sock.settimeout(MsgTimeout) 3334 try: 3335 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3336 sock.sendall(self._scheduler_auth) 3337 req = {'compute_id': self._compute.id, 'auth': self._compute.auth, 3338 'node': node, 'get_uids': True, 'from_node': bool(from_node)} 3339 sock.send_msg(b'NODE_JOBS:' + serialize(req)) 3340 reply = sock.recv_msg() 3341 job_uids = deserialize(reply) 3342 _jobs = [self._cluster._sched_jobs.get(uid, None) for uid in job_uids] 3343 except Exception: 3344 logger.warning('Could not connect to scheduler to get running jobs at node') 3345 _jobs = [] 3346 finally: 3347 sock.close() 3348 jobs = [_job.job for _job in _jobs if _job] 3349 return jobs 3350 3351 def set_node_cpus(self, node, cpus): 3352 """Similar to 'set_node_cpus' of JobCluster. 3353 """ 3354 # setting a node's cpus may affect other clients, so (for now) disable it. 3355 return -1 3356 3357 if isinstance(node, DispyNode): 3358 node = node.ip_addr 3359 elif isinstance(node, str): 3360 pass 3361 if not node: 3362 return -1 3363 sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True, 3364 keyfile=self._cluster.keyfile, certfile=self._cluster.certfile) 3365 sock.settimeout(MsgTimeout) 3366 try: 3367 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3368 sock.sendall(self._scheduler_auth) 3369 req = {'compute_id': self._compute.id, 'auth': self._compute.auth, 'node': node, 3370 'cpus': cpus} 3371 sock.send_msg(b'SET_NODE_CPUS:' + serialize(req)) 3372 reply = sock.recv_msg() 3373 reply = deserialize(reply) 3374 if isinstance(reply, tuple) and reply[0]: 3375 node = self._dispy_nodes.get(reply[0], None) 3376 if node: 3377 node.cpus = reply[1] 3378 except Exception: 3379 logger.warning('Could not connect to scheduler to add node') 3380 return -1 3381 finally: 3382 sock.close() 3383 return reply 3384 3385 def send_file(self, path, node): 3386 """Send file with given 'path' to 'node'. 'node' can be an 3387 instance of DispyNode (e.g., as received in cluster status 3388 callback) or IP address or host name. 3389 """ 3390 if isinstance(node, DispyNode): 3391 node = node.ip_addr 3392 elif isinstance(node, str): 3393 pass 3394 if not node: 3395 return -1 3396 3397 cwd = self._cluster.dest_path 3398 path = os.path.abspath(path) 3399 if path.startswith(cwd): 3400 dst = os.path.dirname(path[len(cwd):].lstrip(os.sep)) 3401 else: 3402 dst = '.' 3403 xf = _XferFile(path, dst, self._compute.id) 3404 sock = AsyncSocket(socket.socket(self.addrinfo.family, socket.SOCK_STREAM), blocking=True, 3405 keyfile=self._cluster.keyfile, certfile=self._cluster.certfile) 3406 sock.settimeout(MsgTimeout) 3407 try: 3408 sock.connect((self.scheduler_ip_addr, self.scheduler_port)) 3409 sock.sendall(self._scheduler_auth) 3410 sock.send_msg(b'SENDFILE:' + serialize({'node': node, 'xf': xf})) 3411 recvd = sock.recv_msg() 3412 recvd = deserialize(recvd) 3413 sent = 0 3414 with open(xf.name, 'rb') as fd: 3415 while sent == recvd: 3416 data = fd.read(1024000) 3417 if not data: 3418 break 3419 sock.sendall(data) 3420 sent += len(data) 3421 recvd = sock.recv_msg() 3422 recvd = deserialize(recvd) 3423 assert recvd == xf.stat_buf.st_size 3424 except Exception: 3425 return -1 3426 else: 3427 return 0 3428 3429 3430def recover_jobs(recover_file=None, timeout=None, terminate_pending=False): 3431 """ 3432 If dispy client crashes or loses connection to nodes, the nodes 3433 will continue to execute scheduled jobs. This 'recover_jobs' 3434 function can be used to retrieve the results of those jobs 3435 (DispyJob objects). 3436 3437 @recover_file is path to file in which dispy stored information 3438 about cluster (see 'recover_file' in JobCluster above). If 3439 incorrect 'recover_file' is used, this function issues a warning 3440 and will block. 3441 3442 @timeout is time limit in seconds for recovery. This function will 3443 return all jobs that finish before 'timeout'. Any jobs still 3444 running or couldn't be recovered before timeout will be ignored. 3445 3446 @terminate_pending indicates if any jobs currently running should 3447 be terminated (so that, for example, node can be used for 3448 computations again right away instead of having to wait until 3449 all jobs finish). 3450 3451 Returns list of DispyJob instances that will have .result, 3452 .stdout, .stderr etc.; however, the nodes don't keep track of .id, 3453 .args, .kwargs so they will be None. 3454 3455 Once all the jobs that were scheduled at the time of crash are 3456 retrieved (if the jobs are still running, this function will block 3457 until all the jobs are finished and results obtained), nodes are 3458 closed (so they can serve new clients), 'recover_file' is removed 3459 and the jobs are returned. 3460 """ 3461 3462 if not recover_file: 3463 import glob 3464 recover_file = sorted(glob.glob('_dispy_*')) 3465 if recover_file: 3466 recover_file = recover_file[-1] 3467 else: 3468 print('Could not find recover file of the form "_dispy_*"') 3469 return [] 3470 3471 shelf_nodes = {} 3472 computes = {} 3473 cluster = None 3474 pycos_scheduler = pycos.Pycos.instance() 3475 3476 try: 3477 shelf = shelve.open(recover_file, flag='r') 3478 except Exception: 3479 print('Could not open recover file "%s"' % recover_file) 3480 return [] 3481 3482 for key, val in shelf.items(): 3483 if key.startswith('node_'): 3484 shelf_nodes[key[len('node_'):]] = val 3485 elif key.startswith('compute_'): 3486 computes[int(key[len('compute_'):])] = val 3487 elif key == '_cluster': 3488 cluster = val 3489 else: 3490 logger.warning('Invalid key "%s" ignored', key) 3491 shelf.close() 3492 if not cluster or not computes or not shelf_nodes: 3493 for ext in ('', '.db', '.bak', '.dat', '.dir'): 3494 if os.path.isfile(recover_file + ext): 3495 try: 3496 os.remove(recover_file + ext) 3497 except Exception: 3498 pass 3499 return [] 3500 3501 nodes = {} 3502 for ip_addr, info in shelf_nodes.items(): 3503 node = _Node(ip_addr, info['port'], 0, '', cluster['secret'], platform='', 3504 keyfile=cluster['keyfile'], certfile=cluster['certfile']) 3505 node.auth = info['auth'] 3506 if info.get('scheduler'): 3507 node.scheduler_ip_addr = ip_addr 3508 nodes[node.ip_addr] = node 3509 3510 def tcp_req(conn, addr, pending, task=None): 3511 # generator 3512 conn.settimeout(MsgTimeout) 3513 msg = yield conn.recv_msg() 3514 if msg.startswith(b'JOB_REPLY:'): 3515 try: 3516 reply = deserialize(msg[len(b'JOB_REPLY:'):]) 3517 except Exception: 3518 logger.warning('Invalid job reply from %s:%s ignored', addr[0], addr[1]) 3519 conn.close() 3520 return 3521 yield conn.send_msg(b'ACK') 3522 logger.debug('Received reply for job %s', reply.uid) 3523 job = DispyJob(None, (), {}) 3524 job.result = deserialize(reply.result) 3525 job.stdout = reply.stdout 3526 job.stderr = reply.stderr 3527 job.exception = reply.exception 3528 job.start_time = reply.start_time 3529 job.end_time = reply.end_time 3530 job.status = reply.status 3531 job.ip_addr = reply.ip_addr 3532 job.finish.set() 3533 pending['jobs'].append(job) 3534 pending['count'] -= 1 3535 if pending['count'] == 0 and pending['resend_req_done'] is True: 3536 pending['complete'].set() 3537 else: 3538 logger.debug('Invalid TCP message from %s ignored', addr[0]) 3539 conn.close() 3540 3541 def tcp_server(ip_addr, pending, task=None): 3542 task.set_daemon() 3543 addrinfo = host_addrinfo(host=ip_addr) 3544 sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_STREAM), 3545 keyfile=cluster['keyfile'], certfile=cluster['certfile']) 3546 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 3547 sock.bind((ip_addr, cluster['port'])) 3548 sock.listen(32) 3549 3550 while 1: 3551 if pending['timeout'] is not None: 3552 timeout = pending['timeout'] - (time.time() - pending['start_time']) 3553 if timeout <= 0: 3554 pending['complete'].set() 3555 timeout = 2 3556 sock.settimeout(timeout) 3557 3558 try: 3559 conn, addr = yield sock.accept() 3560 except ssl.SSLError as err: 3561 logger.debug('SSL connection failed: %s', str(err)) 3562 continue 3563 except GeneratorExit: 3564 sock.close() 3565 break 3566 except socket.timeout: 3567 continue 3568 except Exception: 3569 continue 3570 else: 3571 Task(tcp_req, conn, addr, pending) 3572 return 3573 3574 def resend_requests(pending, task=None): 3575 for compute_id, compute in list(computes.items()): 3576 if pending['timeout'] is not None and \ 3577 ((time.time() - pending['start_time']) > pending['timeout']): 3578 break 3579 req = {'compute_id': compute_id, 'auth': compute['auth']} 3580 for ip_addr in compute['nodes']: 3581 node = nodes.get(ip_addr, None) 3582 if not node: 3583 continue 3584 try: 3585 reply = yield node.send(b'RESEND_JOB_RESULTS:' + serialize(req)) 3586 reply = deserialize(reply) 3587 assert isinstance(reply, int) 3588 except Exception: 3589 logger.warning('Invalid resend reply from %s', ip_addr) 3590 continue 3591 logger.debug('Pending jobs from %s for %s: %s', 3592 node.ip_addr, compute['name'], reply) 3593 if reply == 0: 3594 req['node_ip_addr'] = ip_addr 3595 try: 3596 yield node.send(b'CLOSE:' + serialize(req), reply=True, task=task) 3597 except Exception: 3598 pass 3599 else: 3600 pending['count'] += reply 3601 pending['resend_req_done'] = True 3602 if pending['count'] == 0: 3603 pending['complete'].set() 3604 3605 pending = {'count': 0, 'resend_req_done': False, 'jobs': [], 'complete': threading.Event(), 3606 'timeout': timeout, 'start_time': time.time()} 3607 for ip_addr in cluster['ip_addrs']: 3608 if not ip_addr: 3609 ip_addr = '' 3610 Task(tcp_server, ip_addr, pending) 3611 3612 Task(resend_requests, pending) 3613 3614 pending['complete'].wait() 3615 3616 for compute_id, compute in computes.items(): 3617 req = {'compute_id': compute_id, 'auth': compute['auth'], 3618 'terminate_pending': terminate_pending} 3619 for ip_addr in compute['nodes']: 3620 node = nodes.get(ip_addr, None) 3621 if not node: 3622 continue 3623 if node.scheduler_ip_addr: 3624 continue 3625 req['node_ip_addr'] = ip_addr 3626 Task(node.send, b'CLOSE:' + serialize(req), reply=True) 3627 3628 if terminate_pending: 3629 # wait a bit to get cancelled job results 3630 for x in range(10): 3631 if pending['count'] == 0 and pending['resend_req_done'] is True: 3632 break 3633 time.sleep(0.2) 3634 3635 pycos_scheduler.finish() 3636 3637 if pending['count'] == 0 and pending['resend_req_done'] is True: 3638 for ext in ('', '.db', '.bak', '.dat', '.dir'): 3639 if os.path.isfile(recover_file + ext): 3640 try: 3641 os.remove(recover_file + ext) 3642 except Exception: 3643 pass 3644 return pending['jobs'] 3645 3646 3647if __name__ == '__main__': 3648 import argparse 3649 3650 logger.info('dispy version %s', _dispy_version) 3651 3652 parser = argparse.ArgumentParser() 3653 parser.add_argument('computation', help='program to distribute and parallelize') 3654 parser.add_argument('-c', action='store_false', dest='cleanup', default=True, 3655 help='if True, nodes will remove any files transferred when ' 3656 'this computation is over') 3657 parser.add_argument('-d', '--debug', action='store_true', dest='loglevel', default=False, 3658 help='if given, debug messages are printed') 3659 parser.add_argument('-a', action='append', dest='args', default=[], 3660 help='argument(s) to program; repeat for multiple instances') 3661 parser.add_argument('-f', action='append', dest='depends', default=[], 3662 help='dependencies (files) needed by program') 3663 parser.add_argument('-n', '--nodes', action='append', dest='nodes', default=[], 3664 help='list of nodes (names or IP address) acceptable for this computation') 3665 parser.add_argument('--ip_addr', dest='ip_addr', default=None, 3666 help='IP address of this client') 3667 parser.add_argument('--secret', dest='secret', default='', 3668 help='authentication secret for handshake with nodes') 3669 parser.add_argument('--certfile', dest='certfile', default='', 3670 help='file containing SSL certificate') 3671 parser.add_argument('--keyfile', dest='keyfile', default='', 3672 help='file containing SSL key') 3673 parser.add_argument('--scheduler_node', dest='scheduler_node', default=None, 3674 help='name or IP address where dispyscheduler is running to which ' 3675 'jobs are submitted') 3676 3677 config = vars(parser.parse_args(sys.argv[1:])) 3678 3679 if config['loglevel']: 3680 logger.setLevel(logger.DEBUG) 3681 pycos.logger.setLevel(logger.DEBUG) 3682 else: 3683 logger.setLevel(logger.INFO) 3684 del config['loglevel'] 3685 3686 if config['certfile']: 3687 config['certfile'] = os.path.abspath(config['certfile']) 3688 else: 3689 config['certfile'] = None 3690 if config['keyfile']: 3691 config['keyfile'] = os.path.abspath(config['keyfile']) 3692 else: 3693 config['keyfile'] = None 3694 3695 args = config.pop('args') 3696 3697 if config['scheduler_node']: 3698 cluster = SharedJobCluster(**config) 3699 else: 3700 del config['scheduler_node'] 3701 cluster = JobCluster(**config) 3702 3703 jobs = [] 3704 for n, arg in enumerate(args, start=1): 3705 job = cluster.submit(*(arg.split())) 3706 job.id = n 3707 jobs.append((job, arg)) 3708 3709 for job, args in jobs: 3710 job() 3711 sargs = ''.join(arg for arg in args) 3712 if job.exception: 3713 print('Job %s with arguments "%s" failed with "%s"' % 3714 (job.id, sargs, job.exception)) 3715 continue 3716 if job.result: 3717 print('Job %s with arguments "%s" exited with: "%s"' % 3718 (job.id, sargs, str(job.result))) 3719 if job.stdout: 3720 print('Job %s with arguments "%s" produced output: "%s"' % 3721 (job.id, sargs, job.stdout)) 3722 if job.stderr: 3723 print('Job %s with argumens "%s" produced error messages: "%s"' % 3724 (job.id, sargs, job.stderr)) 3725 3726 cluster.print_status() 3727 exit(0) 3728