1#!/usr/bin/python3 2 3""" 4This file is part of pycos project. See https://pycos.sourceforge.io for details. 5 6This module provides API for creating distributed communicating 7processes. 'Computation' class should be used to package computation components 8(Python generator functions, Python functions, files, classes, modules) and then 9schedule runs that create remote tasks at remote server processes running 10'dispycosnode.py'. 11 12See 'dispycos_client*.py' files in 'examples' directory for various use cases. 13""" 14 15import os 16import sys 17import inspect 18import hashlib 19import collections 20import time 21import shutil 22import operator 23import functools 24import re 25import copy 26import stat 27 28import pycos 29import pycos.netpycos 30from pycos import Task, SysTask, logger 31 32__author__ = "Giridhar Pemmasani (pgiri@yahoo.com)" 33__copyright__ = "Copyright (c) 2014-2015 Giridhar Pemmasani" 34__license__ = "Apache 2.0" 35__url__ = "https://pycos.sourceforge.io" 36 37__all__ = ['Scheduler', 'Computation', 'DispycosStatus', 'DispycosTaskInfo', 38 'DispycosNodeInfo', 'DispycosNodeAvailInfo', 'DispycosNodeAllocate'] 39 40MsgTimeout = pycos.MsgTimeout 41MinPulseInterval = 10 42MaxPulseInterval = 10 * MinPulseInterval 43 44# status about nodes / servers are sent with this structure 45DispycosStatus = collections.namedtuple('DispycosStatus', ['status', 'info']) 46DispycosTaskInfo = collections.namedtuple('DispycosTaskInfo', ['task', 'args', 'kwargs', 47 'start_time']) 48DispycosNodeInfo = collections.namedtuple('DispycosNodeInfo', ['name', 'addr', 'cpus', 'platform', 49 'avail_info']) 50logger.name = 'dispycos' 51# PyPI / pip packaging adjusts assertion below for Python 3.7+ 52assert sys.version_info.major == 3 and sys.version_info.minor >= 7, \ 53 ('"%s" is not suitable for Python version %s.%s; use file installed by pip instead' % 54 (__file__, sys.version_info.major, sys.version_info.minor)) 55 56 57class DispycosNodeAvailInfo(object): 58 """Node availability status is indicated with this class. 'cpu' is 59 available CPU in percent in the range 0 to 100. 0 indicates node is busy 60 executing tasks on all CPUs and 100 indicates node is not busy at all. 61 """ 62 63 def __init__(self, location, cpu, memory, disk, swap): 64 self.location = location 65 self.cpu = cpu 66 self.memory = memory 67 self.disk = disk 68 self.swap = swap 69 70 71class DispycosNodeAllocate(object): 72 """Allocation of nodes can be customized by specifying 'nodes' of Computation 73 with DispycosNodeAllocate instances. 74 75 'node' must be hostname or IP address (with possibly '*' to match rest of IP 76 address), 'port must be TCP port used by node (only if 'node' doesn't have 77 '*'), 'cpus', if given, must be number of servers running on that node if 78 positive, or number of CPUs not to use if negative, 'memory' is minimum 79 available memory in bytes, 'disk' is minimum available disk space (on the 80 partition where dispycosnode servers are running from), and 'platform' is 81 regular expression to match output of"platform.platform()" on that node, 82 e.g., "Linux.*x86_64" to accept only nodes that run 64-bit Linux. 83 """ 84 85 def __init__(self, node, port=None, platform='', cpus=None, memory=None, disk=None): 86 if node.find('*') < 0: 87 self.ip_rex = pycos.Pycos.host_ipaddr(node) 88 else: 89 self.ip_rex = node 90 91 if self.ip_rex: 92 self.ip_rex = self.ip_rex.replace('.', '\\.').replace('*', '.*') 93 else: 94 logger.warning('node "%s" is invalid', node) 95 self.ip_rex = '' 96 97 self.port = port 98 self.platform = platform.lower() 99 self.cpus = cpus 100 self.memory = memory 101 self.disk = disk 102 103 def allocate(self, ip_addr, name, platform, cpus, memory, disk): 104 """When a node is found, scheduler calls this method with IP address, name, 105 CPUs, memory and disk available on that node. This method should return 106 a number indicating number of CPUs to use. If return value is 0, the 107 node is not used; if the return value is < 0, this allocation is ignored 108 (next allocation in the 'nodes' list, if any, is applied). 109 """ 110 if (self.platform and not re.search(self.platform, platform)): 111 return -1 112 if ((self.memory and memory and self.memory > memory) or 113 (self.disk and disk and self.disk > disk)): 114 return 0 115 if not isinstance(self.cpus, int): 116 return cpus 117 if self.cpus == 0: 118 return 0 119 elif self.cpus > 0: 120 if self.cpus > cpus: 121 return 0 122 return self.cpus 123 else: 124 cpus += self.cpus 125 if cpus < 0: 126 return 0 127 return cpus 128 129 def __getstate__(self): 130 state = {} 131 for attr in ['ip_rex', 'port', 'platform', 'cpus', 'memory', 'disk']: 132 state[attr] = getattr(self, attr) 133 return state 134 135 136class Computation(object): 137 """Packages components to distribute to remote pycos schedulers to create 138 (remote) tasks. 139 """ 140 141 def __init__(self, components, pulse_interval=(5*MinPulseInterval), node_allocations=[], 142 nodes=[], status_task=None, node_setup=None, server_setup=None, 143 disable_nodes=False, disable_servers=False, peers_communicate=False, 144 zombie_period=None): 145 """'components' should be a list, each element of which is either a 146 module, a (generator or normal) function, path name of a file, a class 147 or an object (in which case the code for its class is sent). 148 149 'pulse_interval' is interval (number of seconds) used for heart beat 150 messages to check if client / scheduler / server is alive. If the other 151 side doesn't reply to 5 heart beat messages, it is treated as dead. 152 """ 153 154 if pulse_interval < MinPulseInterval or pulse_interval > MaxPulseInterval: 155 raise Exception('"pulse_interval" must be at least %s and at most %s' % 156 (MinPulseInterval, MaxPulseInterval)) 157 if not isinstance(nodes, list): 158 raise Exception('"nodes" must be list of strings or DispycosNodeAllocate instances') 159 if node_allocations: 160 logger.warning(' WARNING: "node_allocations" is deprecated; use "nodes" instead') 161 if not isinstance(node_allocations, list): 162 raise Exception('"node_allocations" must be list of DispycosNodeAllocate instances') 163 nodes.extend(node_allocations) 164 if any(not isinstance(_, (DispycosNodeAllocate, str)) for _ in nodes): 165 raise Exception('"nodes" must be list of strings or DispycosNodeAllocate instances') 166 if status_task and not isinstance(status_task, Task): 167 raise Exception('status_task must be Task instance') 168 if node_setup and not inspect.isgeneratorfunction(node_setup): 169 raise Exception('"node_setup" must be a task (generator function)') 170 if server_setup and not inspect.isgeneratorfunction(server_setup): 171 raise Exception('"server_setup" must be a task (generator function)') 172 if (disable_nodes or disable_servers) and not status_task: 173 raise Exception('status_task must be given when nodes or servers are disabled') 174 if zombie_period: 175 if zombie_period < 5*pulse_interval: 176 raise Exception('"zombie_period" must be at least 5*pulse_interval') 177 elif zombie_period is None: 178 zombie_period = 10*pulse_interval 179 180 if not isinstance(components, list): 181 components = [components] 182 183 self._code = '' 184 self._xfer_funcs = set() 185 self._xfer_files = [] 186 self._auth = None 187 self.scheduler = None 188 self._pulse_task = None 189 if zombie_period: 190 self._pulse_interval = min(pulse_interval, zombie_period / 3) 191 else: 192 self._pulse_interval = pulse_interval 193 self._zombie_period = zombie_period 194 if nodes: 195 self._node_allocations = [node if isinstance(node, DispycosNodeAllocate) 196 else DispycosNodeAllocate(node) for node in nodes] 197 self._node_allocations.append(DispycosNodeAllocate('*', cpus=0)) 198 else: 199 self._node_allocations = [DispycosNodeAllocate('*')] 200 self.status_task = status_task 201 if node_setup: 202 components.append(node_setup) 203 self._node_setup = node_setup.__name__ 204 else: 205 self._node_setup = None 206 if server_setup: 207 components.append(server_setup) 208 self._server_setup = server_setup.__name__ 209 else: 210 self._server_setup = None 211 self._peers_communicate = bool(peers_communicate) 212 self._disable_nodes = bool(disable_nodes) 213 self._disable_servers = bool(disable_servers) 214 215 depends = set() 216 cwd = os.getcwd() 217 for dep in components: 218 if isinstance(dep, str) or inspect.ismodule(dep): 219 if inspect.ismodule(dep): 220 name = dep.__file__ 221 if name.endswith('.pyc'): 222 name = name[:-1] 223 if not name.endswith('.py'): 224 raise Exception('Invalid module "%s" - must be python source.' % dep) 225 if name.startswith(cwd): 226 dst = os.path.dirname(name[len(cwd):].lstrip(os.sep)) 227 elif dep.__package__: 228 dst = dep.__package__.replace('.', os.sep) 229 else: 230 dst = os.path.dirname(dep.__name__.replace('.', os.sep)) 231 else: 232 name = os.path.abspath(dep) 233 if name.startswith(cwd): 234 dst = os.path.dirname(name[len(cwd):].lstrip(os.sep)) 235 else: 236 dst = '.' 237 if name in depends: 238 continue 239 try: 240 with open(name, 'rb') as fd: 241 pass 242 except Exception: 243 raise Exception('File "%s" is not valid' % name) 244 self._xfer_files.append((name, dst, os.sep)) 245 depends.add(name) 246 elif (inspect.isgeneratorfunction(dep) or inspect.isfunction(dep) or 247 inspect.isclass(dep) or hasattr(dep, '__class__')): 248 if inspect.isgeneratorfunction(dep) or inspect.isfunction(dep): 249 name = dep.__name__ 250 elif inspect.isclass(dep): 251 name = dep.__name__ 252 elif hasattr(dep, '__class__') and inspect.isclass(dep.__class__): 253 dep = dep.__class__ 254 name = dep.__name__ 255 if name in depends: 256 continue 257 depends.add(name) 258 self._xfer_funcs.add(name) 259 self._code += '\n' + inspect.getsource(dep).lstrip() 260 else: 261 raise Exception('Invalid computation: %s' % dep) 262 # check code can be compiled 263 compile(self._code, '<string>', 'exec') 264 # Under Windows dispycos server may send objects with '__mp_main__' 265 # scope, so make an alias to '__main__'. Do so even if scheduler is not 266 # running on Windows; it is possible the client is not Windows, but a 267 # node is. 268 if os.name == 'nt' and '__mp_main__' not in sys.modules: 269 sys.modules['__mp_main__'] = sys.modules['__main__'] 270 271 def schedule(self, location=None, timeout=None): 272 """Schedule computation for execution. Must be used with 'yield' as 273 'result = yield compute.schedule()'. If scheduler is executing other 274 computations, this will block until scheduler processes them 275 (computations are processed in the order submitted). 276 """ 277 278 if self._auth is not None: 279 return(0) 280 self._auth = '' 281 if self.status_task is not None and not isinstance(self.status_task, Task): 282 return(-1) 283 284 self.scheduler = yield SysTask.locate('dispycos_scheduler', location=location, 285 timeout=MsgTimeout) 286 if not isinstance(self.scheduler, Task): 287 return(-1) 288 289 def _schedule(self, task=None): 290 self._pulse_task = SysTask(self._pulse_proc) 291 msg = {'req': 'schedule', 'computation': pycos.serialize(self), 'client': task} 292 self.scheduler.send(msg) 293 self._auth = yield task.receive(timeout=MsgTimeout) 294 if not isinstance(self._auth, str): 295 logger.debug('Could not send computation to scheduler %s: %s', 296 self.scheduler, self._auth) 297 return(-1) 298 SysTask.scheduler().atexit(10, lambda: SysTask(self.close)) 299 if task.location != self.scheduler.location: 300 for xf, dst, sep in self._xfer_files: 301 drive, xf = os.path.splitdrive(xf) 302 if xf.startswith(sep): 303 xf = os.path.join(os.sep, *(xf.split(sep))) 304 else: 305 xf = os.path.join(*(xf.split(sep))) 306 xf = drive + xf 307 dst = os.path.join(self._auth, os.path.join(*(dst.split(sep)))) 308 if (yield pycos.Pycos.instance().send_file( 309 self.scheduler.location, xf, dir=dst, timeout=MsgTimeout)) < 0: 310 logger.warning('Could not send file "%s" to scheduler', xf) 311 yield self.close() 312 return(-1) 313 msg = {'req': 'await', 'auth': self._auth, 'client': task} 314 self.scheduler.send(msg) 315 resp = yield task.receive(timeout=timeout) 316 if (isinstance(resp, dict) and resp.get('auth') == self._auth and 317 resp.get('resp') == 'scheduled'): 318 return(0) 319 else: 320 yield self.close() 321 return(-1) 322 323 yield Task(_schedule, self).finish() 324 325 def nodes(self): 326 """Get list of addresses of nodes initialized for this computation. Must 327 be used with 'yield' as 'yield compute.nodes()'. 328 """ 329 330 def _nodes(self, task=None): 331 msg = {'req': 'nodes', 'auth': self._auth, 'client': task} 332 if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1: 333 yield task.receive(MsgTimeout) 334 else: 335 return([]) 336 337 yield Task(_nodes, self).finish() 338 339 def servers(self): 340 """Get list of Location instances of servers initialized for this 341 computation. Must be used with 'yield' as 'yield compute.servers()'. 342 """ 343 344 def _servers(self, task=None): 345 msg = {'req': 'servers', 'auth': self._auth, 'client': task} 346 if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1: 347 yield task.receive(MsgTimeout) 348 else: 349 return([]) 350 351 yield Task(_servers, self).finish() 352 353 def close(self, await_async=False, timeout=None): 354 """Close computation. Must be used with 'yield' as 'yield 355 compute.close()'. 356 """ 357 358 def _close(self, done, task=None): 359 msg = {'req': 'close_computation', 'auth': self._auth, 'client': task, 360 'await_async': bool(await_async)} 361 self.scheduler.send(msg) 362 msg = yield task.receive(timeout=timeout) 363 if msg != 'closed': 364 logger.warning('%s: closing computation failed?', self._auth) 365 self._auth = None 366 if self._pulse_task: 367 yield self._pulse_task.send('quit') 368 self._pulse_task = None 369 done.set() 370 371 if self._auth: 372 done = pycos.Event() 373 SysTask(_close, self, done) 374 yield done.wait() 375 376 def run_at(self, where, gen, *args, **kwargs): 377 """Must be used with 'yield' as 378 379 'rtask = yield computation.run_at(where, gen, ...)' 380 381 Run given generator function 'gen' with arguments 'args' and 'kwargs' at 382 remote server 'where'. If the request is successful, 'rtask' will be a 383 (remote) task; check result with 'isinstance(rtask, 384 pycos.Task)'. The generator is expected to be (mostly) CPU bound and 385 until this is finished, another CPU bound task will not be 386 submitted at same server. 387 388 If 'where' is a string, it is assumed to be IP address of a node, in 389 which case the task is scheduled at that node on a server at that 390 node. If 'where' is a Location instance, it is assumed to be server 391 location in which case the task is scheduled at that server. 392 393 'gen' must be generator function, as it is used to run task at 394 remote location. 395 """ 396 yield self._run_request('run_async', where, 1, gen, *args, **kwargs) 397 398 def run(self, gen, *args, **kwargs): 399 """Run CPU bound task at any remote server; see 'run_at' 400 above. 401 """ 402 yield self._run_request('run_async', None, 1, gen, *args, **kwargs) 403 404 def run_result_at(self, where, gen, *args, **kwargs): 405 """Must be used with 'yield' as 406 407 'rtask = yield computation.run_result_at(where, gen, ...)' 408 409 Whereas 'run_at' and 'run' return remote task instance, 410 'run_result_at' and 'run_result' wait until remote task is 411 finished and return the result of that remote task (i.e., either 412 the value of 'StopIteration' or the last value 'yield'ed). 413 414 'where', 'gen', 'args', 'kwargs' are as explained in 'run_at'. 415 """ 416 yield self._run_request('run_result', where, 1, gen, *args, **kwargs) 417 418 def run_result(self, gen, *args, **kwargs): 419 """Run CPU bound task at any remote server and return result of 420 that task; see 'run_result_at' above. 421 """ 422 yield self._run_request('run_result', None, 1, gen, *args, **kwargs) 423 424 def run_async_at(self, where, gen, *args, **kwargs): 425 """Must be used with 'yield' as 426 427 'rtask = yield computation.run_async_at(where, gen, ...)' 428 429 Run given generator function 'gen' with arguments 'args' and 'kwargs' at 430 remote server 'where'. If the request is successful, 'rtask' will be a 431 (remote) task; check result with 'isinstance(rtask, 432 pycos.Task)'. The generator is supposed to be (mostly) I/O bound and 433 not consume CPU time. Unlike other 'run' variants, tasks created 434 with 'async' are not "tracked" by scheduler (see online documentation for 435 more details). 436 437 If 'where' is a string, it is assumed to be IP address of a node, in 438 which case the task is scheduled at that node on a server at that 439 node. If 'where' is a Location instance, it is assumed to be server 440 location in which case the task is scheduled at that server. 441 442 'gen' must be generator function, as it is used to run task at 443 remote location. 444 """ 445 yield self._run_request('run_async', where, 0, gen, *args, **kwargs) 446 447 def run_async(self, gen, *args, **kwargs): 448 """Run I/O bound task at any server; see 'run_async_at' 449 above. 450 """ 451 yield self._run_request('run_async', None, 0, gen, *args, **kwargs) 452 453 def run_results(self, gen, iter): 454 """Must be used with 'yield', as for example, 455 'results = yield scheduler.map_results(generator, list_of_tuples)'. 456 457 Execute generator 'gen' with arguments from given iterable. The return 458 value is list of results that correspond to executing 'gen' with 459 arguments in iterable in the same order. 460 """ 461 tasks = [] 462 append_task = tasks.append 463 for params in iter: 464 if not isinstance(params, tuple): 465 if hasattr(params, '__iter__'): 466 params = tuple(params) 467 else: 468 params = (params,) 469 append_task(Task(self.run_result, gen, *params)) 470 results = [None] * len(tasks) 471 for i, task in enumerate(tasks): 472 results[i] = yield task.finish() 473 return(results) 474 475 def enable_node(self, ip_addr, *setup_args): 476 """If computation disabled nodes (with 'disabled_nodes=True' when 477 Computation is constructed), nodes are not automatically used by the 478 scheduler until nodes are enabled with 'enable_node'. 479 480 'ip_addr' must be either IP address or host name of the node to be 481 enabled. 482 483 'setup_args' is arguments passed to 'node_setup' function specific to 484 that node. If 'node_setup' succeeds (i.e., finishes with value 0), the 485 node is used for computations. 486 """ 487 if self.scheduler: 488 if isinstance(ip_addr, pycos.Location): 489 ip_addr = ip_addr.addr 490 self.scheduler.send({'req': 'enable_node', 'auth': self._auth, 'addr': ip_addr, 491 'setup_args': setup_args}) 492 493 def enable_server(self, location, *setup_args): 494 """If computation disabled servers (with 'disabled_servers=True' when 495 Computation is constructed), servers are not automatically used by the 496 scheduler until they are enabled with 'enable_server'. 497 498 'location' must be Location instance of the server to be enabled. 499 500 'setup_args' is arguments passed to 'server_setup' function specific to 501 that server. If 'server_setup' succeeds (i.e., finishes with value 0), the 502 server is used for computations. 503 """ 504 if self.scheduler: 505 self.scheduler.send({'req': 'enable_server', 'auth': self._auth, 'server': location, 506 'setup_args': setup_args}) 507 508 def suspend_node(self, location): 509 """Suspend submitting jobs (tasks) at this node. Any currently running 510 tasks are left running. 511 """ 512 if self.scheduler: 513 self.scheduler.send({'req': 'suspend_node', 'auth': self._auth, 'server': location}) 514 515 def resume_node(self, location): 516 """Resume submitting jobs (tasks) at this node. 517 """ 518 if self.scheduler: 519 self.scheduler.send({'req': 'enable_node', 'auth': self._auth, 'server': location}) 520 521 def suspend_server(self, location): 522 """Suspend submitting jobs (tasks) at this server. Any currently running 523 tasks are left running. 524 """ 525 if self.scheduler: 526 self.scheduler.send({'req': 'suspend_server', 'auth': self._auth, 'server': location}) 527 528 def resume_server(self, location): 529 """Resume submitting jobs (tasks) at this server. 530 """ 531 if self.scheduler: 532 self.scheduler.send({'req': 'enable_server', 'auth': self._auth, 'server': location}) 533 534 def node_allocate(self, node_allocate): 535 """Request scheduler to add 'node_allocate' to any previously sent 536 'node_allocations'. 537 """ 538 if not isinstance(node_allocate, DispycosNodeAllocate): 539 return -1 540 if not self._pulse_task: 541 return -1 542 if (node_allocate.__class__ != DispycosNodeAllocate and 543 self._pulse_task.location != self.scheduler.location): 544 node_allocate = copy.copy(node_allocate) 545 node_allocate.__class__ = DispycosNodeAllocate 546 return self.scheduler.send({'req': 'node_allocate', 'auth': self._auth, 547 'node': node_allocate}) 548 549 def _run_request(self, request, where, cpu, gen, *args, **kwargs): 550 """Internal use only. 551 """ 552 if isinstance(gen, str): 553 name = gen 554 else: 555 name = gen.__name__ 556 557 if name in self._xfer_funcs: 558 code = None 559 else: 560 # if not inspect.isgeneratorfunction(gen): 561 # logger.warning('"%s" is not a valid generator function', name) 562 # return([]) 563 code = inspect.getsource(gen).lstrip() 564 565 def _run_req(task=None): 566 msg = {'req': 'job', 'auth': self._auth, 567 'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)} 568 if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1: 569 reply = yield task.receive() 570 if isinstance(reply, Task): 571 if self.status_task: 572 msg = DispycosTaskInfo(reply, args, kwargs, time.time()) 573 self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg)) 574 if not request.endswith('async'): 575 reply = yield task.receive() 576 else: 577 reply = None 578 return(reply) 579 580 yield Task(_run_req).finish() 581 582 def _pulse_proc(self, task=None): 583 """For internal use only. 584 """ 585 task.set_daemon() 586 last_pulse = time.time() 587 timeout = 2 * self._pulse_interval 588 while 1: 589 msg = yield task.receive(timeout=timeout) 590 591 if msg == 'pulse': 592 last_pulse = time.time() 593 594 elif isinstance(msg, dict): 595 if msg.get('auth', None) != self._auth: 596 continue 597 if msg.get('req', None) == 'allocate': 598 reply = msg.get('reply', None) 599 args = msg.get('args', ()) 600 if not isinstance(reply, Task) or not args: 601 logger.warning('Ignoring allocate request: %s', type(reply)) 602 continue 603 ip_addr = args[0] 604 try: 605 node_allocation = self._node_allocations[int(msg['alloc_id'])] 606 assert re.match(node_allocation.ip_rex, ip_addr) 607 cpus = node_allocation.allocate(*args) 608 except Exception: 609 cpus = 0 610 reply.send({'auth': self._auth, 'req': 'allocate', 611 'ip_addr': ip_addr, 'cpus': cpus}) 612 613 elif msg == 'quit': 614 break 615 616 elif msg is None: 617 logger.warning('scheduler may have gone away!') 618 if (time.time() - last_pulse) > (10 * self._pulse_interval): 619 # TODO: inform status and / or "close"? 620 pass 621 622 else: 623 logger.debug('ignoring invalid pulse message') 624 625 self._pulse_task = None 626 627 def __getstate__(self): 628 state = {} 629 for attr in ['_code', '_xfer_funcs', '_xfer_files', '_auth', 'scheduler', 'status_task', 630 '_pulse_interval', '_pulse_task', '_node_setup', '_server_setup', 631 '_disable_nodes', '_disable_servers', '_peers_communicate', '_zombie_period']: 632 state[attr] = getattr(self, attr) 633 if self._pulse_task.location == self.scheduler.location: 634 node_allocations = self._node_allocations 635 else: 636 node_allocations = [] 637 for i in range(len(self._node_allocations)): 638 obj = self._node_allocations[i] 639 if obj.__class__ != DispycosNodeAllocate: 640 ip_rex = obj.ip_rex 641 obj = DispycosNodeAllocate('*', port=obj.port) 642 obj.ip_rex = ip_rex 643 obj.cpus = str(i) 644 node_allocations.append(obj) 645 state['_node_allocations'] = node_allocations 646 return state 647 648 def __setstate__(self, state): 649 for attr, value in state.items(): 650 setattr(self, attr, value) 651 652 653class _DispycosJob_(object): 654 """Internal use only. 655 """ 656 __slots__ = ('request', 'client', 'name', 'where', 'cpu', 'code', 'args', 'kwargs', 'done') 657 658 def __init__(self, request, client, name, where, cpu, code, args=None, kwargs=None): 659 self.request = request 660 self.client = client 661 self.name = name 662 self.where = where 663 self.cpu = cpu 664 self.code = code 665 self.args = pycos.serialize(args) 666 self.kwargs = pycos.serialize(kwargs) 667 self.done = None 668 669 670class Scheduler(object, metaclass=pycos.Singleton): 671 672 # status indications ('status' attribute of DispycosStatus) 673 NodeDiscovered = 1 674 NodeInitialized = 2 675 NodeClosed = 3 676 NodeIgnore = 4 677 NodeDisconnected = 5 678 NodeAbandoned = 6 679 NodeSuspended = 7 680 NodeResumed = 8 681 682 ServerDiscovered = 11 683 ServerInitialized = 12 684 ServerClosed = 13 685 ServerIgnore = 14 686 ServerDisconnected = 15 687 ServerAbandoned = 16 688 ServerSuspended = 17 689 ServerResumed = 18 690 691 TaskCreated = 20 692 TaskAbandoned = 21 693 ComputationScheduled = 23 694 ComputationClosed = 25 695 696 """This class is for use by Computation class (see below) only. Other than 697 the status indications above, none of its attributes are to be accessed 698 directly. 699 """ 700 701 class _Node(object): 702 703 def __init__(self, name, addr): 704 self.name = name 705 self.addr = addr 706 self.cpus_used = 0 707 self.cpus = 0 708 self.platform = None 709 self.avail_info = None 710 self.servers = {} 711 self.disabled_servers = {} 712 self.load = 0.0 713 self.status = Scheduler.NodeClosed 714 self.task = None 715 self.last_pulse = time.time() 716 self.lock = pycos.Lock() 717 self.cpu_avail = pycos.Event() 718 self.cpu_avail.clear() 719 720 class _Server(object): 721 722 def __init__(self, name, location, scheduler): 723 self.name = name 724 self.task = None 725 self.status = Scheduler.ServerClosed 726 self.rtasks = {} 727 self.xfer_files = [] 728 self.askew_results = {} 729 self.cpu_avail = pycos.Event() 730 self.cpu_avail.clear() 731 self.scheduler = scheduler 732 733 def run(self, job, computation, node): 734 def _run(self, task=None): 735 self.task.send({'req': 'run', 'auth': computation._auth, 'job': job, 'client': task}) 736 rtask = yield task.receive(timeout=MsgTimeout) 737 # currently fault-tolerancy is not supported, so clear job's 738 # args to save space 739 job.args = job.kwargs = None 740 if isinstance(rtask, Task): 741 # TODO: keep func too for fault-tolerance 742 job.done = pycos.Event() 743 self.rtasks[rtask] = (rtask, job) 744 if self.askew_results: 745 msg = self.askew_results.pop(rtask, None) 746 if msg: 747 self.scheduler.__status_task.send(msg) 748 else: 749 logger.debug('failed to create rtask: %s', rtask) 750 if job.cpu: 751 self.cpu_avail.set() 752 if self.status == Scheduler.ServerInitialized: 753 node.cpu_avail.set() 754 self.scheduler._cpu_nodes.add(node) 755 self.scheduler._cpus_avail.set() 756 node.cpus_used -= 1 757 node.load = float(node.cpus_used) / len(node.servers) 758 return(rtask) 759 760 rtask = yield SysTask(_run, self).finish() 761 job.client.send(rtask) 762 763 def __init__(self, **kwargs): 764 self._nodes = {} 765 self._disabled_nodes = {} 766 self._cpu_nodes = set() 767 self._cpus_avail = pycos.Event() 768 self._cpus_avail.clear() 769 self._remote = False 770 771 self._cur_computation = None 772 self.__cur_client_auth = None 773 self.__cur_node_allocations = [] 774 self.__pulse_interval = kwargs.pop('pulse_interval', MaxPulseInterval) 775 self.__ping_interval = kwargs.pop('ping_interval', 0) 776 self.__zombie_period = kwargs.pop('zombie_period', 100 * MaxPulseInterval) 777 self._node_port = kwargs.pop('dispycosnode_port', 9706) 778 self.__server_locations = set() 779 780 kwargs['name'] = 'dispycos_scheduler' 781 clean = kwargs.pop('clean', False) 782 nodes = kwargs.pop('nodes', []) 783 relay_nodes = kwargs.pop('relay_nodes', False) 784 self.pycos = pycos.Pycos.instance(**kwargs) 785 self.__dest_path = os.path.join(self.pycos.dest_path, 'dispycos', 'scheduler') 786 if clean: 787 shutil.rmtree(self.__dest_path) 788 self.pycos.dest_path = self.__dest_path 789 os.chmod(self.__dest_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) 790 791 self.__computation_sched_event = pycos.Event() 792 self.__computation_scheduler_task = SysTask(self.__computation_scheduler_proc) 793 self.__status_task = SysTask(self.__status_proc) 794 self.__timer_task = SysTask(self.__timer_proc) 795 self.__client_task = SysTask(self.__client_proc) 796 self.__client_task.register('dispycos_scheduler') 797 for node in nodes: 798 if not isinstance(node, pycos.Location): 799 node = pycos.Location(node, self._node_port) 800 Task(self.pycos.peer, node, relay=relay_nodes) 801 802 def status(self): 803 pending_cpu = sum(node.cpus_used for node in self._nodes.values()) 804 pending = sum(len(server.rtasks) for node in self._nodes.values() 805 for server in node.servers.values()) 806 servers = functools.reduce(operator.add, [list(node.servers.keys()) 807 for node in self._nodes.values()], []) 808 return {'Client': self._cur_computation._pulse_task.location if self._cur_computation else '', 809 'Pending': pending, 'PendingCPU': pending_cpu, 810 'Nodes': list(self._nodes.keys()), 'Servers': servers 811 } 812 813 def print_status(self): 814 status = self.status() 815 print('') 816 print(' Client: %s' % status['Client']) 817 print(' Pending: %s' % status['Pending']) 818 print(' Pending CPU: %s' % status['PendingCPU']) 819 print(' nodes: %s' % len(status['Nodes'])) 820 print(' servers: %s' % len(status['Servers'])) 821 822 def __status_proc(self, task=None): 823 task.set_daemon() 824 task.register('dispycos_status') 825 self.pycos.peer_status(self.__status_task) 826 while 1: 827 msg = yield task.receive() 828 now = time.time() 829 if isinstance(msg, pycos.MonitorException): 830 rtask = msg.args[0] 831 if not isinstance(rtask, Task): 832 logger.warning('ignoring invalid rtask %s', type(rtask)) 833 continue 834 node = self._nodes.get(rtask.location.addr, None) 835 if not node: 836 node = self._disabled_nodes.get(rtask.location.addr, None) 837 if not node: 838 logger.warning('node %s is invalid', rtask.location.addr) 839 continue 840 server = node.servers.get(rtask.location, None) 841 if not server: 842 server = node.disabled_servers.get(rtask.location, None) 843 if not server: 844 logger.warning('server "%s" is invalid', rtask.location) 845 continue 846 node.last_pulse = now 847 info = server.rtasks.pop(rtask, None) 848 if not info: 849 # Due to 'yield' used to create rtask, scheduler may not 850 # have updated self._rtasks before the task's 851 # MonitorException is received, so put it in 852 # 'askew_results'. The scheduling task will resend it 853 # when it receives rtask 854 server.askew_results[rtask] = msg 855 continue 856 # assert isinstance(info[1], _DispycosJob_) 857 job = info[1] 858 if job.cpu: 859 server.cpu_avail.set() 860 if server.status == Scheduler.ServerInitialized: 861 node.cpu_avail.set() 862 self._cpu_nodes.add(node) 863 self._cpus_avail.set() 864 node.cpus_used -= 1 865 node.load = float(node.cpus_used) / len(node.servers) 866 if job.request.endswith('async'): 867 if job.done: 868 job.done.set() 869 else: 870 job.client.send(msg.args[1][1]) 871 if self._cur_computation and self._cur_computation.status_task: 872 if len(msg.args) > 2: 873 msg.args = (msg.args[0], msg.args[1]) 874 self._cur_computation.status_task.send(msg) 875 876 elif isinstance(msg, pycos.PeerStatus): 877 if msg.status == pycos.PeerStatus.Online: 878 if msg.name.endswith('_server-0'): 879 SysTask(self.__discover_node, msg) 880 else: 881 # msg.status == pycos.PeerStatus.Offline 882 node = server = None 883 node = self._nodes.get(msg.location.addr, None) 884 if not node: 885 node = self._disabled_nodes.get(msg.location.addr, None) 886 if node: 887 server = node.servers.pop(msg.location, None) 888 if server: 889 if node.servers: 890 node.disabled_servers[msg.location] = server 891 else: 892 server = node.disabled_servers.get(msg.location, None) 893 894 if server: 895 server.status = Scheduler.ServerDisconnected 896 SysTask(self.__close_server, server, node) 897 elif node.task and node.task.location == msg.location: 898 # TODO: inform scheduler / client 899 if not self._nodes.pop(node.addr, None): 900 self._disabled_nodes.get(node.addr, None) 901 node.status = Scheduler.NodeDisconnected 902 SysTask(self.__close_node, node) 903 904 if ((not server and not node) and self._remote and self._cur_computation and 905 self._cur_computation._pulse_task.location == msg.location): 906 logger.warning('Client %s terminated; closing computation %s', 907 msg.location, self.__cur_client_auth) 908 SysTask(self.__close_computation) 909 910 elif isinstance(msg, dict): # message from a node's server 911 status = msg.get('status', None) 912 if status == 'pulse': 913 location = msg.get('location', None) 914 if not isinstance(location, pycos.Location): 915 continue 916 node = self._nodes.get(location.addr, None) 917 if node: 918 node.last_pulse = now 919 node_status = msg.get('node_status', None) 920 if (node_status and self._cur_computation and 921 self._cur_computation.status_task): 922 self._cur_computation.status_task.send(node_status) 923 924 elif status == Scheduler.ServerDiscovered: 925 rtask = msg.get('task', None) 926 if not isinstance(rtask, pycos.Task): 927 continue 928 if (not self._cur_computation or 929 self._cur_computation._auth != msg.get('auth', None)): 930 continue 931 node = self._nodes.get(rtask.location.addr, None) 932 if not node: 933 node = self._disabled_nodes.get(rtask.location.addr, None) 934 if not node or (node.status != Scheduler.NodeInitialized and 935 node.status != Scheduler.NodeDiscovered and 936 node.status != Scheduler.NodeSuspended): 937 if node: 938 logger.warning('Node at %s is not initialized for server %s: %s', 939 node.addr, rtask.location, node.status) 940 else: 941 logger.warning('Node is not valid for server %s', rtask.location) 942 continue 943 if (not self._cur_computation or 944 self._cur_computation._auth != msg.get('auth', None)): 945 logger.warning('Status %s for server %s is ignored', status, rtask.location) 946 continue 947 server = node.servers.get(rtask.location, None) 948 if server: 949 continue 950 server = Scheduler._Server(msg.get('name', None), rtask.location, self) 951 server.task = rtask 952 server.status = Scheduler.ServerDiscovered 953 node.disabled_servers[rtask.location] = server 954 if self._cur_computation and self._cur_computation.status_task: 955 info = DispycosStatus(server.status, server.task.location) 956 self._cur_computation.status_task.send(info) 957 958 elif status == Scheduler.ServerInitialized: 959 rtask = msg.get('task', None) 960 if not isinstance(rtask, pycos.Task): 961 continue 962 if (not self._cur_computation or 963 self._cur_computation._auth != msg.get('auth', None)): 964 continue 965 node = self._nodes.get(rtask.location.addr, None) 966 if not node: 967 node = self._disabled_nodes.get(rtask.location.addr, None) 968 if not node or (node.status != Scheduler.NodeInitialized and 969 node.status != Scheduler.NodeDiscovered and 970 node.status != Scheduler.NodeSuspended): 971 if node: 972 logger.warning('Node at %s is not initialized for server %s: %s', 973 node.addr, rtask.location, node.status) 974 else: 975 logger.warning('Node is not valid for server %s', rtask.location) 976 continue 977 if (not self._cur_computation or 978 self._cur_computation._auth != msg.get('auth', None)): 979 logger.warning('Status %s for server %s is ignored', status, rtask.location) 980 continue 981 server = node.disabled_servers.pop(rtask.location, None) 982 if server: 983 if (server.status != Scheduler.ServerDiscovered and 984 server.status != Scheduler.ServerSuspended): 985 continue 986 else: 987 server = Scheduler._Server(msg.get('name', None), rtask.location, self) 988 server.task = rtask 989 990 node.last_pulse = now 991 server.status = Scheduler.ServerInitialized 992 if node.status == Scheduler.NodeInitialized: 993 if not node.servers: 994 if self._cur_computation and self._cur_computation.status_task: 995 info = DispycosNodeInfo(node.name, node.addr, node.cpus, 996 node.platform, node.avail_info) 997 info = DispycosStatus(node.status, info) 998 self._cur_computation.status_task.send(info) 999 self._disabled_nodes.pop(rtask.location.addr, None) 1000 self._nodes[rtask.location.addr] = node 1001 node.servers[rtask.location] = server 1002 server.cpu_avail.set() 1003 self._cpu_nodes.add(node) 1004 self._cpus_avail.set() 1005 node.cpu_avail.set() 1006 node.load = float(node.cpus_used) / len(node.servers) 1007 if self._cur_computation and self._cur_computation.status_task: 1008 self._cur_computation.status_task.send( 1009 DispycosStatus(server.status, server.task.location)) 1010 else: 1011 node.disabled_servers[rtask.location] = server 1012 1013 if self._cur_computation._peers_communicate: 1014 server.task.send({'req': 'peers', 'auth': self._cur_computation._auth, 1015 'peers': list(self.__server_locations)}) 1016 self.__server_locations.add(server.task.location) 1017 1018 elif status in (Scheduler.ServerClosed, Scheduler.ServerDisconnected): 1019 location = msg.get('location', None) 1020 if not isinstance(location, pycos.Location): 1021 continue 1022 node = self._nodes.get(location.addr, None) 1023 if not node: 1024 node = self._disabled_nodes.get(location.addr, None) 1025 if not node: 1026 continue 1027 if (not self._cur_computation or 1028 self._cur_computation._auth != msg.get('auth', None)): 1029 logger.warning('Status %s for server %s is ignored', status, rtask.location) 1030 continue 1031 server = node.servers.pop(location, None) 1032 if not server: 1033 server = node.disabled_servers.get(location, None) 1034 if not server: 1035 continue 1036 if status == Scheduler.ServerDisconnected: 1037 server.status = status 1038 SysTask(self.__close_server, server, node) 1039 server.status = Scheduler.ServerClosed 1040 node.disabled_servers[location] = server 1041 if not node.servers: 1042 node.status = Scheduler.NodeClosed 1043 self._nodes.pop(node.addr, None) 1044 self._disabled_nodes[location.addr] = node 1045 self._cpu_nodes.discard(node) 1046 if not self._cpu_nodes: 1047 self._cpus_avail.clear() 1048 if self._cur_computation: 1049 if self._cur_computation._peers_communicate: 1050 self.__server_locations.discard(server.task.location) 1051 # TODO: inform other servers 1052 # if not node.servers: 1053 # info = DispycosNodeInfo(node.name, node.addr, node.cpus, 1054 # node.platform, node.avail_info) 1055 # self._cur_computation.status_task.send( 1056 # Scheduler.NodeClosed, DispycosStatus(node.status, info)) 1057 1058 elif status == Scheduler.NodeClosed: 1059 location = msg.get('location', None) 1060 if not isinstance(location, pycos.Location): 1061 continue 1062 node = self._nodes.pop(location.addr, None) 1063 if not node: 1064 node = self._disabled_nodes.get(location.addr, None) 1065 if not node: 1066 continue 1067 if (not self._cur_computation or 1068 self._cur_computation._auth != msg.get('auth', None)): 1069 logger.warning('Status %s for node %s is ignored', status, location) 1070 continue 1071 node.status = Scheduler.NodeDisconnected 1072 SysTask(self.__close_node, node) 1073 1074 else: 1075 logger.warning('Ignoring invalid status message: %s', status) 1076 else: 1077 logger.warning('invalid status message ignored') 1078 1079 def __node_allocate(self, node, task=None): 1080 if not task: 1081 task = pycos.Pycos.cur_task() 1082 for node_allocate in self.__cur_node_allocations: 1083 if not re.match(node_allocate.ip_rex, node.addr): 1084 continue 1085 if self._remote and isinstance(node_allocate.cpus, str): 1086 req = {'req': 'allocate', 'auth': self.__cur_client_auth, 1087 'alloc_id': node_allocate.cpus, 'reply': task, 1088 'args': (node.addr, node.name, node.platform, node.cpus, 1089 node.avail_info.memory, node.avail_info.disk)} 1090 self._cur_computation._pulse_task.send(req) 1091 reply = yield task.recv(timeout=MsgTimeout) 1092 if (isinstance(reply, dict) and reply.get('auth', None) == self.__cur_client_auth and 1093 reply.get('req', None) == 'allocate' and reply.get('ip_addr', '') == node.addr): 1094 cpus = reply.get('cpus', 0) 1095 else: 1096 cpus = 0 1097 else: 1098 cpus = node_allocate.allocate(node.addr, node.name, node.platform, node.cpus, 1099 node.avail_info.memory, node.avail_info.disk) 1100 if cpus < 0: 1101 continue 1102 return(min(cpus, node.cpus)) 1103 return(node.cpus) 1104 1105 def __get_node_info(self, node, task=None): 1106 assert node.addr in self._disabled_nodes 1107 node.task.send({'req': 'dispycos_node_info', 'client': task}) 1108 node_info = yield task.receive(timeout=MsgTimeout) 1109 if not node_info: 1110 node.status = Scheduler.NodeIgnore 1111 return 1112 node.name = node_info.name 1113 node.cpus = node_info.cpus 1114 node.platform = node_info.platform.lower() 1115 node.avail_info = node_info.avail_info 1116 if self._cur_computation: 1117 yield self.__init_node(node, task=task) 1118 1119 def __init_node(self, node, setup_args=(), task=None): 1120 computation = self._cur_computation 1121 if not computation or not node.task: 1122 return(-1) 1123 # this task may be invoked in two different paths (when a node is 1124 # found right after computation is already scheduled, and when 1125 # computation is scheduled right after a node is found). To prevent 1126 # concurrent execution (that may reserve / initialize same node more 1127 # than once), lock is used 1128 yield node.lock.acquire() 1129 # assert node.addr in self._disabled_nodes 1130 if node.status not in (Scheduler.NodeDiscovered, Scheduler.NodeClosed): 1131 logger.warning('Ignoring node initialization for %s: %s', node.addr, node.status) 1132 node.lock.release() 1133 return(0) 1134 1135 if node.status == Scheduler.NodeClosed: 1136 cpus = yield self.__node_allocate(node, task=task) 1137 if not cpus: 1138 node.status = Scheduler.NodeIgnore 1139 node.lock.release() 1140 return(0) 1141 1142 node.task.send({'req': 'reserve', 'cpus': cpus, 'auth': computation._auth, 1143 'status_task': self.__status_task, 'client': task, 1144 'computation_location': computation._pulse_task.location}) 1145 cpus = yield task.receive(timeout=MsgTimeout) 1146 if not isinstance(cpus, int) or cpus <= 0: 1147 logger.debug('Reserving %s failed', node.addr) 1148 self._disabled_nodes.pop(node.addr, None) 1149 # node.status = Scheduler.NodeDiscoverd 1150 node.lock.release() 1151 yield pycos.Pycos.instance().close_peer(node.task.location) 1152 return(-1) 1153 if computation != self._cur_computation: 1154 node.status = Scheduler.NodeClosed 1155 node.task.send({'req': 'release', 'auth': computation._auth, 'client': None}) 1156 node.lock.release() 1157 return(-1) 1158 1159 node.status = Scheduler.NodeDiscovered 1160 node.cpus = cpus 1161 if self._cur_computation and self._cur_computation.status_task: 1162 info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform, 1163 node.avail_info) 1164 self._cur_computation.status_task.send(DispycosStatus(node.status, info)) 1165 1166 if self._cur_computation._disable_nodes: 1167 node.lock.release() 1168 return(0) 1169 else: 1170 assert node.addr in self._disabled_nodes 1171 1172 for name, dst, sep in computation._xfer_files: 1173 reply = yield self.pycos.send_file(node.task.location, name, dir=dst, timeout=MsgTimeout, 1174 overwrite=True) 1175 if reply < 0 or computation != self._cur_computation: 1176 logger.debug('Failed to transfer file %s: %s', name, reply) 1177 node.status = Scheduler.NodeClosed 1178 node.task.send({'req': 'release', 'auth': computation._auth, 'client': None}) 1179 node.lock.release() 1180 return(-1) 1181 1182 node.task.send({'req': 'computation', 'computation': computation, 'auth': computation._auth, 1183 'setup_args': setup_args, 'client': task}) 1184 cpus = yield task.receive(timeout=MsgTimeout) 1185 if not cpus or computation != self._cur_computation: 1186 node.status = Scheduler.NodeClosed 1187 node.task.send({'req': 'release', 'auth': computation._auth, 'client': None}) 1188 node.lock.release() 1189 return(-1) 1190 1191 node.cpus = cpus 1192 node.status = Scheduler.NodeInitialized 1193 node.lock.release() 1194 servers = [server for server in node.disabled_servers.values() 1195 if server.status == Scheduler.ServerInitialized] 1196 if servers: 1197 if computation.status_task: 1198 info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform, 1199 node.avail_info) 1200 computation.status_task.send(DispycosStatus(node.status, info)) 1201 self._disabled_nodes.pop(node.addr, None) 1202 self._nodes[node.addr] = node 1203 node.cpu_avail.set() 1204 self._cpu_nodes.add(node) 1205 self._cpus_avail.set() 1206 for server in servers: 1207 node.disabled_servers.pop(server.task.location) 1208 node.servers[server.task.location] = server 1209 server.cpu_avail.set() 1210 if computation.status_task: 1211 computation.status_task.send(DispycosStatus(server.status, 1212 server.task.location)) 1213 1214 def __discover_node(self, peer_status, task=None): 1215 for _ in range(10): 1216 node_task = yield Task.locate('dispycos_node', location=peer_status.location, 1217 timeout=MsgTimeout) 1218 if not isinstance(node_task, Task): 1219 yield task.sleep(0.1) 1220 continue 1221 node = self._nodes.pop(peer_status.location.addr, None) 1222 if not node: 1223 node = self._disabled_nodes.pop(peer_status.location.addr, None) 1224 if node: 1225 logger.warning('Rediscovered dispycosnode at %s; discarding previous incarnation!', 1226 peer_status.location.addr) 1227 node.status = Scheduler.NodeDisconnected 1228 yield SysTask(self.__close_node, node).finish() 1229 node = Scheduler._Node(peer_status.name, peer_status.location.addr) 1230 self._disabled_nodes[peer_status.location.addr] = node 1231 node.task = node_task 1232 yield self.__get_node_info(node, task=task) 1233 return 1234 1235 def __timer_proc(self, task=None): 1236 task.set_daemon() 1237 node_check = client_pulse = last_ping = time.time() 1238 while 1: 1239 try: 1240 yield task.sleep(self.__pulse_interval) 1241 except GeneratorExit: 1242 break 1243 now = time.time() 1244 if self.__cur_client_auth: 1245 if (yield self._cur_computation._pulse_task.deliver('pulse')) == 1: 1246 client_pulse = now 1247 if self.__zombie_period: 1248 if ((now - client_pulse) > self.__zombie_period): 1249 logger.warning('Closing zombie computation %s', self.__cur_client_auth) 1250 SysTask(self.__close_computation) 1251 1252 if (now - node_check) > self.__zombie_period: 1253 node_check = now 1254 for node in self._nodes.values(): 1255 if (node.status != Scheduler.NodeInitialized and 1256 node.status != Scheduler.NodeDiscovered and 1257 node.status != Scheduler.NodeSuspended): 1258 continue 1259 if (now - node.last_pulse) > self.__zombie_period: 1260 logger.warning('dispycos node %s is zombie!', node.addr) 1261 self._nodes.pop(node.addr, None) 1262 self._disabled_nodes[node.addr] = node 1263 # TODO: assuming servers are zombies as well 1264 node.status = Scheduler.NodeDisconnected 1265 SysTask(self.__close_node, node) 1266 1267 if not self._cur_computation._disable_nodes: 1268 for node in self._disabled_nodes.values(): 1269 if node.task and node.status == Scheduler.NodeDiscovered: 1270 SysTask(self.__init_node, node) 1271 1272 if self.__ping_interval and ((now - last_ping) > self.__ping_interval): 1273 last_ping = now 1274 if not self.pycos.ignore_peers: 1275 self.pycos.discover_peers(port=self._node_port) 1276 1277 def __computation_scheduler_proc(self, task=None): 1278 task.set_daemon() 1279 while 1: 1280 if self._cur_computation: 1281 self.__computation_sched_event.clear() 1282 yield self.__computation_sched_event.wait() 1283 continue 1284 1285 self._cur_computation, client = yield task.receive() 1286 self.__pulse_interval = self._cur_computation._pulse_interval 1287 if not self._remote: 1288 self.__zombie_period = self._cur_computation._zombie_period 1289 1290 self.__cur_client_auth = self._cur_computation._auth 1291 self._cur_computation._auth = hashlib.sha1(os.urandom(20)).hexdigest() 1292 self.__cur_node_allocations = self._cur_computation._node_allocations 1293 self._cur_computation._node_allocations = [] 1294 1295 self._disabled_nodes.update(self._nodes) 1296 self._nodes.clear() 1297 self._cpu_nodes.clear() 1298 self._cpus_avail.clear() 1299 for node in self._disabled_nodes.values(): 1300 node.status = Scheduler.NodeClosed 1301 node.disabled_servers.clear() 1302 node.servers.clear() 1303 node.cpu_avail.clear() 1304 logger.debug('Computation %s / %s scheduled', self.__cur_client_auth, 1305 self._cur_computation._auth) 1306 msg = {'resp': 'scheduled', 'auth': self.__cur_client_auth} 1307 if (yield client.deliver(msg, timeout=MsgTimeout)) != 1: 1308 logger.warning('client not reachable?') 1309 self._cur_client_auth = None 1310 self._cur_computation = None 1311 continue 1312 for node in self.__cur_node_allocations: 1313 if node.ip_rex.find('*') >= 0: 1314 continue 1315 loc = pycos.Location(node.ip_rex.replace('\\.', '.'), 1316 node.port if node.port else self._node_port) 1317 SysTask(self.pycos.peer, loc) 1318 for node in self._disabled_nodes.values(): 1319 SysTask(self.__get_node_info, node) 1320 if not self.pycos.ignore_peers: 1321 self.pycos.discover_peers(port=self._node_port) 1322 self.__timer_task.resume() 1323 self.__computation_scheduler_task = None 1324 1325 def __submit_job(self, msg, task=None): 1326 task.set_daemon() 1327 job = msg['job'] 1328 auth = msg.get('auth', None) 1329 if (not isinstance(job, _DispycosJob_) or not isinstance(job.client, Task)): 1330 logger.warning('Ignoring invalid client job request: %s' % type(job)) 1331 return 1332 cpu = job.cpu 1333 where = job.where 1334 if not where: 1335 while 1: 1336 node = None 1337 load = None 1338 if cpu: 1339 for host in self._cpu_nodes: 1340 if host.cpu_avail.is_set() and (load is None or host.load < load): 1341 node = host 1342 load = host.load 1343 else: 1344 for host in self._nodes.values(): 1345 if load is None or host.load < load: 1346 node = host 1347 load = host.load 1348 if not node: 1349 self._cpus_avail.clear() 1350 yield self._cpus_avail.wait() 1351 if self.__cur_client_auth != auth: 1352 return 1353 continue 1354 server = None 1355 load = None 1356 for proc in node.servers.values(): 1357 if cpu: 1358 if proc.cpu_avail.is_set() and (load is None or len(proc.rtasks) < load): 1359 server = proc 1360 load = len(proc.rtasks) 1361 elif (load is None or len(proc.rtasks) < load): 1362 server = proc 1363 load = len(proc.rtasks) 1364 if server: 1365 break 1366 else: 1367 self._cpus_avail.clear() 1368 yield self._cpus_avail.wait() 1369 if self.__cur_client_auth != auth: 1370 return 1371 continue 1372 1373 if cpu: 1374 server.cpu_avail.clear() 1375 node.cpus_used += 1 1376 node.load = float(node.cpus_used) / len(node.servers) 1377 if node.cpus_used == len(node.servers): 1378 node.cpu_avail.clear() 1379 self._cpu_nodes.discard(node) 1380 if not self._cpu_nodes: 1381 self._cpus_avail.clear() 1382 yield server.run(job, self._cur_computation, node) 1383 1384 elif isinstance(where, str): 1385 node = self._nodes.get(where, None) 1386 if not node: 1387 job.client.send(None) 1388 return 1389 while 1: 1390 server = None 1391 load = None 1392 for proc in node.servers.values(): 1393 if cpu: 1394 if proc.cpu_avail.is_set() and (load is None or len(proc.rtasks) < load): 1395 server = proc 1396 load = len(proc.rtasks) 1397 elif (load is None or len(proc.rtasks) < load): 1398 server = proc 1399 load = len(proc.rtasks) 1400 1401 if server: 1402 break 1403 else: 1404 yield node.cpu_avail.wait() 1405 if self.__cur_client_auth != auth: 1406 return 1407 continue 1408 1409 if cpu: 1410 server.cpu_avail.clear() 1411 node.cpus_used += 1 1412 node.load = float(node.cpus_used) / len(node.servers) 1413 if node.cpus_used >= len(node.servers): 1414 node.cpu_avail.clear() 1415 self._cpu_nodes.discard(node) 1416 if not self._cpu_nodes: 1417 self._cpus_avail.clear() 1418 yield server.run(job, self._cur_computation, node) 1419 1420 elif isinstance(where, pycos.Location): 1421 node = self._nodes.get(where.addr) 1422 if not node: 1423 job.client.send(None) 1424 return 1425 server = node.servers.get(where) 1426 if not server: 1427 job.client.send(None) 1428 return 1429 if cpu: 1430 while (not node.cpu_avail.is_set() or not server.cpu_avail.is_set()): 1431 yield server.cpu_avail.wait() 1432 if self.__cur_client_auth != auth: 1433 return 1434 server.cpu_avail.clear() 1435 node.cpus_used += 1 1436 node.load = float(node.cpus_used) / len(node.servers) 1437 if node.cpus_used >= len(node.servers): 1438 node.cpu_avail.clear() 1439 self._cpu_nodes.discard(node) 1440 if not self._cpu_nodes: 1441 self._cpus_avail.clear() 1442 yield server.run(job, self._cur_computation, node) 1443 1444 else: 1445 job.client.send(None) 1446 1447 def __client_proc(self, task=None): 1448 task.set_daemon() 1449 computations = {} 1450 1451 while 1: 1452 msg = yield task.receive() 1453 if not isinstance(msg, dict): 1454 continue 1455 req = msg.get('req', None) 1456 auth = msg.get('auth', None) 1457 if self.__cur_client_auth != auth: 1458 if req == 'schedule' or req == 'await': 1459 pass 1460 else: 1461 continue 1462 1463 if req == 'job': 1464 SysTask(self.__submit_job, msg) 1465 continue 1466 1467 client = msg.get('client', None) 1468 if not isinstance(client, pycos.Task): 1469 client = None 1470 1471 if req == 'enable_server': 1472 loc = msg.get('server', None) 1473 if not isinstance(loc, pycos.Location): 1474 continue 1475 node = self._nodes.get(loc.addr, None) 1476 if not node: 1477 node = self._disabled_nodes.get(loc.addr, None) 1478 if not node or node.status not in (Scheduler.NodeInitialized, 1479 Scheduler.NodeSuspended): 1480 continue 1481 server = node.disabled_servers.get(loc, None) 1482 if not server or server.status not in (Scheduler.ServerDiscovered, 1483 Scheduler.ServerSuspended): 1484 continue 1485 if server.status == Scheduler.ServerDiscovered: 1486 args = msg.get('setup_args', ()) 1487 server.task.send({'req': 'enable_server', 'setup_args': args, 1488 'auth': self._cur_computation._auth}) 1489 elif server.status == Scheduler.ServerSuspended: 1490 node.disabled_servers.pop(loc) 1491 node.servers[loc] = server 1492 server.status = Scheduler.ServerInitialized 1493 server.cpu_avail.set() 1494 if len(node.servers) == 1: 1495 node.cpu_avail.set() 1496 self._cpu_nodes.add(node) 1497 self._cpus_avail.set() 1498 if self._cur_computation.status_task: 1499 info = DispycosStatus(Scheduler.ServerResumed, loc) 1500 self._cur_computation.status_task.send(info) 1501 if node.status == Scheduler.NodeSuspended: 1502 self._disabled_nodes.pop(node.addr, None) 1503 self._nodes[node.addr] = node 1504 node.status = Scheduler.NodeInitialized 1505 if self._cur_computation.status_task: 1506 info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform, 1507 node.avail_info) 1508 info = DispycosStatus(Scheduler.NodeResumed, info) 1509 self._cur_computation.status_task.send(info) 1510 1511 elif req == 'enable_node': 1512 addr = msg.get('addr', None) 1513 if not addr: 1514 continue 1515 node = self._disabled_nodes.get(addr, None) 1516 if not node or node.status not in (Scheduler.NodeDiscovered, 1517 Scheduler.NodeSuspended): 1518 continue 1519 if node.status == Scheduler.NodeDiscovered: 1520 setup_args = msg.get('setup_args', ()) 1521 SysTask(self.__init_node, node, setup_args=setup_args) 1522 elif node.status == Scheduler.NodeSuspended: 1523 if node.servers: 1524 node.status = Scheduler.NodeInitialized 1525 self._disabled_nodes.pop(addr, None) 1526 self._nodes[node.addr] = node 1527 node.cpu_avail.set() 1528 self._cpu_nodes.add(node) 1529 self._cpus_avail.set() 1530 if self._cur_computation.status_task: 1531 info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform, 1532 node.avail_info) 1533 info = DispycosStatus(Scheduler.NodeResumed, info) 1534 self._cur_computation.status_task.send(info) 1535 1536 elif req == 'suspend_server': 1537 loc = msg.get('server', None) 1538 if not isinstance(loc, pycos.Location): 1539 continue 1540 node = self._nodes.get(loc.addr, None) 1541 if not node: 1542 continue 1543 server = node.servers.pop(loc, None) 1544 if not server: 1545 continue 1546 if server.status not in (Scheduler.ServerInitialized, Scheduler.ServerDiscovered): 1547 node.servers[loc] = server 1548 continue 1549 node.disabled_servers[loc] = server 1550 if server.status == Scheduler.ServerInitialized: 1551 server.status = Scheduler.ServerSuspended 1552 server.cpu_avail.clear() 1553 if self._cur_computation.status_task: 1554 info = DispycosStatus(server.status, loc) 1555 self._cur_computation.status_task.send(info) 1556 if not node.servers: 1557 self._nodes.pop(node.addr) 1558 self._disabled_nodes[node.addr] = node 1559 node.status = Scheduler.NodeSuspended 1560 node.cpu_avail.clear() 1561 self._cpu_nodes.discard(node) 1562 if not self._cpu_nodes: 1563 self._cpus_avail.clear() 1564 if self._cur_computation.status_task: 1565 info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform, 1566 node.avail_info) 1567 info = DispycosStatus(node.status, info) 1568 self._cur_computation.status_task.send(info) 1569 1570 elif req == 'suspend_node': 1571 addr = msg.get('addr', None) 1572 if not addr: 1573 continue 1574 node = self._nodes.pop(addr, None) 1575 if not node: 1576 continue 1577 if node.status not in (Scheduler.NodeInitialized, Scheduler.NodeDiscovered): 1578 self._nodes[addr] = node 1579 continue 1580 self._disabled_nodes[node.addr] = node 1581 if node.status == Scheduler.NodeInitialized: 1582 node.status = Scheduler.NodeSuspended 1583 node.cpu_avail.clear() 1584 self._cpu_nodes.discard(node) 1585 if not self._cpu_nodes: 1586 self._cpus_avail.clear() 1587 if self._cur_computation.status_task: 1588 info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform, 1589 node.avail_info) 1590 info = DispycosStatus(node.status, info) 1591 self._cur_computation.status_task.send(info) 1592 1593 elif req == 'node_allocate': 1594 node = req.get('node', None) 1595 if not isinstance(node, DispycosNodeAllocate): 1596 continue 1597 self.__cur_node_allocations = [node] + [na for na in self.__cur_node_allocations 1598 if na.ip_rex != node.ip_rex] 1599 if node.ip_rex.find('*') >= 0: 1600 continue 1601 loc = pycos.Location(node.ip_rex.replace('\\.', '.'), 1602 node.port if node.port else self._node_port) 1603 SysTask(self.pycos.peer, loc) 1604 1605 elif req == 'nodes': 1606 if isinstance(client, Task): 1607 nodes = [node.addr for node in self._nodes.values() 1608 if node.status == Scheduler.NodeInitialized] 1609 client.send(nodes) 1610 1611 elif req == 'servers': 1612 if isinstance(client, Task): 1613 servers = [server.task.location for node in self._nodes.values() 1614 if node.status == Scheduler.NodeInitialized 1615 for server in node.servers.values() 1616 # if server.status == Scheduler.ServerInitialized 1617 ] 1618 client.send(servers) 1619 1620 elif req == 'schedule': 1621 if not isinstance(client, Task): 1622 logger.warning('Ignoring invalid client request "%s"', req) 1623 continue 1624 try: 1625 computation = pycos.deserialize(msg['computation']) 1626 assert isinstance(computation, Computation) or \ 1627 computation.__class__.__name__ == 'Computation' 1628 assert isinstance(computation._pulse_task, Task) 1629 if computation._pulse_task.location == self.pycos.location: 1630 computation._pulse_task._id = int(computation._pulse_task._id) 1631 if computation.status_task: 1632 computation.status_task._id = int(computation.status_task._id) 1633 assert isinstance(computation._pulse_interval, (float, int)) 1634 assert (MinPulseInterval <= computation._pulse_interval <= MaxPulseInterval) 1635 except Exception: 1636 logger.warning('ignoring invalid computation request') 1637 client.send(None) 1638 continue 1639 while 1: 1640 computation._auth = hashlib.sha1(os.urandom(20)).hexdigest() 1641 if not os.path.exists(os.path.join(self.__dest_path, computation._auth)): 1642 break 1643 try: 1644 os.mkdir(os.path.join(self.__dest_path, computation._auth)) 1645 except Exception: 1646 logger.debug('Could not create "%s"', 1647 os.path.join(self.__dest_path, computation._auth)) 1648 client.send(None) 1649 continue 1650 # TODO: save it on disk instead 1651 computations[computation._auth] = computation 1652 client.send(computation._auth) 1653 1654 elif req == 'await': 1655 if not isinstance(client, Task): 1656 logger.warning('Ignoring invalid client request "%s"', req) 1657 continue 1658 computation = computations.pop(auth, None) 1659 if not computation: 1660 client.send(None) 1661 continue 1662 if computation._pulse_task.location.addr != self.pycos.location.addr: 1663 computation._xfer_files = [(os.path.join(self.__dest_path, computation._auth, 1664 os.path.join(*(dst.split(sep))), 1665 xf.split(sep)[-1]), 1666 os.path.join(*(dst.split(sep))), os.sep) 1667 for xf, dst, sep in computation._xfer_files] 1668 for xf, dst, sep in computation._xfer_files: 1669 if not os.path.isfile(xf): 1670 logger.warning('File "%s" for computation %s is not valid', 1671 xf, computation._auth) 1672 computation = None 1673 break 1674 if computation is None: 1675 client.send(None) 1676 else: 1677 self.__computation_scheduler_task.send((computation, client)) 1678 self.__computation_sched_event.set() 1679 1680 elif req == 'close_computation': 1681 if not isinstance(client, Task): 1682 logger.warning('Ignoring invalid client request "%s"', req) 1683 continue 1684 SysTask(self.__close_computation, client=client, 1685 await_async=msg.get('await_async', False)) 1686 1687 else: 1688 logger.warning('Ignoring invalid client request "%s"', req) 1689 1690 def __close_node(self, node, await_async=False, task=None): 1691 if not node.task: 1692 logger.debug('Closing node %s ignored: %s', node.addr, node.status) 1693 return(-1) 1694 1695 node.cpu_avail.clear() 1696 self._cpu_nodes.discard(node) 1697 if not self._cpu_nodes: 1698 self._cpus_avail.clear() 1699 self._nodes.pop(node.addr, None) 1700 self._disabled_nodes[node.addr] = node 1701 node.disabled_servers.update(node.servers) 1702 node.servers.clear() 1703 computation = self._cur_computation 1704 status_info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform, 1705 node.avail_info) 1706 if node.status == Scheduler.NodeDisconnected: 1707 # TODO: safe to assume servers are disconnected as well? 1708 for server in node.disabled_servers.values(): 1709 if server.task: 1710 server.status = Scheduler.ServerDisconnected 1711 if (computation and computation.status_task): 1712 computation.status_task.send(DispycosStatus(Scheduler.NodeAbandoned, status_info)) 1713 1714 close_tasks = [SysTask(self.__close_server, server, node, await_async=await_async) 1715 for server in node.disabled_servers.values() if server.task] 1716 for close_task in close_tasks: 1717 yield close_task.finish() 1718 node_task = node.task 1719 if not node_task: 1720 return(0) 1721 if node.status == Scheduler.NodeDisconnected: 1722 self._disabled_nodes.pop(node.addr, None) 1723 if node_task and node.status != Scheduler.NodeDisconnected: 1724 node_task.send({'req': 'release', 'auth': computation._auth}) 1725 1726 def __close_server(self, server, node, await_async=False, task=None): 1727 if not server.task: 1728 return(-1) 1729 computation = self._cur_computation 1730 if computation: 1731 status_task = computation.status_task 1732 else: 1733 status_task = None 1734 1735 if node.servers.pop(server.task.location, None): 1736 node.disabled_servers[server.task.location] = server 1737 1738 if server.status == Scheduler.ServerDisconnected: 1739 if server.rtasks: 1740 for _ in range(10): 1741 yield task.sleep(0.1) 1742 if not server.rtasks: 1743 break 1744 if server.rtasks: 1745 logger.warning('%s tasks abandoned at %s', len(server.rtasks), server.task.location) 1746 if status_task: 1747 status_task.send(DispycosStatus(Scheduler.ServerAbandoned, 1748 server.task.location)) 1749 for rtask, job in server.rtasks.values(): 1750 status = pycos.MonitorException(rtask, (Scheduler.TaskAbandoned, None)) 1751 status_task.send(status) 1752 if job.request.endswith('async'): 1753 if job.done: 1754 job.done.set() 1755 else: 1756 job.client.send(None) 1757 else: 1758 if (server.status == Scheduler.ServerInitialized or 1759 server.status == Scheduler.ServerSuspended): 1760 server.status = Scheduler.ServerClosed 1761 if not server.cpu_avail.is_set(): 1762 logger.debug('Waiting for remote tasks at %s to finish', server.task.location) 1763 yield server.cpu_avail.wait() 1764 if await_async: 1765 while server.rtasks: 1766 rtask, job = server.rtasks[next(iter(server.rtasks))] 1767 logger.debug('Remote task %s has not finished yet', rtask) 1768 if job.done: 1769 yield job.done.wait() 1770 if server.task: 1771 server.task.send({'req': 'close', 'auth': computation._auth, 'client': task}) 1772 yield task.receive(timeout=MsgTimeout) 1773 if server.rtasks: # wait a bit for server to terminate tasks 1774 for _ in range(10): 1775 yield task.sleep(0.1) 1776 if not server.rtasks: 1777 break 1778 if server.rtasks: 1779 logger.warning('%s tasks running at %s', len(server.rtasks), server.task.location) 1780 for rtask, job in server.rtasks.values(): 1781 if job.request.endswith('async'): 1782 if job.done: 1783 job.done.set() 1784 else: 1785 job.client.send(None) 1786 if status_task: 1787 status_task.send(pycos.MonitorException(rtask, (Scheduler.ServerClosed, None))) 1788 server.rtasks.clear() 1789 1790 server_task, server.task = server.task, None 1791 if not server_task: 1792 return(0) 1793 node.disabled_servers.pop(server_task.location, None) 1794 server.xfer_files = [] 1795 server.askew_results.clear() 1796 server.status = Scheduler.ServerClosed 1797 if status_task: 1798 status_task.send(DispycosStatus(server.status, server_task.location)) 1799 1800 if not server.cpu_avail.is_set(): 1801 server.cpu_avail.set() 1802 node.cpus_used -= 1 1803 if node.cpus_used == len(node.servers): 1804 self._cpu_nodes.discard(node) 1805 if not self._cpu_nodes: 1806 self._cpus_avail.clear() 1807 node.cpu_avail.clear() 1808 if node.servers: 1809 node.load = float(node.cpus_used) / len(node.servers) 1810 else: 1811 node.load = 0.0 1812 1813 if self.__server_locations: 1814 self.__server_locations.discard(server_task.location) 1815 # TODO: inform other servers 1816 1817 return(0) 1818 1819 def __close_computation(self, client=None, await_async=False, task=None): 1820 self.__server_locations.clear() 1821 if self._cur_computation: 1822 close_tasks = [SysTask(self.__close_node, node, await_async=await_async) 1823 for node in self._nodes.values()] 1824 close_tasks.extend([SysTask(self.__close_node, node) 1825 for node in self._disabled_nodes.values()]) 1826 for close_task in close_tasks: 1827 yield close_task.finish() 1828 if self.__cur_client_auth: 1829 computation_path = os.path.join(self.__dest_path, self.__cur_client_auth) 1830 if os.path.isdir(computation_path): 1831 shutil.rmtree(computation_path, ignore_errors=True) 1832 if self._cur_computation and self._cur_computation.status_task: 1833 self._cur_computation.status_task.send(DispycosStatus(Scheduler.ComputationClosed, 1834 id(self._cur_computation))) 1835 self.__cur_client_auth = self._cur_computation = None 1836 self.__computation_sched_event.set() 1837 if client: 1838 client.send('closed') 1839 return(0) 1840 1841 def close(self, task=None): 1842 """Close current computation and quit scheduler. 1843 1844 Must be called with 'yield' as 'yield scheduler.close()' or as 1845 task. 1846 """ 1847 yield self.__close_computation(task=task) 1848 return(0) 1849 1850 1851if __name__ == '__main__': 1852 """The scheduler can be started either within a client program (if no other 1853 client programs use the nodes simultaneously), or can be run on a node with 1854 the options described below (usually no options are necessary, so the 1855 scheduler can be strated with just 'dispycos.py') 1856 """ 1857 1858 import logging 1859 import argparse 1860 import signal 1861 try: 1862 import readline 1863 except ImportError: 1864 pass 1865 1866 import pycos.dispycos 1867 setattr(sys.modules['pycos.dispycos'], '_DispycosJob_', _DispycosJob_) 1868 1869 parser = argparse.ArgumentParser() 1870 parser.add_argument('-i', '--ip_addr', dest='node', action='append', default=[], 1871 help='IP address or host name of this node') 1872 parser.add_argument('--ext_ip_addr', dest='ext_ip_addr', action='append', default=[], 1873 help='External IP address to use (needed in case of NAT firewall/gateway)') 1874 parser.add_argument('-u', '--udp_port', dest='udp_port', type=int, default=9705, 1875 help='UDP port number to use') 1876 parser.add_argument('-t', '--tcp_port', dest='tcp_port', type=int, default=9705, 1877 help='TCP port number to use') 1878 parser.add_argument('--ipv4_udp_multicast', dest='ipv4_udp_multicast', action='store_true', 1879 default=False, help='use multicast for IPv4 UDP instead of broadcast') 1880 parser.add_argument('-n', '--name', dest='name', default=None, 1881 help='(symbolic) name given to schduler') 1882 parser.add_argument('--dest_path', dest='dest_path', default=None, 1883 help='path prefix to where files sent by peers are stored') 1884 parser.add_argument('--max_file_size', dest='max_file_size', default=None, type=int, 1885 help='maximum file size of any file transferred') 1886 parser.add_argument('-s', '--secret', dest='secret', default='', 1887 help='authentication secret for handshake with peers') 1888 parser.add_argument('--certfile', dest='certfile', default='', 1889 help='file containing SSL certificate') 1890 parser.add_argument('--keyfile', dest='keyfile', default='', 1891 help='file containing SSL key') 1892 parser.add_argument('--node', action='append', dest='nodes', default=[], 1893 help='additional remote nodes (names or IP address) to use') 1894 parser.add_argument('--relay_nodes', action='store_true', dest='relay_nodes', default=False, 1895 help='request each node to relay scheduler info on its network') 1896 parser.add_argument('--pulse_interval', dest='pulse_interval', type=float, 1897 default=MaxPulseInterval, 1898 help='interval in seconds to send "pulse" messages to check nodes ' 1899 'and client are connected') 1900 parser.add_argument('--ping_interval', dest='ping_interval', type=float, default=0, 1901 help='interval in seconds to broadcast "ping" message to discover nodes') 1902 parser.add_argument('--zombie_period', dest='zombie_period', type=int, 1903 default=(100 * MaxPulseInterval), 1904 help='maximum time in seconds computation is idle') 1905 parser.add_argument('-d', '--debug', action='store_true', dest='loglevel', default=False, 1906 help='if given, debug messages are printed') 1907 parser.add_argument('--clean', action='store_true', dest='clean', default=False, 1908 help='if given, files copied from or generated by clients will be removed') 1909 parser.add_argument('--dispycosnode_port', dest='dispycosnode_port', type=int, default=9706, 1910 help='UDP port number used by dispycosnode') 1911 parser.add_argument('--daemon', action='store_true', dest='daemon', default=False, 1912 help='if given, input is not read from terminal') 1913 config = vars(parser.parse_args(sys.argv[1:])) 1914 del parser 1915 1916 if config['zombie_period'] and config['zombie_period'] < MaxPulseInterval: 1917 raise Exception('zombie_period must be >= %s' % MaxPulseInterval) 1918 1919 if not config['name']: 1920 config['name'] = 'dispycos_scheduler' 1921 1922 if config['loglevel']: 1923 logger.setLevel(logging.DEBUG) 1924 else: 1925 logger.setLevel(logging.INFO) 1926 del config['loglevel'] 1927 1928 if config['certfile']: 1929 config['certfile'] = os.path.abspath(config['certfile']) 1930 else: 1931 config['certfile'] = None 1932 if config['keyfile']: 1933 config['keyfile'] = os.path.abspath(config['keyfile']) 1934 else: 1935 config['keyfile'] = None 1936 1937 daemon = config.pop('daemon', False) 1938 1939 _dispycos_scheduler = Scheduler(**config) 1940 _dispycos_scheduler._remote = True 1941 del config 1942 1943 def sighandler(signum, frame): 1944 # Task(_dispycos_scheduler.close).value() 1945 raise KeyboardInterrupt 1946 1947 try: 1948 signal.signal(signal.SIGHUP, sighandler) 1949 signal.signal(signal.SIGQUIT, sighandler) 1950 except Exception: 1951 pass 1952 signal.signal(signal.SIGINT, sighandler) 1953 signal.signal(signal.SIGABRT, sighandler) 1954 signal.signal(signal.SIGTERM, sighandler) 1955 del sighandler 1956 1957 if not daemon: 1958 try: 1959 if os.getpgrp() != os.tcgetpgrp(sys.stdin.fileno()): 1960 daemon = True 1961 except Exception: 1962 pass 1963 1964 if daemon: 1965 del daemon 1966 while 1: 1967 try: 1968 time.sleep(3600) 1969 except (Exception, KeyboardInterrupt): 1970 break 1971 else: 1972 del daemon 1973 while 1: 1974 try: 1975 _dispycos_cmd = input( 1976 '\n\nEnter "quit" or "exit" to terminate dispycos scheduler\n' 1977 ' "status" to show status of scheduler: ' 1978 ) 1979 except KeyboardInterrupt: 1980 break 1981 _dispycos_cmd = _dispycos_cmd.strip().lower() 1982 if _dispycos_cmd in ('quit', 'exit'): 1983 break 1984 if _dispycos_cmd == 'status': 1985 _dispycos_scheduler.print_status() 1986 1987 logger.info('terminating dispycos scheduler') 1988 try: 1989 Task(_dispycos_scheduler.close).value() 1990 except KeyboardInterrupt: 1991 pass 1992