1# Copyright (C) 2015-2021 Regents of the University of California 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import collections 15import copy 16import importlib 17import inspect 18import itertools 19import logging 20import os 21import pickle 22import shutil 23import tempfile 24import time 25import uuid 26from abc import ABCMeta, abstractmethod 27from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser 28from contextlib import contextmanager 29from io import BytesIO 30from typing import Dict, Optional, Union, Set 31 32import dill 33 34from toil.common import Config, Toil, addOptions, safeUnpickleFromStream 35from toil.deferred import DeferredFunction 36from toil.fileStores import FileID 37from toil.lib.expando import Expando 38from toil.lib.conversions import human2bytes 39from toil.lib.resources import (get_total_cpu_time, 40 get_total_cpu_time_and_memory_usage) 41from toil.resource import ModuleDescriptor 42from toil.statsAndLogging import set_logging_from_options 43 44logger = logging.getLogger(__name__) 45 46 47class JobPromiseConstraintError(RuntimeError): 48 """ 49 Represents a problem where a job is being asked to promise its return 50 value, but it has not yet been hit in the topological order of the job 51 graph. 52 """ 53 def __init__(self, promisingJob, recipientJob=None): 54 """ 55 :param toil.job.Job promisingJob: The job being asked for its return value. 56 :param toil.job.Job recipientJob: The job receiving the return value, if any. 57 """ 58 59 self.promisingJob = promisingJob 60 self.recipientJob = recipientJob 61 62 if recipientJob is None: 63 # Come up with a vaguer error message 64 super().__init__(f"Job {promisingJob.description} cannot promise its return value to a job that is not its successor") 65 else: 66 # Write a full error message 67 super().__init__(f"Job {promisingJob.description} cannot promise its return value to non-successor {recipientJob.description}") 68 69 70class ConflictingPredecessorError(Exception): 71 def __init__(self, predecessor: 'Job', successor: 'Job'): 72 super().__init__(f'The given job: "{predecessor.description}" is already a predecessor of job: "{successor.description}".') 73 74 75class TemporaryID: 76 """ 77 Placeholder for a job ID used by a JobDescription that has not yet been 78 registered with any JobStore. 79 80 Needs to be held: 81 * By JobDescription objects to record normal relationships. 82 * By Jobs to key their connected-component registries and to record 83 predecessor relationships to facilitate EncapsulatedJob adding 84 itself as a child. 85 * By Services to tie back to their hosting jobs, so the service 86 tree can be built up from Service objects. 87 """ 88 def __init__(self): 89 """ 90 Assign a unique temporary ID that won't collide with anything. 91 """ 92 self._value = uuid.uuid4() 93 94 def __str__(self): 95 return self.__repr__() 96 97 def __repr__(self): 98 return f'TemporaryID({self._value})' 99 100 def __hash__(self): 101 return hash(self._value) 102 103 def __eq__(self, other): 104 return isinstance(other, TemporaryID) and self._value == other._value 105 106 def __ne__(self, other): 107 return not isinstance(other, TemporaryID) or self._value != other._value 108 109 110class Requirer: 111 """ 112 Base class implementing the storage and presentation of requirements for 113 cores, memory, disk, and preemptability as properties. 114 """ 115 116 def __init__(self, requirements): 117 """ 118 Parse and save the given requirements. 119 120 :param dict requirements: Dict from string to number, string, or bool 121 describing a set of resource requirments. 'cores', 'memory', 122 'disk', and 'preemptable' fields, if set, are parsed and broken out 123 into properties. If unset, the relevant property will be 124 unspecified, and will be pulled from the assigned Config object if 125 queried (see :meth:`toil.job.Requirer.assignConfig`). If 126 unspecified and no Config object is assigned, an AttributeError 127 will be raised at query time. 128 """ 129 130 super().__init__() 131 132 # We can have a toil.common.Config assigned to fill in default values 133 # for e.g. job requirements not explicitly specified. 134 self._config = None 135 136 # Save requirements, parsing and validating anything that needs parsing or validating. 137 # Don't save Nones. 138 self._requirementOverrides = {k: self._parseResource(k, v) for (k, v) in requirements.items() if v is not None} 139 140 def assignConfig(self, config): 141 """ 142 Assign the given config object to be used to provide default values. 143 144 Must be called exactly once on a loaded JobDescription before any 145 requirements are queried. 146 147 :param toil.common.Config config: Config object to query 148 """ 149 if self._config is not None: 150 raise RuntimeError(f"Config assigned multiple times to {self}") 151 self._config = config 152 153 def __getstate__(self): 154 """ 155 Return the dict to use as the instance's __dict__ when pickling. 156 """ 157 158 # We want to exclude the config from pickling. 159 state = self.__dict__.copy() 160 state['_config'] = None 161 return state 162 163 def __copy__(self): 164 """ 165 Return a semantically-shallow copy of the object, for :meth:`copy.copy`. 166 """ 167 168 # See https://stackoverflow.com/a/40484215 for how to do an override 169 # that uses the base implementation 170 171 # Hide this override 172 implementation = self.__copy__ 173 self.__copy__ = None 174 175 # Do the copy which omits the config via __getstate__ override 176 clone = copy.copy(self) 177 178 # Put back the override on us and the copy 179 self.__copy__ = implementation 180 clone.__copy__ = implementation 181 182 if self._config is not None: 183 # Share a config reference 184 clone.assignConfig(self._config) 185 186 return clone 187 188 def __deepcopy__(self, memo): 189 """ 190 Return a semantically-deep copy of the object, for :meth:`copy.deepcopy`. 191 """ 192 193 # See https://stackoverflow.com/a/40484215 for how to do an override 194 # that uses the base implementation 195 196 # Hide this override 197 implementation = self.__deepcopy__ 198 self.__deepcopy__ = None 199 200 # Do the deepcopy which omits the config via __getstate__ override 201 clone = copy.deepcopy(self, memo) 202 203 # Put back the override on us and the copy 204 self.__deepcopy__ = implementation 205 clone.__deepcopy__ = implementation 206 207 if self._config is not None: 208 # Share a config reference 209 clone.assignConfig(self._config) 210 211 return clone 212 213 @staticmethod 214 def _parseResource(name, value): 215 """ 216 Parse a Toil resource requirement value and apply resource-specific type checks. If the 217 value is a string, a binary or metric unit prefix in it will be evaluated and the 218 corresponding integral value will be returned. 219 220 :param str name: The name of the resource 221 :param str|int|float|bool|None value: The resource value 222 :rtype: int|float|bool|None 223 224 >>> Requirer._parseResource('cores', None) 225 >>> Requirer._parseResource('cores', 1), Requirer._parseResource('disk', 1), \ 226 Requirer._parseResource('memory', 1) 227 (1, 1, 1) 228 >>> Requirer._parseResource('cores', '1G'), Requirer._parseResource('disk', '1G'), \ 229 Requirer._parseResource('memory', '1G') 230 (1073741824, 1073741824, 1073741824) 231 >>> Requirer._parseResource('cores', 1.1) 232 1.1 233 >>> Requirer._parseResource('disk', 1.1) # doctest: +IGNORE_EXCEPTION_DETAIL 234 Traceback (most recent call last): 235 ... 236 TypeError: The 'disk' requirement does not accept values that are of <type 'float'> 237 >>> Requirer._parseResource('memory', object()) # doctest: +IGNORE_EXCEPTION_DETAIL 238 Traceback (most recent call last): 239 ... 240 TypeError: The 'memory' requirement does not accept values that are of ... 241 """ 242 243 if value is None: 244 # Anything can be None. 245 return value 246 247 if name in ('memory', 'disk', 'cores'): 248 # These should be numbers that accept things like "5G". 249 if isinstance(value, (str, bytes)): 250 value = human2bytes(value) 251 if isinstance(value, int): 252 return value 253 elif isinstance(value, float) and name == 'cores': 254 # But only cores can be fractional. 255 return value 256 else: 257 raise TypeError(f"The '{name}' requirement does not accept values that are of type {type(value)}") 258 elif name == 'preemptable': 259 if isinstance(value, str): 260 if value.tolower() == 'true': 261 return True 262 elif value.tolower() == 'false': 263 return False 264 else: 265 raise ValueError(f"The '{name}' requirement, as a string, must be 'true' or 'false' but is {value}") 266 elif isinstance(value, int): 267 if value == 1: 268 return True 269 if value == 0: 270 return False 271 else: 272 raise ValueError(f"The '{name}' requirement, asn an int, must be 1 or 0 but is {value}") 273 elif isinstance(value, bool): 274 return value 275 else: 276 raise TypeError(f"The '{name}' requirement does not accept values that are of type {type(value)}") 277 else: 278 # Anything else we just pass along without opinons 279 return value 280 281 def _fetchRequirement(self, requirement): 282 """ 283 Get the value of the specified requirement ('blah') by looking it up in 284 our requirement storage and querying 'defaultBlah' on the config if it 285 isn't set. If the config would be queried but isn't associated, raises 286 AttributeError. 287 288 :param str requirement: The name of the resource 289 :rtype: int|float|bool|None 290 """ 291 if requirement in self._requirementOverrides: 292 value = self._requirementOverrides[requirement] 293 if value is None: 294 raise AttributeError(f"Encountered explicit None for '{requirement}' requirement of {self}") 295 return value 296 elif self._config is not None: 297 value = getattr(self._config, 'default' + requirement.capitalize()) 298 if value is None: 299 raise AttributeError(f"Encountered None for default '{requirement}' requirement in config: {self._config}") 300 return value 301 else: 302 raise AttributeError(f"Default value for '{requirement}' requirement of {self} cannot be determined") 303 304 @property 305 def requirements(self): 306 """ 307 Dict containing all non-None, non-defaulted requirements. 308 309 :rtype: dict 310 """ 311 return dict(self._requirementOverrides) 312 313 @property 314 def disk(self) -> int: 315 """ 316 The maximum number of bytes of disk required. 317 318 :rtype: int 319 """ 320 return self._fetchRequirement('disk') 321 @disk.setter 322 def disk(self, val): 323 self._requirementOverrides['disk'] = self._parseResource('disk', val) 324 325 @property 326 def memory(self): 327 """ 328 The maximum number of bytes of memory required. 329 330 :rtype: int 331 """ 332 return self._fetchRequirement('memory') 333 @memory.setter 334 def memory(self, val): 335 self._requirementOverrides['memory'] = self._parseResource('memory', val) 336 337 @property 338 def cores(self): 339 """ 340 The number of CPU cores required. 341 342 :rtype: int|float 343 """ 344 return self._fetchRequirement('cores') 345 @cores.setter 346 def cores(self, val): 347 self._requirementOverrides['cores'] = self._parseResource('cores', val) 348 349 @property 350 def preemptable(self): 351 """ 352 Whether a preemptable node is permitted, or a nonpreemptable one is required. 353 354 :rtype: bool 355 """ 356 return self._fetchRequirement('preemptable') 357 @preemptable.setter 358 def preemptable(self, val): 359 self._requirementOverrides['preemptable'] = self._parseResource('preemptable', val) 360 361class JobDescription(Requirer): 362 """ 363 Stores all the information that the Toil Leader ever needs to know about a 364 Job: requirements information, dependency information, commands to issue, 365 etc. 366 367 Can be obtained from an actual (i.e. executable) Job object, and can be 368 used to obtain the Job object from the JobStore. 369 370 Never contains other Jobs or JobDescriptions: all reference is by ID. 371 372 Subclassed into variants for checkpoint jobs and service jobs that have 373 their specific parameters. 374 """ 375 376 def __init__(self, requirements: Dict[str, Union[int, str, bool]], jobName: str, unitName: str='', displayName: str='', command: Optional[str]=None) -> None: 377 """ 378 Create a new JobDescription. 379 380 :param dict requirements: Dict from string to number, string, or bool 381 describing the resource requirments of the job. 'cores', 'memory', 382 'disk', and 'preemptable' fields, if set, are parsed and broken out 383 into properties. If unset, the relevant property will be 384 unspecified, and will be pulled from the assigned Config object if 385 queried (see :meth:`toil.job.Requirer.assignConfig`). 386 :param str jobName: Name of the kind of job this is. May be used in job 387 store IDs and logging. Also used to let the cluster scaler learn a 388 model for how long the job will take. Ought to be the job class's 389 name if no real user-defined name is available. 390 :param str unitName: Name of this instance of this kind of job. May 391 appear with jobName in logging. 392 :param str displayName: A human-readable name to identify this 393 particular job instance. Ought to be the job class's name 394 if no real user-defined name is available. 395 """ 396 397 # Set requirements 398 super().__init__(requirements) 399 400 # Save names, making sure they are strings and not e.g. bytes. 401 def makeString(x): 402 return x if not isinstance(x, bytes) else x.decode('utf-8', errors='replace') 403 self.jobName = makeString(jobName) 404 self.unitName = makeString(unitName) 405 self.displayName = makeString(displayName) 406 407 # Set properties that are not fully filled in on creation. 408 409 # ID of this job description in the JobStore. 410 self.jobStoreID = TemporaryID() 411 412 # Mostly fake, not-really-executable command string that encodes how to 413 # find the Job body data that this JobDescription describes, and the 414 # module(s) needed to unpickle it. 415 # 416 # Gets replaced with/rewritten into the real, executable command when 417 # the leader passes the description off to the batch system to be 418 # executed. 419 self.command: Optional[str] = command 420 421 # Set scheduling properties that the leader read to think about scheduling. 422 423 # The number of times the job should be attempted. Includes the initial 424 # try, plus the nu,ber of times to retry if the job fails. This number 425 # is reduced each time the job is run, until it is zero, and then no 426 # further attempts to run the job are made. If None, taken as the 427 # default value for this workflow execution. 428 self._remainingTryCount = None 429 430 # Holds FileStore FileIDs of the files that this job has deleted. Used 431 # to journal deletions of files and recover from a worker crash between 432 # committing a JobDescription update and actually executing the 433 # requested deletions. 434 self.filesToDelete = [] 435 436 # Holds JobStore Job IDs of the jobs that have been chained into this 437 # job, and which should be deleted when this job finally is deleted. 438 self.jobsToDelete = [] 439 440 # The number of direct predecessors of the job. Needs to be stored at 441 # the JobDescription to support dynamically-created jobs with multiple 442 # predecessors. Otherwise, we could reach a job by one path down from 443 # the root and decide to schedule it without knowing that it is also 444 # reachable from other paths down from the root. 445 self.predecessorNumber = 0 446 447 # The IDs of predecessor jobs that have finished. Managed by the Leader 448 # and ToilState, and just stored in the JobDescription. Never changed 449 # after the job is scheduled, so we don't ahve to worry about 450 # conflicting updates from workers. 451 # TODO: Move into ToilState itself so leader stops mutating us so much? 452 self.predecessorsFinished = set() 453 454 # Note that we don't hold IDs of our predecessors. Predecessors know 455 # about us, and not the other way around. Otherwise we wouldn't be able 456 # to save ourselves to the job store until our predecessors were saved, 457 # but they'd also be waiting on us. 458 459 # The IDs of all child jobs of the described job. 460 # Children which are done must be removed with filterSuccessors. 461 self.childIDs = set() 462 463 # The IDs of all follow-on jobs of the described job. 464 # Follow-ons which are done must be removed with filterSuccessors. 465 self.followOnIDs = set() 466 467 # Dict from ServiceHostJob ID to list of child ServiceHostJobs that start after it. 468 # All services must have an entry, if only to an empty list. 469 self.serviceTree = {} 470 471 # A jobStoreFileID of the log file for a job. This will be None unless the job failed and 472 # the logging has been captured to be reported on the leader. 473 self.logJobStoreFileID = None 474 475 def serviceHostIDsInBatches(self): 476 """ 477 Get an iterator over all batches of service host job IDs that can be 478 started at the same time, in the order they need to start in. 479 """ 480 481 # First start all the jobs with no parent 482 roots = set(self.serviceTree.keys()) 483 for parent, children in self.serviceTree.items(): 484 for child in children: 485 roots.remove(child) 486 batch = list(roots) 487 if len(batch) > 0: 488 # If there's a first batch, yield it 489 yield batch 490 491 while len(batch) > 0: 492 nextBatch = [] 493 for started in batch: 494 # Go find all the children that can start now that we have started. 495 for child in self.serviceTree[started]: 496 nextBatch.append(child) 497 498 batch = nextBatch 499 if len(batch) > 0: 500 # Emit the batch if nonempty 501 yield batch 502 503 def successorsAndServiceHosts(self): 504 """ 505 Get an iterator over all child, follow-on, and service job IDs 506 """ 507 return itertools.chain(self.childIDs, self.followOnIDs, self.serviceTree.keys()) 508 509 def allSuccessors(self): 510 """ 511 Get an iterator over all child and follow-on job IDs 512 """ 513 return itertools.chain(self.childIDs, self.followOnIDs) 514 515 @property 516 def services(self): 517 """ 518 Get a collection of the IDs of service host jobs for this job, in arbitrary order. 519 520 Will be empty if the job has no unfinished services. 521 """ 522 523 return list(self.serviceTree.keys()) 524 525 def nextSuccessors(self): 526 """ 527 Return the collection of job IDs for the successors of this job that, 528 according to this job, are ready to run. 529 530 If those jobs have multiple predecessor relationships, they may still 531 be blocked on other jobs. 532 533 Returns None when at the final phase (all successors done), and an 534 empty collection if there are more phases but they can't be entered yet 535 (e.g. because we are waiting for the job itself to run). 536 """ 537 538 if self.command is not None: 539 # We ourselves need to run. So there's not nothing to do but no successors are ready. 540 return [] 541 elif len(self.childIDs) != 0: 542 # Our children need to run 543 return self.childIDs 544 elif len(self.followOnIDs) != 0: 545 # Our follow-ons need to run 546 return self.followOnIDs 547 else: 548 # Everything is done. 549 return None 550 551 @property 552 def stack(self): 553 """ 554 Get an immutable collection of immutable collections of IDs of successors that need to run still. 555 556 Batches of successors are in reverse order of the order they need to run in. 557 558 Some successors in each batch may have already been finished. Batches may be empty. 559 560 Exists so that code that used the old stack list immutably can work 561 still. New development should use nextSuccessors(), and all mutations 562 should use filterSuccessors() (which automatically removes completed 563 phases). 564 565 :return: Batches of successors that still need to run, in reverse 566 order. An empty batch may exist under a non-empty batch, or at the top 567 when the job itself is not done. 568 :rtype: tuple(tuple(str)) 569 """ 570 571 result = [] 572 if self.command is not None or len(self.childIDs) != 0 or len(self.followOnIDs) != 0: 573 # Follow-ons haven't all finished yet 574 result.append(tuple(self.followOnIDs)) 575 if self.command is not None or len(self.childIDs) != 0: 576 # Children haven't all finished yet 577 result.append(tuple(self.childIDs)) 578 return tuple(result) 579 580 def filterSuccessors(self, predicate): 581 """ 582 Keep only successor jobs for which the given predicate function returns True when called with the job's ID. 583 584 Treats all other successors as complete and forgets them. 585 """ 586 587 self.childIDs = {x for x in self.childIDs if predicate(x)} 588 self.followOnIDs = {x for x in self.followOnIDs if predicate(x)} 589 590 def filterServiceHosts(self, predicate): 591 """ 592 Keep only services for which the given predicate function returns True when called with the service host job's ID. 593 594 Treats all other services as complete and forgets them. 595 """ 596 597 # Get all the services we shouldn't have anymore 598 toRemove = set() 599 for serviceID in self.services: 600 if not predicate(serviceID): 601 toRemove.add(serviceID) 602 603 # Drop everything from that set as a value and a key 604 self.serviceTree = {k: [x for x in v if x not in toRemove] for k, v in self.serviceTree.items() if k not in toRemove} 605 606 def clearSuccessorsAndServiceHosts(self): 607 """ 608 Remove all references to child, follow-on, and service jobs associated with the described job. 609 """ 610 self.childIDs = set() 611 self.followOnIDs = set() 612 self.serviceTree = {} 613 614 615 def replace(self, other): 616 """ 617 Take on the ID of another JobDescription, while retaining our own state and type. 618 619 When updated in the JobStore, we will save over the other JobDescription. 620 621 Useful for chaining jobs: the chained-to job can replace the parent job. 622 623 Merges cleanup state from the job being replaced into this one. 624 625 :param toil.job.JobDescription other: Job description to replace. 626 """ 627 628 # TODO: also be able to take on the successors of the other job, under 629 # ours on the stack, somehow. 630 631 self.jobStoreID = other.jobStoreID 632 633 # Save files and jobs to delete from the job we replaced, so we can 634 # roll up a whole chain of jobs and delete them when they're all done. 635 self.filesToDelete += other.filesToDelete 636 self.jobsToDelete += other.jobsToDelete 637 638 def addChild(self, childID): 639 """ 640 Make the job with the given ID a child of the described job. 641 """ 642 643 self.childIDs.add(childID) 644 645 def addFollowOn(self, followOnID): 646 """ 647 Make the job with the given ID a follow-on of the described job. 648 """ 649 650 self.followOnIDs.add(followOnID) 651 652 def addServiceHostJob(self, serviceID, parentServiceID=None): 653 """ 654 Make the ServiceHostJob with the given ID a service of the described job. 655 656 If a parent ServiceHostJob ID is given, that parent service will be started 657 first, and must have already been added. 658 """ 659 660 # Make sure we aren't clobbering something 661 assert serviceID not in self.serviceTree 662 self.serviceTree[serviceID] = [] 663 if parentServiceID is not None: 664 self.serviceTree[parentServiceID].append(serviceID) 665 666 def hasChild(self, childID): 667 """ 668 Return True if the job with the given ID is a child of the described job. 669 """ 670 671 return childID in self.childIDs 672 673 def hasFollowOn(self, followOnID): 674 """ 675 Return True if the job with the given ID is a follow-on of the described job. 676 """ 677 678 return followOnID in self.followOnIDs 679 680 def hasServiceHostJob(self, serviceID): 681 """ 682 Return True if the ServiceHostJob with the given ID is a service of the described job. 683 """ 684 685 return serviceID in self.serviceTree 686 687 def renameReferences(self, renames): 688 """ 689 Apply the given dict of ID renames to all references to jobs. Does not 690 modify our own ID or those of finished predecessors. 691 692 IDs not present in the renames dict are left as-is. 693 694 :param dict(TemporaryID, str) renames: Rename operations to apply. 695 """ 696 697 self.childIDs = {renames.get(old, old) for old in self.childIDs} 698 self.followOnIDs = {renames.get(old, old) for old in self.followOnIDs} 699 self.serviceTree = {renames.get(parent, parent): [renames.get(child, child) for child in children] 700 for parent, children in self.serviceTree.items()} 701 702 def addPredecessor(self): 703 """ 704 Notify the JobDescription that a predecessor has been added to its Job. 705 """ 706 self.predecessorNumber += 1 707 708 def onRegistration(self, jobStore): 709 """ 710 Called by the Job saving logic when this JobDescription meets the JobStore and has its ID assigned. 711 712 Overridden to perform setup work (like hooking up flag files for service jobs) that requires the JobStore. 713 714 :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore: The job store we are being placed into 715 """ 716 717 def setupJobAfterFailure(self, exitReason=None): 718 """ 719 Reduce the remainingTryCount if greater than zero and set the memory 720 to be at least as big as the default memory (in case of exhaustion of memory, 721 which is common). 722 723 Requires a configuration to have been assigned (see :meth:`toil.job.Requirer.assignConfig`). 724 725 :param toil.batchSystems.abstractBatchSystem.BatchJobExitReason exitReason: The configuration for the current workflow run. 726 727 """ 728 729 # Avoid potential circular imports 730 from toil.batchSystems.abstractBatchSystem import BatchJobExitReason 731 732 # Old version of this function used to take a config. Make sure that isn't happening. 733 assert not isinstance(exitReason, Config), "Passing a Config as an exit reason" 734 # Make sure we have an assigned config. 735 assert self._config is not None 736 737 if self._config.enableUnlimitedPreemptableRetries and exitReason == BatchJobExitReason.LOST: 738 logger.info("*Not* reducing try count (%s) of job %s with ID %s", 739 self.remainingTryCount, self, self.jobStoreID) 740 else: 741 self.remainingTryCount = max(0, self.remainingTryCount - 1) 742 logger.warning("Due to failure we are reducing the remaining try count of job %s with ID %s to %s", 743 self, self.jobStoreID, self.remainingTryCount) 744 # Set the default memory to be at least as large as the default, in 745 # case this was a malloc failure (we do this because of the combined 746 # batch system) 747 if exitReason == BatchJobExitReason.MEMLIMIT and self._config.doubleMem: 748 self.memory = self.memory * 2 749 logger.warning("We have doubled the memory of the failed job %s to %s bytes due to doubleMem flag", 750 self, self.memory) 751 if self.memory < self._config.defaultMemory: 752 self.memory = self._config.defaultMemory 753 logger.warning("We have increased the default memory of the failed job %s to %s bytes", 754 self, self.memory) 755 756 if self.disk < self._config.defaultDisk: 757 self.disk = self._config.defaultDisk 758 logger.warning("We have increased the disk of the failed job %s to the default of %s bytes", 759 self, self.disk) 760 761 762 def getLogFileHandle(self, jobStore): 763 """ 764 Returns a context manager that yields a file handle to the log file. 765 766 Assumes logJobStoreFileID is set. 767 """ 768 return jobStore.readFileStream(self.logJobStoreFileID) 769 770 @property 771 def remainingTryCount(self): 772 """ 773 The try count set on the JobDescription, or the default based on the 774 retry count from the config if none is set. 775 """ 776 if self._remainingTryCount is not None: 777 return self._remainingTryCount 778 elif self._config is not None: 779 # Our try count should be the number of retries in the config, plus 780 # 1 for the initial try 781 return self._config.retryCount + 1 782 else: 783 raise AttributeError(f"Try count for {self} cannot be determined") 784 @remainingTryCount.setter 785 def remainingTryCount(self, val): 786 self._remainingTryCount = val 787 788 def clearRemainingTryCount(self): 789 """ 790 Clear remainingTryCount and set it back to its default value. 791 792 :returns: True if a modification to the JobDescription was made, and 793 False otherwise. 794 :rtype: bool 795 """ 796 if self._remainingTryCount is not None: 797 # We had a value stored 798 self._remainingTryCount = None 799 return True 800 else: 801 # No change needed 802 return False 803 804 805 def __str__(self): 806 """ 807 Produce a useful logging string identifying this job. 808 """ 809 810 printedName = "'" + self.jobName + "'" 811 if self.unitName: 812 printedName += ' ' + self.unitName 813 814 if self.jobStoreID is not None: 815 printedName += ' ' + str(self.jobStoreID) 816 817 return printedName 818 819 # Not usable as a key (not hashable) and doesn't have any value-equality. 820 # There really should only ever be one true version of a JobDescription at 821 # a time, keyed by jobStoreID. 822 823 def __repr__(self): 824 return '%s( **%r )' % (self.__class__.__name__, self.__dict__) 825 826 827class ServiceJobDescription(JobDescription): 828 """ 829 A description of a job that hosts a service. 830 """ 831 832 def __init__(self, *args, **kwargs): 833 """ 834 Create a ServiceJobDescription to describe a ServiceHostJob. 835 """ 836 837 # Make the base JobDescription 838 super().__init__(*args, **kwargs) 839 840 # Set service-specific properties 841 842 # An empty file in the jobStore which when deleted is used to signal that the service 843 # should cease. 844 self.terminateJobStoreID = None 845 846 # Similarly a empty file which when deleted is used to signal that the service is 847 # established 848 self.startJobStoreID = None 849 850 # An empty file in the jobStore which when deleted is used to signal that the service 851 # should terminate signaling an error. 852 self.errorJobStoreID = None 853 854 def onRegistration(self, jobStore): 855 """ 856 When a ServiceJobDescription first meets the JobStore, it needs to set up its flag files. 857 """ 858 super().onRegistration(jobStore) 859 860 self.startJobStoreID = jobStore.getEmptyFileStoreID() 861 self.terminateJobStoreID = jobStore.getEmptyFileStoreID() 862 self.errorJobStoreID = jobStore.getEmptyFileStoreID() 863 864class CheckpointJobDescription(JobDescription): 865 """ 866 A description of a job that is a checkpoint. 867 """ 868 869 def __init__(self, *args, **kwargs): 870 """ 871 Create a CheckpointJobDescription to describe a checkpoint job. 872 """ 873 874 # Make the base JobDescription 875 super().__init__(*args, **kwargs) 876 877 # Set checkpoint-specific properties 878 879 # None, or a copy of the original command string used to reestablish the job after failure. 880 self.checkpoint = None 881 882 # Files that can not be deleted until the job and its successors have completed 883 self.checkpointFilesToDelete = [] 884 885 # Human-readable names of jobs that were run as part of this job's 886 # invocation, starting with this job 887 self.chainedJobs = [] 888 889 def restartCheckpoint(self, jobStore): 890 """ 891 Restart a checkpoint after the total failure of jobs in its subtree. 892 893 Writes the changes to the jobStore immediately. All the 894 checkpoint's successors will be deleted, but its try count 895 will *not* be decreased. 896 897 Returns a list with the IDs of any successors deleted. 898 """ 899 assert self.checkpoint is not None 900 successorsDeleted = [] 901 if self.childIDs or self.followOnIDs or self.serviceTree or self.command is not None: 902 if self.command is not None: 903 assert self.command == self.checkpoint 904 logger.debug("Checkpoint job already has command set to run") 905 else: 906 self.command = self.checkpoint 907 908 jobStore.update(self) # Update immediately to ensure that checkpoint 909 # is made before deleting any remaining successors 910 911 if self.childIDs or self.followOnIDs or self.serviceTree: 912 # If the subtree of successors is not complete restart everything 913 logger.debug("Checkpoint job has unfinished successor jobs, deleting children: %s, followOns: %s, services: %s " % 914 (self.childIDs, self.followOnIDs, self.serviceTree.keys())) 915 # Delete everything on the stack, as these represent successors to clean 916 # up as we restart the queue 917 def recursiveDelete(jobDesc): 918 # Recursive walk the stack to delete all remaining jobs 919 for otherJobID in jobDesc.successorsAndServiceHosts(): 920 if jobStore.exists(otherJobID): 921 recursiveDelete(jobStore.load(otherJobID)) 922 else: 923 logger.debug("Job %s has already been deleted", otherJobID) 924 if jobDesc.jobStoreID != self.jobStoreID: 925 # Delete everything under us except us. 926 logger.debug("Checkpoint is deleting old successor job: %s", jobDesc.jobStoreID) 927 jobStore.delete(jobDesc.jobStoreID) 928 successorsDeleted.append(jobDesc.jobStoreID) 929 recursiveDelete(self) 930 931 # Cut links to the jobs we deleted. 932 self.clearSuccessorsAndServiceHosts() 933 934 # Update again to commit the removal of successors. 935 jobStore.update(self) 936 return successorsDeleted 937 938class Job: 939 """ 940 Class represents a unit of work in toil. 941 """ 942 def __init__(self, memory=None, cores=None, disk=None, preemptable=None, 943 unitName='', checkpoint=False, displayName='', 944 descriptionClass=None): 945 """ 946 This method must be called by any overriding constructor. 947 948 :param memory: the maximum number of bytes of memory the job will require to run. 949 :param cores: the number of CPU cores required. 950 :param disk: the amount of local disk space required by the job, expressed in bytes. 951 :param preemptable: if the job can be run on a preemptable node. 952 :param unitName: Human-readable name for this instance of the job. 953 :param checkpoint: if any of this job's successor jobs completely fails, 954 exhausting all their retries, remove any successor jobs and rerun this job to restart the 955 subtree. Job must be a leaf vertex in the job graph when initially defined, see 956 :func:`toil.job.Job.checkNewCheckpointsAreCutVertices`. 957 :param displayName: Human-readable job type display name. 958 :param descriptionClass: Override for the JobDescription class used to describe the job. 959 960 :type memory: int or string convertible by toil.lib.conversions.human2bytes to an int 961 :type cores: float, int, or string convertible by toil.lib.conversions.human2bytes to an int 962 :type disk: int or string convertible by toil.lib.conversions.human2bytes to an int 963 :type preemptable: bool, int in {0, 1}, or string in {'false', 'true'} in any case 964 :type unitName: str 965 :type checkpoint: bool 966 :type displayName: str 967 :type descriptionClass: class 968 """ 969 970 # Fill in our various names 971 jobName = self.__class__.__name__ 972 displayName = displayName if displayName else jobName 973 974 975 # Build a requirements dict for the description 976 requirements = {'memory': memory, 'cores': cores, 'disk': disk, 977 'preemptable': preemptable} 978 if descriptionClass is None: 979 if checkpoint: 980 # Actually describe as a checkpoint job 981 descriptionClass = CheckpointJobDescription 982 else: 983 # Use the normal default 984 descriptionClass = JobDescription 985 # Create the JobDescription that owns all the scheduling information. 986 # Make it with a temporary ID until we can be assigned a real one by 987 # the JobStore. 988 self._description = descriptionClass(requirements, jobName, unitName=unitName, displayName=displayName) 989 990 # Private class variables needed to actually execute a job, in the worker. 991 # Also needed for setting up job graph structures before saving to the JobStore. 992 993 # This dict holds a mapping from TemporaryIDs to the job objects they represent. 994 # Will be shared among all jobs in a disconnected piece of the job 995 # graph that hasn't been registered with a JobStore yet. 996 # Make sure to initially register ourselves. 997 self._registry = {self._description.jobStoreID: self} 998 999 # Job relationships are all stored exactly once in the JobDescription. 1000 # Except for predecessor relationships which are stored here, just 1001 # while the user is creating the job graphs, to check for duplicate 1002 # relationships and to let EncapsulatedJob magically add itself as a 1003 # child. Note that this stores actual Job objects, to call addChild on. 1004 self._directPredecessors = set() 1005 1006 # Note that self.__module__ is not necessarily this module, i.e. job.py. It is the module 1007 # defining the class self is an instance of, which may be a subclass of Job that may be 1008 # defined in a different module. 1009 self.userModule = ModuleDescriptor.forModule(self.__module__).globalize() 1010 # Maps index paths into composite return values to lists of IDs of files containing 1011 # promised values for those return value items. An index path is a tuple of indices that 1012 # traverses a nested data structure of lists, dicts, tuples or any other type supporting 1013 # the __getitem__() protocol.. The special key `()` (the empty tuple) represents the 1014 # entire return value. 1015 self._rvs = collections.defaultdict(list) 1016 self._promiseJobStore = None 1017 self._fileStore = None 1018 self._defer = None 1019 self._tempDir = None 1020 1021 def __str__(self): 1022 """ 1023 Produce a useful logging string to identify this Job and distinguish it 1024 from its JobDescription. 1025 """ 1026 if self.description is None: 1027 return repr(self) 1028 else: 1029 return 'Job(' + str(self.description) + ')' 1030 1031 @property 1032 def jobStoreID(self): 1033 """ 1034 Get the ID of this Job. 1035 1036 :rtype: str|toil.job.TemporaryID 1037 """ 1038 # This is managed by the JobDescription. 1039 return self._description.jobStoreID 1040 1041 @property 1042 def description(self): 1043 """ 1044 Expose the JobDescription that describes this job. 1045 1046 :rtype: toil.job.JobDescription 1047 """ 1048 return self._description 1049 1050 # Instead of being a Requirer ourselves, we pass anything about 1051 # requirements through to the JobDescription. 1052 1053 @property 1054 def disk(self) -> int: 1055 """ 1056 The maximum number of bytes of disk the job will require to run. 1057 1058 :rtype: int 1059 """ 1060 return self.description.disk 1061 @disk.setter 1062 def disk(self, val): 1063 self.description.disk = val 1064 1065 @property 1066 def memory(self): 1067 """ 1068 The maximum number of bytes of memory the job will require to run. 1069 1070 :rtype: int 1071 """ 1072 return self.description.memory 1073 @memory.setter 1074 def memory(self, val): 1075 self.description.memory = val 1076 1077 @property 1078 def cores(self): 1079 """ 1080 The number of CPU cores required. 1081 1082 :rtype: int|float 1083 """ 1084 return self.description.cores 1085 @cores.setter 1086 def cores(self, val): 1087 self.description.cores = val 1088 1089 @property 1090 def preemptable(self): 1091 """ 1092 Whether the job can be run on a preemptable node. 1093 1094 :rtype: bool 1095 """ 1096 return self.description.preemptable 1097 @preemptable.setter 1098 def preemptable(self, val): 1099 self.description.preemptable = val 1100 1101 @property 1102 def checkpoint(self): 1103 """ 1104 Determine if the job is a checkpoint job or not. 1105 1106 :rtype: bool 1107 """ 1108 1109 return isinstance(self._description, CheckpointJobDescription) 1110 1111 def assignConfig(self, config): 1112 """ 1113 Assign the given config object to be used by various actions 1114 implemented inside the Job class. 1115 1116 :param toil.common.Config config: Config object to query 1117 """ 1118 self.description.assignConfig(config) 1119 1120 1121 def run(self, fileStore): 1122 """ 1123 Override this function to perform work and dynamically create successor jobs. 1124 1125 :param toil.fileStores.abstractFileStore.AbstractFileStore fileStore: Used to create local and 1126 globally sharable temporary files and to send log messages to the leader 1127 process. 1128 1129 :return: The return value of the function can be passed to other jobs by means of 1130 :func:`toil.job.Job.rv`. 1131 """ 1132 1133 def _jobGraphsJoined(self, other): 1134 """ 1135 Called whenever the job graphs of this job and the other job may have been merged into one connected component. 1136 1137 Ought to be called on the bigger registry first. 1138 1139 Merges TemporaryID registries if needed. 1140 1141 :param toil.job.Job other: A job possibly from the other connected component 1142 """ 1143 1144 # Maintain the invariant that a whole connected component has a config 1145 # assigned if any job in it does. 1146 if self.description._config is None and other.description._config is not None: 1147 # The other component has a config assigned but this component doesn't. 1148 for job in self._registry.values(): 1149 job.assignConfig(other.description._config) 1150 elif other.description._config is None and self.description._config is not None: 1151 # We have a config assigned but the other component doesn't. 1152 for job in other._registry.values(): 1153 job.assignConfig(self.description._config) 1154 1155 if len(self._registry) < len(other._registry): 1156 # Merge into the other component instead 1157 other._jobGraphsJoined(self) 1158 else: 1159 if self._registry != other._registry: 1160 # We are in fact joining connected components. 1161 1162 # Steal everything from the other connected component's registry 1163 self._registry.update(other._registry) 1164 1165 for job in other._registry.values(): 1166 # Point all their jobs at the new combined registry 1167 job._registry = self._registry 1168 1169 def addChild(self, childJob): 1170 """ 1171 Adds childJob to be run as child of this job. Child jobs will be run \ 1172 directly after this job's :func:`toil.job.Job.run` method has completed. 1173 1174 :param toil.job.Job childJob: 1175 :return: childJob 1176 :rtype: toil.job.Job 1177 """ 1178 1179 assert isinstance(childJob, Job) 1180 1181 # Join the job graphs 1182 self._jobGraphsJoined(childJob) 1183 # Remember the child relationship 1184 self._description.addChild(childJob.jobStoreID) 1185 # Record the temporary back-reference 1186 childJob._addPredecessor(self) 1187 1188 return childJob 1189 1190 def hasChild(self, childJob): 1191 """ 1192 Check if childJob is already a child of this job. 1193 1194 :param toil.job.Job childJob: 1195 :return: True if childJob is a child of the job, else False. 1196 :rtype: bool 1197 """ 1198 return self._description.hasChild(childJob.jobStoreID) 1199 1200 def addFollowOn(self, followOnJob): 1201 """ 1202 Adds a follow-on job, follow-on jobs will be run after the child jobs and \ 1203 their successors have been run. 1204 1205 :param toil.job.Job followOnJob: 1206 :return: followOnJob 1207 :rtype: toil.job.Job 1208 """ 1209 1210 assert isinstance(followOnJob, Job) 1211 1212 # Join the job graphs 1213 self._jobGraphsJoined(followOnJob) 1214 # Remember the follow-on relationship 1215 self._description.addFollowOn(followOnJob.jobStoreID) 1216 # Record the temporary back-reference 1217 followOnJob._addPredecessor(self) 1218 1219 return followOnJob 1220 1221 def hasPredecessor(self, job: 'Job') -> bool: 1222 """Check if a given job is already a predecessor of this job.""" 1223 return job in self._directPredecessors 1224 1225 def hasFollowOn(self, followOnJob): 1226 """ 1227 Check if given job is already a follow-on of this job. 1228 1229 :param toil.job.Job followOnJob: 1230 :return: True if the followOnJob is a follow-on of this job, else False. 1231 :rtype: bool 1232 """ 1233 return self._description.hasChild(followOnJob.jobStoreID) 1234 1235 def addService(self, service, parentService=None): 1236 """ 1237 Add a service. 1238 1239 The :func:`toil.job.Job.Service.start` method of the service will be called 1240 after the run method has completed but before any successors are run. 1241 The service's :func:`toil.job.Job.Service.stop` method will be called once 1242 the successors of the job have been run. 1243 1244 Services allow things like databases and servers to be started and accessed 1245 by jobs in a workflow. 1246 1247 :raises toil.job.JobException: If service has already been made the child of a job or another service. 1248 :param toil.job.Job.Service service: Service to add. 1249 :param toil.job.Job.Service parentService: Service that will be started before 'service' is 1250 started. Allows trees of services to be established. parentService must be a service 1251 of this job. 1252 :return: a promise that will be replaced with the return value from 1253 :func:`toil.job.Job.Service.start` of service in any successor of the job. 1254 :rtype: toil.job.Promise 1255 """ 1256 1257 if parentService is not None: 1258 if not self.hasService(parentService): 1259 raise JobException("Parent service is not a service of the given job") 1260 1261 if service.hostID is not None: 1262 raise JobException("Service has already been added to a job") 1263 1264 # Create a host job for the service, ad get it an ID 1265 hostingJob = ServiceHostJob(service) 1266 self._jobGraphsJoined(hostingJob) 1267 1268 # Record the relationship to the hosting job, with its parent if any. 1269 self._description.addServiceHostJob(hostingJob.jobStoreID, parentService.hostID if parentService is not None else None) 1270 1271 # For compatibility with old Cactus versions that tinker around with 1272 # our internals, we need to make the hosting job available as 1273 # self._services[-1]. TODO: Remove this when Cactus has updated. 1274 self._services = [hostingJob] 1275 1276 # Return the promise for the service's startup result 1277 return hostingJob.rv() 1278 1279 def hasService(self, service): 1280 """ 1281 Returns True if the given Service is a service of this job, and False otherwise. 1282 """ 1283 1284 return service.hostID is None or self._description.hasServiceHostJob(service.hostID) 1285 1286 ##Convenience functions for creating jobs 1287 1288 def addChildFn(self, fn, *args, **kwargs): 1289 """ 1290 Adds a function as a child job. 1291 1292 :param fn: Function to be run as a child job with ``*args`` and ``**kwargs`` as \ 1293 arguments to this function. See toil.job.FunctionWrappingJob for reserved \ 1294 keyword arguments used to specify resource requirements. 1295 :return: The new child job that wraps fn. 1296 :rtype: toil.job.FunctionWrappingJob 1297 """ 1298 if PromisedRequirement.convertPromises(kwargs): 1299 return self.addChild(PromisedRequirementFunctionWrappingJob.create(fn, *args, **kwargs)) 1300 else: 1301 return self.addChild(FunctionWrappingJob(fn, *args, **kwargs)) 1302 1303 def addFollowOnFn(self, fn, *args, **kwargs): 1304 """ 1305 Adds a function as a follow-on job. 1306 1307 :param fn: Function to be run as a follow-on job with ``*args`` and ``**kwargs`` as \ 1308 arguments to this function. See toil.job.FunctionWrappingJob for reserved \ 1309 keyword arguments used to specify resource requirements. 1310 :return: The new follow-on job that wraps fn. 1311 :rtype: toil.job.FunctionWrappingJob 1312 """ 1313 if PromisedRequirement.convertPromises(kwargs): 1314 return self.addFollowOn(PromisedRequirementFunctionWrappingJob.create(fn, *args, **kwargs)) 1315 else: 1316 return self.addFollowOn(FunctionWrappingJob(fn, *args, **kwargs)) 1317 1318 def addChildJobFn(self, fn, *args, **kwargs): 1319 """ 1320 Adds a job function as a child job. See :class:`toil.job.JobFunctionWrappingJob` 1321 for a definition of a job function. 1322 1323 :param fn: Job function to be run as a child job with ``*args`` and ``**kwargs`` as \ 1324 arguments to this function. See toil.job.JobFunctionWrappingJob for reserved \ 1325 keyword arguments used to specify resource requirements. 1326 :return: The new child job that wraps fn. 1327 :rtype: toil.job.JobFunctionWrappingJob 1328 """ 1329 if PromisedRequirement.convertPromises(kwargs): 1330 return self.addChild(PromisedRequirementJobFunctionWrappingJob.create(fn, *args, **kwargs)) 1331 else: 1332 return self.addChild(JobFunctionWrappingJob(fn, *args, **kwargs)) 1333 1334 def addFollowOnJobFn(self, fn, *args, **kwargs): 1335 """ 1336 Add a follow-on job function. See :class:`toil.job.JobFunctionWrappingJob` 1337 for a definition of a job function. 1338 1339 :param fn: Job function to be run as a follow-on job with ``*args`` and ``**kwargs`` as \ 1340 arguments to this function. See toil.job.JobFunctionWrappingJob for reserved \ 1341 keyword arguments used to specify resource requirements. 1342 :return: The new follow-on job that wraps fn. 1343 :rtype: toil.job.JobFunctionWrappingJob 1344 """ 1345 if PromisedRequirement.convertPromises(kwargs): 1346 return self.addFollowOn(PromisedRequirementJobFunctionWrappingJob.create(fn, *args, **kwargs)) 1347 else: 1348 return self.addFollowOn(JobFunctionWrappingJob(fn, *args, **kwargs)) 1349 1350 @property 1351 def tempDir(self): 1352 """ 1353 Shortcut to calling :func:`job.fileStore.getLocalTempDir`. Temp dir is created on first call 1354 and will be returned for first and future calls 1355 :return: Path to tempDir. See `job.fileStore.getLocalTempDir` 1356 :rtype: str 1357 """ 1358 if self._tempDir is None: 1359 self._tempDir = self._fileStore.getLocalTempDir() 1360 return self._tempDir 1361 1362 def log(self, text, level=logging.INFO): 1363 """ 1364 convenience wrapper for :func:`fileStore.logToMaster` 1365 """ 1366 self._fileStore.logToMaster(text, level) 1367 1368 @staticmethod 1369 def wrapFn(fn, *args, **kwargs): 1370 """ 1371 Makes a Job out of a function. \ 1372 Convenience function for constructor of :class:`toil.job.FunctionWrappingJob`. 1373 1374 :param fn: Function to be run with ``*args`` and ``**kwargs`` as arguments. \ 1375 See toil.job.JobFunctionWrappingJob for reserved keyword arguments used \ 1376 to specify resource requirements. 1377 :return: The new function that wraps fn. 1378 :rtype: toil.job.FunctionWrappingJob 1379 """ 1380 if PromisedRequirement.convertPromises(kwargs): 1381 return PromisedRequirementFunctionWrappingJob.create(fn, *args, **kwargs) 1382 else: 1383 return FunctionWrappingJob(fn, *args, **kwargs) 1384 1385 @staticmethod 1386 def wrapJobFn(fn, *args, **kwargs): 1387 """ 1388 Makes a Job out of a job function. \ 1389 Convenience function for constructor of :class:`toil.job.JobFunctionWrappingJob`. 1390 1391 :param fn: Job function to be run with ``*args`` and ``**kwargs`` as arguments. \ 1392 See toil.job.JobFunctionWrappingJob for reserved keyword arguments used \ 1393 to specify resource requirements. 1394 :return: The new job function that wraps fn. 1395 :rtype: toil.job.JobFunctionWrappingJob 1396 """ 1397 if PromisedRequirement.convertPromises(kwargs): 1398 return PromisedRequirementJobFunctionWrappingJob.create(fn, *args, **kwargs) 1399 else: 1400 return JobFunctionWrappingJob(fn, *args, **kwargs) 1401 1402 def encapsulate(self, name=None): 1403 """ 1404 Encapsulates the job, see :class:`toil.job.EncapsulatedJob`. 1405 Convenience function for constructor of :class:`toil.job.EncapsulatedJob`. 1406 1407 :param str name: Human-readable name for the encapsulated job. 1408 1409 :return: an encapsulated version of this job. 1410 :rtype: toil.job.EncapsulatedJob 1411 """ 1412 return EncapsulatedJob(self, unitName=name) 1413 1414 #################################################### 1415 #The following function is used for passing return values between 1416 #job run functions 1417 #################################################### 1418 1419 def rv(self, *path): 1420 """ 1421 Creates a *promise* (:class:`toil.job.Promise`) representing a return value of the job's 1422 run method, or, in case of a function-wrapping job, the wrapped function's return value. 1423 1424 :param (Any) path: Optional path for selecting a component of the promised return value. 1425 If absent or empty, the entire return value will be used. Otherwise, the first 1426 element of the path is used to select an individual item of the return value. For 1427 that to work, the return value must be a list, dictionary or of any other type 1428 implementing the `__getitem__()` magic method. If the selected item is yet another 1429 composite value, the second element of the path can be used to select an item from 1430 it, and so on. For example, if the return value is `[6,{'a':42}]`, `.rv(0)` would 1431 select `6` , `rv(1)` would select `{'a':3}` while `rv(1,'a')` would select `3`. To 1432 select a slice from a return value that is slicable, e.g. tuple or list, the path 1433 element should be a `slice` object. For example, assuming that the return value is 1434 `[6, 7, 8, 9]` then `.rv(slice(1, 3))` would select `[7, 8]`. Note that slicing 1435 really only makes sense at the end of path. 1436 1437 :return: A promise representing the return value of this jobs :meth:`toil.job.Job.run` 1438 method. 1439 1440 :rtype: toil.job.Promise 1441 """ 1442 return Promise(self, path) 1443 1444 def registerPromise(self, path): 1445 if self._promiseJobStore is None: 1446 # We haven't had a job store set to put our return value into, so 1447 # we must not have been hit yet in job topological order. 1448 raise JobPromiseConstraintError(self) 1449 # TODO: can we guarantee self.jobStoreID is populated and so pass that here? 1450 with self._promiseJobStore.writeFileStream() as (fileHandle, jobStoreFileID): 1451 promise = UnfulfilledPromiseSentinel(str(self.description), False) 1452 pickle.dump(promise, fileHandle, pickle.HIGHEST_PROTOCOL) 1453 self._rvs[path].append(jobStoreFileID) 1454 return self._promiseJobStore.config.jobStore, jobStoreFileID 1455 1456 def prepareForPromiseRegistration(self, jobStore): 1457 """ 1458 Ensure that a promise by this job (the promissor) can register with the promissor when 1459 another job referring to the promise (the promissee) is being serialized. The promissee 1460 holds the reference to the promise (usually as part of the the job arguments) and when it 1461 is being pickled, so will the promises it refers to. Pickling a promise triggers it to be 1462 registered with the promissor. 1463 1464 :return: 1465 """ 1466 self._promiseJobStore = jobStore 1467 1468 def _disablePromiseRegistration(self): 1469 """ 1470 Called when the job data is about to be saved in the JobStore. 1471 No promises should attempt to register with the job after this has been 1472 called, because that registration would not be persisted. 1473 """ 1474 1475 self._promiseJobStore = None 1476 1477 #################################################### 1478 #Cycle/connectivity checking 1479 #################################################### 1480 1481 def checkJobGraphForDeadlocks(self): 1482 """ 1483 Ensures that a graph of Jobs (that hasn't yet been saved to the 1484 JobStore) doesn't contain any pathological relationships between jobs 1485 that would result in deadlocks if we tried to run the jobs. 1486 1487 See :func:`toil.job.Job.checkJobGraphConnected`, 1488 :func:`toil.job.Job.checkJobGraphAcyclic` and 1489 :func:`toil.job.Job.checkNewCheckpointsAreLeafVertices` for more info. 1490 1491 :raises toil.job.JobGraphDeadlockException: if the job graph 1492 is cyclic, contains multiple roots or contains checkpoint jobs that are 1493 not leaf vertices when defined (see :func:`toil.job.Job.checkNewCheckpointsAreLeaves`). 1494 """ 1495 self.checkJobGraphConnected() 1496 self.checkJobGraphAcylic() 1497 self.checkNewCheckpointsAreLeafVertices() 1498 1499 def getRootJobs(self) -> Set['Job']: 1500 """ 1501 Returns the set of root job objects that contain this job. 1502 A root job is a job with no predecessors (i.e. which are not children, follow-ons, or services). 1503 1504 Only deals with jobs created here, rather than loaded from the job store. 1505 """ 1506 1507 # Start assuming all jobs are roots 1508 roots = set(self._registry.keys()) 1509 1510 for job in self._registry.values(): 1511 for otherID in job.description.successorsAndServiceHosts(): 1512 # If anything is a successor or service of anything else, it isn't a root. 1513 if otherID in roots: 1514 # Remove it if we still think it is 1515 roots.remove(otherID) 1516 1517 return {self._registry[jid] for jid in roots} 1518 1519 def checkJobGraphConnected(self): 1520 """ 1521 :raises toil.job.JobGraphDeadlockException: if :func:`toil.job.Job.getRootJobs` does \ 1522 not contain exactly one root job. 1523 1524 As execution always starts from one root job, having multiple root jobs will \ 1525 cause a deadlock to occur. 1526 1527 Only deals with jobs created here, rather than loaded from the job store. 1528 """ 1529 rootJobs = self.getRootJobs() 1530 if len(rootJobs) != 1: 1531 raise JobGraphDeadlockException("Graph does not contain exactly one" 1532 " root job: %s" % rootJobs) 1533 1534 def checkJobGraphAcylic(self): 1535 """ 1536 :raises toil.job.JobGraphDeadlockException: if the connected component \ 1537 of jobs containing this job contains any cycles of child/followOn dependencies \ 1538 in the *augmented job graph* (see below). Such cycles are not allowed \ 1539 in valid job graphs. 1540 1541 A follow-on edge (A, B) between two jobs A and B is equivalent \ 1542 to adding a child edge to B from (1) A, (2) from each child of A, \ 1543 and (3) from the successors of each child of A. We call each such edge \ 1544 an edge an "implied" edge. The augmented job graph is a job graph including \ 1545 all the implied edges. 1546 1547 For a job graph G = (V, E) the algorithm is ``O(|V|^2)``. It is ``O(|V| + |E|)`` for \ 1548 a graph with no follow-ons. The former follow-on case could be improved! 1549 1550 Only deals with jobs created here, rather than loaded from the job store. 1551 """ 1552 #Get the root jobs 1553 roots = self.getRootJobs() 1554 if len(roots) == 0: 1555 raise JobGraphDeadlockException("Graph contains no root jobs due to cycles") 1556 1557 #Get implied edges 1558 extraEdges = self._getImpliedEdges(roots) 1559 1560 #Check for directed cycles in the augmented graph 1561 visited = set() 1562 for root in roots: 1563 root._checkJobGraphAcylicDFS([], visited, extraEdges) 1564 1565 def _checkJobGraphAcylicDFS(self, stack, visited, extraEdges): 1566 """ 1567 DFS traversal to detect cycles in augmented job graph. 1568 """ 1569 if self not in visited: 1570 visited.add(self) 1571 stack.append(self) 1572 for successor in [self._registry[jID] for jID in self.description.allSuccessors() if jID in self._registry] + extraEdges[self]: 1573 # Grab all the successors in the current registry (i.e. added form this node) and look at them. 1574 successor._checkJobGraphAcylicDFS(stack, visited, extraEdges) 1575 assert stack.pop() == self 1576 if self in stack: 1577 stack.append(self) 1578 raise JobGraphDeadlockException("A cycle of job dependencies has been detected '%s'" % stack) 1579 1580 @staticmethod 1581 def _getImpliedEdges(roots): 1582 """ 1583 Gets the set of implied edges (between children and follow-ons of a common job). Used in Job.checkJobGraphAcylic. 1584 1585 Only deals with jobs created here, rather than loaded from the job store. 1586 1587 :returns: dict from Job object to list of Job objects that must be done before it can start. 1588 """ 1589 #Get nodes (Job objects) in job graph 1590 nodes = set() 1591 for root in roots: 1592 root._collectAllSuccessors(nodes) 1593 1594 ##For each follow-on edge calculate the extra implied edges 1595 #Adjacency list of implied edges, i.e. map of jobs to lists of jobs 1596 #connected by an implied edge 1597 extraEdges = dict([(n, []) for n in nodes]) 1598 for job in nodes: 1599 for depth in range(1, len(job.description.stack)): 1600 # Add edges from all jobs in the earlier/upper subtrees to all 1601 # the roots of the later/lower subtrees 1602 1603 upper = job.description.stack[depth] 1604 lower = job.description.stack[depth - 1] 1605 1606 # Find everything in the upper subtree 1607 reacheable = set() 1608 for upperID in upper: 1609 if upperID in job._registry: 1610 # This is a locally added job, not an already-saved job 1611 upperJob = job._registry[upperID] 1612 upperJob._collectAllSuccessors(reacheable) 1613 1614 for inUpper in reacheable: 1615 # Add extra edges to the roots of all the lower subtrees 1616 # But skip anything in the lower subtree not in the current _registry (i.e. not created hear) 1617 extraEdges[inUpper] += [job._registry[lowerID] for lowerID in lower if lowerID in job._registry] 1618 1619 return extraEdges 1620 1621 def checkNewCheckpointsAreLeafVertices(self): 1622 """ 1623 A checkpoint job is a job that is restarted if either it fails, or if any of \ 1624 its successors completely fails, exhausting their retries. 1625 1626 A job is a leaf it is has no successors. 1627 1628 A checkpoint job must be a leaf when initially added to the job graph. When its \ 1629 run method is invoked it can then create direct successors. This restriction is made 1630 to simplify implementation. 1631 1632 Only works on connected components of jobs not yet added to the JobStore. 1633 1634 :raises toil.job.JobGraphDeadlockException: if there exists a job being added to the graph for which \ 1635 checkpoint=True and which is not a leaf. 1636 """ 1637 roots = self.getRootJobs() # Roots jobs of component, these are preexisting jobs in the graph 1638 1639 # All jobs in the component of the job graph containing self 1640 jobs = set() 1641 list(map(lambda x : x._collectAllSuccessors(jobs), roots)) 1642 1643 # Check for each job for which checkpoint is true that it is a cut vertex or leaf 1644 for y in [x for x in jobs if x.checkpoint]: 1645 if y not in roots: # The roots are the prexisting jobs 1646 if not Job._isLeafVertex(y): 1647 raise JobGraphDeadlockException("New checkpoint job %s is not a leaf in the job graph" % y) 1648 1649 #################################################### 1650 #Deferred function system 1651 #################################################### 1652 1653 def defer(self, function, *args, **kwargs): 1654 """ 1655 Register a deferred function, i.e. a callable that will be invoked after the current 1656 attempt at running this job concludes. A job attempt is said to conclude when the job 1657 function (or the :meth:`toil.job.Job.run` method for class-based jobs) returns, raises an 1658 exception or after the process running it terminates abnormally. A deferred function will 1659 be called on the node that attempted to run the job, even if a subsequent attempt is made 1660 on another node. A deferred function should be idempotent because it may be called 1661 multiple times on the same node or even in the same process. More than one deferred 1662 function may be registered per job attempt by calling this method repeatedly with 1663 different arguments. If the same function is registered twice with the same or different 1664 arguments, it will be called twice per job attempt. 1665 1666 Examples for deferred functions are ones that handle cleanup of resources external to 1667 Toil, like Docker containers, files outside the work directory, etc. 1668 1669 :param callable function: The function to be called after this job concludes. 1670 1671 :param list args: The arguments to the function 1672 1673 :param dict kwargs: The keyword arguments to the function 1674 """ 1675 if self._defer is None: 1676 raise Exception('A deferred function may only be registered with a job while that job is running.') 1677 self._defer(DeferredFunction.create(function, *args, **kwargs)) 1678 1679 1680 #################################################### 1681 #The following nested classes are used for 1682 #creating jobtrees (Job.Runner), 1683 #and defining a service (Job.Service) 1684 #################################################### 1685 1686 class Runner(): 1687 """ 1688 Used to setup and run Toil workflow. 1689 """ 1690 @staticmethod 1691 def getDefaultArgumentParser(): 1692 """ 1693 Get argument parser with added toil workflow options. 1694 1695 :returns: The argument parser used by a toil workflow with added Toil options. 1696 :rtype: :class:`argparse.ArgumentParser` 1697 """ 1698 parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) 1699 Job.Runner.addToilOptions(parser) 1700 return parser 1701 1702 @staticmethod 1703 def getDefaultOptions(jobStore): 1704 """ 1705 Get default options for a toil workflow. 1706 1707 :param string jobStore: A string describing the jobStore \ 1708 for the workflow. 1709 :returns: The options used by a toil workflow. 1710 :rtype: argparse.ArgumentParser values object 1711 """ 1712 parser = Job.Runner.getDefaultArgumentParser() 1713 return parser.parse_args(args=[jobStore]) 1714 1715 @staticmethod 1716 def addToilOptions(parser): 1717 """ 1718 Adds the default toil options to an :mod:`optparse` or :mod:`argparse` 1719 parser object. 1720 1721 :param parser: Options object to add toil options to. 1722 :type parser: optparse.OptionParser or argparse.ArgumentParser 1723 """ 1724 addOptions(parser) 1725 1726 @staticmethod 1727 def startToil(job, options): 1728 """ 1729 Deprecated by toil.common.Toil.start. Runs the toil workflow using the given options 1730 (see Job.Runner.getDefaultOptions and Job.Runner.addToilOptions) starting with this 1731 job. 1732 :param toil.job.Job job: root job of the workflow 1733 :raises: toil.leader.FailedJobsException if at the end of function \ 1734 their remain failed jobs. 1735 :return: The return value of the root job's run function. 1736 :rtype: Any 1737 """ 1738 set_logging_from_options(options) 1739 with Toil(options) as toil: 1740 if not options.restart: 1741 return toil.start(job) 1742 else: 1743 return toil.restart() 1744 1745 class Service(Requirer, metaclass=ABCMeta): 1746 """ 1747 Abstract class used to define the interface to a service. 1748 1749 Should be subclassed by the user to define services. 1750 1751 Is not executed as a job; runs within a ServiceHostJob. 1752 """ 1753 def __init__(self, memory=None, cores=None, disk=None, preemptable=None, unitName=None): 1754 """ 1755 Memory, core and disk requirements are specified identically to as in \ 1756 :func:`toil.job.Job.__init__`. 1757 """ 1758 1759 # Save the requirements in ourselves so they are visible on `self` to user code. 1760 super().__init__({'memory': memory, 'cores': cores, 'disk': disk, 'preemptable': preemptable}) 1761 1762 # And the unit name 1763 self.unitName = unitName 1764 1765 # And the name for the hosting job 1766 self.jobName = self.__class__.__name__ 1767 1768 # Record that we have as of yet no ServiceHostJob 1769 self.hostID = None 1770 1771 @abstractmethod 1772 def start(self, job): 1773 """ 1774 Start the service. 1775 1776 :param toil.job.Job job: The underlying host job that the service is being run in. 1777 Can be used to register deferred functions, or to access 1778 the fileStore for creating temporary files. 1779 1780 :returns: An object describing how to access the service. The object must be pickleable 1781 and will be used by jobs to access the service (see :func:`toil.job.Job.addService`). 1782 """ 1783 1784 @abstractmethod 1785 def stop(self, job): 1786 """ 1787 Stops the service. Function can block until complete. 1788 1789 :param toil.job.Job job: The underlying host job that the service is being run in. 1790 Can be used to register deferred functions, or to access 1791 the fileStore for creating temporary files. 1792 """ 1793 1794 def check(self): 1795 """ 1796 Checks the service is still running. 1797 1798 :raise exceptions.RuntimeError: If the service failed, this will cause the service job to be labeled failed. 1799 :returns: True if the service is still running, else False. If False then the service job will be terminated, 1800 and considered a success. Important point: if the service job exits due to a failure, it should raise a 1801 RuntimeError, not return False! 1802 """ 1803 1804 def _addPredecessor(self, predecessorJob): 1805 """Adds a predecessor job to the set of predecessor jobs.""" 1806 if predecessorJob in self._directPredecessors: 1807 raise ConflictingPredecessorError(predecessorJob, self) 1808 self._directPredecessors.add(predecessorJob) 1809 1810 # Record the need for the predecessor to finish 1811 self._description.addPredecessor() 1812 1813 @staticmethod 1814 def _isLeafVertex(job): 1815 return next(job.description.successorsAndServiceHosts(), None) is None 1816 1817 @classmethod 1818 def _loadUserModule(cls, userModule): 1819 """ 1820 Imports and returns the module object represented by the given module descriptor. 1821 1822 :type userModule: ModuleDescriptor 1823 """ 1824 return userModule.load() 1825 1826 @classmethod 1827 def _unpickle(cls, userModule, fileHandle, requireInstanceOf=None): 1828 """ 1829 Unpickles an object graph from the given file handle while loading symbols \ 1830 referencing the __main__ module from the given userModule instead. 1831 1832 :param userModule: 1833 :param fileHandle: An open, binary-mode file handle. 1834 :param requireInstanceOf: If set, require result to be an instance of this class. 1835 :returns: 1836 """ 1837 1838 def filter_main(module_name, class_name): 1839 try: 1840 if module_name == '__main__': 1841 return getattr(userModule, class_name) 1842 else: 1843 return getattr(importlib.import_module(module_name), class_name) 1844 except: 1845 if module_name == '__main__': 1846 logger.debug('Failed getting %s from module %s.', class_name, userModule) 1847 else: 1848 logger.debug('Failed getting %s from module %s.', class_name, module_name) 1849 raise 1850 1851 class FilteredUnpickler(pickle.Unpickler): 1852 def find_class(self, module, name): 1853 return filter_main(module, name) 1854 1855 unpickler = FilteredUnpickler(fileHandle) 1856 1857 runnable = unpickler.load() 1858 if requireInstanceOf is not None: 1859 assert isinstance(runnable, requireInstanceOf), "Did not find a {} when expected".format(requireInstanceOf) 1860 1861 return runnable 1862 1863 def getUserScript(self): 1864 return self.userModule 1865 1866 def _fulfillPromises(self, returnValues, jobStore): 1867 """ 1868 Sets the values for promises using the return values from this job's run() function. 1869 """ 1870 for path, promiseFileStoreIDs in self._rvs.items(): 1871 if not path: 1872 # Note that its possible for returnValues to be a promise, not an actual return 1873 # value. This is the case if the job returns a promise from another job. In 1874 # either case, we just pass it on. 1875 promisedValue = returnValues 1876 else: 1877 # If there is an path ... 1878 if isinstance(returnValues, Promise): 1879 # ... and the value itself is a Promise, we need to created a new, narrower 1880 # promise and pass it on. 1881 promisedValue = Promise(returnValues.job, path) 1882 else: 1883 # Otherwise, we just select the desired component of the return value. 1884 promisedValue = returnValues 1885 for index in path: 1886 promisedValue = promisedValue[index] 1887 for promiseFileStoreID in promiseFileStoreIDs: 1888 # File may be gone if the job is a service being re-run and the accessing job is 1889 # already complete. 1890 if jobStore.fileExists(promiseFileStoreID): 1891 with jobStore.updateFileStream(promiseFileStoreID) as fileHandle: 1892 pickle.dump(promisedValue, fileHandle, pickle.HIGHEST_PROTOCOL) 1893 1894 # Functions associated with Job.checkJobGraphAcyclic to establish that the job graph does not 1895 # contain any cycles of dependencies: 1896 1897 def _collectAllSuccessors(self, visited): 1898 """ 1899 Adds the job and all jobs reachable on a directed path from current node to the given set. 1900 1901 Only considers jobs in this job's subgraph that are newly added, not loaded from the job store. 1902 """ 1903 1904 # Keep our own stack since we may have a stick in the graph long enough 1905 # to exhaust the real stack 1906 todo = [self] 1907 1908 while len(todo) > 0: 1909 job = todo[-1] 1910 todo.pop() 1911 if job not in visited: 1912 visited.add(job) 1913 for successorID in job.description.allSuccessors(): 1914 if successorID in self._registry: 1915 # We added this successor locally 1916 todo.append(self._registry[successorID]) 1917 1918 def getTopologicalOrderingOfJobs(self): 1919 """ 1920 :returns: a list of jobs such that for all pairs of indices i, j for which i < j, \ 1921 the job at index i can be run before the job at index j. 1922 1923 Only considers jobs in this job's subgraph that are newly added, not loaded from the job store. 1924 1925 Ignores service jobs. 1926 1927 :rtype: list[Job] 1928 """ 1929 1930 # List of Job objects in order. 1931 ordering = [] 1932 # Set of IDs of visited jobs. 1933 visited = set() 1934 1935 # We need to recurse and traverse the graph without exhausting Python's 1936 # stack, so we keep our own stack of Job objects 1937 todo = [self] 1938 1939 while len(todo) > 0: 1940 job = todo[-1] 1941 todo.pop() 1942 1943 #Do not add the job to the ordering until all its predecessors have been 1944 #added to the ordering 1945 outstandingPredecessor = False 1946 for predJob in job._directPredecessors: 1947 if predJob.jobStoreID not in visited: 1948 outstandingPredecessor = True 1949 break 1950 if outstandingPredecessor: 1951 continue 1952 1953 if job.jobStoreID not in visited: 1954 visited.add(job.jobStoreID) 1955 ordering.append(job) 1956 1957 for otherID in itertools.chain(job.description.followOnIDs, job.description.childIDs): 1958 if otherID in self._registry: 1959 # Stack up descendants so we process children and then follow-ons. 1960 # So stack up follow-ons deeper 1961 todo.append(self._registry[otherID]) 1962 1963 return ordering 1964 1965 #################################################### 1966 #Storing Jobs into the JobStore 1967 #################################################### 1968 1969 def _register(self, jobStore): 1970 """ 1971 If this job lacks a JobStore-assigned ID, assign this job an ID. 1972 Must be called for each job before it is saved to the JobStore for the first time. 1973 1974 :returns: A list with either one old ID, new ID pair, or an empty list 1975 :rtype: list 1976 """ 1977 1978 # TODO: This doesn't really have much to do with the registry. Rename 1979 # the registry. 1980 1981 if isinstance(self.jobStoreID, TemporaryID): 1982 # We need to get an ID. 1983 1984 # Save our fake ID 1985 fake = self.jobStoreID 1986 1987 # Replace it with a real ID 1988 jobStore.assignID(self.description) 1989 1990 # Make sure the JobDescription can do its JobStore-related setup. 1991 self.description.onRegistration(jobStore) 1992 1993 # Return the fake to real mapping 1994 return [(fake, self.description.jobStoreID)] 1995 else: 1996 # We already have an ID. No assignment or reference rewrite necessary. 1997 return [] 1998 1999 def _renameReferences(self, renames): 2000 """ 2001 Apply the given dict of ID renames to all references to other jobs. 2002 2003 Ignores the registry, which is shared and assumed to already be updated. 2004 2005 IDs not present in the renames dict are left as-is. 2006 2007 :param dict(TemporaryID, str) renames: Rename operations to apply. 2008 """ 2009 2010 # Do renames in the description 2011 self._description.renameReferences(renames) 2012 2013 def saveBody(self, jobStore): 2014 """ 2015 Save the execution data for just this job to the JobStore, and fill in 2016 the JobDescription with the information needed to retrieve it. 2017 2018 The Job's JobDescription must have already had a real jobStoreID assigned to it. 2019 2020 Does not save the JobDescription. 2021 2022 :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore: The job store 2023 to save the job body into. 2024 """ 2025 2026 # We can't save the job in the right place for cleanup unless the 2027 # description has a real ID. 2028 assert not isinstance(self.jobStoreID, TemporaryID), "Tried to save job {} without ID assigned!".format(self) 2029 2030 # Note that we can't accept any more requests for our return value 2031 self._disablePromiseRegistration() 2032 2033 # Clear out old Cactus compatibility fields that don't need to be 2034 # preserved and shouldn't be serialized. 2035 if hasattr(self, '_services'): 2036 delattr(self, '_services') 2037 2038 # Remember fields we will overwrite 2039 description = self._description 2040 registry = self._registry 2041 directPredecessors = self._directPredecessors 2042 2043 try: 2044 try: 2045 # Drop out the description, which the job store will manage separately 2046 self._description = None 2047 # Fix up the registry and direct predecessors for when the job is 2048 # loaded to be run: the registry should contain just the job itself and 2049 # there should be no predecessors available when the job actually runs. 2050 self._registry = {description.jobStoreID: self} 2051 self._directPredecessors = set() 2052 2053 # Save the body of the job 2054 with jobStore.writeFileStream(description.jobStoreID, cleanup=True) as (fileHandle, fileStoreID): 2055 pickle.dump(self, fileHandle, pickle.HIGHEST_PROTOCOL) 2056 finally: 2057 # Restore important fields (before handling errors) 2058 self._directPredecessors = directPredecessors 2059 self._registry = registry 2060 self._description = description 2061 except JobPromiseConstraintError as e: 2062 # The user is passing promises without regard to predecessor constraints. 2063 if e.recipientJob is None: 2064 # Add ourselves as the recipient job that wanted the promise. 2065 e = JobPromiseConstraintError(e.promisingJob, self) 2066 raise e 2067 2068 # Find the user script. 2069 # Note that getUserScript() may have been overridden. This is intended. If we used 2070 # self.userModule directly, we'd be getting a reference to job.py if the job was 2071 # specified as a function (as opposed to a class) since that is where FunctionWrappingJob 2072 # is defined. What we really want is the module that was loaded as __main__, 2073 # and FunctionWrappingJob overrides getUserScript() to give us just that. Only then can 2074 # filter_main() in _unpickle( ) do its job of resolving any user-defined type or function. 2075 userScript = self.getUserScript().globalize() 2076 2077 # The command connects the body of the job to the JobDescription 2078 self._description.command = ' '.join(('_toil', fileStoreID) + userScript.toCommand()) 2079 2080 def _saveJobGraph(self, jobStore, saveSelf=False, returnValues=None): 2081 """ 2082 Save job data and new JobDescriptions to the given job store for this 2083 job and all descending jobs, including services. 2084 2085 Used to save the initial job graph containing the root job of the workflow. 2086 2087 :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore: The job store 2088 to save the jobs into. 2089 :param bool saveSelf: Set to True to save this job along with its children, 2090 follow-ons, and services, or False to just save the children, follow-ons, 2091 and services and to populate the return value. 2092 :param returnValues: The collection of values returned when executing 2093 the job (or starting the service the job is hosting). If saveSelf 2094 is not set, will be used to fulfill the job's return value promises. 2095 """ 2096 2097 # Prohibit cycles and multiple roots 2098 self.checkJobGraphForDeadlocks() 2099 2100 # Make sure everybody in the registry is registered with the job store 2101 # and has an ID. Also rewrite ID references. 2102 allJobs = list(self._registry.values()) 2103 # We use one big dict from fake ID to corresponding real ID to rewrite references. 2104 fakeToReal = {} 2105 for job in allJobs: 2106 # Register the job, get the old ID to new ID pair if any, and save that in the fake to real mapping 2107 fakeToReal.update(job._register(jobStore)) 2108 if len(fakeToReal) > 0: 2109 # Some jobs changed ID. We need to rebuild the registry and apply the reference rewrites. 2110 2111 # Remake the registry in place 2112 self._registry.clear() 2113 self._registry.update({job.jobStoreID: job for job in allJobs}) 2114 2115 for job in allJobs: 2116 # Tell all the jobs (and thus their descriptions and services) 2117 # about the renames. 2118 job._renameReferences(fakeToReal) 2119 2120 # Make sure the whole component is ready for promise registration 2121 for job in allJobs: 2122 job.prepareForPromiseRegistration(jobStore) 2123 2124 # Get an ordering on the non-service jobs which we use for pickling the 2125 # jobs in the correct order to ensure the promises are properly 2126 # established 2127 ordering = self.getTopologicalOrderingOfJobs() 2128 2129 # Set up to save last job first, so promises flow the right way 2130 ordering.reverse() 2131 2132 logger.info("Saving graph of %d jobs, %d new", len(allJobs), len(fakeToReal)) 2133 2134 # Make sure we're the root 2135 assert ordering[-1] == self 2136 2137 # Don't verify the ordering length: it excludes service host jobs. 2138 2139 if not saveSelf: 2140 # Fulfil promises for return values (even if value is None) 2141 self._fulfillPromises(returnValues, jobStore) 2142 2143 for job in ordering: 2144 logger.info("Processing job %s", job.description) 2145 for serviceBatch in reversed(list(job.description.serviceHostIDsInBatches())): 2146 # For each batch of service host jobs in reverse order they start 2147 for serviceID in serviceBatch: 2148 logger.info("Processing service %s", serviceID) 2149 if serviceID in self._registry: 2150 # It's a new service 2151 2152 # Find the actual job 2153 serviceJob = self._registry[serviceID] 2154 logger.info("Saving service %s", serviceJob.description) 2155 # Pickle the service body, which triggers all the promise stuff 2156 serviceJob.saveBody(jobStore) 2157 if job != self or saveSelf: 2158 # Now pickle the job itself 2159 job.saveBody(jobStore) 2160 2161 # Now that the job data is on disk, commit the JobDescriptions in 2162 # reverse execution order, in a batch if supported. 2163 with jobStore.batch(): 2164 for job in ordering: 2165 for serviceBatch in job.description.serviceHostIDsInBatches(): 2166 for serviceID in serviceBatch: 2167 if serviceID in self._registry: 2168 jobStore.create(self._registry[serviceID].description) 2169 if job != self or saveSelf: 2170 jobStore.create(job.description) 2171 2172 def saveAsRootJob(self, jobStore): 2173 """ 2174 Save this job to the given jobStore as the root job of the workflow. 2175 2176 :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore: 2177 :return: the JobDescription describing this job. 2178 """ 2179 2180 # Check if the workflow root is a checkpoint but not a leaf vertex. 2181 # All other job vertices in the graph are checked by checkNewCheckpointsAreLeafVertices 2182 if self.checkpoint and not Job._isLeafVertex(self): 2183 raise JobGraphDeadlockException( 2184 'New checkpoint job %s is not a leaf in the job graph' % self) 2185 2186 # Save the root job and all descendants and services 2187 self._saveJobGraph(jobStore, saveSelf=True) 2188 2189 # Store the name of the first job in a file in case of restart. Up to this point the 2190 # root job is not recoverable. FIXME: "root job" or "first job", which one is it? 2191 jobStore.setRootJob(self.jobStoreID) 2192 2193 # Assign the config from the JobStore as if we were loaded. 2194 # TODO: Find a better way to make this the JobStore's responsibility 2195 self.description.assignConfig(jobStore.config) 2196 2197 return self.description 2198 2199 @classmethod 2200 def loadJob(cls, jobStore, jobDescription): 2201 """ 2202 Retrieves a :class:`toil.job.Job` instance from a JobStore 2203 2204 :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore: The job store. 2205 :param toil.job.JobDescription jobDescription: the JobDescription of the job to retrieve. 2206 :returns: The job referenced by the JobDescription. 2207 :rtype: toil.job.Job 2208 """ 2209 2210 # Grab the command that connects the description to the job body 2211 command = jobDescription.command 2212 2213 commandTokens = command.split() 2214 assert "_toil" == commandTokens[0] 2215 userModule = ModuleDescriptor.fromCommand(commandTokens[2:]) 2216 logger.debug('Loading user module %s.', userModule) 2217 userModule = cls._loadUserModule(userModule) 2218 pickleFile = commandTokens[1] 2219 2220 # Get a directory to download the job in 2221 directory = tempfile.mkdtemp() 2222 # Initialize a blank filename so the finally below can't fail due to a 2223 # missing variable 2224 filename = '' 2225 2226 try: 2227 # Get a filename to download the job to. 2228 # Don't use mkstemp because we would need to delete and replace the 2229 # file. 2230 # Don't use a NamedTemporaryFile context manager because its 2231 # context manager exit will crash if we deleted it. 2232 filename = os.path.join(directory, 'job') 2233 2234 # Download the job 2235 if pickleFile == "firstJob": 2236 jobStore.readSharedFile(pickleFile, filename) 2237 else: 2238 jobStore.readFile(pickleFile, filename) 2239 2240 # Open and unpickle 2241 with open(filename, 'rb') as fileHandle: 2242 job = cls._unpickle(userModule, fileHandle, requireInstanceOf=Job) 2243 # Fill in the current description 2244 job._description = jobDescription 2245 2246 # Set up the registry again, so children and follow-ons can be added on the worker 2247 job._registry = {job.jobStoreID: job} 2248 2249 return job 2250 2251 # TODO: We ought to just unpickle straight from a streaming read 2252 finally: 2253 # Clean up the file 2254 if os.path.exists(filename): 2255 os.unlink(filename) 2256 # Clean up the directory we put it in 2257 shutil.rmtree(directory) 2258 2259 def _run(self, jobGraph=None, fileStore=None, **kwargs): 2260 """ 2261 Function which worker calls to ultimately invoke 2262 a job's Job.run method, and then handle created 2263 children/followOn jobs. 2264 2265 May be (but currently is not) overridden by specialized Toil-internal jobs. 2266 2267 Should not be overridden by non-Toil code! 2268 2269 Despite this, it has been overridden by non-Toil code, so we keep it 2270 around and use a hardened kwargs-based interface to try and tolerate 2271 bad behavior by workflows (e.g. Cactus). 2272 2273 When everyone has upgraded to a sufficiently new Cactus, we can remove 2274 this! 2275 2276 :param NoneType jobGraph: Ignored. Here for compatibility with old 2277 Cactus versions that pass two positional arguments. 2278 :param toil.fileStores.abstractFileStore.AbstractFileStore fileStore: the 2279 FileStore to use to access files when running the job. Required. 2280 """ 2281 return self.run(fileStore) 2282 2283 @contextmanager 2284 def _executor(self, stats, fileStore): 2285 """ 2286 This is the core wrapping method for running the job within a worker. It sets up the stats 2287 and logging before yielding. After completion of the body, the function will finish up the 2288 stats and logging, and starts the async update process for the job. 2289 2290 Will modify the job's description with changes that need to be committed back to the JobStore. 2291 """ 2292 if stats is not None: 2293 startTime = time.time() 2294 startClock = get_total_cpu_time() 2295 baseDir = os.getcwd() 2296 2297 yield 2298 2299 # If the job is not a checkpoint job, add the promise files to delete 2300 # to the list of jobStoreFileIDs to delete 2301 # TODO: why is Promise holding a global list here??? 2302 if not self.checkpoint: 2303 for jobStoreFileID in Promise.filesToDelete: 2304 # Make sure to wrap the job store ID in a FileID object so the file store will accept it 2305 # TODO: talk directly to the job sotre here instead. 2306 fileStore.deleteGlobalFile(FileID(jobStoreFileID, 0)) 2307 else: 2308 # Else copy them to the job description to delete later 2309 self.description.checkpointFilesToDelete = list(Promise.filesToDelete) 2310 Promise.filesToDelete.clear() 2311 # Now indicate the asynchronous update of the job can happen 2312 fileStore.startCommit(jobState=True) 2313 # Change dir back to cwd dir, if changed by job (this is a safety issue) 2314 if os.getcwd() != baseDir: 2315 os.chdir(baseDir) 2316 # Finish up the stats 2317 if stats is not None: 2318 totalCpuTime, totalMemoryUsage = get_total_cpu_time_and_memory_usage() 2319 stats.jobs.append( 2320 Expando( 2321 time=str(time.time() - startTime), 2322 clock=str(totalCpuTime - startClock), 2323 class_name=self._jobName(), 2324 memory=str(totalMemoryUsage) 2325 ) 2326 ) 2327 2328 def _runner(self, jobStore=None, fileStore=None, defer=None, **kwargs): 2329 """ 2330 This method actually runs the job, and serialises the next jobs. 2331 2332 It marks the job as completed (by clearing its command) and creates the 2333 successor relationships to new successors, but it doesn't actually 2334 commit those updates to the current job into the JobStore. 2335 2336 We take all arguments as keyword arguments, and accept and ignore 2337 additional keyword arguments, for compatibility with workflows (*cough* 2338 Cactus *cough*) which are reaching in and overriding _runner (which 2339 they aren't supposed to do). If everything is passed as name=value it 2340 won't break as soon as we add or remove a parameter. 2341 2342 :param class jobStore: Instance of the job store 2343 :param toil.fileStores.abstractFileStore.AbstractFileStore fileStore: Instance 2344 of a cached or uncached filestore 2345 :param defer: Function yielded by open() context 2346 manager of :class:`toil.DeferredFunctionManager`, which is called to 2347 register deferred functions. 2348 :param kwargs: Catch-all to accept superfluous arguments passed by old 2349 versions of Cactus. Cactus shouldn't override this method, but it does. 2350 :return: 2351 """ 2352 2353 # Make deferred function registration available during run(). 2354 self._defer = defer 2355 # Make fileStore available as an attribute during run() ... 2356 self._fileStore = fileStore 2357 # ... but also pass it to _run() as an argument for backwards 2358 # compatibility with workflows that tinker around with our internals, 2359 # and send a fake jobGraph in case they still think jobGraph exists. 2360 returnValues = self._run(jobGraph=None, fileStore=fileStore) 2361 2362 # Clean up state changes made for run() 2363 self._defer = None 2364 self._fileStore = None 2365 2366 2367 # Serialize the new Jobs defined by the run method to the jobStore 2368 self._saveJobGraph(jobStore, saveSelf=False, returnValues=returnValues) 2369 2370 # Clear out the command, because the job is done. 2371 self.description.command = None 2372 2373 # That and the new child/follow-on relationships will need to be 2374 # recorded later by an update() of the JobDescription. 2375 2376 2377 2378 def _jobName(self): 2379 """ 2380 :rtype : string, used as identifier of the job class in the stats report. 2381 """ 2382 return self._description.displayName 2383 2384 2385class JobException(Exception): 2386 """ 2387 General job exception. 2388 """ 2389 def __init__(self, message): 2390 super().__init__(message) 2391 2392 2393class JobGraphDeadlockException(JobException): 2394 """ 2395 An exception raised in the event that a workflow contains an unresolvable \ 2396 dependency, such as a cycle. See :func:`toil.job.Job.checkJobGraphForDeadlocks`. 2397 """ 2398 def __init__(self, string): 2399 super().__init__(string) 2400 2401 2402class FunctionWrappingJob(Job): 2403 """ 2404 Job used to wrap a function. In its `run` method the wrapped function is called. 2405 """ 2406 def __init__(self, userFunction, *args, **kwargs): 2407 """ 2408 :param callable userFunction: The function to wrap. It will be called with ``*args`` and 2409 ``**kwargs`` as arguments. 2410 2411 The keywords ``memory``, ``cores``, ``disk``, ``preemptable`` and ``checkpoint`` are 2412 reserved keyword arguments that if specified will be used to determine the resources 2413 required for the job, as :func:`toil.job.Job.__init__`. If they are keyword arguments to 2414 the function they will be extracted from the function definition, but may be overridden 2415 by the user (as you would expect). 2416 """ 2417 # Use the user-specified requirements, if specified, else grab the default argument 2418 # from the function, if specified, else default to None 2419 argSpec = inspect.getfullargspec(userFunction) 2420 2421 if argSpec.defaults is None: 2422 argDict = {} 2423 else: 2424 argDict = dict(list(zip(argSpec.args[-len(argSpec.defaults):], argSpec.defaults))) 2425 2426 def resolve(key, default=None, dehumanize=False): 2427 try: 2428 # First, try constructor arguments, ... 2429 value = kwargs.pop(key) 2430 except KeyError: 2431 try: 2432 # ..., then try default value for function keyword arguments, ... 2433 value = argDict[key] 2434 except KeyError: 2435 # ... and finally fall back to a default value. 2436 value = default 2437 # Optionally, convert strings with metric or binary prefixes. 2438 if dehumanize and isinstance(value, str): 2439 value = human2bytes(value) 2440 return value 2441 2442 super().__init__(memory=resolve('memory', dehumanize=True), 2443 cores=resolve('cores', dehumanize=True), 2444 disk=resolve('disk', dehumanize=True), 2445 preemptable=resolve('preemptable'), 2446 checkpoint=resolve('checkpoint', default=False), 2447 unitName=resolve('name', default=None)) 2448 2449 self.userFunctionModule = ModuleDescriptor.forModule(userFunction.__module__).globalize() 2450 self.userFunctionName = str(userFunction.__name__) 2451 self.jobName = self.userFunctionName 2452 self._args = args 2453 self._kwargs = kwargs 2454 2455 def _getUserFunction(self): 2456 logger.debug('Loading user function %s from module %s.', 2457 self.userFunctionName, 2458 self.userFunctionModule) 2459 userFunctionModule = self._loadUserModule(self.userFunctionModule) 2460 return getattr(userFunctionModule, self.userFunctionName) 2461 2462 def run(self,fileStore): 2463 userFunction = self._getUserFunction( ) 2464 return userFunction(*self._args, **self._kwargs) 2465 2466 def getUserScript(self): 2467 return self.userFunctionModule 2468 2469 def _jobName(self): 2470 return ".".join((self.__class__.__name__,self.userFunctionModule.name,self.userFunctionName)) 2471 2472 2473class JobFunctionWrappingJob(FunctionWrappingJob): 2474 """ 2475 A job function is a function whose first argument is a :class:`.Job` 2476 instance that is the wrapping job for the function. This can be used to 2477 add successor jobs for the function and perform all the functions the 2478 :class:`.Job` class provides. 2479 2480 To enable the job function to get access to the 2481 :class:`toil.fileStores.abstractFileStore.AbstractFileStore` instance (see 2482 :func:`toil.job.Job.run`), it is made a variable of the wrapping job called 2483 fileStore. 2484 2485 To specify a job's resource requirements the following default keyword arguments 2486 can be specified: 2487 2488 - memory 2489 - disk 2490 - cores 2491 2492 For example to wrap a function into a job we would call:: 2493 2494 Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1) 2495 2496 """ 2497 2498 @property 2499 def fileStore(self): 2500 return self._fileStore 2501 2502 def run(self, fileStore): 2503 userFunction = self._getUserFunction() 2504 rValue = userFunction(*((self,) + tuple(self._args)), **self._kwargs) 2505 return rValue 2506 2507 2508class PromisedRequirementFunctionWrappingJob(FunctionWrappingJob): 2509 """ 2510 Handles dynamic resource allocation using :class:`toil.job.Promise` instances. 2511 Spawns child function using parent function parameters and fulfilled promised 2512 resource requirements. 2513 """ 2514 def __init__(self, userFunction, *args, **kwargs): 2515 self._promisedKwargs = kwargs.copy() 2516 # Replace resource requirements in intermediate job with small values. 2517 kwargs.update(dict(disk='1M', memory='32M', cores=0.1)) 2518 super().__init__(userFunction, *args, **kwargs) 2519 2520 @classmethod 2521 def create(cls, userFunction, *args, **kwargs): 2522 """ 2523 Creates an encapsulated Toil job function with unfulfilled promised resource 2524 requirements. After the promises are fulfilled, a child job function is created 2525 using updated resource values. The subgraph is encapsulated to ensure that this 2526 child job function is run before other children in the workflow. Otherwise, a 2527 different child may try to use an unresolved promise return value from the parent. 2528 """ 2529 return EncapsulatedJob(cls(userFunction, *args, **kwargs)) 2530 2531 def run(self, fileStore): 2532 # Assumes promises are fulfilled when parent job is run 2533 self.evaluatePromisedRequirements() 2534 userFunction = self._getUserFunction() 2535 return self.addChildFn(userFunction, *self._args, **self._promisedKwargs).rv() 2536 2537 def evaluatePromisedRequirements(self): 2538 requirements = ["disk", "memory", "cores"] 2539 # Fulfill resource requirement promises 2540 for requirement in requirements: 2541 try: 2542 if isinstance(self._promisedKwargs[requirement], PromisedRequirement): 2543 self._promisedKwargs[requirement] = self._promisedKwargs[requirement].getValue() 2544 except KeyError: 2545 pass 2546 2547 2548class PromisedRequirementJobFunctionWrappingJob(PromisedRequirementFunctionWrappingJob): 2549 """ 2550 Handles dynamic resource allocation for job functions. 2551 See :class:`toil.job.JobFunctionWrappingJob` 2552 """ 2553 2554 def run(self, fileStore): 2555 self.evaluatePromisedRequirements() 2556 userFunction = self._getUserFunction() 2557 return self.addChildJobFn(userFunction, *self._args, **self._promisedKwargs).rv() 2558 2559 2560class EncapsulatedJob(Job): 2561 """ 2562 A convenience Job class used to make a job subgraph appear to be a single job. 2563 2564 Let A be the root job of a job subgraph and B be another job we'd like to run after A 2565 and all its successors have completed, for this use encapsulate:: 2566 2567 # Job A and subgraph, Job B 2568 A, B = A(), B() 2569 Aprime = A.encapsulate() 2570 Aprime.addChild(B) 2571 # B will run after A and all its successors have completed, A and its subgraph of 2572 # successors in effect appear to be just one job. 2573 2574 If the job being encapsulated has predecessors (e.g. is not the root job), then the encapsulated 2575 job will inherit these predecessors. If predecessors are added to the job being encapsulated 2576 after the encapsulated job is created then the encapsulating job will NOT inherit these 2577 predecessors automatically. Care should be exercised to ensure the encapsulated job has the 2578 proper set of predecessors. 2579 2580 The return value of an encapsulatd job (as accessed by the :func:`toil.job.Job.rv` function) 2581 is the return value of the root job, e.g. A().encapsulate().rv() and A().rv() will resolve to 2582 the same value after A or A.encapsulate() has been run. 2583 """ 2584 def __init__(self, job, unitName=None): 2585 """ 2586 :param toil.job.Job job: the job to encapsulate. 2587 :param str unitName: human-readable name to identify this job instance. 2588 """ 2589 2590 if job is not None: 2591 # Initial construction, when encapsulating a job 2592 2593 # Giving the root of the subgraph the same resources as the first job in the subgraph. 2594 super().__init__(**job.description.requirements, unitName=unitName) 2595 # Ensure that the encapsulated job has the same direct predecessors as the job 2596 # being encapsulated. 2597 for predJob in job._directPredecessors: 2598 predJob.addChild(self) 2599 self.encapsulatedJob = job 2600 Job.addChild(self, job) 2601 # Use small resource requirements for dummy Job instance. 2602 # But not too small, or the job won't have enough resources to safely start up Toil. 2603 self.encapsulatedFollowOn = Job(disk='100M', memory='512M', cores=0.1, unitName=None if unitName is None else unitName + '-followOn') 2604 Job.addFollowOn(self, self.encapsulatedFollowOn) 2605 else: 2606 # Unpickling on the worker, to be run as a no-op. 2607 # No need to try and hook things up, but nobody can add children or 2608 # follow-ons to us now either. 2609 super().__init__() 2610 self.encapsulatedJob = None 2611 self.encapsulatedFollowOn = None 2612 2613 def addChild(self, childJob): 2614 assert self.encapsulatedFollowOn is not None, \ 2615 "Children cannot be added to EncapsulatedJob while it is running" 2616 return Job.addChild(self.encapsulatedFollowOn, childJob) 2617 2618 def addService(self, service, parentService=None): 2619 assert self.encapsulatedFollowOn is not None, \ 2620 "Services cannot be added to EncapsulatedJob while it is running" 2621 return Job.addService(self.encapsulatedFollowOn, service, parentService=parentService) 2622 2623 def addFollowOn(self, followOnJob): 2624 assert self.encapsulatedFollowOn is not None, \ 2625 "Follow-ons cannot be added to EncapsulatedJob while it is running" 2626 return Job.addFollowOn(self.encapsulatedFollowOn, followOnJob) 2627 2628 def rv(self, *path): 2629 assert self.encapsulatedJob is not None 2630 return self.encapsulatedJob.rv(*path) 2631 2632 def prepareForPromiseRegistration(self, jobStore): 2633 # This one will be called after execution when re-serializing the 2634 # (unchanged) graph of jobs rooted here. 2635 super().prepareForPromiseRegistration(jobStore) 2636 if self.encapsulatedJob is not None: 2637 # Running where the job was created. 2638 self.encapsulatedJob.prepareForPromiseRegistration(jobStore) 2639 2640 def _disablePromiseRegistration(self): 2641 assert self.encapsulatedJob is not None 2642 super()._disablePromiseRegistration() 2643 self.encapsulatedJob._disablePromiseRegistration() 2644 2645 def __reduce__(self): 2646 """ 2647 Called during pickling to define the pickled representation of the job. 2648 2649 We don't want to pickle our internal references to the job we 2650 encapsulate, so we elide them here. When actually run, we're just a 2651 no-op job that can maybe chain. 2652 """ 2653 2654 return self.__class__, (None,) 2655 2656 def getUserScript(self): 2657 assert self.encapsulatedJob is not None 2658 return self.encapsulatedJob.getUserScript() 2659 2660 2661class ServiceHostJob(Job): 2662 """ 2663 Job that runs a service. Used internally by Toil. Users should subclass Service instead of using this. 2664 """ 2665 def __init__(self, service): 2666 """ 2667 This constructor should not be called by a user. 2668 2669 :param service: The service to wrap in a job. 2670 :type service: toil.job.Job.Service 2671 """ 2672 2673 # Make sure the service hasn't been given a host already. 2674 assert service.hostID is None 2675 2676 # Make ourselves with name info from the Service and a 2677 # ServiceJobDescription that has the service control flags. 2678 super().__init__(**service.requirements, 2679 unitName=service.unitName, descriptionClass=ServiceJobDescription) 2680 2681 # Make sure the service knows it has a host now 2682 service.hostID = self.jobStoreID 2683 2684 # service.__module__ is the module defining the class service is an instance of. 2685 # Will need to be loaded before unpickling the Service 2686 self.serviceModule = ModuleDescriptor.forModule(service.__module__).globalize() 2687 2688 # The service to run, or None if it is still pickled. 2689 # We can't just pickle as part of ourselves because we may need to load 2690 # an additional module. 2691 self.service = service 2692 # The pickled service, or None if it isn't currently pickled. 2693 # We can't just pickle right away because we may owe promises from it. 2694 self.pickledService = None 2695 2696 # Pick up our name form the service. 2697 self.jobName = service.jobName 2698 # This references the parent job wrapper. It is initialised just before 2699 # the job is run. It is used to access the start and terminate flags. 2700 self.jobGraph = None 2701 2702 @property 2703 def fileStore(self): 2704 """ 2705 Return the file store, which the Service may need. 2706 """ 2707 return self._fileStore 2708 2709 def _renameReferences(self, renames): 2710 # When the job store finally hads out IDs we have to fix up the 2711 # back-reference from our Service to us. 2712 super()._renameReferences(renames) 2713 if self.service is not None: 2714 self.service.hostID = renames[self.service.hostID] 2715 2716 # Since the running service has us, make sure they don't try to tack more 2717 # stuff onto us. 2718 2719 def addChild(self, child): 2720 raise RuntimeError("Service host jobs cannot have children, follow-ons, or services") 2721 2722 def addFollowOn(self, followOn): 2723 raise RuntimeError("Service host jobs cannot have children, follow-ons, or services") 2724 2725 def addService(self, service, parentService=None): 2726 raise RuntimeError("Service host jobs cannot have children, follow-ons, or services") 2727 2728 def saveBody(self, jobStore): 2729 """ 2730 Serialize the service itself before saving the host job's body. 2731 """ 2732 # Save unpickled service 2733 service = self.service 2734 # Serialize service 2735 self.pickledService = pickle.dumps(self.service, protocol=pickle.HIGHEST_PROTOCOL) 2736 # Clear real service until we have the module to load it back 2737 self.service = None 2738 # Save body as normal 2739 super().saveBody(jobStore) 2740 # Restore unpickled service 2741 self.service = service 2742 self.pickledService = None 2743 2744 def run(self, fileStore): 2745 # Unpickle the service 2746 logger.debug('Loading service module %s.', self.serviceModule) 2747 userModule = self._loadUserModule(self.serviceModule) 2748 service = self._unpickle(userModule, BytesIO(self.pickledService), requireInstanceOf=Job.Service) 2749 self.pickledService = None 2750 # Make sure it has the config, since it wasn't load()-ed via the JobStore 2751 service.assignConfig(fileStore.jobStore.config) 2752 #Start the service 2753 startCredentials = service.start(self) 2754 try: 2755 #The start credentials must be communicated to processes connecting to 2756 #the service, to do this while the run method is running we 2757 #cheat and set the return value promise within the run method 2758 self._fulfillPromises(startCredentials, fileStore.jobStore) 2759 self._rvs = {} # Set this to avoid the return values being updated after the 2760 #run method has completed! 2761 2762 #Now flag that the service is running jobs can connect to it 2763 logger.debug("Removing the start jobStoreID to indicate that establishment of the service") 2764 assert self.description.startJobStoreID != None 2765 if fileStore.jobStore.fileExists(self.description.startJobStoreID): 2766 fileStore.jobStore.deleteFile(self.description.startJobStoreID) 2767 assert not fileStore.jobStore.fileExists(self.description.startJobStoreID) 2768 2769 #Now block until we are told to stop, which is indicated by the removal 2770 #of a file 2771 assert self.description.terminateJobStoreID != None 2772 while True: 2773 # Check for the terminate signal 2774 if not fileStore.jobStore.fileExists(self.description.terminateJobStoreID): 2775 logger.debug("Detected that the terminate jobStoreID has been removed so exiting") 2776 if not fileStore.jobStore.fileExists(self.description.errorJobStoreID): 2777 raise RuntimeError("Detected the error jobStoreID has been removed so exiting with an error") 2778 break 2779 2780 # Check the service's status and exit if failed or complete 2781 try: 2782 if not service.check(): 2783 logger.debug("The service has finished okay, exiting") 2784 break 2785 except RuntimeError: 2786 logger.debug("Detected termination of the service") 2787 raise 2788 2789 time.sleep(fileStore.jobStore.config.servicePollingInterval) #Avoid excessive polling 2790 2791 logger.debug("Service is done") 2792 finally: 2793 # The stop function is always called 2794 service.stop(self) 2795 2796 def getUserScript(self): 2797 return self.serviceModule 2798 2799 2800class Promise: 2801 """ 2802 References a return value from a :meth:`toil.job.Job.run` or 2803 :meth:`toil.job.Job.Service.start` method as a *promise* before the method itself is run. 2804 2805 Let T be a job. Instances of :class:`.Promise` (termed a *promise*) are returned by T.rv(), 2806 which is used to reference the return value of T's run function. When the promise is passed 2807 to the constructor (or as an argument to a wrapped function) of a different, successor job 2808 the promise will be replaced by the actual referenced return value. This mechanism allows a 2809 return values from one job's run method to be input argument to job before the former job's 2810 run function has been executed. 2811 """ 2812 _jobstore = None 2813 """ 2814 Caches the job store instance used during unpickling to prevent it from being instantiated 2815 for each promise 2816 2817 :type: toil.jobStores.abstractJobStore.AbstractJobStore 2818 """ 2819 2820 filesToDelete = set() 2821 """ 2822 A set of IDs of files containing promised values when we know we won't need them anymore 2823 """ 2824 def __init__(self, job, path): 2825 """ 2826 :param Job job: the job whose return value this promise references 2827 :param path: see :meth:`.Job.rv` 2828 """ 2829 self.job = job 2830 self.path = path 2831 2832 def __reduce__(self): 2833 """ 2834 Called during pickling when a promise (an instance of this class) is about to be be 2835 pickled. Returns the Promise class and construction arguments that will be evaluated 2836 during unpickling, namely the job store coordinates of a file that will hold the promised 2837 return value. By the time the promise is about to be unpickled, that file should be 2838 populated. 2839 """ 2840 # The allocation of the file in the job store is intentionally lazy, we only allocate an 2841 # empty file in the job store if the promise is actually being pickled. This is done so 2842 # that we do not allocate files for promises that are never used. 2843 jobStoreLocator, jobStoreFileID = self.job.registerPromise(self.path) 2844 # Returning a class object here causes the pickling machinery to attempt to instantiate 2845 # the class. We will catch that with __new__ and return an the actual return value instead. 2846 return self.__class__, (jobStoreLocator, jobStoreFileID) 2847 2848 @staticmethod 2849 def __new__(cls, *args): 2850 assert len(args) == 2 2851 if isinstance(args[0], Job): 2852 # Regular instantiation when promise is created, before it is being pickled 2853 return super().__new__(cls) 2854 else: 2855 # Attempted instantiation during unpickling, return promised value instead 2856 return cls._resolve(*args) 2857 2858 @classmethod 2859 def _resolve(cls, jobStoreLocator, jobStoreFileID): 2860 # Initialize the cached job store if it was never initialized in the current process or 2861 # if it belongs to a different workflow that was run earlier in the current process. 2862 if cls._jobstore is None or cls._jobstore.config.jobStore != jobStoreLocator: 2863 cls._jobstore = Toil.resumeJobStore(jobStoreLocator) 2864 cls.filesToDelete.add(jobStoreFileID) 2865 with cls._jobstore.readFileStream(jobStoreFileID) as fileHandle: 2866 # If this doesn't work then the file containing the promise may not exist or be 2867 # corrupted 2868 value = safeUnpickleFromStream(fileHandle) 2869 return value 2870 2871 2872class PromisedRequirement: 2873 def __init__(self, valueOrCallable, *args): 2874 """ 2875 Class for dynamically allocating job function resource requirements involving 2876 :class:`toil.job.Promise` instances. 2877 2878 Use when resource requirements depend on the return value of a parent function. 2879 PromisedRequirements can be modified by passing a function that takes the 2880 :class:`.Promise` as input. 2881 2882 For example, let f, g, and h be functions. Then a Toil workflow can be 2883 defined as follows:: 2884 A = Job.wrapFn(f) 2885 B = A.addChildFn(g, cores=PromisedRequirement(A.rv()) 2886 C = B.addChildFn(h, cores=PromisedRequirement(lambda x: 2*x, B.rv())) 2887 2888 :param valueOrCallable: A single Promise instance or a function that 2889 takes args as input parameters. 2890 :param args: variable length argument list 2891 :type args: int or .Promise 2892 """ 2893 if hasattr(valueOrCallable, '__call__'): 2894 assert len(args) != 0, 'Need parameters for PromisedRequirement function.' 2895 func = valueOrCallable 2896 else: 2897 assert len(args) == 0, 'Define a PromisedRequirement function to handle multiple arguments.' 2898 func = lambda x: x 2899 args = [valueOrCallable] 2900 2901 self._func = dill.dumps(func) 2902 self._args = list(args) 2903 2904 def getValue(self): 2905 """ 2906 Returns PromisedRequirement value 2907 """ 2908 func = dill.loads(self._func) 2909 return func(*self._args) 2910 2911 @staticmethod 2912 def convertPromises(kwargs): 2913 """ 2914 Returns True if reserved resource keyword is a Promise or 2915 PromisedRequirement instance. Converts Promise instance 2916 to PromisedRequirement. 2917 2918 :param kwargs: function keyword arguments 2919 :return: bool 2920 """ 2921 for r in ["disk", "memory", "cores"]: 2922 if isinstance(kwargs.get(r), Promise): 2923 kwargs[r] = PromisedRequirement(kwargs[r]) 2924 return True 2925 elif isinstance(kwargs.get(r), PromisedRequirement): 2926 return True 2927 return False 2928 2929 2930class UnfulfilledPromiseSentinel: 2931 """This should be overwritten by a proper promised value. Throws an 2932 exception when unpickled.""" 2933 def __init__(self, fulfillingJobName, unpickled): 2934 self.fulfillingJobName = fulfillingJobName 2935 2936 @staticmethod 2937 def __setstate__(stateDict): 2938 """Only called when unpickling. This won't be unpickled unless the 2939 promise wasn't resolved, so we throw an exception.""" 2940 jobName = stateDict['fulfillingJobName'] 2941 raise RuntimeError("This job was passed a promise that wasn't yet resolved when it " 2942 "ran. The job {jobName} that fulfills this promise hasn't yet " 2943 "finished. This means that there aren't enough constraints to " 2944 "ensure the current job always runs after {jobName}. Consider adding a " 2945 "follow-on indirection between this job and its parent, or adding " 2946 "this job as a child/follow-on of {jobName}.".format(jobName=jobName)) 2947