1#!/usr/bin/env python3 2 3 4""" 5dispyscheduler: Schedule jobs to nodes running 'dispynode'; needed 6when multiple processes may use same nodes simultaneously with 7SharedJobCluster; see accompanying 'dispy' for more details. 8""" 9 10import os 11import sys 12import time 13import socket 14import stat 15import re 16import ssl 17import atexit 18import traceback 19import tempfile 20import shutil 21import glob 22import pickle 23import hashlib 24import struct 25try: 26 import netifaces 27except ImportError: 28 netifaces = None 29 30# 'httpd' module may not be available at sys.path[0] as 'dispy.py' is 31# installed in same directory as this script is; prepend directory 32# where httpd.py module is installed to sys.path. 33for path in sys.path: 34 if os.path.isfile(os.path.join(path, 'dispy', 'httpd.py')): 35 sys.path.insert(0, path) 36 break 37del path 38 39import pycos 40from pycos import Task, Pycos, AsyncSocket, Singleton, serialize, deserialize 41import dispy 42import dispy.httpd 43from dispy import _Compute, DispyJob, _DispyJob_, _Function, _Node, DispyNode, NodeAllocate, \ 44 _JobReply, auth_code, num_min, _parse_node_allocs, _XferFile, _dispy_version, \ 45 _same_file, MsgTimeout, _node_ipaddr 46 47__author__ = "Giridhar Pemmasani (pgiri@yahoo.com)" 48__email__ = "pgiri@yahoo.com" 49__copyright__ = "Copyright 2011, Giridhar Pemmasani" 50__contributors__ = [] 51__maintainer__ = "Giridhar Pemmasani (pgiri@yahoo.com)" 52__license__ = "Apache 2.0" 53__url__ = "http://dispy.sourceforge.net" 54__status__ = "Production" 55__version__ = _dispy_version 56__all__ = [] 57 58MaxFileSize = 0 59# PyPI / pip packaging adjusts assertion below for Python 3.7+ 60assert sys.version_info.major == 3 and sys.version_info.minor < 7, \ 61 ('"%s" is not suitable for Python version %s.%s; use file installed by pip instead' % 62 (__file__, sys.version_info.major, sys.version_info.minor)) 63 64 65class _Cluster(object): 66 """Internal use only. 67 """ 68 def __init__(self, compute, node_allocs, scheduler): 69 self._compute = compute 70 # self.name = compute.name 71 self.name = '%s @ %s' % (compute.name, compute.scheduler_ip_addr) 72 self._node_allocs = _parse_node_allocs(node_allocs) 73 self._node_allocs = sorted(self._node_allocs, 74 key=lambda node_alloc: node_alloc.ip_rex, reverse=True) 75 self.scheduler = scheduler 76 self.status_callback = None 77 self.pending_jobs = 0 78 self.pending_results = 0 79 self._jobs = [] 80 self._dispy_nodes = {} 81 self.cpu_time = 0 82 self.start_time = time.time() 83 self.end_time = None 84 self.job_sched_time = 0 85 self.zombie = False 86 self.exclusive = False 87 self.last_pulse = time.time() 88 self.client_ip_addr = None 89 self.client_port = None 90 self.client_sock_family = None 91 self.client_job_result_port = None 92 self.client_auth = None 93 self.ip_addr = None 94 self.dest_path = None 95 self.file_uses = {} 96 97 def __getstate__(self): 98 state = dict(self.__dict__) 99 for var in ('_node_allocs', 'scheduler', 'status_callback', '_jobs', '_dispy_nodes'): 100 state.pop(var, None) 101 return state 102 103 def node_jobs(self, node, from_node=False, task=None): 104 jobs = Task(self.scheduler.node_jobs, self, node, from_node, get_uids=False).value() 105 return jobs 106 107 def cancel(self, job): 108 return self.scheduler.cancel_job(self, job.id) 109 110 def allocate_node(self, node_alloc): 111 if not isinstance(node_alloc, list): 112 node_alloc = [node_alloc] 113 node_allocs = _parse_node_allocs(node_alloc) 114 Task(self.scheduler.allocate_node, self, node_allocs) 115 116 def set_node_cpus(self, node, cpus): 117 return Task(self.scheduler.set_node_cpus, node, cpus).value() 118 119 120class _Scheduler(object, metaclass=Singleton): 121 """Internal use only. 122 123 See dispy's JobCluster and SharedJobCluster for documentation. 124 """ 125 126 def __init__(self, nodes=[], ip_addrs=[], ext_ip_addrs=[], port=None, node_port=None, 127 scheduler_port=None, ipv4_udp_multicast=False, scheduler_alg=None, 128 pulse_interval=None, ping_interval=None, cooperative=False, cleanup_nodes=False, 129 node_secret='', cluster_secret='', node_keyfile=None, node_certfile=None, 130 cluster_keyfile=None, cluster_certfile=None, dest_path_prefix=None, clean=False, 131 zombie_interval=60, http_server=False): 132 self.ipv4_udp_multicast = bool(ipv4_udp_multicast) 133 self.addrinfos = {} 134 if not ip_addrs: 135 ip_addrs = [None] 136 for i in range(len(ip_addrs)): 137 ip_addr = ip_addrs[i] 138 if i < len(ext_ip_addrs): 139 ext_ip_addr = ext_ip_addrs[i] 140 else: 141 ext_ip_addr = None 142 addrinfo = dispy.host_addrinfo(host=ip_addr, ipv4_multicast=self.ipv4_udp_multicast) 143 if not addrinfo: 144 logger.warning('Ignoring invalid ip_addr %s', ip_addr) 145 continue 146 if ext_ip_addr: 147 ext_ip_addr = dispy._node_ipaddr(ext_ip_addr) 148 if not ext_ip_addr: 149 logger.warning('Ignoring invalid ext_ip_addr %s', ext_ip_addrs[i]) 150 if not ext_ip_addr: 151 ext_ip_addr = addrinfo.ip 152 addrinfo.ext_ip_addr = ext_ip_addr 153 self.addrinfos[addrinfo.ext_ip_addr] = addrinfo 154 if not self.addrinfos: 155 raise Exception('No valid IP address found') 156 157 if not port: 158 port = 51347 159 if not node_port: 160 node_port = 51348 161 if not scheduler_port: 162 scheduler_port = 51349 163 if not nodes: 164 nodes = ['*'] 165 166 self.port = port 167 self.node_port = node_port 168 self.scheduler_port = scheduler_port 169 self._node_allocs = _parse_node_allocs(nodes) 170 self._nodes = {} 171 self.node_secret = node_secret 172 self.node_keyfile = node_keyfile 173 self.node_certfile = node_certfile 174 self.cluster_secret = cluster_secret 175 self.cluster_keyfile = cluster_keyfile 176 self.cluster_certfile = cluster_certfile 177 if not dest_path_prefix: 178 dest_path_prefix = os.path.join(tempfile.gettempdir(), 'dispy', 'scheduler') 179 self.dest_path_prefix = os.path.abspath(dest_path_prefix.strip()).rstrip(os.sep) 180 if clean: 181 shutil.rmtree(self.dest_path_prefix, ignore_errors=True) 182 if not os.path.isdir(self.dest_path_prefix): 183 os.makedirs(self.dest_path_prefix) 184 os.chmod(self.dest_path_prefix, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) 185 186 self.cooperative = bool(cooperative) 187 self.cleanup_nodes = bool(cleanup_nodes) 188 if pulse_interval: 189 try: 190 self.pulse_interval = float(pulse_interval) 191 assert 1.0 <= self.pulse_interval <= 1000 192 except Exception: 193 raise Exception('Invalid pulse_interval; must be between 1 and 1000') 194 else: 195 self.pulse_interval = None 196 197 if ping_interval: 198 try: 199 self.ping_interval = float(ping_interval) 200 assert 1.0 <= self.ping_interval <= 1000 201 except Exception: 202 raise Exception('Invalid ping_interval; must be between 1 and 1000') 203 else: 204 self.ping_interval = None 205 206 if zombie_interval: 207 self.zombie_interval = 60 * zombie_interval 208 if self.pulse_interval: 209 self.pulse_interval = min(self.pulse_interval, self.zombie_interval / 5.0) 210 else: 211 self.pulse_interval = self.zombie_interval / 5.0 212 else: 213 self.zombie_interval = None 214 215 self.pycos = Pycos() 216 atexit.register(self.shutdown) 217 218 self._clusters = {} 219 self.unsched_clusters = [] 220 self.pending_clusters = {} 221 self._sched_jobs = {} 222 self._sched_event = pycos.Event() 223 # once a _job is done (i.e., final result for it is 224 # received from node), it is added to done_jobs, so same 225 # object is not reused by Python (when a new job is 226 # submitted) until the result is sent back to client 227 # (otherwise, 'id' may be duplicate) 228 self.done_jobs = {} 229 self.terminate = False 230 self.sign = hashlib.sha1(os.urandom(20)) 231 for ext_ip_addr in self.addrinfos: 232 self.sign.update(ext_ip_addr.encode()) 233 self.sign = self.sign.hexdigest() 234 self.cluster_auth = auth_code(self.cluster_secret, self.sign) 235 self.node_auth = auth_code(self.node_secret, self.sign) 236 237 with open(os.path.join(self.dest_path_prefix, 'config'), 'wb') as fd: 238 config = { 239 'ip_addrs': [ai.ip for ai in self.addrinfos.values()], 240 'ext_ip_addrs': [ai.ext_ip_addr for ai in self.addrinfos.values()], 241 'port': self.port, 'sign': self.sign, 242 'cluster_secret': self.cluster_secret, 'cluster_auth': self.cluster_auth, 243 'node_secret': self.node_secret, 'node_auth': self.node_auth 244 } 245 pickle.dump(config, fd) 246 247 if scheduler_alg == 'fair_cluster': 248 self.select_job_node_cluster = self.fair_cluster_schedule 249 elif scheduler_alg == 'fcfs_cluster': 250 self.select_job_node_cluster = self.fcfs_cluster_schedule 251 else: 252 self.select_job_node_cluster = self.fsfs_job_schedule 253 254 self.start_time = time.time() 255 if http_server: 256 self.httpd = dispy.httpd.DispyHTTPServer(None) 257 else: 258 self.httpd = None 259 260 self.timer_task = Task(self.timer_proc) 261 self.job_scheduler_task = Task(self._schedule_jobs) 262 263 self.tcp_tasks = [] 264 self.udp_tasks = [] 265 self.scheduler_tasks = [] 266 udp_addrinfos = {} 267 for addrinfo in self.addrinfos.values(): 268 self.tcp_tasks.append(Task(self.tcp_server, addrinfo)) 269 self.scheduler_tasks.append(Task(self.scheduler_server, addrinfo)) 270 if os.name == 'nt': 271 bind_addr = addrinfo.ip 272 elif sys.platform == 'darwin': 273 if addrinfo.family == socket.AF_INET and (not self.ipv4_udp_multicast): 274 bind_addr = '' 275 else: 276 bind_addr = addrinfo.broadcast 277 else: 278 bind_addr = addrinfo.broadcast 279 udp_addrinfos[bind_addr] = addrinfo 280 281 for bind_addr, addrinfo in udp_addrinfos.items(): 282 self.udp_tasks.append(Task(self.udp_server, bind_addr, addrinfo)) 283 284 def udp_server(self, bind_addr, addrinfo, task=None): 285 task.set_daemon() 286 287 udp_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM)) 288 udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 289 if hasattr(socket, 'SO_REUSEPORT'): 290 try: 291 udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 292 except Exception: 293 pass 294 295 while not self.port: 296 yield task.sleep(0.2) 297 udp_sock.bind((bind_addr, self.port)) 298 if addrinfo.family == socket.AF_INET: 299 if self.ipv4_udp_multicast: 300 mreq = socket.inet_aton(addrinfo.broadcast) + socket.inet_aton(addrinfo.ip) 301 udp_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 302 else: # addrinfo.family == socket.AF_INET6: 303 mreq = socket.inet_pton(addrinfo.family, addrinfo.broadcast) 304 mreq += struct.pack('@I', addrinfo.ifn) 305 udp_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) 306 try: 307 udp_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) 308 except Exception: 309 pass 310 311 Task(self.broadcast_ping, addrinfos=[addrinfo]) 312 self.send_ping_cluster(self._node_allocs, set()) 313 314 while 1: 315 msg, addr = yield udp_sock.recvfrom(1000) 316 if msg.startswith(b'PING:'): 317 try: 318 info = deserialize(msg[len(b'PING:'):]) 319 if info['version'] != _dispy_version: 320 logger.warning('Ignoring %s due to version mismatch', addr[0]) 321 continue 322 assert info['port'] > 0 323 assert info['ip_addr'] 324 # socket.inet_aton(status['ip_addr']) 325 except Exception: 326 logger.debug('Ignoring node %s', addr[0]) 327 logger.debug(traceback.format_exc()) 328 continue 329 if info['port'] == self.port: 330 continue 331 auth = auth_code(self.node_secret, info['sign']) 332 node = self._nodes.get(info['ip_addr'], None) 333 if node: 334 if node.auth == auth: 335 continue 336 sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_STREAM), 337 keyfile=self.node_keyfile, certfile=self.node_certfile) 338 sock.settimeout(MsgTimeout) 339 msg = {'port': self.port, 'sign': self.sign, 'version': _dispy_version} 340 msg['ip_addrs'] = [ai.ext_ip_addr for ai in self.addrinfos.values()] 341 try: 342 yield sock.connect((info['ip_addr'], info['port'])) 343 yield sock.sendall(auth) 344 yield sock.send_msg(b'PING:' + serialize(msg)) 345 except Exception: 346 logger.debug(traceback.format_exc()) 347 finally: 348 sock.close() 349 else: 350 pass 351 352 def tcp_server(self, addrinfo, task=None): 353 # generator 354 task.set_daemon() 355 sock = socket.socket(addrinfo.family, socket.SOCK_STREAM) 356 sock = AsyncSocket(sock, keyfile=self.node_keyfile, certfile=self.node_certfile) 357 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 358 try: 359 sock.bind((addrinfo.ip, self.port)) 360 except Exception: 361 logger.debug('Could not bind TCP to %s:%s', addrinfo.ip, self.port) 362 raise StopIteration 363 logger.debug('TCP server at %s:%s', addrinfo.ip, self.port) 364 sock.listen(32) 365 366 while 1: 367 try: 368 conn, addr = yield sock.accept() 369 except ssl.SSLError as err: 370 logger.debug('SSL connection failed: %s', str(err)) 371 continue 372 except GeneratorExit: 373 break 374 except Exception: 375 logger.debug(traceback.format_exc()) 376 continue 377 Task(self.tcp_req, conn, addr) 378 379 def tcp_req(self, conn, addr, task=None): 380 # generator 381 conn.settimeout(MsgTimeout) 382 msg = yield conn.recv_msg() 383 if msg.startswith(b'JOB_REPLY:'): 384 try: 385 info = deserialize(msg[len(b'JOB_REPLY:'):]) 386 except Exception: 387 logger.warning('Invalid job reply from %s:%s ignored', addr[0], addr[1]) 388 else: 389 yield self.job_reply_process(info, len(msg), conn, addr) 390 conn.close() 391 392 elif msg.startswith(b'PULSE:'): 393 msg = msg[len(b'PULSE:'):] 394 try: 395 info = deserialize(msg) 396 except Exception: 397 logger.warning('Ignoring pulse message from %s', addr[0]) 398 conn.close() 399 raise StopIteration 400 node = self._nodes.get(info['ip_addr'], None) 401 if node: 402 # assert 0 <= info['cpus'] <= node.cpus 403 node.last_pulse = time.time() 404 yield conn.send_msg(b'PULSE') 405 if info['avail_info']: 406 node.avail_info = info['avail_info'] 407 for cluster in node.clusters: 408 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 409 if dispy_node: 410 dispy_node.avail_info = info['avail_info'] 411 dispy_node.update_time = node.last_pulse 412 Task(self.send_node_status, cluster, dispy_node, DispyNode.AvailInfo) 413 if cluster.status_callback: 414 cluster.status_callback(DispyNode.AvailInfo, dispy_node, None) 415 conn.close() 416 417 elif msg.startswith(b'PONG:'): 418 conn.close() 419 try: 420 info = deserialize(msg[len(b'PONG:'):]) 421 assert info['auth'] == self.node_auth 422 except Exception: 423 logger.warning('Ignoring node %s due to "secret" mismatch', addr[0]) 424 else: 425 self.add_node(info) 426 427 elif msg.startswith(b'PING:'): 428 sock_family = conn.family 429 conn.close() 430 try: 431 info = deserialize(msg[len(b'PING:'):]) 432 if info['version'] != _dispy_version: 433 logger.warning('Ignoring node %s due to version mismatch', addr[0]) 434 raise Exception('') 435 assert info['port'] > 0 436 assert info['ip_addr'] 437 except Exception: 438 logger.debug('Ignoring node %s', addr[0]) 439 logger.debug(traceback.format_exc()) 440 raise StopIteration 441 if info['port'] != self.port: 442 auth = auth_code(self.node_secret, info['sign']) 443 node = self._nodes.get(info['ip_addr'], None) 444 if node: 445 if node.auth == auth: 446 raise StopIteration 447 sock = AsyncSocket(socket.socket(sock_family, socket.SOCK_STREAM), 448 keyfile=self.node_keyfile, certfile=self.node_certfile) 449 sock.settimeout(MsgTimeout) 450 msg = {'port': self.port, 'sign': self.sign, 'version': _dispy_version} 451 msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()] 452 try: 453 yield sock.connect((info['ip_addr'], info['port'])) 454 yield sock.sendall(auth) 455 yield sock.send_msg(b'PING:' + serialize(msg)) 456 except Exception: 457 logger.debug(traceback.format_exc()) 458 finally: 459 sock.close() 460 461 elif msg.startswith(b'FILEXFER:'): 462 try: 463 xf = deserialize(msg[len(b'FILEXFER:'):]) 464 msg = yield conn.recv_msg() 465 job_reply = deserialize(msg) 466 yield self.xfer_to_client(job_reply, xf, conn, addr) 467 except Exception: 468 logger.debug(traceback.format_exc()) 469 conn.close() 470 471 elif msg.startswith(b'TERMINATED:'): 472 conn.close() 473 try: 474 info = deserialize(msg[len(b'TERMINATED:'):]) 475 node = self._nodes.get(info['ip_addr'], None) 476 if not node: 477 raise StopIteration 478 auth = auth_code(self.node_secret, info['sign']) 479 if auth != node.auth: 480 logger.warning('Invalid signature from %s', node.ip_addr) 481 raise StopIteration 482 logger.debug('Removing node %s', node.ip_addr) 483 del self._nodes[node.ip_addr] 484 if node.clusters: 485 dead_jobs = [_job for _job in self._sched_jobs.values() 486 if _job.node is not None and _job.node.ip_addr == node.ip_addr] 487 clusters = list(node.clusters) 488 node.clusters.clear() 489 for cluster in clusters: 490 dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None) 491 if not dispy_node: 492 continue 493 Task(self.send_node_status, cluster, dispy_node, DispyNode.Closed) 494 self.reschedule_jobs(dead_jobs) 495 except Exception: 496 # logger.debug(traceback.format_exc()) 497 pass 498 499 elif msg.startswith(b'NODE_CPUS:'): 500 conn.close() 501 try: 502 info = deserialize(msg[len(b'NODE_CPUS:'):]) 503 node = self._nodes.get(info['ip_addr'], None) 504 if not node: 505 raise StopIteration 506 auth = auth_code(self.node_secret, info['sign']) 507 if auth != node.auth: 508 logger.warning('Invalid signature from %s', node.ip_addr) 509 raise StopIteration 510 cpus = info['cpus'] 511 except Exception: 512 logger.debug(traceback.format_exc()) 513 raise StopIteration 514 if cpus < 0: 515 logger.warning('Node requested using %s CPUs, disabling it', node.ip_addr, cpus) 516 cpus = 0 517 logger.debug('Setting cpus for %s to %s', node.ip_addr, cpus) 518 # TODO: set node.cpus to min(cpus, node.cpus)? 519 node.cpus = cpus 520 if cpus > node.avail_cpus: 521 node.avail_cpus = cpus 522 node_computations = [] 523 for cluster in self._clusters.values(): 524 if cluster in node.clusters: 525 continue 526 compute = cluster._compute 527 for node_alloc in cluster._node_allocs: 528 cpus = node_alloc.allocate(cluster, node.ip_addr, node.name, 529 node.avail_cpus) 530 if cpus <= 0: 531 continue 532 node.cpus = min(node.avail_cpus, cpus) 533 node_computations.append(compute) 534 break 535 if node_computations: 536 Task(self.setup_node, node, node_computations) 537 yield self._sched_event.set() 538 else: 539 node.avail_cpus = cpus 540 for cluster in node.clusters: 541 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 542 if dispy_node: 543 dispy_node.cpus = cpus 544 545 elif msg.startswith(b'RELAY_INFO:'): 546 try: 547 info = deserialize(msg[len(b'RELAY_INFO:'):]) 548 assert info['version'] == _dispy_version 549 msg = {'sign': self.sign, 'ip_addrs': [info['scheduler_ip_addr']], 550 'port': self.port} 551 if 'auth' in info and info['auth'] != self.node_auth: 552 msg = None 553 except Exception: 554 msg = None 555 yield conn.send_msg(serialize(msg)) 556 conn.close() 557 558 else: 559 logger.warning('Invalid message from %s:%s ignored', addr[0], addr[1]) 560 conn.close() 561 562 def schedule_cluster(self, task=None): 563 while self.unsched_clusters: 564 cluster = self.unsched_clusters[0] 565 if self._clusters: 566 if cluster.exclusive: 567 raise StopIteration 568 for cur_cluster in self._clusters.values(): 569 if cur_cluster.exclusive: 570 raise StopIteration 571 break 572 self.unsched_clusters.pop(0) 573 reply_sock = socket.socket(cluster.client_sock_family, socket.SOCK_STREAM) 574 reply_sock = AsyncSocket(reply_sock, keyfile=self.cluster_keyfile, 575 certfile=self.cluster_certfile) 576 reply_sock.settimeout(MsgTimeout) 577 reply = {'compute_id': cluster._compute.id, 'pulse_interval': self.pulse_interval} 578 self._clusters[cluster._compute.id] = cluster 579 try: 580 yield reply_sock.connect((cluster.client_ip_addr, cluster.client_job_result_port)) 581 yield reply_sock.send_msg('SCHEDULED:'.encode() + serialize(reply)) 582 msg = yield reply_sock.recv_msg() 583 assert msg == 'ACK'.encode() 584 self.add_cluster(cluster) 585 except Exception: 586 self._clusters.pop(cluster._compute.id, None) 587 logger.debug('Ignoring computation %s / %s from %s:%s', 588 cluster._compute.name, cluster._compute.id, 589 cluster.client_ip_addr, cluster.client_job_result_port) 590 continue 591 finally: 592 reply_sock.close() 593 594 def scheduler_server(self, addrinfo, task=None): 595 task.set_daemon() 596 sock = socket.socket(addrinfo.family, socket.SOCK_STREAM) 597 sock = AsyncSocket(sock, keyfile=self.cluster_keyfile, certfile=self.cluster_certfile) 598 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 599 try: 600 sock.bind((addrinfo.ip, self.scheduler_port)) 601 except Exception: 602 logger.warning('Could not bind scheduler server to %s:%s', 603 addrinfo.ip, self.scheduler_port) 604 raise StopIteration 605 logger.debug('Scheduler at %s:%s', addrinfo.ip, self.scheduler_port) 606 sock.listen(32) 607 while 1: 608 conn, addr = yield sock.accept() 609 Task(self.scheduler_req, conn, addr) 610 611 def scheduler_req(self, conn, addr, task=None): 612 # generator 613 def _job_request(self, cluster, node, _job): 614 # generator 615 _job.uid = id(_job) 616 for xf in _job.xfer_files: 617 xf.name = os.path.join(cluster.dest_path, xf.dest_path.replace(xf.sep, os.sep), 618 xf.name.split(xf.sep)[-1]) 619 xf.sep = os.sep 620 621 job = DispyJob(None, (), {}) 622 job.id = _job.uid 623 _job.job = job 624 yield conn.send_msg(serialize(_job.uid)) 625 ack = yield conn.recv_msg() 626 if ack != b'ACK': 627 raise StopIteration 628 if node: 629 _job.pinned = node 630 node.pending_jobs.append(_job) 631 else: 632 _job.pinned = None 633 cluster._jobs.append(_job) 634 logger.debug('Submitted job %s / %s', _job.uid, job.submit_time) 635 cluster.pending_jobs += 1 636 cluster.last_pulse = job.submit_time 637 self._sched_event.set() 638 if cluster.status_callback: 639 cluster.status_callback(DispyJob.Created, None, job) 640 641 def _compute_req(self, msg): 642 # function 643 try: 644 req = deserialize(msg) 645 compute = req['compute'] 646 node_allocs = req['node_allocs'] 647 exclusive = req['exclusive'] 648 except Exception: 649 return serialize(('Invalid computation').encode()) 650 for xf in compute.xfer_files: 651 if MaxFileSize and xf.stat_buf.st_size > MaxFileSize: 652 return serialize(('File "%s" is too big; limit is %s' % 653 (xf.name, MaxFileSize)).encode()) 654 if self.terminate: 655 return serialize(('Scheduler is closing').encode()) 656 if self.cleanup_nodes and not compute.cleanup: 657 compute.cleanup = True 658 cluster = _Cluster(compute, node_allocs, self) 659 cluster.ip_addr = conn.getsockname()[0] 660 cluster.exclusive = exclusive 661 dest = compute.scheduler_ip_addr 662 if os.name == 'nt': 663 dest = dest.replace(':', '_') 664 dest = os.path.join(self.dest_path_prefix, dest) 665 if not os.path.isdir(dest): 666 try: 667 os.mkdir(dest) 668 except Exception: 669 return serialize(('Could not create destination directory').encode()) 670 if compute.dest_path and isinstance(compute.dest_path, str): 671 # TODO: get os.sep from client and convert (in case of mixed environments)? 672 if compute.dest_path.startswith(os.sep): 673 cluster.dest_path = compute.dest_path 674 else: 675 cluster.dest_path = os.path.join(dest, compute.dest_path) 676 if not os.path.isdir(cluster.dest_path): 677 try: 678 os.makedirs(cluster.dest_path) 679 except Exception: 680 return serialize(('Could not create destination directory').encode()) 681 else: 682 cluster.dest_path = tempfile.mkdtemp(prefix=compute.name + '_', dir=dest) 683 684 compute.id = id(compute) 685 cluster.client_job_result_port = compute.job_result_port 686 cluster.client_ip_addr = compute.scheduler_ip_addr 687 cluster.client_port = compute.scheduler_port 688 cluster.client_sock_family = conn.family 689 cluster.client_auth = compute.auth 690 compute.job_result_port = self.port 691 compute.scheduler_port = self.port 692 compute.auth = hashlib.sha1(os.urandom(10)).hexdigest() 693 cluster.last_pulse = time.time() 694 for xf in compute.xfer_files: 695 xf.compute_id = compute.id 696 xf.name = os.path.join(cluster.dest_path, xf.dest_path.replace(xf.sep, os.sep), 697 xf.name.split(xf.sep)[-1]) 698 xf.sep = os.sep 699 700 with open(os.path.join(self.dest_path_prefix, 701 '%s_%s' % (compute.id, cluster.client_auth)), 'wb') as fd: 702 pickle.dump(cluster, fd) 703 self.pending_clusters[cluster._compute.id] = cluster 704 logger.debug('New computation %s: %s, %s', 705 compute.id, compute.name, cluster.dest_path) 706 return serialize({'compute_id': cluster._compute.id, 'auth': cluster.client_auth}) 707 708 def xfer_from_client(self, msg): 709 # generator 710 try: 711 xf = deserialize(msg) 712 except Exception: 713 logger.debug('Ignoring file trasnfer request from %s', addr[0]) 714 raise StopIteration(serialize(-1)) 715 cluster = self.pending_clusters.get(xf.compute_id, None) 716 if not cluster: 717 # if file is transfered for 'dispy_job_depends', cluster would be active 718 cluster = self._clusters.get(xf.compute_id, None) 719 if not cluster: 720 logger.error('Computation "%s" is invalid', xf.compute_id) 721 raise StopIteration(serialize(-1)) 722 tgt = os.path.join(cluster.dest_path, xf.dest_path.replace(xf.sep, os.sep), 723 xf.name.split(xf.sep)[-1]) 724 if os.path.isfile(tgt) and _same_file(tgt, xf): 725 if tgt in cluster.file_uses: 726 cluster.file_uses[tgt] += 1 727 else: 728 cluster.file_uses[tgt] = 2 729 raise StopIteration(serialize(xf.stat_buf.st_size)) 730 logger.debug('Copying file %s to %s (%s)', xf.name, tgt, xf.stat_buf.st_size) 731 try: 732 if not os.path.isdir(os.path.dirname(tgt)): 733 os.makedirs(os.path.dirname(tgt)) 734 with open(tgt, 'wb') as fd: 735 recvd = 0 736 while recvd < xf.stat_buf.st_size: 737 yield conn.send_msg(serialize(recvd)) 738 data = yield conn.recvall(min(xf.stat_buf.st_size-recvd, 1024000)) 739 if not data: 740 break 741 fd.write(data) 742 recvd += len(data) 743 assert recvd == xf.stat_buf.st_size 744 os.utime(tgt, (xf.stat_buf.st_atime, xf.stat_buf.st_mtime)) 745 os.chmod(tgt, stat.S_IMODE(xf.stat_buf.st_mode)) 746 if tgt in cluster.file_uses: 747 cluster.file_uses[tgt] += 1 748 else: 749 cluster.file_uses[tgt] = 1 750 logger.debug('Copied file %s', tgt) 751 except Exception: 752 logger.warning('Copying file "%s" failed with "%s"', 753 xf.name, traceback.format_exc()) 754 recvd = -1 755 try: 756 os.remove(tgt) 757 if len(os.listdir(cluster.dest_path)) == 0: 758 os.rmdir(cluster.dest_path) 759 except Exception: 760 pass 761 raise StopIteration(serialize(recvd)) 762 763 def send_file(self, msg): 764 # generator 765 try: 766 msg = deserialize(msg) 767 node = self._nodes.get(msg['node'], None) 768 xf = msg['xf'] 769 except Exception: 770 logger.debug('Ignoring file trasnfer request from %s', addr[0]) 771 raise StopIteration(serialize(-1)) 772 cluster = self._clusters.get(xf.compute_id, None) 773 if not cluster or not node: 774 logger.error('send_file "%s" is invalid', xf.name) 775 raise StopIteration(serialize(-1)) 776 dispy_node = cluster._dispy_nodes.get(node.ip_addr) 777 if not dispy_node: 778 logger.error('send_file "%s" is invalid', xf.name) 779 raise StopIteration(serialize(-1)) 780 if _same_file(xf.name, xf): 781 resp = yield node.xfer_file(xf) 782 if resp < 0: 783 raise StopIteration(serialize(-1)) 784 else: 785 node.tx += resp 786 dispy_node.tx += resp 787 raise StopIteration(serialize(xf.stat_buf.st_size)) 788 789 node_sock = AsyncSocket(socket.socket(node.sock_family, socket.SOCK_STREAM), 790 keyfile=self.node_keyfile, certfile=self.node_certfile) 791 node_sock.settimeout(MsgTimeout) 792 try: 793 yield node_sock.connect((node.ip_addr, node.port)) 794 yield node_sock.sendall(node.auth) 795 yield node_sock.send_msg('FILEXFER:'.encode() + serialize(xf)) 796 recvd = yield node_sock.recv_msg() 797 recvd = deserialize(recvd) 798 while recvd < xf.stat_buf.st_size: 799 yield conn.send_msg(serialize(recvd)) 800 data = yield conn.recvall(min(xf.stat_buf.st_size-recvd, 1024000)) 801 if not data: 802 break 803 yield node_sock.sendall(data) 804 recvd = yield node_sock.recv_msg() 805 recvd = deserialize(recvd) 806 node.tx += recvd 807 dispy_node.tx += recvd 808 except Exception: 809 logger.error('Could not transfer %s to %s: %s', xf.name, node.ip_addr, recvd) 810 logger.debug(traceback.format_exc()) 811 # TODO: mark this node down, reschedule on different node? 812 recvd = -1 813 finally: 814 node_sock.close() 815 raise StopIteration(serialize(recvd)) 816 817 # scheduler_req begins here 818 conn.settimeout(MsgTimeout) 819 resp = None 820 try: 821 req = yield conn.recvall(len(self.cluster_auth)) 822 except Exception: 823 logger.warning('Failed to read message from %s: %s', str(addr), traceback.format_exc()) 824 conn.close() 825 raise StopIteration 826 827 if req != self.cluster_auth: 828 msg = yield conn.recv_msg() 829 if msg.startswith(b'CLIENT:'): 830 try: 831 req = deserialize(msg[len(b'CLIENT:'):]) 832 if req['version'] != _dispy_version: 833 logger.warning('Ignoring %s due to version mismatch', addr[0]) 834 raise Exception('') 835 if not req['ip_addr']: 836 req['ip_addr'] = addr[0] 837 reply = {'ip_addr': req['ip_addr'], 'port': self.scheduler_port, 838 'sign': self.sign, 'version': _dispy_version} 839 yield conn.send_msg(serialize(reply)) 840 except Exception: 841 pass 842 else: 843 logger.warning('Invalid/unauthorized request ignored') 844 conn.close() 845 raise StopIteration 846 msg = yield conn.recv_msg() 847 if not msg: 848 logger.info('Closing connection') 849 conn.close() 850 raise StopIteration 851 852 if msg.startswith(b'JOB:'): 853 msg = msg[len(b'JOB:'):] 854 try: 855 req = deserialize(msg) 856 _job = req['job'] 857 cluster = self._clusters[_job.compute_id] 858 assert cluster.client_auth == req['auth'] 859 node = req['node'] 860 if node: 861 node = self._nodes[node] 862 except Exception: 863 pass 864 else: 865 yield _job_request(self, cluster, node, _job) 866 resp = None 867 868 elif msg.startswith(b'PULSE:'): 869 msg = msg[len(b'PULSE:'):] 870 try: 871 info = deserialize(msg) 872 except Exception: 873 logger.warning('Ignoring pulse message from %s', addr[0]) 874 conn.close() 875 raise StopIteration 876 if 'client_port' in info: 877 for cluster in self._clusters.values(): 878 if (cluster.client_ip_addr == addr[0] and 879 cluster.client_port == info['client_port']): 880 cluster.last_pulse = time.time() 881 conn.close() 882 883 elif msg.startswith(b'COMPUTE:'): 884 msg = msg[len(b'COMPUTE:'):] 885 resp = _compute_req(self, msg) 886 887 elif msg.startswith(b'SCHEDULE:'): 888 msg = msg[len(b'SCHEDULE:'):] 889 try: 890 req = deserialize(msg) 891 cluster = self.pending_clusters[req['compute_id']] 892 assert cluster.client_auth == req['auth'] 893 for xf in cluster._compute.xfer_files: 894 assert os.path.isfile(xf.name) 895 self.unsched_clusters.append(cluster) 896 self.pending_clusters.pop(cluster._compute.id) 897 except Exception: 898 logger.debug('Ignoring schedule request from %s', addr[0]) 899 resp = 'NAK'.encode() 900 else: 901 resp = 'ACK'.encode() 902 Task(self.schedule_cluster) 903 904 elif msg.startswith(b'CLOSE:'): 905 msg = msg[len(b'CLOSE:'):] 906 try: 907 req = deserialize(msg) 908 auth = req['auth'] 909 except Exception: 910 logger.warning('Invalid compuation for deleting') 911 conn.close() 912 raise StopIteration 913 cluster = self._clusters.get(req['compute_id'], None) 914 if cluster is None or cluster.client_auth != auth: 915 # this cluster is closed 916 conn.close() 917 raise StopIteration 918 cluster.zombie = True 919 terminate_pending = req.get('terminate_pending', False) 920 Task(self.cleanup_computation, cluster, terminate_pending=bool(terminate_pending)) 921 922 elif msg.startswith(b'FILEXFER:'): 923 msg = msg[len(b'FILEXFER:'):] 924 resp = yield xfer_from_client(self, msg) 925 926 elif msg.startswith(b'SENDFILE:'): 927 msg = msg[len(b'SENDFILE:'):] 928 resp = yield send_file(self, msg) 929 930 elif msg.startswith(b'NODE_JOBS:'): 931 msg = msg[len(b'NODE_JOBS:'):] 932 try: 933 req = deserialize(msg) 934 cluster = self._clusters.get(req['compute_id'], None) 935 if cluster is None or cluster.client_auth != req['auth']: 936 job_uids = [] 937 else: 938 node = req['node'] 939 from_node = req['from_node'] 940 # assert req['get_uids'] == True 941 job_uids = yield self.node_jobs(cluster, node, from_node, 942 get_uids=True, task=task) 943 except Exception: 944 job_uids = [] 945 resp = serialize(job_uids) 946 947 elif msg.startswith(b'TERMINATE_JOB:'): 948 msg = msg[len(b'TERMINATE_JOB:'):] 949 try: 950 req = deserialize(msg) 951 uid = req['uid'] 952 cluster = self._clusters[req['compute_id']] 953 assert cluster.client_auth == req['auth'] 954 except Exception: 955 logger.warning('Invalid job cancel message') 956 conn.close() 957 raise StopIteration 958 self.cancel_job(cluster, uid) 959 960 elif msg.startswith(b'RESEND_JOB_RESULTS:'): 961 msg = msg[len(b'RESEND_JOB_RESULTS:'):] 962 try: 963 info = deserialize(msg) 964 compute_id = info['compute_id'] 965 auth = info['auth'] 966 except Exception: 967 resp = serialize(0) 968 else: 969 cluster = self._clusters.get(compute_id, None) 970 if cluster is None or cluster.client_auth != auth: 971 try: 972 with open(os.path.join(self.dest_path_prefix, 973 '%s_%s' % (compute_id, auth)), 'rb') as fd: 974 cluster = pickle.load(fd) 975 except Exception: 976 pass 977 if cluster is None: 978 resp = 0 979 else: 980 resp = cluster.pending_results + cluster.pending_jobs 981 yield conn.send_msg(serialize(resp)) 982 conn.close() 983 if resp > 0: 984 yield self.resend_job_results(cluster, task=task) 985 raise StopIteration 986 987 elif msg.startswith(b'PENDING_JOBS:'): 988 msg = msg[len(b'PENDING_JOBS:'):] 989 reply = {'done': [], 'pending': 0} 990 try: 991 info = deserialize(msg) 992 compute_id = info['compute_id'] 993 auth = info['auth'] 994 except Exception: 995 pass 996 else: 997 cluster = self._clusters.get(compute_id, None) 998 if cluster is None or cluster.client_auth != auth: 999 with open(os.path.join(self.dest_path_prefix, 1000 '%s_%s' % (compute_id, auth)), 'rb') as fd: 1001 cluster = pickle.load(fd) 1002 if cluster is not None and cluster.client_auth == auth: 1003 done = [] 1004 if cluster.pending_results: 1005 for result_file in glob.glob(os.path.join(cluster.dest_path, 1006 '_dispy_job_reply_*')): 1007 result_file = os.path.basename(result_file) 1008 try: 1009 uid = int(result_file[len('_dispy_job_reply_'):]) 1010 except Exception: 1011 pass 1012 else: 1013 done.append(uid) 1014 # limit so as not to take up too much time 1015 if len(done) > 50: 1016 break 1017 reply['done'] = done 1018 reply['pending'] = cluster.pending_jobs 1019 resp = serialize(reply) 1020 1021 elif msg.startswith(b'RETRIEVE_JOB:'): 1022 msg = msg[len(b'RETRIEVE_JOB:'):] 1023 yield self.retrieve_job_req(conn, msg) 1024 1025 elif msg.startswith(b'ALLOCATE_NODE:'): 1026 req = msg[len(b'ALLOCATE_NODE:'):] 1027 try: 1028 req = deserialize(req) 1029 cluster = self._clusters[req['compute_id']] 1030 assert cluster.client_auth == req['auth'] 1031 resp = yield self.allocate_node(cluster, req['node_alloc'], task=task) 1032 resp = serialize(resp) 1033 except Exception: 1034 resp = serialize(-1) 1035 1036 elif msg.startswith(b'DEALLOCATE_NODE:'): 1037 req = msg[len(b'DEALLOCATE_NODE:'):] 1038 try: 1039 req = deserialize(req) 1040 cluster = self._clusters[req['compute_id']] 1041 assert cluster.client_auth == req['auth'] 1042 resp = yield self.deallocate_node(cluster, req['node'], task=task) 1043 resp = serialize(resp) 1044 except Exception: 1045 resp = serialize(-1) 1046 1047 elif msg.startswith(b'CLOSE_NODE:'): 1048 req = msg[len(b'CLOSE_NODE:'):] 1049 try: 1050 req = deserialize(req) 1051 cluster = self._clusters[req['compute_id']] 1052 assert cluster.client_auth == req['auth'] 1053 resp = yield self.close_node(cluster, req['node'], 1054 terminate_pending=req['terminate_pending'], task=task) 1055 resp = serialize(resp) 1056 except Exception: 1057 resp = serialize(-1) 1058 1059 elif msg.startswith(b'SET_NODE_CPUS:'): 1060 req = msg[len(b'SET_NODE_CPUS:'):] 1061 try: 1062 req = deserialize(req) 1063 cluster = self._clusters[req['compute_id']] 1064 assert cluster.client_auth == req['auth'] 1065 # for shared cluster, changing cpus may not be valid, as we 1066 # don't maintain cpus per cluster 1067 resp = yield self.set_node_cpus(req['node'], req['cpus']) 1068 except Exception: 1069 logger.debug(traceback.format_exc()) 1070 resp = (None, -1) 1071 resp = serialize(resp) 1072 1073 else: 1074 logger.debug('Ignoring invalid command') 1075 1076 if resp is not None: 1077 try: 1078 yield conn.send_msg(resp) 1079 except Exception: 1080 logger.warning('Failed to send response to %s: %s', 1081 str(addr), traceback.format_exc()) 1082 conn.close() 1083 # end of scheduler_req 1084 1085 def resend_job_results(self, cluster, task=None): 1086 # TODO: limit number queued so as not to take up too much space/time 1087 result_files = [f for f in os.listdir(cluster.dest_path) 1088 if f.startswith('_dispy_job_reply_')] 1089 result_files = result_files[:min(len(result_files), 64)] 1090 for result_file in result_files: 1091 result_file = os.path.join(cluster.dest_path, result_file) 1092 try: 1093 with open(result_file, 'rb') as fd: 1094 result = pickle.load(fd) 1095 except Exception: 1096 logger.debug('Could not load "%s"', result_file) 1097 else: 1098 status = yield self.send_job_result( 1099 result.uid, cluster, result, resending=True, task=task) 1100 if status: 1101 break 1102 1103 def timer_proc(self, task=None): 1104 task.set_daemon() 1105 reset = True 1106 last_ping_time = last_pulse_time = last_zombie_time = time.time() 1107 while 1: 1108 if reset: 1109 timeout = num_min(self.pulse_interval, self.ping_interval, self.zombie_interval) 1110 1111 reset = yield task.suspend(timeout) 1112 if reset: 1113 continue 1114 1115 now = time.time() 1116 if self.pulse_interval and (now - last_pulse_time) >= self.pulse_interval: 1117 last_pulse_time = now 1118 dead_nodes = {} 1119 for node in self._nodes.values(): 1120 if node.busy and (node.last_pulse + (5 * self.pulse_interval)) < now: 1121 logger.warning('Node %s is not responding; removing it (%s, %s, %s)', 1122 node.ip_addr, node.busy, node.last_pulse, now) 1123 dead_nodes[node.ip_addr] = node 1124 for ip_addr in dead_nodes: 1125 node = self._nodes.pop(ip_addr, None) 1126 clusters = list(node.clusters) 1127 node.clusters.clear() 1128 for cluster in clusters: 1129 dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None) 1130 if not dispy_node: 1131 continue 1132 Task(self.send_node_status, cluster, dispy_node, DispyNode.Closed) 1133 1134 dead_jobs = [_job for _job in self._sched_jobs.values() 1135 if _job.node is not None and _job.node.ip_addr in dead_nodes] 1136 self.reschedule_jobs(dead_jobs) 1137 resend = [resend_cluster for resend_cluster in self._clusters.values() 1138 if resend_cluster.pending_results and not resend_cluster.zombie] 1139 for cluster in resend: 1140 Task(self.resend_job_results, cluster) 1141 1142 if self.ping_interval and (now - last_ping_time) >= self.ping_interval: 1143 last_ping_time = now 1144 for cluster in self._clusters.values(): 1145 self.send_ping_cluster(cluster._node_allocs, 1146 set(cluster._dispy_nodes.keys())) 1147 self.send_ping_cluster(self._node_allocs, set()) 1148 1149 if self.zombie_interval and (now - last_zombie_time) >= self.zombie_interval: 1150 last_zombie_time = now 1151 for cluster in self._clusters.values(): 1152 if (now - cluster.last_pulse) > self.zombie_interval: 1153 cluster.zombie = True 1154 zombies = [cluster for cluster in self._clusters.values() 1155 if cluster.zombie and cluster.pending_jobs == 0] 1156 for cluster in zombies: 1157 logger.debug('Deleting zombie computation "%s" / %s', 1158 cluster._compute.name, cluster._compute.id) 1159 Task(self.cleanup_computation, cluster) 1160 zombies = [cluster for cluster in self.pending_clusters.values() 1161 if (now - cluster.last_pulse) > self.zombie_interval] 1162 for cluster in zombies: 1163 logger.debug('Deleting zombie computation "%s" / %s', 1164 cluster._compute.name, cluster._compute.id) 1165 path = os.path.join(self.dest_path_prefix, 1166 '%s_%s' % (cluster._compute.id, cluster.client_auth)) 1167 if os.path.isfile(path): 1168 os.remove(path) 1169 try: 1170 shutil.rmtree(cluster.dest_path) 1171 except Exception: 1172 logger.debug(traceback.format_exc()) 1173 self.pending_clusters.pop(cluster._compute.id, None) 1174 1175 def xfer_to_client(self, job_reply, xf, conn, addr): 1176 _job = self._sched_jobs.get(job_reply.uid, None) 1177 if _job is None or _job.hash != job_reply.hash: 1178 logger.warning('Ignoring invalid file transfer from job %s at %s', 1179 job_reply.uid, addr[0]) 1180 yield conn.send_msg(serialize(-1)) 1181 raise StopIteration 1182 node = self._nodes.get(job_reply.ip_addr, None) 1183 cluster = self._clusters.get(_job.compute_id, None) 1184 if not node or not cluster: 1185 logger.warning('Ignoring invalid file transfer from job %s at %s', 1186 job_reply.uid, addr[0]) 1187 yield conn.send_msg(serialize(-1)) 1188 raise StopIteration 1189 node.last_pulse = time.time() 1190 client_sock = AsyncSocket(socket.socket(node.sock_family, socket.SOCK_STREAM), 1191 keyfile=self.cluster_keyfile, certfile=self.cluster_certfile) 1192 client_sock.settimeout(MsgTimeout) 1193 try: 1194 yield client_sock.connect((cluster.client_ip_addr, cluster.client_job_result_port)) 1195 yield client_sock.send_msg('FILEXFER:'.encode() + serialize(xf)) 1196 yield client_sock.send_msg(serialize(job_reply)) 1197 1198 recvd = yield client_sock.recv_msg() 1199 recvd = deserialize(recvd) 1200 while recvd < xf.stat_buf.st_size: 1201 yield conn.send_msg(serialize(recvd)) 1202 data = yield conn.recvall(min(xf.stat_buf.st_size-recvd, 1024000)) 1203 if not data: 1204 break 1205 yield client_sock.sendall(data) 1206 recvd = yield client_sock.recv_msg() 1207 recvd = deserialize(recvd) 1208 yield conn.send_msg(serialize(recvd)) 1209 except Exception: 1210 yield conn.send_msg(serialize(-1)) 1211 finally: 1212 client_sock.close() 1213 conn.close() 1214 1215 def send_ping_node(self, ip_addr, port=None, task=None): 1216 ping_msg = {'version': _dispy_version, 'sign': self.sign, 'port': self.port, 1217 'node_ip_addr': ip_addr} 1218 ping_msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()] 1219 if not port: 1220 port = self.node_port 1221 if re.match('\d+\.', ip_addr): 1222 sock_family = socket.AF_INET 1223 else: 1224 sock_family = socket.AF_INET6 1225 tcp_sock = AsyncSocket(socket.socket(sock_family, socket.SOCK_STREAM), 1226 keyfile=self.node_keyfile, certfile=self.node_certfile) 1227 tcp_sock.settimeout(MsgTimeout) 1228 try: 1229 yield tcp_sock.connect((ip_addr, port)) 1230 yield tcp_sock.sendall(b'x' * len(self.node_auth)) 1231 yield tcp_sock.send_msg(b'PING:' + serialize(ping_msg)) 1232 except Exception: 1233 pass 1234 tcp_sock.close() 1235 1236 def broadcast_ping(self, addrinfos=[], port=None, task=None): 1237 # generator 1238 if not port: 1239 port = self.node_port 1240 ping_msg = {'version': _dispy_version, 'sign': self.sign, 'port': self.port} 1241 ping_msg['ip_addrs'] = [addrinfo.ext_ip_addr for addrinfo in self.addrinfos.values()] 1242 if not addrinfos: 1243 addrinfos = list(self.addrinfos.values()) 1244 for addrinfo in addrinfos: 1245 bc_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM)) 1246 bc_sock.settimeout(MsgTimeout) 1247 ttl_bin = struct.pack('@i', 1) 1248 if addrinfo.family == socket.AF_INET: 1249 if self.ipv4_udp_multicast: 1250 bc_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin) 1251 else: 1252 bc_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 1253 else: # addrinfo.family == socket.AF_INET6 1254 bc_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_bin) 1255 bc_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn) 1256 bc_sock.bind((addrinfo.ip, 0)) 1257 try: 1258 yield bc_sock.sendto(b'PING:' + serialize(ping_msg), (addrinfo.broadcast, port)) 1259 except Exception: 1260 pass 1261 bc_sock.close() 1262 1263 def send_ping_cluster(self, node_allocs, present_ip_addrs, task=None): 1264 for node_alloc in node_allocs: 1265 # TODO: we assume subnets are indicated by '*', instead of 1266 # subnet mask; this is a limitation, but specifying with 1267 # subnet mask a bit cumbersome. 1268 if node_alloc.ip_rex.find('*') >= 0: 1269 Task(self.broadcast_ping, addrinfos=[], port=node_alloc.port) 1270 else: 1271 ip_addr = node_alloc.ip_addr 1272 if ip_addr in present_ip_addrs: 1273 continue 1274 port = node_alloc.port 1275 Task(self.send_ping_node, ip_addr, port) 1276 1277 def add_cluster(self, cluster): 1278 compute = cluster._compute 1279 compute.pulse_interval = self.pulse_interval 1280 if self.httpd and cluster.status_callback is None: 1281 self.httpd.add_cluster(cluster) 1282 # TODO: should we allow clients to add new nodes, or use only 1283 # the nodes initially created with command-line? 1284 self.send_ping_cluster(cluster._node_allocs, set(cluster._dispy_nodes.keys())) 1285 compute_nodes = [] 1286 for ip_addr, node in self._nodes.items(): 1287 if cluster in node.clusters: 1288 continue 1289 for node_alloc in cluster._node_allocs: 1290 cpus = node_alloc.allocate(cluster, node.ip_addr, node.name, node.avail_cpus) 1291 if cpus <= 0: 1292 continue 1293 if cluster.exclusive or self.cooperative: 1294 node.cpus = min(node.avail_cpus, cpus) 1295 compute_nodes.append(node) 1296 for node in compute_nodes: 1297 Task(self.setup_node, node, [compute]) 1298 1299 def cleanup_computation(self, cluster, terminate_pending=False, task=None): 1300 # generator 1301 if not cluster.zombie: 1302 raise StopIteration 1303 1304 compute = cluster._compute 1305 cid = compute.id 1306 pkl_path = os.path.join(self.dest_path_prefix, 1307 '%s_%s' % (cid, cluster.client_auth)) 1308 if self._clusters.pop(cid, None) is None: 1309 if not cluster.pending_results: 1310 try: 1311 os.remove(pkl_path) 1312 except Exception: 1313 logger.debug(traceback.format_exc()) 1314 pass 1315 raise StopIteration 1316 cluster._jobs = [] 1317 cluster.pending_jobs = 0 1318 1319 if cluster.pending_results == 0: 1320 try: 1321 os.remove(pkl_path) 1322 except Exception: 1323 logger.warning('Could not remove "%s"', pkl_path) 1324 else: 1325 with open(pkl_path, 'wb') as fd: 1326 pickle.dump(cluster, fd) 1327 1328 for path, use_count in cluster.file_uses.items(): 1329 if use_count == 1: 1330 try: 1331 os.remove(path) 1332 except Exception: 1333 logger.warning('Could not remove "%s"', path) 1334 cluster.file_uses.clear() 1335 1336 if os.path.isdir(cluster.dest_path): 1337 for dirpath, dirnames, filenames in os.walk(cluster.dest_path, topdown=False): 1338 if not filenames or dirpath.endswith('__pycache__'): 1339 try: 1340 shutil.rmtree(dirpath) 1341 except Exception: 1342 logger.warning('Could not remove "%s"', dirpath) 1343 break 1344 1345 # remove cluster from all nodes before closing (which uses 1346 # yield); otherwise, scheduler may access removed cluster 1347 # through node.clusters 1348 close_nodes = [] 1349 for dispy_node in cluster._dispy_nodes.values(): 1350 node = self._nodes.get(dispy_node.ip_addr, None) 1351 if not node: 1352 continue 1353 drop_jobs = [i for i, _job in enumerate(node.pending_jobs) 1354 if _job.compute_id == cid] 1355 for i in reversed(drop_jobs): 1356 node.pending_jobs.remove(i) 1357 node.clusters.discard(cluster) 1358 if cluster.exclusive: 1359 node.cpus = node.avail_cpus 1360 close_nodes.append((Task(node.close, compute, terminate_pending=terminate_pending), 1361 dispy_node)) 1362 cluster._dispy_nodes.clear() 1363 for close_task, dispy_node in close_nodes: 1364 yield close_task.finish() 1365 yield self.send_node_status(cluster, dispy_node, DispyNode.Closed) 1366 if self.httpd: 1367 self.httpd.del_cluster(cluster) 1368 Task(self.schedule_cluster) 1369 1370 def setup_node(self, node, computes, task=None): 1371 # generator 1372 task.set_daemon() 1373 for compute in computes: 1374 # NB: to avoid computation being sent multiple times, we 1375 # add to cluster's _dispy_nodes before sending computation 1376 # to node 1377 cluster = self._clusters[compute.id] 1378 if node.ip_addr in cluster._dispy_nodes: 1379 continue 1380 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 1381 if not dispy_node: 1382 dispy_node = DispyNode(node.ip_addr, node.name, node.cpus) 1383 cluster._dispy_nodes[node.ip_addr] = dispy_node 1384 dispy_node.tx = node.tx 1385 dispy_node.rx = node.rx 1386 dispy_node.avail_cpus = node.avail_cpus 1387 dispy_node.avail_info = node.avail_info 1388 r = yield node.setup(compute, exclusive=cluster.exclusive, task=task) 1389 if r or compute.id not in self._clusters: 1390 cluster._dispy_nodes.pop(node.ip_addr, None) 1391 logger.warning('Failed to setup %s for computation "%s": %s', 1392 node.ip_addr, compute.name, r) 1393 Task(node.close, compute) 1394 else: 1395 dispy_node.update_time = time.time() 1396 node.clusters.add(cluster) 1397 self._sched_event.set() 1398 Task(self.send_node_status, cluster, dispy_node, DispyNode.Initialized) 1399 1400 def add_node(self, info): 1401 try: 1402 # assert info['version'] == _dispy_version 1403 assert info['port'] > 0 and info['cpus'] > 0 1404 # TODO: check if it is one of ext_ip_addr? 1405 except Exception: 1406 # logger.debug(traceback.format_exc()) 1407 return 1408 node = self._nodes.get(info['ip_addr'], None) 1409 if node is None: 1410 logger.debug('Discovered %s:%s (%s) with %s cpus', 1411 info['ip_addr'], info['port'], info['name'], info['cpus']) 1412 node = _Node(info['ip_addr'], info['port'], info['cpus'], info['sign'], 1413 self.node_secret, platform=info['platform'], 1414 keyfile=self.node_keyfile, certfile=self.node_certfile) 1415 node.name = info['name'] 1416 node.avail_info = info['avail_info'] 1417 self._nodes[node.ip_addr] = node 1418 else: 1419 node.last_pulse = time.time() 1420 auth = auth_code(self.node_secret, info['sign']) 1421 if info['cpus'] > 0: 1422 node.avail_cpus = info['cpus'] 1423 node.cpus = min(node.cpus, node.avail_cpus) 1424 else: 1425 logger.warning('Invalid "cpus" %s from %s ignored', info['cpus'], info['ip_addr']) 1426 if node.port == info['port'] and node.auth == auth: 1427 return 1428 logger.debug('Node %s rediscovered', info['ip_addr']) 1429 node.port = info['port'] 1430 if node.auth is not None: 1431 dead_jobs = [_job for _job in self._sched_jobs.values() 1432 if _job.node is not None and _job.node.ip_addr == node.ip_addr] 1433 node.busy = 0 1434 node.auth = auth 1435 clusters = list(node.clusters) 1436 node.clusters.clear() 1437 for cluster in clusters: 1438 dispy_node = cluster._dispy_nodes.pop(node.ip_addr, None) 1439 if not dispy_node: 1440 continue 1441 Task(self.send_node_status, cluster, dispy_node, DispyNode.Closed) 1442 self.reschedule_jobs(dead_jobs) 1443 node.auth = auth 1444 node_computations = [] 1445 node.name = info['name'] 1446 node.scheduler_ip_addr = info['scheduler_ip_addr'] 1447 for cluster in self._clusters.values(): 1448 if cluster in node.clusters: 1449 continue 1450 compute = cluster._compute 1451 for node_alloc in cluster._node_allocs: 1452 cpus = node_alloc.allocate(cluster, node.ip_addr, node.name, node.avail_cpus) 1453 if cpus > 0: 1454 node_computations.append(compute) 1455 break 1456 if node_computations: 1457 Task(self.setup_node, node, node_computations) 1458 1459 def send_job_result(self, uid, cluster, result, resending=False, task=None): 1460 # generator 1461 sock = socket.socket(cluster.client_sock_family, socket.SOCK_STREAM) 1462 sock = AsyncSocket(sock, keyfile=self.cluster_keyfile, certfile=self.cluster_certfile) 1463 sock.settimeout(MsgTimeout) 1464 try: 1465 yield sock.connect((cluster.client_ip_addr, cluster.client_job_result_port)) 1466 yield sock.send_msg(b'JOB_REPLY:' + serialize(result)) 1467 ack = yield sock.recv_msg() 1468 assert ack == b'ACK' 1469 except Exception: 1470 status = -1 1471 if not resending: 1472 # store job result even if computation has not enabled 1473 # fault recovery; user may be able to access node and 1474 # retrieve result manually 1475 f = os.path.join(cluster.dest_path, '_dispy_job_reply_%s' % uid) 1476 logger.error('Could not send reply for job %s to %s:%s; saving it in "%s"', 1477 uid, cluster.client_ip_addr, cluster.client_job_result_port, f) 1478 try: 1479 with open(f, 'wb') as fd: 1480 pickle.dump(result, fd) 1481 except Exception: 1482 logger.debug('Could not save reply for job %s', uid) 1483 else: 1484 cluster.pending_results += 1 1485 cluster.file_uses[f] = 2 1486 else: 1487 status = 0 1488 cluster.last_pulse = time.time() 1489 if result.status != DispyJob.ProvisionalResult: 1490 if resending: 1491 cluster.pending_results -= 1 1492 f = os.path.join(cluster.dest_path, '_dispy_job_reply_%s' % uid) 1493 if os.path.isfile(f): 1494 cluster.file_uses.pop(f, None) 1495 try: 1496 os.remove(f) 1497 except Exception: 1498 logger.warning('Could not remove "%s"', f) 1499 else: 1500 self.done_jobs.pop(uid, None) 1501 if cluster.pending_results: 1502 Task(self.resend_job_results, cluster) 1503 if cluster.pending_jobs == 0 and cluster.pending_results == 0 and cluster.zombie: 1504 Task(self.cleanup_computation, cluster) 1505 dispy_node = cluster._dispy_nodes.get(result.ip_addr, None) 1506 if dispy_node: 1507 Task(self.send_node_status, cluster, dispy_node, DispyNode.AvailInfo) 1508 finally: 1509 sock.close() 1510 1511 raise StopIteration(status) 1512 1513 def send_job_status(self, cluster, _job, task=None): 1514 if cluster.status_callback: 1515 dispy_node = cluster._dispy_nodes.get(_job.node.ip_addr, None) 1516 # assert _job.job.status == DispyJob.Running 1517 if dispy_node: 1518 dispy_node.busy += 1 1519 dispy_node.update_time = time.time() 1520 cluster.status_callback(_job.job.status, dispy_node, _job.job) 1521 sock = socket.socket(cluster.client_sock_family, socket.SOCK_STREAM) 1522 sock = AsyncSocket(sock, keyfile=self.cluster_keyfile, certfile=self.cluster_certfile) 1523 sock.settimeout(MsgTimeout) 1524 try: 1525 yield sock.connect((cluster.client_ip_addr, cluster.client_job_result_port)) 1526 status = {'uid': _job.uid, 'status': _job.job.status, 'node': _job.node.ip_addr, 1527 'hash': _job.hash} 1528 status['start_time'] = _job.job.start_time 1529 yield sock.send_msg(b'JOB_STATUS:' + serialize(status)) 1530 except Exception: 1531 logger.warning('Could not send job status to %s:%s', 1532 cluster.client_ip_addr, cluster.client_job_result_port) 1533 sock.close() 1534 1535 def send_node_status(self, cluster, dispy_node, status, task=None): 1536 if cluster.status_callback: 1537 dispy_node.update_time = time.time() 1538 cluster.status_callback(status, dispy_node, None) 1539 sock = socket.socket(cluster.client_sock_family, socket.SOCK_STREAM) 1540 sock = AsyncSocket(sock, keyfile=self.cluster_keyfile, certfile=self.cluster_certfile) 1541 sock.settimeout(MsgTimeout) 1542 status_info = {'compute_id': cluster._compute.id, 1543 'status': status, 'auth': cluster.client_auth} 1544 if status == DispyNode.Initialized: 1545 status_info['dispy_node'] = dispy_node 1546 else: 1547 status_info['ip_addr'] = dispy_node.ip_addr 1548 if status == DispyNode.AvailInfo: 1549 status_info['avail_info'] = dispy_node.avail_info 1550 status_info['tx'] = dispy_node.tx 1551 status_info['rx'] = dispy_node.rx 1552 try: 1553 yield sock.connect((cluster.client_ip_addr, cluster.client_job_result_port)) 1554 yield sock.send_msg(b'NODE_STATUS:' + serialize(status_info)) 1555 except Exception: 1556 logger.debug('Could not send node status to %s:%s', 1557 cluster.client_ip_addr, cluster.client_job_result_port) 1558 sock.close() 1559 1560 def job_reply_process(self, reply, msg_len, sock, addr): 1561 _job = self._sched_jobs.get(reply.uid, None) 1562 if not _job or reply.hash != _job.hash: 1563 logger.warning('Ignoring invalid reply for job %s from %s', reply.uid, addr[0]) 1564 yield sock.send_msg(b'ACK') 1565 raise StopIteration 1566 job = _job.job 1567 _job._args = _job._kwargs = None 1568 node = self._nodes.get(reply.ip_addr, None) 1569 cluster = self._clusters.get(_job.compute_id, None) 1570 if not cluster: 1571 # job cancelled while/after closing computation 1572 if node: 1573 node.rx += msg_len 1574 if node.busy > 0: 1575 node.busy -= 1 1576 node.cpu_time += reply.end_time - reply.start_time 1577 node.last_pulse = time.time() 1578 self._sched_event.set() 1579 yield sock.send_msg(b'ACK') 1580 raise StopIteration 1581 if not node: 1582 logger.warning('Ignoring invalid reply for job %s from %s', reply.uid, addr[0]) 1583 yield sock.send_msg(b'ACK') 1584 raise StopIteration 1585 # assert reply.ip_addr == node.ip_addr 1586 node.last_pulse = time.time() 1587 logger.debug('Received reply for job %s from %s', _job.uid, addr[0]) 1588 # assert _job.job.status not in [DispyJob.Created, DispyJob.Finished] 1589 setattr(reply, 'cpus', node.cpus) 1590 dispy_node = cluster._dispy_nodes.get(_job.node.ip_addr, None) 1591 if dispy_node: 1592 dispy_node.rx += msg_len 1593 1594 yield sock.send_msg(b'ACK') 1595 job.start_time = reply.start_time 1596 job.end_time = reply.end_time 1597 if reply.status != DispyJob.ProvisionalResult: 1598 self.done_jobs[_job.uid] = _job 1599 del self._sched_jobs[_job.uid] 1600 node.busy -= 1 1601 node.cpu_time += reply.end_time - reply.start_time 1602 if cluster.status_callback: 1603 if dispy_node: 1604 dispy_node.busy -= 1 1605 dispy_node.jobs_done += 1 1606 dispy_node.cpu_time += reply.end_time - reply.start_time 1607 dispy_node.update_time = time.time() 1608 cluster.status_callback(reply.status, dispy_node, job) 1609 1610 cluster.pending_jobs -= 1 1611 if cluster.pending_jobs == 0: 1612 cluster.end_time = time.time() 1613 if cluster.zombie: 1614 Task(self.cleanup_computation, cluster) 1615 self._sched_event.set() 1616 for xf in _job.xfer_files: 1617 try: 1618 cluster.file_uses[xf.name] -= 1 1619 if cluster.file_uses[xf.name] == 0: 1620 cluster.file_uses.pop(xf.name) 1621 os.remove(xf.name) 1622 except Exception: 1623 logger.warning('Could not remove "%s"', xf.name) 1624 Task(self.send_job_result, _job.uid, cluster, reply, resending=False) 1625 1626 def reschedule_jobs(self, dead_jobs): 1627 if not dead_jobs: 1628 return 1629 for _job in dead_jobs: 1630 cluster = self._clusters[_job.compute_id] 1631 del self._sched_jobs[_job.uid] 1632 if cluster._compute.reentrant and not _job.pinned: 1633 logger.debug('Rescheduling job %s from %s', _job.uid, _job.node.ip_addr) 1634 _job.job.status = DispyJob.Created 1635 _job.hash = ''.join(hex(x)[2:] for x in os.urandom(10)) 1636 cluster._jobs.append(_job) 1637 else: 1638 logger.debug('Terminating job %s scheduled on %s', _job.uid, _job.node.ip_addr) 1639 reply = _JobReply(_job, _job.node.ip_addr, status=DispyJob.Abandoned) 1640 reply.result = serialize(None) 1641 cluster.pending_jobs -= 1 1642 if cluster.pending_jobs == 0: 1643 cluster.end_time = time.time() 1644 self.done_jobs[_job.uid] = _job 1645 Task(self.send_job_result, _job.uid, cluster, reply, resending=False) 1646 self._sched_event.set() 1647 1648 def load_balance_node(self): 1649 """Return node with least load 1650 """ 1651 # TODO: maintain "available" sequence of nodes for better performance 1652 node = None 1653 load = 1.0 1654 for host in self._nodes.values(): 1655 if host.busy >= host.cpus: 1656 continue 1657 if host.pending_jobs: 1658 return host 1659 if not any(cluster._jobs for cluster in host.clusters): 1660 continue 1661 if (host.busy / host.cpus) < load: 1662 node = host 1663 load = host.busy / host.cpus 1664 return node 1665 1666 def fsfs_job_schedule(self): 1667 """Return tuple (_job, node, cluster) such that _job is earliest 1668 submitted in all clusters. 1669 """ 1670 node = self.load_balance_node() 1671 if not node: 1672 return (None, None, None) 1673 _job = cluster = lrs = None 1674 for cluster in node.clusters: 1675 if cluster._jobs and (not lrs or 1676 cluster._jobs[0].job.submit_time < lrs._jobs[0].job.submit_time): 1677 lrs = cluster 1678 if lrs: 1679 if node.pending_jobs: 1680 if node.pending_jobs[0].job.submit_time < lrs._jobs[0].job.submit_time: 1681 _job = node.pending_jobs.pop(0) 1682 cluster = self._clusters[_job.compute_id] 1683 if not _job: 1684 cluster = lrs 1685 _job = cluster._jobs.pop(0) 1686 elif node.pending_jobs: 1687 _job = node.pending_jobs.pop(0) 1688 cluster = self._clusters[_job.compute_id] 1689 return (_job, node, cluster) 1690 1691 def fair_cluster_schedule(self): 1692 """Return tuple (_job, node, cluster) such that cluster is earliest 1693 scheduled last time. 1694 """ 1695 node = self.load_balance_node() 1696 if not node: 1697 return (None, None, None) 1698 _job = cluster = lrs = None 1699 for cluster in node.clusters: 1700 if cluster._jobs and (not lrs or cluster.job_sched_time < lrs.job_sched_time): 1701 lrs = cluster 1702 if lrs: 1703 if node.pending_jobs: 1704 _job = node.pending_jobs[0] 1705 cluster = self._clusters[_job.compute_id] 1706 if cluster.job_sched_time < lrs.job_sched_time: 1707 node.pending_jobs.pop(0) 1708 else: 1709 cluster = lrs 1710 _job = cluster._jobs.pop(0) 1711 if not _job: 1712 cluster = lrs 1713 _job = cluster._jobs.pop(0) 1714 elif node.pending_jobs: 1715 _job = node.pending_jobs.pop(0) 1716 cluster = self._clusters[_job.compute_id] 1717 if _job: 1718 cluster.job_sched_time = time.time() 1719 return (_job, node, cluster) 1720 1721 def fcfs_cluster_schedule(self): 1722 """Return tuple (_job, node, cluster) such that cluster is created 1723 earliest. 1724 """ 1725 node = self.load_balance_node() 1726 if not node: 1727 return (None, None, None) 1728 _job = cluster = lrs = None 1729 for cluster in node.clusters: 1730 if cluster._jobs and (not lrs or cluster.start_time < lrs.start_time): 1731 lrs = cluster 1732 if lrs: 1733 if node.pending_jobs: 1734 _job = node.pending_jobs[0] 1735 cluster = self._clusters[_job.compute_id] 1736 if cluster.start_time < lrs.start_time: 1737 node.pending_jobs.pop(0) 1738 else: 1739 cluster = lrs 1740 _job = cluster._jobs.pop(0) 1741 if not _job: 1742 cluster = lrs 1743 _job = cluster._jobs.pop(0) 1744 elif node.pending_jobs: 1745 _job = node.pending_jobs.pop(0) 1746 cluster = self._clusters[_job.compute_id] 1747 return (_job, node, cluster) 1748 1749 def run_job(self, _job, cluster, task=None): 1750 # generator 1751 # assert task is not None 1752 node = _job.node 1753 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 1754 try: 1755 tx = yield _job.run(task=task) 1756 if dispy_node: 1757 dispy_node.tx += tx 1758 except (EnvironmentError, OSError): 1759 logger.warning('Failed to run job %s on %s for computation %s; removing this node', 1760 _job.uid, node.ip_addr, cluster._compute.name) 1761 if node.pending_jobs: 1762 # TODO: instead of discarding pending jobs, maintain them 1763 # elsewhere, while cluster is alive? 1764 for njob in node.pending_jobs: 1765 self.done_jobs[_njob.uid] = _njob 1766 cl = self._clusters[njob.compute_id] 1767 cl.pending_jobs -= 1 1768 reply = _JobReply(_njob, cl.ip_addr, status=DispyJob.Cancelled) 1769 reply.result = serialize(None) 1770 Task(self.send_job_result, _njob.uid, cl, reply, resending=False) 1771 node.pending_jobs = [] 1772 # TODO: need to close computations on this node? 1773 for cl in node.clusters: 1774 dn = cl._dispy_nodes.pop(node.ip_addr, None) 1775 if dn and cl.status_callback: 1776 Task(self.send_node_status, cl, dn, DispyNode.Closed) 1777 node.clusters.clear() 1778 self._nodes.pop(node.ip_addr, None) 1779 if self._sched_jobs.pop(_job.uid, None) == _job: 1780 if not _job.pinned: 1781 cluster._jobs.insert(0, _job) 1782 node.busy -= 1 1783 self._sched_event.set() 1784 except Exception: 1785 logger.warning('Failed to run job %s on %s for computation %s', 1786 _job.uid, node.ip_addr, cluster._compute.name) 1787 # logger.debug(traceback.format_exc()) 1788 # TODO: delay executing again for some time? 1789 # this job might have been deleted already due to timeout 1790 if self._sched_jobs.pop(_job.uid, None) == _job: 1791 if cluster.status_callback: 1792 if dispy_node: 1793 dispy_node.update_time = time.time() 1794 cluster.status_callback(DispyJob.Cancelled, dispy_node, _job.job) 1795 node.busy -= 1 1796 self._sched_event.set() 1797 else: 1798 # job may have already finished (in which case _job.job would be None) 1799 if _job.job: 1800 logger.debug('Running job %s on %s (busy: %d / %d)', 1801 _job.uid, node.ip_addr, node.busy, node.cpus) 1802 _job.job.status = DispyJob.Running 1803 _job.job.start_time = time.time() 1804 cluster = self._clusters[_job.compute_id] 1805 # TODO/Note: It is likely that this job status may arrive at 1806 # the client before the job is done and the node's status 1807 # arrives. Either use queing for messages (ideally with 1808 # pycos's message passing) or tag messages with timestamps 1809 # so recipient can use temporal ordering to ignore prior 1810 # messages 1811 Task(self.send_job_status, cluster, _job) 1812 if not cluster._compute.reentrant: 1813 _job._args = _job._kwargs = None 1814 1815 def _schedule_jobs(self, task=None): 1816 # generator 1817 assert task is not None 1818 while not self.terminate: 1819 # n = sum(len(cluster._jobs) for cluster in self._clusters.values()) 1820 _job, node, cluster = self.select_job_node_cluster() 1821 if not _job: 1822 self._sched_event.clear() 1823 yield self._sched_event.wait() 1824 continue 1825 _job.node = node 1826 # assert node.busy < node.cpus 1827 self._sched_jobs[_job.uid] = _job 1828 node.busy += 1 1829 Task(self.run_job, _job, cluster) 1830 1831 logger.debug('Scheduler quitting: %s', len(self._sched_jobs)) 1832 for uid, _job in self._sched_jobs.items(): 1833 cluster = self._clusters.get(_job.compute_id, None) 1834 if cluster: 1835 reply = _JobReply(_job, cluster.ip_addr, status=DispyJob.Terminated) 1836 reply.result = serialize(None) 1837 Task(self.send_job_result, _job.uid, cluster, reply, resending=False) 1838 for cluster in self._clusters.values(): 1839 for _job in cluster._jobs: 1840 reply = _JobReply(_job, cluster.ip_addr, status=DispyJob.Terminated) 1841 reply.result = serialize(None) 1842 Task(self.send_job_result, _job.uid, cluster, reply, resending=False) 1843 cluster._jobs = [] 1844 1845 for cluster in list(self._clusters.values()): 1846 cluster.pending_jobs = 0 1847 cluster.zombie = True 1848 yield self.cleanup_computation(cluster) 1849 self._clusters = {} 1850 self._sched_jobs = {} 1851 self.done_jobs = {} 1852 logger.debug('Scheduler quit') 1853 1854 def retrieve_job_req(self, conn, msg): 1855 # generator 1856 1857 def send_reply(reply): 1858 try: 1859 yield conn.send_msg(serialize(reply)) 1860 except Exception: 1861 raise StopIteration(-1) 1862 raise StopIteration(0) 1863 1864 try: 1865 req = deserialize(msg) 1866 uid = req['uid'] 1867 compute_id = req['compute_id'] 1868 auth = req['auth'] 1869 job_hash = req['hash'] 1870 except Exception: 1871 yield send_reply(None) 1872 raise StopIteration 1873 1874 pkl_path = os.path.join(self.dest_path_prefix, '%s_%s' % (compute_id, auth)) 1875 cluster = self._clusters.get(compute_id, None) 1876 if not cluster or cluster.client_auth != auth: 1877 with open(pkl_path, 'rb') as fd: 1878 cluster = pickle.load(fd) 1879 if not cluster or cluster.client_auth != auth: 1880 yield send_reply(None) 1881 raise StopIteration 1882 1883 info_file = os.path.join(cluster.dest_path, '_dispy_job_reply_%s' % uid) 1884 if not os.path.isfile(info_file): 1885 yield send_reply(None) 1886 raise StopIteration 1887 1888 try: 1889 with open(info_file, 'rb') as fd: 1890 job_reply = pickle.load(fd) 1891 assert job_reply.hash == job_hash 1892 except Exception: 1893 yield send_reply(None) 1894 raise StopIteration 1895 1896 try: 1897 yield conn.send_msg(serialize(job_reply)) 1898 ack = yield conn.recv_msg() 1899 assert ack == 'ACK'.encode() 1900 cluster.pending_results -= 1 1901 with open(pkl_path, 'wb') as fd: 1902 pickle.dump(cluster, fd) 1903 except Exception: 1904 pass 1905 else: 1906 cluster.file_uses.pop(info_file, None) 1907 try: 1908 os.remove(info_file) 1909 except Exception: 1910 pass 1911 1912 def cancel_job(self, cluster, uid): 1913 # function 1914 cluster.last_pulse = time.time() 1915 _job = self._sched_jobs.get(uid, None) 1916 if _job: 1917 _job.job.status = DispyJob.Cancelled 1918 Task(_job.node.send, b'TERMINATE_JOB:' + serialize(_job), reply=False) 1919 return 0 1920 else: 1921 for i, _job in enumerate(cluster._jobs): 1922 if _job.uid == uid: 1923 del cluster._jobs[i] 1924 self.done_jobs[_job.uid] = _job 1925 cluster.pending_jobs -= 1 1926 reply = _JobReply(_job, cluster.ip_addr, status=DispyJob.Cancelled) 1927 reply.result = serialize(None) 1928 Task(self.send_job_result, _job.uid, cluster, reply, resending=False) 1929 return 0 1930 1931 for ip_addr in cluster._dispy_nodes: 1932 node = self._nodes.get(ip_addr, None) 1933 if not node: 1934 continue 1935 for i, _job in enumerate(node.pending_jobs): 1936 if _job.uid == uid: 1937 del node.pending_jobs[i] 1938 self.done_jobs[_job.uid] = _job 1939 cluster.pending_jobs -= 1 1940 reply = _JobReply(_job, cluster.ip_addr, status=DispyJob.Cancelled) 1941 reply.result = serialize(None) 1942 Task(self.send_job_result, _job.uid, cluster, reply, resending=False) 1943 return 0 1944 logger.debug('Invalid job %s!', uid) 1945 return -1 1946 1947 def allocate_node(self, cluster, node_alloc, task=None): 1948 # generator 1949 if not isinstance(node_alloc, list): 1950 node_alloc = [node_alloc] 1951 for i in range(len(node_alloc)-1, -1, -1): 1952 node = self._nodes.get(node_alloc[i].ip_addr, None) 1953 if node: 1954 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 1955 if dispy_node: 1956 node.clusters.add(cluster) 1957 self._sched_event.set() 1958 del node_alloc[i] 1959 continue 1960 if not node_alloc: 1961 raise StopIteration(0) 1962 cluster._node_allocs.extend(node_alloc) 1963 cluster._node_allocs = sorted(cluster._node_allocs, 1964 key=lambda node_alloc: node_alloc.ip_rex, reverse=True) 1965 present = set() 1966 cluster._node_allocs = [na for na in cluster._node_allocs 1967 if na.ip_rex not in present and not present.add(na.ip_rex)] 1968 del present 1969 self.add_cluster(cluster) 1970 yield 0 1971 1972 def deallocate_node(self, cluster, node, task=None): 1973 # generator 1974 node = self._nodes.get(_node_ipaddr(node), None) 1975 if node is None: 1976 raise StopIteration(-1) 1977 node.clusters.discard(cluster) 1978 yield 0 1979 1980 def close_node(self, cluster, node, terminate_pending, task=None): 1981 # generator 1982 node = self._nodes.get(_node_ipaddr(node), None) 1983 if node is None: 1984 raise StopIteration(-1) 1985 node.clusters.discard(cluster) 1986 yield node.close(cluster._compute, terminate_pending=terminate_pending) 1987 1988 def node_jobs(self, cluster, node, from_node=False, get_uids=True, task=None): 1989 # generator 1990 node = self._nodes.get(_node_ipaddr(node), None) 1991 if not node or cluster not in node.clusters: 1992 raise StopIteration([]) 1993 if from_node: 1994 sock = socket.socket(node.sock_family, socket.SOCK_STREAM) 1995 sock = AsyncSocket(sock, keyfile=self.node_keyfile, certfile=self.node_certfile) 1996 sock.settimeout(MsgTimeout) 1997 try: 1998 yield sock.connect((node.ip_addr, node.port)) 1999 yield sock.sendall(node.auth) 2000 req = {'compute_id': cluster._compute.id, 'auth': cluster._compute.auth} 2001 yield sock.send_msg(b'JOBS:' + serialize(req)) 2002 msg = yield sock.recv_msg() 2003 uids = [info['uid'] for info in deserialize(msg)] 2004 except Exception: 2005 logger.debug(traceback.format_exc()) 2006 uids = [] 2007 sock.close() 2008 if get_uids: 2009 jobs = uids 2010 else: 2011 _jobs = [self._sched_jobs.get(uid, None) for uid in uids] 2012 jobs = [_job.job for _job in _jobs if _job is not None 2013 and _job.compute_id == cluster._compute.id 2014 ] 2015 else: 2016 if get_uids: 2017 jobs = [_job.uid for _job in self._sched_jobs.values() if _job.node == node 2018 and _job.compute_id == cluster._compute.id] 2019 else: 2020 jobs = [_job.job for _job in self._sched_jobs.values() if _job.node == node 2021 and _job.compute_id == cluster._compute.id] 2022 2023 raise StopIteration(jobs) 2024 2025 def set_node_cpus(self, node, cpus): 2026 # generator 2027 try: 2028 cpus = int(cpus) 2029 except ValueError: 2030 raise StopIteration(-1) 2031 node = self._nodes.get(_node_ipaddr(node), None) 2032 if node is None: 2033 reply = (None, -1) 2034 else: 2035 if cpus >= 0: 2036 node.cpus = min(node.avail_cpus, cpus) 2037 elif (node.avail_cpus + cpus) >= 0: 2038 node.cpus = node.avail_cpus + cpus 2039 cpus = node.cpus 2040 for cluster in node.clusters: 2041 dispy_node = cluster._dispy_nodes.get(node.ip_addr, None) 2042 if dispy_node: 2043 dispy_node.cpus = cpus 2044 yield self._sched_event.set() 2045 reply = (node.ip_addr, node.cpus) 2046 raise StopIteration(reply) 2047 2048 def shutdown(self): 2049 def _shutdown(self, task=None): 2050 logger.debug('Shutting down scheduler ...') 2051 for cluster in list(self.pending_clusters.values()) + self.unsched_clusters: 2052 path = os.path.join(self.dest_path_prefix, 2053 '%s_%s' % (cluster._compute.id, cluster.client_auth)) 2054 if os.path.isfile(path): 2055 os.remove(path) 2056 try: 2057 shutil.rmtree(cluster.dest_path) 2058 except Exception: 2059 logger.debug(traceback.format_exc()) 2060 # TODO: inform cluster 2061 self.pending_clusters.clear() 2062 self.unsched_clusters = [] 2063 while (any(cluster.pending_jobs for cluster in self._clusters.values())): 2064 logger.warning('Waiting for %s clusters to finish', len(self._clusters)) 2065 yield task.sleep(5) 2066 2067 self._sched_event.set() 2068 yield self.job_scheduler_task.finish() 2069 yield self.print_status() 2070 2071 if self.terminate: 2072 return 2073 self.terminate = True 2074 Task(_shutdown, self).value() 2075 if self.pycos: 2076 self.pycos.finish() 2077 self.pycos = None 2078 Singleton.empty(self.__class__) 2079 2080 def print_status(self): 2081 print('') 2082 heading = ' %30s | %5s | %13s' % ('Node', 'CPUs', 'Node Time Sec') 2083 print(heading) 2084 print('-' * len(heading)) 2085 tot_cpu_time = 0 2086 for ip_addr in sorted(self._nodes, key=lambda addr: self._nodes[addr].cpu_time, 2087 reverse=True): 2088 node = self._nodes[ip_addr] 2089 tot_cpu_time += node.cpu_time 2090 if node.name: 2091 name = ip_addr + ' (' + node.name + ')' 2092 else: 2093 name = ip_addr 2094 print(' %-30.30s | %5s | %13.3f' % (name, node.cpus, node.cpu_time)) 2095 print('') 2096 print('Total job time: %.3f sec\n' % (tot_cpu_time)) 2097 if self._clusters: 2098 print('Current clients: %s (%s)' % (len(self._clusters), 2099 ', '.join(cluster.ip_addr for cluster in 2100 self._clusters.values()))) 2101 if self.unsched_clusters: 2102 print('Pending clients: %s' % (len(self.unsched_clusters))) 2103 print('') 2104 yield 0 2105 2106 2107if __name__ == '__main__': 2108 import argparse 2109 2110 logger = pycos.Logger('dispyscheduler') 2111 2112 parser = argparse.ArgumentParser() 2113 parser.add_argument('--config', dest='config', default='', 2114 help='use configuration in given file') 2115 parser.add_argument('--save_config', dest='save_config', default='', 2116 help='save configuration in given file and exit') 2117 parser.add_argument('-d', '--debug', action='store_true', dest='loglevel', default=False, 2118 help='if given, debug messages are printed') 2119 parser.add_argument('-n', '--nodes', action='append', dest='nodes', default=[], 2120 help='name or IP address used for all computations; ' 2121 'repeat for multiple nodes') 2122 parser.add_argument('-i', '--ip_addr', action='append', dest='ip_addrs', default=[], 2123 help='IP address to use; repeat for multiple interfaces') 2124 parser.add_argument('--ext_ip_addr', action='append', dest='ext_ip_addrs', default=[], 2125 help='External IP address to use (needed in case of NAT firewall/gateway);' 2126 ' repeat for multiple interfaces') 2127 parser.add_argument('-p', '--port', dest='port', type=int, default=51347, 2128 help='port number for UDP data and job results') 2129 parser.add_argument('--node_port', dest='node_port', type=int, default=51348, 2130 help='port number used by nodes') 2131 parser.add_argument('--scheduler_port', dest='scheduler_port', type=int, default=51349, 2132 help='port number for scheduler') 2133 parser.add_argument('--ipv4_udp_multicast', dest='ipv4_udp_multicast', action='store_true', 2134 default=False, help='use multicast for IPv4 UDP instead of broadcast') 2135 parser.add_argument('--node_secret', dest='node_secret', default='', 2136 help='authentication secret for handshake with dispy clients') 2137 parser.add_argument('--node_keyfile', dest='node_keyfile', default='', 2138 help='file containing SSL key to be used with nodes') 2139 parser.add_argument('--node_certfile', dest='node_certfile', default='', 2140 help='file containing SSL certificate to be used with nodes') 2141 parser.add_argument('--cluster_secret', dest='cluster_secret', default='', 2142 help='file containing SSL certificate to be used with dispy clients') 2143 parser.add_argument('--cluster_certfile', dest='cluster_certfile', default='', 2144 help='file containing SSL certificate to be used with dispy clients') 2145 parser.add_argument('--cluster_keyfile', dest='cluster_keyfile', default='', 2146 help='file containing SSL key to be used with dispy clients') 2147 parser.add_argument('--pulse_interval', dest='pulse_interval', type=float, default=0, 2148 help='number of seconds between pulse messages to indicate ' 2149 'whether node is alive') 2150 parser.add_argument('--ping_interval', dest='ping_interval', type=float, default=0, 2151 help='number of seconds between ping messages to discover nodes') 2152 parser.add_argument('--zombie_interval', dest='zombie_interval', default=60, type=float, 2153 help='interval in minutes to presume unresponsive scheduler is zombie') 2154 parser.add_argument('--msg_timeout', dest='msg_timeout', default=MsgTimeout, type=float, 2155 help='timeout used for messages to/from client/nodes in seconds') 2156 parser.add_argument('--dest_path_prefix', dest='dest_path_prefix', default=None, 2157 help='path prefix where files sent by dispy are stored') 2158 parser.add_argument('--max_file_size', dest='max_file_size', default=str(MaxFileSize), type=str, 2159 help='maximum file size of any file transferred') 2160 parser.add_argument('--clean', action='store_true', dest='clean', default=False, 2161 help='if given, files copied from or generated by clients ' 2162 'will be removed before scheduler starts') 2163 parser.add_argument('--httpd', action='store_true', dest='http_server', default=False, 2164 help='if given, HTTP server is created so clusters can be ' 2165 'monitored and managed') 2166 parser.add_argument('--fair_cluster_scheduler', dest='scheduler_alg', action='store_const', 2167 const='fair_cluster', 2168 help='Choose job from cluster that was least recently scheduled') 2169 parser.add_argument('--early_cluster_scheduler', dest='scheduler_alg', action='store_const', 2170 const='fcfs_cluster', 2171 help='Choose job from cluster created earliest') 2172 parser.add_argument('--cooperative', action='store_true', dest='cooperative', default=False, 2173 help='if given, clients (clusters) can update CPUs') 2174 parser.add_argument('--cleanup_nodes', action='store_true', dest='cleanup_nodes', default=False, 2175 help='if given, nodes always remove files even if ' 2176 '"cleanup=False" is used by clients') 2177 parser.add_argument('--daemon', action='store_true', dest='daemon', default=False, 2178 help='if given, input is not read from terminal') 2179 2180 config = vars(parser.parse_args(sys.argv[1:])) 2181 2182 if config['config']: 2183 import configparser 2184 cfg = configparser.ConfigParser() 2185 cfg.read(config['config']) 2186 cfg = dict(cfg.items('DEFAULT')) 2187 cfg['nodes'] = [] if cfg['nodes'] == '[]' else \ 2188 [_.strip() for _ in cfg['nodes'][1:-1].split(',')] 2189 cfg['ip_addrs'] = [] if cfg['ip_addrs'] == '[]' else \ 2190 [_.strip() for _ in cfg['ip_addrs'][1:-1].split(',')] 2191 cfg['ext_ip_addrs'] = [] if cfg['ext_ip_addrs'] == '[]' else \ 2192 [_.strip() for _ in cfg['ext_ip_addrs'][1:-1].split(',')] 2193 cfg['port'] = int(cfg['port']) 2194 cfg['node_port'] = int(cfg['node_port']) 2195 cfg['scheduler_port'] = int(cfg['scheduler_port']) 2196 cfg['ipv4_udp_multicast'] = cfg['ipv4_udp_multicast'] == 'True' 2197 cfg['pulse_interval'] = float(cfg['pulse_interval']) 2198 cfg['ping_interval'] = float(cfg['ping_interval']) 2199 cfg['zombie_interval'] = float(cfg['zombie_interval']) 2200 cfg['msg_timeout'] = float(cfg['msg_timeout']) 2201 cfg['loglevel'] = cfg['loglevel'] == 'True' 2202 cfg['clean'] = cfg['clean'] == 'True' 2203 cfg['http_server'] = cfg['http_server'] == 'True' 2204 cfg['cooperative'] = cfg['cooperative'] == 'True' 2205 cfg['cleanup_nodes'] = cfg['cleanup_nodes'] == 'True' 2206 cfg['daemon'] = cfg['daemon'] == 'True' 2207 if cfg['dest_path_prefix'] == 'None': 2208 cfg['dest_path_prefix'] = None 2209 if cfg['scheduler_alg'] == 'None': 2210 cfg['scheduler_alg'] = None 2211 config = cfg 2212 config.pop('config', None) 2213 2214 cfg = config.pop('save_config', None) 2215 if cfg: 2216 import configparser 2217 config = configparser.ConfigParser(config) 2218 cfg = open(cfg, 'w') 2219 config.write(cfg) 2220 cfg.close() 2221 exit(0) 2222 del parser, cfg 2223 2224 if config['loglevel']: 2225 logger.setLevel(logger.DEBUG) 2226 pycos.logger.setLevel(pycos.logger.DEBUG) 2227 else: 2228 logger.setLevel(logger.INFO) 2229 del config['loglevel'] 2230 2231 if config['zombie_interval']: 2232 config['zombie_interval'] = float(config['zombie_interval']) 2233 if config['zombie_interval'] < 1: 2234 raise Exception('zombie_interval must be at least 1') 2235 2236 MsgTimeout = config['msg_timeout'] 2237 del config['msg_timeout'] 2238 2239 m = re.match(r'(\d+)([kKmMgGtT]?)', config['max_file_size']) 2240 if m: 2241 MaxFileSize = int(m.group(1)) 2242 if m.group(2): 2243 m = m.group(2).lower() 2244 if m == 'k': 2245 MaxFileSize *= 1024 2246 elif m == 'm': 2247 MaxFileSize *= 1024**2 2248 elif m == 'g': 2249 MaxFileSize *= 1024**3 2250 elif m == 't': 2251 MaxFileSize *= 1024**4 2252 else: 2253 raise Exception('invalid max_file_size option') 2254 else: 2255 raise Exception('max_file_size must be >= 0') 2256 del config['max_file_size'] 2257 2258 if config['node_certfile']: 2259 config['node_certfile'] = os.path.abspath(config['node_certfile']) 2260 else: 2261 config['node_certfile'] = None 2262 if config['node_keyfile']: 2263 config['node_keyfile'] = os.path.abspath(config['node_keyfile']) 2264 else: 2265 config['node_keyfile'] = None 2266 2267 if config['cluster_certfile']: 2268 config['cluster_certfile'] = os.path.abspath(config['cluster_certfile']) 2269 else: 2270 config['cluster_certfile'] = None 2271 if config['cluster_keyfile']: 2272 config['cluster_keyfile'] = os.path.abspath(config['cluster_keyfile']) 2273 else: 2274 config['cluster_keyfile'] = None 2275 2276 daemon = config.pop('daemon', False) 2277 if not daemon: 2278 try: 2279 if os.getpgrp() != os.tcgetpgrp(sys.stdin.fileno()): 2280 daemon = True 2281 except (Exception, KeyboardInterrupt): 2282 pass 2283 2284 logger.info('dispyscheduler version %s', _dispy_version) 2285 scheduler = _Scheduler(**config) 2286 if daemon: 2287 scheduler.job_scheduler_task.value() 2288 else: 2289 while 1: 2290 try: 2291 cmd = input('Enter "quit" or "exit" to terminate scheduler, ' 2292 'anything else to get status: ') 2293 cmd = cmd.strip().lower() 2294 if cmd == 'quit' or cmd == 'exit': 2295 break 2296 else: 2297 Task(scheduler.print_status) 2298 time.sleep(0.2) 2299 except KeyboardInterrupt: 2300 # TODO: terminate even if jobs are scheduled? 2301 logger.info('Interrupted; terminating') 2302 break 2303 except (Exception, KeyboardInterrupt): 2304 logger.debug(traceback.format_exc()) 2305 scheduler.shutdown() 2306 exit(0) 2307