1# Copyright (C) 2015-2021 Regents of the University of California
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import collections
15import copy
16import importlib
17import inspect
18import itertools
19import logging
20import os
21import pickle
22import shutil
23import tempfile
24import time
25import uuid
26from abc import ABCMeta, abstractmethod
27from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
28from contextlib import contextmanager
29from io import BytesIO
30from typing import Dict, Optional, Union, Set
31
32import dill
33
34from toil.common import Config, Toil, addOptions, safeUnpickleFromStream
35from toil.deferred import DeferredFunction
36from toil.fileStores import FileID
37from toil.lib.expando import Expando
38from toil.lib.conversions import human2bytes
39from toil.lib.resources import (get_total_cpu_time,
40                                get_total_cpu_time_and_memory_usage)
41from toil.resource import ModuleDescriptor
42from toil.statsAndLogging import set_logging_from_options
43
44logger = logging.getLogger(__name__)
45
46
47class JobPromiseConstraintError(RuntimeError):
48    """
49    Represents a problem where a job is being asked to promise its return
50    value, but it has not yet been hit in the topological order of the job
51    graph.
52    """
53    def __init__(self, promisingJob, recipientJob=None):
54        """
55        :param toil.job.Job promisingJob: The job being asked for its return value.
56        :param toil.job.Job recipientJob: The job receiving the return value, if any.
57        """
58
59        self.promisingJob = promisingJob
60        self.recipientJob = recipientJob
61
62        if recipientJob is None:
63            # Come up with a vaguer error message
64            super().__init__(f"Job {promisingJob.description} cannot promise its return value to a job that is not its successor")
65        else:
66            # Write a full error message
67            super().__init__(f"Job {promisingJob.description} cannot promise its return value to non-successor {recipientJob.description}")
68
69
70class ConflictingPredecessorError(Exception):
71    def __init__(self, predecessor: 'Job', successor: 'Job'):
72        super().__init__(f'The given job: "{predecessor.description}" is already a predecessor of job: "{successor.description}".')
73
74
75class TemporaryID:
76    """
77    Placeholder for a job ID used by a JobDescription that has not yet been
78    registered with any JobStore.
79
80    Needs to be held:
81        * By JobDescription objects to record normal relationships.
82        * By Jobs to key their connected-component registries and to record
83          predecessor relationships to facilitate EncapsulatedJob adding
84          itself as a child.
85        * By Services to tie back to their hosting jobs, so the service
86          tree can be built up from Service objects.
87    """
88    def __init__(self):
89        """
90        Assign a unique temporary ID that won't collide with anything.
91        """
92        self._value = uuid.uuid4()
93
94    def __str__(self):
95        return self.__repr__()
96
97    def __repr__(self):
98        return f'TemporaryID({self._value})'
99
100    def __hash__(self):
101        return hash(self._value)
102
103    def __eq__(self, other):
104        return isinstance(other, TemporaryID) and self._value == other._value
105
106    def __ne__(self, other):
107        return not isinstance(other, TemporaryID) or self._value != other._value
108
109
110class Requirer:
111    """
112    Base class implementing the storage and presentation of requirements for
113    cores, memory, disk, and preemptability as properties.
114    """
115
116    def __init__(self, requirements):
117        """
118        Parse and save the given requirements.
119
120        :param dict requirements: Dict from string to number, string, or bool
121            describing a set of resource requirments. 'cores', 'memory',
122            'disk', and 'preemptable' fields, if set, are parsed and broken out
123            into properties. If unset, the relevant property will be
124            unspecified, and will be pulled from the assigned Config object if
125            queried (see :meth:`toil.job.Requirer.assignConfig`). If
126            unspecified and no Config object is assigned, an AttributeError
127            will be raised at query time.
128        """
129
130        super().__init__()
131
132        # We can have a toil.common.Config assigned to fill in default values
133        # for e.g. job requirements not explicitly specified.
134        self._config = None
135
136        # Save requirements, parsing and validating anything that needs parsing or validating.
137        # Don't save Nones.
138        self._requirementOverrides = {k: self._parseResource(k, v) for (k, v) in requirements.items() if v is not None}
139
140    def assignConfig(self, config):
141        """
142        Assign the given config object to be used to provide default values.
143
144        Must be called exactly once on a loaded JobDescription before any
145        requirements are queried.
146
147        :param toil.common.Config config: Config object to query
148        """
149        if self._config is not None:
150            raise RuntimeError(f"Config assigned multiple times to {self}")
151        self._config = config
152
153    def __getstate__(self):
154        """
155        Return the dict to use as the instance's __dict__ when pickling.
156        """
157
158        # We want to exclude the config from pickling.
159        state = self.__dict__.copy()
160        state['_config'] = None
161        return state
162
163    def __copy__(self):
164        """
165        Return a semantically-shallow copy of the object, for :meth:`copy.copy`.
166        """
167
168        # See https://stackoverflow.com/a/40484215 for how to do an override
169        # that uses the base implementation
170
171        # Hide this override
172        implementation = self.__copy__
173        self.__copy__ = None
174
175        # Do the copy which omits the config via __getstate__ override
176        clone = copy.copy(self)
177
178        # Put back the override on us and the copy
179        self.__copy__ = implementation
180        clone.__copy__ = implementation
181
182        if self._config is not None:
183            # Share a config reference
184            clone.assignConfig(self._config)
185
186        return clone
187
188    def __deepcopy__(self, memo):
189        """
190        Return a semantically-deep copy of the object, for :meth:`copy.deepcopy`.
191        """
192
193        # See https://stackoverflow.com/a/40484215 for how to do an override
194        # that uses the base implementation
195
196        # Hide this override
197        implementation = self.__deepcopy__
198        self.__deepcopy__ = None
199
200        # Do the deepcopy which omits the config via __getstate__ override
201        clone = copy.deepcopy(self, memo)
202
203        # Put back the override on us and the copy
204        self.__deepcopy__ = implementation
205        clone.__deepcopy__ = implementation
206
207        if self._config is not None:
208            # Share a config reference
209            clone.assignConfig(self._config)
210
211        return clone
212
213    @staticmethod
214    def _parseResource(name, value):
215        """
216        Parse a Toil resource requirement value and apply resource-specific type checks. If the
217        value is a string, a binary or metric unit prefix in it will be evaluated and the
218        corresponding integral value will be returned.
219
220        :param str name: The name of the resource
221        :param str|int|float|bool|None value: The resource value
222        :rtype: int|float|bool|None
223
224        >>> Requirer._parseResource('cores', None)
225        >>> Requirer._parseResource('cores', 1), Requirer._parseResource('disk', 1), \
226        Requirer._parseResource('memory', 1)
227        (1, 1, 1)
228        >>> Requirer._parseResource('cores', '1G'), Requirer._parseResource('disk', '1G'), \
229        Requirer._parseResource('memory', '1G')
230        (1073741824, 1073741824, 1073741824)
231        >>> Requirer._parseResource('cores', 1.1)
232        1.1
233        >>> Requirer._parseResource('disk', 1.1) # doctest: +IGNORE_EXCEPTION_DETAIL
234        Traceback (most recent call last):
235        ...
236        TypeError: The 'disk' requirement does not accept values that are of <type 'float'>
237        >>> Requirer._parseResource('memory', object()) # doctest: +IGNORE_EXCEPTION_DETAIL
238        Traceback (most recent call last):
239        ...
240        TypeError: The 'memory' requirement does not accept values that are of ...
241        """
242
243        if value is None:
244            # Anything can be None.
245            return value
246
247        if name in ('memory', 'disk', 'cores'):
248            # These should be numbers that accept things like "5G".
249            if isinstance(value, (str, bytes)):
250                value = human2bytes(value)
251            if isinstance(value, int):
252                return value
253            elif isinstance(value, float) and name == 'cores':
254                # But only cores can be fractional.
255                return value
256            else:
257                raise TypeError(f"The '{name}' requirement does not accept values that are of type {type(value)}")
258        elif name == 'preemptable':
259            if isinstance(value, str):
260                if value.tolower() == 'true':
261                    return True
262                elif value.tolower() == 'false':
263                    return False
264                else:
265                    raise ValueError(f"The '{name}' requirement, as a string, must be 'true' or 'false' but is {value}")
266            elif isinstance(value, int):
267                if value == 1:
268                    return True
269                if value == 0:
270                    return False
271                else:
272                    raise ValueError(f"The '{name}' requirement, asn an int, must be 1 or 0 but is {value}")
273            elif isinstance(value, bool):
274                return value
275            else:
276                raise TypeError(f"The '{name}' requirement does not accept values that are of type {type(value)}")
277        else:
278            # Anything else we just pass along without opinons
279            return value
280
281    def _fetchRequirement(self, requirement):
282        """
283        Get the value of the specified requirement ('blah') by looking it up in
284        our requirement storage and querying 'defaultBlah' on the config if it
285        isn't set. If the config would be queried but isn't associated, raises
286        AttributeError.
287
288        :param str requirement: The name of the resource
289        :rtype: int|float|bool|None
290        """
291        if requirement in self._requirementOverrides:
292            value = self._requirementOverrides[requirement]
293            if value is None:
294             raise AttributeError(f"Encountered explicit None for '{requirement}' requirement of {self}")
295            return value
296        elif self._config is not None:
297            value = getattr(self._config, 'default' + requirement.capitalize())
298            if value is None:
299                raise AttributeError(f"Encountered None for default '{requirement}' requirement in config: {self._config}")
300            return value
301        else:
302            raise AttributeError(f"Default value for '{requirement}' requirement of {self} cannot be determined")
303
304    @property
305    def requirements(self):
306        """
307        Dict containing all non-None, non-defaulted requirements.
308
309        :rtype: dict
310        """
311        return dict(self._requirementOverrides)
312
313    @property
314    def disk(self) -> int:
315        """
316        The maximum number of bytes of disk required.
317
318        :rtype: int
319        """
320        return self._fetchRequirement('disk')
321    @disk.setter
322    def disk(self, val):
323         self._requirementOverrides['disk'] = self._parseResource('disk', val)
324
325    @property
326    def memory(self):
327        """
328        The maximum number of bytes of memory required.
329
330        :rtype: int
331        """
332        return self._fetchRequirement('memory')
333    @memory.setter
334    def memory(self, val):
335         self._requirementOverrides['memory'] = self._parseResource('memory', val)
336
337    @property
338    def cores(self):
339        """
340        The number of CPU cores required.
341
342        :rtype: int|float
343        """
344        return self._fetchRequirement('cores')
345    @cores.setter
346    def cores(self, val):
347         self._requirementOverrides['cores'] = self._parseResource('cores', val)
348
349    @property
350    def preemptable(self):
351        """
352        Whether a preemptable node is permitted, or a nonpreemptable one is required.
353
354        :rtype: bool
355        """
356        return self._fetchRequirement('preemptable')
357    @preemptable.setter
358    def preemptable(self, val):
359         self._requirementOverrides['preemptable'] = self._parseResource('preemptable', val)
360
361class JobDescription(Requirer):
362    """
363    Stores all the information that the Toil Leader ever needs to know about a
364    Job: requirements information, dependency information, commands to issue,
365    etc.
366
367    Can be obtained from an actual (i.e. executable) Job object, and can be
368    used to obtain the Job object from the JobStore.
369
370    Never contains other Jobs or JobDescriptions: all reference is by ID.
371
372    Subclassed into variants for checkpoint jobs and service jobs that have
373    their specific parameters.
374    """
375
376    def __init__(self, requirements: Dict[str, Union[int, str, bool]], jobName: str, unitName: str='', displayName: str='', command: Optional[str]=None) -> None:
377        """
378        Create a new JobDescription.
379
380        :param dict requirements: Dict from string to number, string, or bool
381            describing the resource requirments of the job. 'cores', 'memory',
382            'disk', and 'preemptable' fields, if set, are parsed and broken out
383            into properties. If unset, the relevant property will be
384            unspecified, and will be pulled from the assigned Config object if
385            queried (see :meth:`toil.job.Requirer.assignConfig`).
386        :param str jobName: Name of the kind of job this is. May be used in job
387            store IDs and logging. Also used to let the cluster scaler learn a
388            model for how long the job will take. Ought to be the job class's
389            name if no real user-defined name is available.
390        :param str unitName: Name of this instance of this kind of job. May
391            appear with jobName in logging.
392        :param str displayName: A human-readable name to identify this
393            particular job instance. Ought to be the job class's name
394            if no real user-defined name is available.
395        """
396
397        # Set requirements
398        super().__init__(requirements)
399
400        # Save names, making sure they are strings and not e.g. bytes.
401        def makeString(x):
402            return x if not isinstance(x, bytes) else x.decode('utf-8', errors='replace')
403        self.jobName = makeString(jobName)
404        self.unitName = makeString(unitName)
405        self.displayName = makeString(displayName)
406
407        # Set properties that are not fully filled in on creation.
408
409        # ID of this job description in the JobStore.
410        self.jobStoreID = TemporaryID()
411
412        # Mostly fake, not-really-executable command string that encodes how to
413        # find the Job body data that this JobDescription describes, and the
414        # module(s) needed to unpickle it.
415        #
416        # Gets replaced with/rewritten into the real, executable command when
417        # the leader passes the description off to the batch system to be
418        # executed.
419        self.command: Optional[str] = command
420
421        # Set scheduling properties that the leader read to think about scheduling.
422
423        # The number of times the job should be attempted. Includes the initial
424        # try, plus the nu,ber of times to retry if the job fails. This number
425        # is reduced each time the job is run, until it is zero, and then no
426        # further attempts to run the job are made. If None, taken as the
427        # default value for this workflow execution.
428        self._remainingTryCount = None
429
430        # Holds FileStore FileIDs of the files that this job has deleted. Used
431        # to journal deletions of files and recover from a worker crash between
432        # committing a JobDescription update and actually executing the
433        # requested deletions.
434        self.filesToDelete = []
435
436        # Holds JobStore Job IDs of the jobs that have been chained into this
437        # job, and which should be deleted when this job finally is deleted.
438        self.jobsToDelete = []
439
440        # The number of direct predecessors of the job. Needs to be stored at
441        # the JobDescription to support dynamically-created jobs with multiple
442        # predecessors. Otherwise, we could reach a job by one path down from
443        # the root and decide to schedule it without knowing that it is also
444        # reachable from other paths down from the root.
445        self.predecessorNumber = 0
446
447        # The IDs of predecessor jobs that have finished. Managed by the Leader
448        # and ToilState, and just stored in the JobDescription. Never changed
449        # after the job is scheduled, so we don't ahve to worry about
450        # conflicting updates from workers.
451        # TODO: Move into ToilState itself so leader stops mutating us so much?
452        self.predecessorsFinished = set()
453
454        # Note that we don't hold IDs of our predecessors. Predecessors know
455        # about us, and not the other way around. Otherwise we wouldn't be able
456        # to save ourselves to the job store until our predecessors were saved,
457        # but they'd also be waiting on us.
458
459        # The IDs of all child jobs of the described job.
460        # Children which are done must be removed with filterSuccessors.
461        self.childIDs = set()
462
463        # The IDs of all follow-on jobs of the described job.
464        # Follow-ons which are done must be removed with filterSuccessors.
465        self.followOnIDs = set()
466
467        # Dict from ServiceHostJob ID to list of child ServiceHostJobs that start after it.
468        # All services must have an entry, if only to an empty list.
469        self.serviceTree = {}
470
471        # A jobStoreFileID of the log file for a job. This will be None unless the job failed and
472        # the logging has been captured to be reported on the leader.
473        self.logJobStoreFileID = None
474
475    def serviceHostIDsInBatches(self):
476        """
477        Get an iterator over all batches of service host job IDs that can be
478        started at the same time, in the order they need to start in.
479        """
480
481        # First start all the jobs with no parent
482        roots = set(self.serviceTree.keys())
483        for parent, children in self.serviceTree.items():
484            for child in children:
485                roots.remove(child)
486        batch = list(roots)
487        if len(batch) > 0:
488            # If there's a first batch, yield it
489            yield batch
490
491        while len(batch) > 0:
492            nextBatch = []
493            for started in batch:
494                # Go find all the children that can start now that we have started.
495                for child in self.serviceTree[started]:
496                    nextBatch.append(child)
497
498            batch = nextBatch
499            if len(batch) > 0:
500                # Emit the batch if nonempty
501                yield batch
502
503    def successorsAndServiceHosts(self):
504        """
505        Get an iterator over all child, follow-on, and service job IDs
506        """
507        return itertools.chain(self.childIDs, self.followOnIDs, self.serviceTree.keys())
508
509    def allSuccessors(self):
510        """
511        Get an iterator over all child and follow-on job IDs
512        """
513        return itertools.chain(self.childIDs, self.followOnIDs)
514
515    @property
516    def services(self):
517        """
518        Get a collection of the IDs of service host jobs for this job, in arbitrary order.
519
520        Will be empty if the job has no unfinished services.
521        """
522
523        return list(self.serviceTree.keys())
524
525    def nextSuccessors(self):
526        """
527        Return the collection of job IDs for the successors of this job that,
528        according to this job, are ready to run.
529
530        If those jobs have multiple predecessor relationships, they may still
531        be blocked on other jobs.
532
533        Returns None when at the final phase (all successors done), and an
534        empty collection if there are more phases but they can't be entered yet
535        (e.g. because we are waiting for the job itself to run).
536        """
537
538        if self.command is not None:
539            # We ourselves need to run. So there's not nothing to do but no successors are ready.
540            return []
541        elif len(self.childIDs) != 0:
542            # Our children need to run
543            return self.childIDs
544        elif len(self.followOnIDs) != 0:
545            # Our follow-ons need to run
546            return self.followOnIDs
547        else:
548            # Everything is done.
549            return None
550
551    @property
552    def stack(self):
553        """
554        Get an immutable collection of immutable collections of IDs of successors that need to run still.
555
556        Batches of successors are in reverse order of the order they need to run in.
557
558        Some successors in each batch may have already been finished. Batches may be empty.
559
560        Exists so that code that used the old stack list immutably can work
561        still. New development should use nextSuccessors(), and all mutations
562        should use filterSuccessors() (which automatically removes completed
563        phases).
564
565        :return: Batches of successors that still need to run, in reverse
566                 order. An empty batch may exist under a non-empty batch, or at the top
567                 when the job itself is not done.
568        :rtype: tuple(tuple(str))
569        """
570
571        result = []
572        if self.command is not None or len(self.childIDs) != 0 or len(self.followOnIDs) != 0:
573            # Follow-ons haven't all finished yet
574            result.append(tuple(self.followOnIDs))
575        if self.command is not None or len(self.childIDs) != 0:
576            # Children haven't all finished yet
577            result.append(tuple(self.childIDs))
578        return tuple(result)
579
580    def filterSuccessors(self, predicate):
581       """
582       Keep only successor jobs for which the given predicate function returns True when called with the job's ID.
583
584       Treats all other successors as complete and forgets them.
585       """
586
587       self.childIDs = {x for x in self.childIDs if predicate(x)}
588       self.followOnIDs = {x for x in self.followOnIDs if predicate(x)}
589
590    def filterServiceHosts(self, predicate):
591        """
592        Keep only services for which the given predicate function returns True when called with the service host job's ID.
593
594        Treats all other services as complete and forgets them.
595        """
596
597        # Get all the services we shouldn't have anymore
598        toRemove = set()
599        for serviceID in self.services:
600            if not predicate(serviceID):
601                toRemove.add(serviceID)
602
603        # Drop everything from that set as a value and a key
604        self.serviceTree = {k: [x for x in v if x not in toRemove] for k, v in self.serviceTree.items() if k not in toRemove}
605
606    def clearSuccessorsAndServiceHosts(self):
607        """
608        Remove all references to child, follow-on, and service jobs associated with the described job.
609        """
610        self.childIDs = set()
611        self.followOnIDs = set()
612        self.serviceTree = {}
613
614
615    def replace(self, other):
616        """
617        Take on the ID of another JobDescription, while retaining our own state and type.
618
619        When updated in the JobStore, we will save over the other JobDescription.
620
621        Useful for chaining jobs: the chained-to job can replace the parent job.
622
623        Merges cleanup state from the job being replaced into this one.
624
625        :param toil.job.JobDescription other: Job description to replace.
626        """
627
628        # TODO: also be able to take on the successors of the other job, under
629        # ours on the stack, somehow.
630
631        self.jobStoreID = other.jobStoreID
632
633        # Save files and jobs to delete from the job we replaced, so we can
634        # roll up a whole chain of jobs and delete them when they're all done.
635        self.filesToDelete += other.filesToDelete
636        self.jobsToDelete += other.jobsToDelete
637
638    def addChild(self, childID):
639        """
640        Make the job with the given ID a child of the described job.
641        """
642
643        self.childIDs.add(childID)
644
645    def addFollowOn(self, followOnID):
646        """
647        Make the job with the given ID a follow-on of the described job.
648        """
649
650        self.followOnIDs.add(followOnID)
651
652    def addServiceHostJob(self, serviceID, parentServiceID=None):
653        """
654        Make the ServiceHostJob with the given ID a service of the described job.
655
656        If a parent ServiceHostJob ID is given, that parent service will be started
657        first, and must have already been added.
658        """
659
660        # Make sure we aren't clobbering something
661        assert serviceID not in self.serviceTree
662        self.serviceTree[serviceID] = []
663        if parentServiceID is not None:
664            self.serviceTree[parentServiceID].append(serviceID)
665
666    def hasChild(self, childID):
667        """
668        Return True if the job with the given ID is a child of the described job.
669        """
670
671        return childID in self.childIDs
672
673    def hasFollowOn(self, followOnID):
674        """
675        Return True if the job with the given ID is a follow-on of the described job.
676        """
677
678        return followOnID in self.followOnIDs
679
680    def hasServiceHostJob(self, serviceID):
681        """
682        Return True if the ServiceHostJob with the given ID is a service of the described job.
683        """
684
685        return serviceID in self.serviceTree
686
687    def renameReferences(self, renames):
688        """
689        Apply the given dict of ID renames to all references to jobs. Does not
690        modify our own ID or those of finished predecessors.
691
692        IDs not present in the renames dict are left as-is.
693
694        :param dict(TemporaryID, str) renames: Rename operations to apply.
695        """
696
697        self.childIDs = {renames.get(old, old) for old in self.childIDs}
698        self.followOnIDs = {renames.get(old, old) for old in self.followOnIDs}
699        self.serviceTree = {renames.get(parent, parent): [renames.get(child, child) for child in children]
700                            for parent, children in self.serviceTree.items()}
701
702    def addPredecessor(self):
703        """
704        Notify the JobDescription that a predecessor has been added to its Job.
705        """
706        self.predecessorNumber += 1
707
708    def onRegistration(self, jobStore):
709        """
710        Called by the Job saving logic when this JobDescription meets the JobStore and has its ID assigned.
711
712        Overridden to perform setup work (like hooking up flag files for service jobs) that requires the JobStore.
713
714        :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore: The job store we are being placed into
715        """
716
717    def setupJobAfterFailure(self, exitReason=None):
718        """
719        Reduce the remainingTryCount if greater than zero and set the memory
720        to be at least as big as the default memory (in case of exhaustion of memory,
721        which is common).
722
723        Requires a configuration to have been assigned (see :meth:`toil.job.Requirer.assignConfig`).
724
725        :param toil.batchSystems.abstractBatchSystem.BatchJobExitReason exitReason: The configuration for the current workflow run.
726
727        """
728
729        # Avoid potential circular imports
730        from toil.batchSystems.abstractBatchSystem import BatchJobExitReason
731
732        # Old version of this function used to take a config. Make sure that isn't happening.
733        assert not isinstance(exitReason, Config), "Passing a Config as an exit reason"
734        # Make sure we have an assigned config.
735        assert self._config is not None
736
737        if self._config.enableUnlimitedPreemptableRetries and exitReason == BatchJobExitReason.LOST:
738            logger.info("*Not* reducing try count (%s) of job %s with ID %s",
739                        self.remainingTryCount, self, self.jobStoreID)
740        else:
741            self.remainingTryCount = max(0, self.remainingTryCount - 1)
742            logger.warning("Due to failure we are reducing the remaining try count of job %s with ID %s to %s",
743                           self, self.jobStoreID, self.remainingTryCount)
744        # Set the default memory to be at least as large as the default, in
745        # case this was a malloc failure (we do this because of the combined
746        # batch system)
747        if exitReason == BatchJobExitReason.MEMLIMIT and self._config.doubleMem:
748            self.memory = self.memory * 2
749            logger.warning("We have doubled the memory of the failed job %s to %s bytes due to doubleMem flag",
750                           self, self.memory)
751        if self.memory < self._config.defaultMemory:
752            self.memory = self._config.defaultMemory
753            logger.warning("We have increased the default memory of the failed job %s to %s bytes",
754                           self, self.memory)
755
756        if self.disk < self._config.defaultDisk:
757            self.disk = self._config.defaultDisk
758            logger.warning("We have increased the disk of the failed job %s to the default of %s bytes",
759                           self, self.disk)
760
761
762    def getLogFileHandle(self, jobStore):
763        """
764        Returns a context manager that yields a file handle to the log file.
765
766        Assumes logJobStoreFileID is set.
767        """
768        return jobStore.readFileStream(self.logJobStoreFileID)
769
770    @property
771    def remainingTryCount(self):
772        """
773        The try count set on the JobDescription, or the default based on the
774        retry count from the config if none is set.
775        """
776        if self._remainingTryCount is not None:
777            return self._remainingTryCount
778        elif self._config is not None:
779            # Our try count should be the number of retries in the config, plus
780            # 1 for the initial try
781            return self._config.retryCount + 1
782        else:
783            raise AttributeError(f"Try count for {self} cannot be determined")
784    @remainingTryCount.setter
785    def remainingTryCount(self, val):
786        self._remainingTryCount = val
787
788    def clearRemainingTryCount(self):
789        """
790        Clear remainingTryCount and set it back to its default value.
791
792        :returns: True if a modification to the JobDescription was made, and
793                  False otherwise.
794        :rtype: bool
795        """
796        if self._remainingTryCount is not None:
797            # We had a value stored
798            self._remainingTryCount = None
799            return True
800        else:
801            # No change needed
802            return False
803
804
805    def __str__(self):
806        """
807        Produce a useful logging string identifying this job.
808        """
809
810        printedName = "'" + self.jobName + "'"
811        if self.unitName:
812            printedName += ' ' + self.unitName
813
814        if self.jobStoreID is not None:
815            printedName += ' ' + str(self.jobStoreID)
816
817        return printedName
818
819    # Not usable as a key (not hashable) and doesn't have any value-equality.
820    # There really should only ever be one true version of a JobDescription at
821    # a time, keyed by jobStoreID.
822
823    def __repr__(self):
824        return '%s( **%r )' % (self.__class__.__name__, self.__dict__)
825
826
827class ServiceJobDescription(JobDescription):
828    """
829    A description of a job that hosts a service.
830    """
831
832    def __init__(self, *args, **kwargs):
833        """
834        Create a ServiceJobDescription to describe a ServiceHostJob.
835        """
836
837        # Make the base JobDescription
838        super().__init__(*args, **kwargs)
839
840        # Set service-specific properties
841
842        # An empty file in the jobStore which when deleted is used to signal that the service
843        # should cease.
844        self.terminateJobStoreID = None
845
846        # Similarly a empty file which when deleted is used to signal that the service is
847        # established
848        self.startJobStoreID = None
849
850        # An empty file in the jobStore which when deleted is used to signal that the service
851        # should terminate signaling an error.
852        self.errorJobStoreID = None
853
854    def onRegistration(self, jobStore):
855        """
856        When a ServiceJobDescription first meets the JobStore, it needs to set up its flag files.
857        """
858        super().onRegistration(jobStore)
859
860        self.startJobStoreID = jobStore.getEmptyFileStoreID()
861        self.terminateJobStoreID = jobStore.getEmptyFileStoreID()
862        self.errorJobStoreID = jobStore.getEmptyFileStoreID()
863
864class CheckpointJobDescription(JobDescription):
865    """
866    A description of a job that is a checkpoint.
867    """
868
869    def __init__(self, *args, **kwargs):
870        """
871        Create a CheckpointJobDescription to describe a checkpoint job.
872        """
873
874        # Make the base JobDescription
875        super().__init__(*args, **kwargs)
876
877        # Set checkpoint-specific properties
878
879        # None, or a copy of the original command string used to reestablish the job after failure.
880        self.checkpoint = None
881
882        # Files that can not be deleted until the job and its successors have completed
883        self.checkpointFilesToDelete = []
884
885        # Human-readable names of jobs that were run as part of this job's
886        # invocation, starting with this job
887        self.chainedJobs = []
888
889    def restartCheckpoint(self, jobStore):
890        """
891        Restart a checkpoint after the total failure of jobs in its subtree.
892
893        Writes the changes to the jobStore immediately. All the
894        checkpoint's successors will be deleted, but its try count
895        will *not* be decreased.
896
897        Returns a list with the IDs of any successors deleted.
898        """
899        assert self.checkpoint is not None
900        successorsDeleted = []
901        if self.childIDs or self.followOnIDs or self.serviceTree or self.command is not None:
902            if self.command is not None:
903                assert self.command == self.checkpoint
904                logger.debug("Checkpoint job already has command set to run")
905            else:
906                self.command = self.checkpoint
907
908            jobStore.update(self) # Update immediately to ensure that checkpoint
909            # is made before deleting any remaining successors
910
911            if self.childIDs or self.followOnIDs or self.serviceTree:
912                # If the subtree of successors is not complete restart everything
913                logger.debug("Checkpoint job has unfinished successor jobs, deleting children: %s, followOns: %s, services: %s " %
914                             (self.childIDs, self.followOnIDs, self.serviceTree.keys()))
915                # Delete everything on the stack, as these represent successors to clean
916                # up as we restart the queue
917                def recursiveDelete(jobDesc):
918                    # Recursive walk the stack to delete all remaining jobs
919                    for otherJobID in jobDesc.successorsAndServiceHosts():
920                        if jobStore.exists(otherJobID):
921                            recursiveDelete(jobStore.load(otherJobID))
922                        else:
923                            logger.debug("Job %s has already been deleted", otherJobID)
924                    if jobDesc.jobStoreID != self.jobStoreID:
925                        # Delete everything under us except us.
926                        logger.debug("Checkpoint is deleting old successor job: %s", jobDesc.jobStoreID)
927                        jobStore.delete(jobDesc.jobStoreID)
928                        successorsDeleted.append(jobDesc.jobStoreID)
929                recursiveDelete(self)
930
931                # Cut links to the jobs we deleted.
932                self.clearSuccessorsAndServiceHosts()
933
934                # Update again to commit the removal of successors.
935                jobStore.update(self)
936        return successorsDeleted
937
938class Job:
939    """
940    Class represents a unit of work in toil.
941    """
942    def __init__(self, memory=None, cores=None, disk=None, preemptable=None,
943                       unitName='', checkpoint=False, displayName='',
944                       descriptionClass=None):
945        """
946        This method must be called by any overriding constructor.
947
948        :param memory: the maximum number of bytes of memory the job will require to run.
949        :param cores: the number of CPU cores required.
950        :param disk: the amount of local disk space required by the job, expressed in bytes.
951        :param preemptable: if the job can be run on a preemptable node.
952        :param unitName: Human-readable name for this instance of the job.
953        :param checkpoint: if any of this job's successor jobs completely fails,
954            exhausting all their retries, remove any successor jobs and rerun this job to restart the
955            subtree. Job must be a leaf vertex in the job graph when initially defined, see
956            :func:`toil.job.Job.checkNewCheckpointsAreCutVertices`.
957        :param displayName: Human-readable job type display name.
958        :param descriptionClass: Override for the JobDescription class used to describe the job.
959
960        :type memory: int or string convertible by toil.lib.conversions.human2bytes to an int
961        :type cores: float, int, or string convertible by toil.lib.conversions.human2bytes to an int
962        :type disk: int or string convertible by toil.lib.conversions.human2bytes to an int
963        :type preemptable: bool, int in {0, 1}, or string in {'false', 'true'} in any case
964        :type unitName: str
965        :type checkpoint: bool
966        :type displayName: str
967        :type descriptionClass: class
968        """
969
970        # Fill in our various names
971        jobName = self.__class__.__name__
972        displayName = displayName if displayName else jobName
973
974
975        # Build a requirements dict for the description
976        requirements = {'memory': memory, 'cores': cores, 'disk': disk,
977                        'preemptable': preemptable}
978        if descriptionClass is None:
979            if checkpoint:
980                # Actually describe as a checkpoint job
981                descriptionClass = CheckpointJobDescription
982            else:
983                # Use the normal default
984                descriptionClass = JobDescription
985        # Create the JobDescription that owns all the scheduling information.
986        # Make it with a temporary ID until we can be assigned a real one by
987        # the JobStore.
988        self._description = descriptionClass(requirements, jobName, unitName=unitName, displayName=displayName)
989
990        # Private class variables needed to actually execute a job, in the worker.
991        # Also needed for setting up job graph structures before saving to the JobStore.
992
993        # This dict holds a mapping from TemporaryIDs to the job objects they represent.
994        # Will be shared among all jobs in a disconnected piece of the job
995        # graph that hasn't been registered with a JobStore yet.
996        # Make sure to initially register ourselves.
997        self._registry = {self._description.jobStoreID: self}
998
999        # Job relationships are all stored exactly once in the JobDescription.
1000        # Except for predecessor relationships which are stored here, just
1001        # while the user is creating the job graphs, to check for duplicate
1002        # relationships and to let EncapsulatedJob magically add itself as a
1003        # child. Note that this stores actual Job objects, to call addChild on.
1004        self._directPredecessors = set()
1005
1006        # Note that self.__module__ is not necessarily this module, i.e. job.py. It is the module
1007        # defining the class self is an instance of, which may be a subclass of Job that may be
1008        # defined in a different module.
1009        self.userModule = ModuleDescriptor.forModule(self.__module__).globalize()
1010        # Maps index paths into composite return values to lists of IDs of files containing
1011        # promised values for those return value items. An index path is a tuple of indices that
1012        # traverses a nested data structure of lists, dicts, tuples or any other type supporting
1013        # the __getitem__() protocol.. The special key `()` (the empty tuple) represents the
1014        # entire return value.
1015        self._rvs = collections.defaultdict(list)
1016        self._promiseJobStore = None
1017        self._fileStore = None
1018        self._defer = None
1019        self._tempDir = None
1020
1021    def __str__(self):
1022        """
1023        Produce a useful logging string to identify this Job and distinguish it
1024        from its JobDescription.
1025        """
1026        if self.description is None:
1027            return repr(self)
1028        else:
1029            return 'Job(' + str(self.description) + ')'
1030
1031    @property
1032    def jobStoreID(self):
1033        """
1034        Get the ID of this Job.
1035
1036        :rtype: str|toil.job.TemporaryID
1037        """
1038        # This is managed by the JobDescription.
1039        return self._description.jobStoreID
1040
1041    @property
1042    def description(self):
1043        """
1044        Expose the JobDescription that describes this job.
1045
1046        :rtype: toil.job.JobDescription
1047        """
1048        return self._description
1049
1050    # Instead of being a Requirer ourselves, we pass anything about
1051    # requirements through to the JobDescription.
1052
1053    @property
1054    def disk(self) -> int:
1055        """
1056        The maximum number of bytes of disk the job will require to run.
1057
1058        :rtype: int
1059        """
1060        return self.description.disk
1061    @disk.setter
1062    def disk(self, val):
1063         self.description.disk = val
1064
1065    @property
1066    def memory(self):
1067        """
1068        The maximum number of bytes of memory the job will require to run.
1069
1070        :rtype: int
1071        """
1072        return self.description.memory
1073    @memory.setter
1074    def memory(self, val):
1075         self.description.memory = val
1076
1077    @property
1078    def cores(self):
1079        """
1080        The number of CPU cores required.
1081
1082       :rtype: int|float
1083        """
1084        return self.description.cores
1085    @cores.setter
1086    def cores(self, val):
1087         self.description.cores = val
1088
1089    @property
1090    def preemptable(self):
1091        """
1092        Whether the job can be run on a preemptable node.
1093
1094        :rtype: bool
1095        """
1096        return self.description.preemptable
1097    @preemptable.setter
1098    def preemptable(self, val):
1099         self.description.preemptable = val
1100
1101    @property
1102    def checkpoint(self):
1103        """
1104        Determine if the job is a checkpoint job or not.
1105
1106        :rtype: bool
1107        """
1108
1109        return isinstance(self._description, CheckpointJobDescription)
1110
1111    def assignConfig(self, config):
1112        """
1113        Assign the given config object to be used by various actions
1114        implemented inside the Job class.
1115
1116        :param toil.common.Config config: Config object to query
1117        """
1118        self.description.assignConfig(config)
1119
1120
1121    def run(self, fileStore):
1122        """
1123        Override this function to perform work and dynamically create successor jobs.
1124
1125        :param toil.fileStores.abstractFileStore.AbstractFileStore fileStore: Used to create local and
1126               globally sharable temporary files and to send log messages to the leader
1127               process.
1128
1129        :return: The return value of the function can be passed to other jobs by means of
1130                 :func:`toil.job.Job.rv`.
1131        """
1132
1133    def _jobGraphsJoined(self, other):
1134        """
1135        Called whenever the job graphs of this job and the other job may have been merged into one connected component.
1136
1137        Ought to be called on the bigger registry first.
1138
1139        Merges TemporaryID registries if needed.
1140
1141        :param toil.job.Job other: A job possibly from the other connected component
1142        """
1143
1144        # Maintain the invariant that a whole connected component has a config
1145        # assigned if any job in it does.
1146        if self.description._config is None and other.description._config is not None:
1147            # The other component has a config assigned but this component doesn't.
1148            for job in self._registry.values():
1149                job.assignConfig(other.description._config)
1150        elif other.description._config is None and self.description._config is not None:
1151            # We have a config assigned but the other component doesn't.
1152            for job in other._registry.values():
1153                job.assignConfig(self.description._config)
1154
1155        if len(self._registry) < len(other._registry):
1156            # Merge into the other component instead
1157            other._jobGraphsJoined(self)
1158        else:
1159            if self._registry != other._registry:
1160                # We are in fact joining connected components.
1161
1162                # Steal everything from the other connected component's registry
1163                self._registry.update(other._registry)
1164
1165                for job in other._registry.values():
1166                    # Point all their jobs at the new combined registry
1167                    job._registry = self._registry
1168
1169    def addChild(self, childJob):
1170        """
1171        Adds childJob to be run as child of this job. Child jobs will be run \
1172        directly after this job's :func:`toil.job.Job.run` method has completed.
1173
1174        :param toil.job.Job childJob:
1175        :return: childJob
1176        :rtype: toil.job.Job
1177        """
1178
1179        assert isinstance(childJob, Job)
1180
1181        # Join the job graphs
1182        self._jobGraphsJoined(childJob)
1183        # Remember the child relationship
1184        self._description.addChild(childJob.jobStoreID)
1185        # Record the temporary back-reference
1186        childJob._addPredecessor(self)
1187
1188        return childJob
1189
1190    def hasChild(self, childJob):
1191        """
1192        Check if childJob is already a child of this job.
1193
1194        :param toil.job.Job childJob:
1195        :return: True if childJob is a child of the job, else False.
1196        :rtype: bool
1197        """
1198        return self._description.hasChild(childJob.jobStoreID)
1199
1200    def addFollowOn(self, followOnJob):
1201        """
1202        Adds a follow-on job, follow-on jobs will be run after the child jobs and \
1203        their successors have been run.
1204
1205        :param toil.job.Job followOnJob:
1206        :return: followOnJob
1207        :rtype: toil.job.Job
1208        """
1209
1210        assert isinstance(followOnJob, Job)
1211
1212        # Join the job graphs
1213        self._jobGraphsJoined(followOnJob)
1214        # Remember the follow-on relationship
1215        self._description.addFollowOn(followOnJob.jobStoreID)
1216        # Record the temporary back-reference
1217        followOnJob._addPredecessor(self)
1218
1219        return followOnJob
1220
1221    def hasPredecessor(self, job: 'Job') -> bool:
1222        """Check if a given job is already a predecessor of this job."""
1223        return job in self._directPredecessors
1224
1225    def hasFollowOn(self, followOnJob):
1226        """
1227        Check if given job is already a follow-on of this job.
1228
1229        :param toil.job.Job followOnJob:
1230        :return: True if the followOnJob is a follow-on of this job, else False.
1231        :rtype: bool
1232        """
1233        return self._description.hasChild(followOnJob.jobStoreID)
1234
1235    def addService(self, service, parentService=None):
1236        """
1237        Add a service.
1238
1239        The :func:`toil.job.Job.Service.start` method of the service will be called
1240        after the run method has completed but before any successors are run.
1241        The service's :func:`toil.job.Job.Service.stop` method will be called once
1242        the successors of the job have been run.
1243
1244        Services allow things like databases and servers to be started and accessed
1245        by jobs in a workflow.
1246
1247        :raises toil.job.JobException: If service has already been made the child of a job or another service.
1248        :param toil.job.Job.Service service: Service to add.
1249        :param toil.job.Job.Service parentService: Service that will be started before 'service' is
1250            started. Allows trees of services to be established. parentService must be a service
1251            of this job.
1252        :return: a promise that will be replaced with the return value from
1253            :func:`toil.job.Job.Service.start` of service in any successor of the job.
1254        :rtype: toil.job.Promise
1255        """
1256
1257        if parentService is not None:
1258            if not self.hasService(parentService):
1259                raise JobException("Parent service is not a service of the given job")
1260
1261        if service.hostID is not None:
1262            raise JobException("Service has already been added to a job")
1263
1264        # Create a host job for the service, ad get it an ID
1265        hostingJob = ServiceHostJob(service)
1266        self._jobGraphsJoined(hostingJob)
1267
1268        # Record the relationship to the hosting job, with its parent if any.
1269        self._description.addServiceHostJob(hostingJob.jobStoreID, parentService.hostID if parentService is not None else None)
1270
1271        # For compatibility with old Cactus versions that tinker around with
1272        # our internals, we need to make the hosting job available as
1273        # self._services[-1]. TODO: Remove this when Cactus has updated.
1274        self._services = [hostingJob]
1275
1276        # Return the promise for the service's startup result
1277        return hostingJob.rv()
1278
1279    def hasService(self, service):
1280        """
1281        Returns True if the given Service is a service of this job, and False otherwise.
1282        """
1283
1284        return service.hostID is None or self._description.hasServiceHostJob(service.hostID)
1285
1286    ##Convenience functions for creating jobs
1287
1288    def addChildFn(self, fn, *args, **kwargs):
1289        """
1290        Adds a function as a child job.
1291
1292        :param fn: Function to be run as a child job with ``*args`` and ``**kwargs`` as \
1293        arguments to this function. See toil.job.FunctionWrappingJob for reserved \
1294        keyword arguments used to specify resource requirements.
1295        :return: The new child job that wraps fn.
1296        :rtype: toil.job.FunctionWrappingJob
1297        """
1298        if PromisedRequirement.convertPromises(kwargs):
1299            return self.addChild(PromisedRequirementFunctionWrappingJob.create(fn, *args, **kwargs))
1300        else:
1301            return self.addChild(FunctionWrappingJob(fn, *args, **kwargs))
1302
1303    def addFollowOnFn(self, fn, *args, **kwargs):
1304        """
1305        Adds a function as a follow-on job.
1306
1307        :param fn: Function to be run as a follow-on job with ``*args`` and ``**kwargs`` as \
1308        arguments to this function. See toil.job.FunctionWrappingJob for reserved \
1309        keyword arguments used to specify resource requirements.
1310        :return: The new follow-on job that wraps fn.
1311        :rtype: toil.job.FunctionWrappingJob
1312        """
1313        if PromisedRequirement.convertPromises(kwargs):
1314            return self.addFollowOn(PromisedRequirementFunctionWrappingJob.create(fn, *args, **kwargs))
1315        else:
1316            return self.addFollowOn(FunctionWrappingJob(fn, *args, **kwargs))
1317
1318    def addChildJobFn(self, fn, *args, **kwargs):
1319        """
1320        Adds a job function as a child job. See :class:`toil.job.JobFunctionWrappingJob`
1321        for a definition of a job function.
1322
1323        :param fn: Job function to be run as a child job with ``*args`` and ``**kwargs`` as \
1324        arguments to this function. See toil.job.JobFunctionWrappingJob for reserved \
1325        keyword arguments used to specify resource requirements.
1326        :return: The new child job that wraps fn.
1327        :rtype: toil.job.JobFunctionWrappingJob
1328        """
1329        if PromisedRequirement.convertPromises(kwargs):
1330            return self.addChild(PromisedRequirementJobFunctionWrappingJob.create(fn, *args, **kwargs))
1331        else:
1332            return self.addChild(JobFunctionWrappingJob(fn, *args, **kwargs))
1333
1334    def addFollowOnJobFn(self, fn, *args, **kwargs):
1335        """
1336        Add a follow-on job function. See :class:`toil.job.JobFunctionWrappingJob`
1337        for a definition of a job function.
1338
1339        :param fn: Job function to be run as a follow-on job with ``*args`` and ``**kwargs`` as \
1340        arguments to this function. See toil.job.JobFunctionWrappingJob for reserved \
1341        keyword arguments used to specify resource requirements.
1342        :return: The new follow-on job that wraps fn.
1343        :rtype: toil.job.JobFunctionWrappingJob
1344        """
1345        if PromisedRequirement.convertPromises(kwargs):
1346            return self.addFollowOn(PromisedRequirementJobFunctionWrappingJob.create(fn, *args, **kwargs))
1347        else:
1348            return self.addFollowOn(JobFunctionWrappingJob(fn, *args, **kwargs))
1349
1350    @property
1351    def tempDir(self):
1352        """
1353        Shortcut to calling :func:`job.fileStore.getLocalTempDir`. Temp dir is created on first call
1354        and will be returned for first and future calls
1355        :return: Path to tempDir. See `job.fileStore.getLocalTempDir`
1356        :rtype: str
1357        """
1358        if self._tempDir is None:
1359            self._tempDir = self._fileStore.getLocalTempDir()
1360        return self._tempDir
1361
1362    def log(self, text, level=logging.INFO):
1363        """
1364        convenience wrapper for :func:`fileStore.logToMaster`
1365        """
1366        self._fileStore.logToMaster(text, level)
1367
1368    @staticmethod
1369    def wrapFn(fn, *args, **kwargs):
1370        """
1371        Makes a Job out of a function. \
1372        Convenience function for constructor of :class:`toil.job.FunctionWrappingJob`.
1373
1374        :param fn: Function to be run with ``*args`` and ``**kwargs`` as arguments. \
1375        See toil.job.JobFunctionWrappingJob for reserved keyword arguments used \
1376        to specify resource requirements.
1377        :return: The new function that wraps fn.
1378        :rtype: toil.job.FunctionWrappingJob
1379        """
1380        if PromisedRequirement.convertPromises(kwargs):
1381            return PromisedRequirementFunctionWrappingJob.create(fn, *args, **kwargs)
1382        else:
1383            return FunctionWrappingJob(fn, *args, **kwargs)
1384
1385    @staticmethod
1386    def wrapJobFn(fn, *args, **kwargs):
1387        """
1388        Makes a Job out of a job function. \
1389        Convenience function for constructor of :class:`toil.job.JobFunctionWrappingJob`.
1390
1391        :param fn: Job function to be run with ``*args`` and ``**kwargs`` as arguments. \
1392        See toil.job.JobFunctionWrappingJob for reserved keyword arguments used \
1393        to specify resource requirements.
1394        :return: The new job function that wraps fn.
1395        :rtype: toil.job.JobFunctionWrappingJob
1396        """
1397        if PromisedRequirement.convertPromises(kwargs):
1398            return PromisedRequirementJobFunctionWrappingJob.create(fn, *args, **kwargs)
1399        else:
1400            return JobFunctionWrappingJob(fn, *args, **kwargs)
1401
1402    def encapsulate(self, name=None):
1403        """
1404        Encapsulates the job, see :class:`toil.job.EncapsulatedJob`.
1405        Convenience function for constructor of :class:`toil.job.EncapsulatedJob`.
1406
1407        :param str name: Human-readable name for the encapsulated job.
1408
1409        :return: an encapsulated version of this job.
1410        :rtype: toil.job.EncapsulatedJob
1411        """
1412        return EncapsulatedJob(self, unitName=name)
1413
1414    ####################################################
1415    #The following function is used for passing return values between
1416    #job run functions
1417    ####################################################
1418
1419    def rv(self, *path):
1420        """
1421        Creates a *promise* (:class:`toil.job.Promise`) representing a return value of the job's
1422        run method, or, in case of a function-wrapping job, the wrapped function's return value.
1423
1424        :param (Any) path: Optional path for selecting a component of the promised return value.
1425               If absent or empty, the entire return value will be used. Otherwise, the first
1426               element of the path is used to select an individual item of the return value. For
1427               that to work, the return value must be a list, dictionary or of any other type
1428               implementing the `__getitem__()` magic method. If the selected item is yet another
1429               composite value, the second element of the path can be used to select an item from
1430               it, and so on. For example, if the return value is `[6,{'a':42}]`, `.rv(0)` would
1431               select `6` , `rv(1)` would select `{'a':3}` while `rv(1,'a')` would select `3`. To
1432               select a slice from a return value that is slicable, e.g. tuple or list, the path
1433               element should be a `slice` object. For example, assuming that the return value is
1434               `[6, 7, 8, 9]` then `.rv(slice(1, 3))` would select `[7, 8]`. Note that slicing
1435               really only makes sense at the end of path.
1436
1437        :return: A promise representing the return value of this jobs :meth:`toil.job.Job.run`
1438                 method.
1439
1440        :rtype: toil.job.Promise
1441        """
1442        return Promise(self, path)
1443
1444    def registerPromise(self, path):
1445        if self._promiseJobStore is None:
1446            # We haven't had a job store set to put our return value into, so
1447            # we must not have been hit yet in job topological order.
1448            raise JobPromiseConstraintError(self)
1449        # TODO: can we guarantee self.jobStoreID is populated and so pass that here?
1450        with self._promiseJobStore.writeFileStream() as (fileHandle, jobStoreFileID):
1451            promise = UnfulfilledPromiseSentinel(str(self.description), False)
1452            pickle.dump(promise, fileHandle, pickle.HIGHEST_PROTOCOL)
1453        self._rvs[path].append(jobStoreFileID)
1454        return self._promiseJobStore.config.jobStore, jobStoreFileID
1455
1456    def prepareForPromiseRegistration(self, jobStore):
1457        """
1458        Ensure that a promise by this job (the promissor) can register with the promissor when
1459        another job referring to the promise (the promissee) is being serialized. The promissee
1460        holds the reference to the promise (usually as part of the the job arguments) and when it
1461        is being pickled, so will the promises it refers to. Pickling a promise triggers it to be
1462        registered with the promissor.
1463
1464        :return:
1465        """
1466        self._promiseJobStore = jobStore
1467
1468    def _disablePromiseRegistration(self):
1469        """
1470        Called when the job data is about to be saved in the JobStore.
1471        No promises should attempt to register with the job after this has been
1472        called, because that registration would not be persisted.
1473        """
1474
1475        self._promiseJobStore = None
1476
1477    ####################################################
1478    #Cycle/connectivity checking
1479    ####################################################
1480
1481    def checkJobGraphForDeadlocks(self):
1482        """
1483        Ensures that a graph of Jobs (that hasn't yet been saved to the
1484        JobStore) doesn't contain any pathological relationships between jobs
1485        that would result in deadlocks if we tried to run the jobs.
1486
1487        See :func:`toil.job.Job.checkJobGraphConnected`,
1488        :func:`toil.job.Job.checkJobGraphAcyclic` and
1489        :func:`toil.job.Job.checkNewCheckpointsAreLeafVertices` for more info.
1490
1491        :raises toil.job.JobGraphDeadlockException: if the job graph
1492            is cyclic, contains multiple roots or contains checkpoint jobs that are
1493            not leaf vertices when defined (see :func:`toil.job.Job.checkNewCheckpointsAreLeaves`).
1494        """
1495        self.checkJobGraphConnected()
1496        self.checkJobGraphAcylic()
1497        self.checkNewCheckpointsAreLeafVertices()
1498
1499    def getRootJobs(self) -> Set['Job']:
1500        """
1501        Returns the set of root job objects that contain this job.
1502        A root job is a job with no predecessors (i.e. which are not children, follow-ons, or services).
1503
1504        Only deals with jobs created here, rather than loaded from the job store.
1505        """
1506
1507        # Start assuming all jobs are roots
1508        roots = set(self._registry.keys())
1509
1510        for job in self._registry.values():
1511            for otherID in job.description.successorsAndServiceHosts():
1512                # If anything is a successor or service of anything else, it isn't a root.
1513                if otherID in roots:
1514                    # Remove it if we still think it is
1515                    roots.remove(otherID)
1516
1517        return {self._registry[jid] for jid in roots}
1518
1519    def checkJobGraphConnected(self):
1520        """
1521        :raises toil.job.JobGraphDeadlockException: if :func:`toil.job.Job.getRootJobs` does \
1522        not contain exactly one root job.
1523
1524        As execution always starts from one root job, having multiple root jobs will \
1525        cause a deadlock to occur.
1526
1527        Only deals with jobs created here, rather than loaded from the job store.
1528        """
1529        rootJobs = self.getRootJobs()
1530        if len(rootJobs) != 1:
1531            raise JobGraphDeadlockException("Graph does not contain exactly one"
1532                                            " root job: %s" % rootJobs)
1533
1534    def checkJobGraphAcylic(self):
1535        """
1536        :raises toil.job.JobGraphDeadlockException: if the connected component \
1537        of jobs containing this job contains any cycles of child/followOn dependencies \
1538        in the *augmented job graph* (see below). Such cycles are not allowed \
1539        in valid job graphs.
1540
1541        A follow-on edge (A, B) between two jobs A and B is equivalent \
1542        to adding a child edge to B from (1) A, (2) from each child of A, \
1543        and (3) from the successors of each child of A. We call each such edge \
1544        an edge an "implied" edge. The augmented job graph is a job graph including \
1545        all the implied edges.
1546
1547        For a job graph G = (V, E) the algorithm is ``O(|V|^2)``. It is ``O(|V| + |E|)`` for \
1548        a graph with no follow-ons. The former follow-on case could be improved!
1549
1550        Only deals with jobs created here, rather than loaded from the job store.
1551        """
1552        #Get the root jobs
1553        roots = self.getRootJobs()
1554        if len(roots) == 0:
1555            raise JobGraphDeadlockException("Graph contains no root jobs due to cycles")
1556
1557        #Get implied edges
1558        extraEdges = self._getImpliedEdges(roots)
1559
1560        #Check for directed cycles in the augmented graph
1561        visited = set()
1562        for root in roots:
1563            root._checkJobGraphAcylicDFS([], visited, extraEdges)
1564
1565    def _checkJobGraphAcylicDFS(self, stack, visited, extraEdges):
1566        """
1567        DFS traversal to detect cycles in augmented job graph.
1568        """
1569        if self not in visited:
1570            visited.add(self)
1571            stack.append(self)
1572            for successor in [self._registry[jID] for jID in self.description.allSuccessors() if jID in self._registry] + extraEdges[self]:
1573                # Grab all the successors in the current registry (i.e. added form this node) and look at them.
1574                successor._checkJobGraphAcylicDFS(stack, visited, extraEdges)
1575            assert stack.pop() == self
1576        if self in stack:
1577            stack.append(self)
1578            raise JobGraphDeadlockException("A cycle of job dependencies has been detected '%s'" % stack)
1579
1580    @staticmethod
1581    def _getImpliedEdges(roots):
1582        """
1583        Gets the set of implied edges (between children and follow-ons of a common job). Used in Job.checkJobGraphAcylic.
1584
1585        Only deals with jobs created here, rather than loaded from the job store.
1586
1587        :returns: dict from Job object to list of Job objects that must be done before it can start.
1588        """
1589        #Get nodes (Job objects) in job graph
1590        nodes = set()
1591        for root in roots:
1592            root._collectAllSuccessors(nodes)
1593
1594        ##For each follow-on edge calculate the extra implied edges
1595        #Adjacency list of implied edges, i.e. map of jobs to lists of jobs
1596        #connected by an implied edge
1597        extraEdges = dict([(n, []) for n in nodes])
1598        for job in nodes:
1599            for depth in range(1, len(job.description.stack)):
1600                # Add edges from all jobs in the earlier/upper subtrees to all
1601                # the roots of the later/lower subtrees
1602
1603                upper = job.description.stack[depth]
1604                lower = job.description.stack[depth - 1]
1605
1606                # Find everything in the upper subtree
1607                reacheable = set()
1608                for upperID in upper:
1609                    if upperID in job._registry:
1610                        # This is a locally added job, not an already-saved job
1611                        upperJob = job._registry[upperID]
1612                        upperJob._collectAllSuccessors(reacheable)
1613
1614                for inUpper in reacheable:
1615                    # Add extra edges to the roots of all the lower subtrees
1616                    # But skip anything in the lower subtree not in the current _registry (i.e. not created hear)
1617                    extraEdges[inUpper] += [job._registry[lowerID] for lowerID in lower if lowerID in job._registry]
1618
1619        return extraEdges
1620
1621    def checkNewCheckpointsAreLeafVertices(self):
1622        """
1623        A checkpoint job is a job that is restarted if either it fails, or if any of \
1624        its successors completely fails, exhausting their retries.
1625
1626        A job is a leaf it is has no successors.
1627
1628        A checkpoint job must be a leaf when initially added to the job graph. When its \
1629        run method is invoked it can then create direct successors. This restriction is made
1630        to simplify implementation.
1631
1632        Only works on connected components of jobs not yet added to the JobStore.
1633
1634        :raises toil.job.JobGraphDeadlockException: if there exists a job being added to the graph for which \
1635        checkpoint=True and which is not a leaf.
1636        """
1637        roots = self.getRootJobs() # Roots jobs of component, these are preexisting jobs in the graph
1638
1639        # All jobs in the component of the job graph containing self
1640        jobs = set()
1641        list(map(lambda x : x._collectAllSuccessors(jobs), roots))
1642
1643        # Check for each job for which checkpoint is true that it is a cut vertex or leaf
1644        for y in [x for x in jobs if x.checkpoint]:
1645            if y not in roots: # The roots are the prexisting jobs
1646                if not Job._isLeafVertex(y):
1647                    raise JobGraphDeadlockException("New checkpoint job %s is not a leaf in the job graph" % y)
1648
1649    ####################################################
1650    #Deferred function system
1651    ####################################################
1652
1653    def defer(self, function, *args, **kwargs):
1654        """
1655        Register a deferred function, i.e. a callable that will be invoked after the current
1656        attempt at running this job concludes. A job attempt is said to conclude when the job
1657        function (or the :meth:`toil.job.Job.run` method for class-based jobs) returns, raises an
1658        exception or after the process running it terminates abnormally. A deferred function will
1659        be called on the node that attempted to run the job, even if a subsequent attempt is made
1660        on another node. A deferred function should be idempotent because it may be called
1661        multiple times on the same node or even in the same process. More than one deferred
1662        function may be registered per job attempt by calling this method repeatedly with
1663        different arguments. If the same function is registered twice with the same or different
1664        arguments, it will be called twice per job attempt.
1665
1666        Examples for deferred functions are ones that handle cleanup of resources external to
1667        Toil, like Docker containers, files outside the work directory, etc.
1668
1669        :param callable function: The function to be called after this job concludes.
1670
1671        :param list args: The arguments to the function
1672
1673        :param dict kwargs: The keyword arguments to the function
1674        """
1675        if self._defer is None:
1676            raise Exception('A deferred function may only be registered with a job while that job is running.')
1677        self._defer(DeferredFunction.create(function, *args, **kwargs))
1678
1679
1680    ####################################################
1681    #The following nested classes are used for
1682    #creating jobtrees (Job.Runner),
1683    #and defining a service (Job.Service)
1684    ####################################################
1685
1686    class Runner():
1687        """
1688        Used to setup and run Toil workflow.
1689        """
1690        @staticmethod
1691        def getDefaultArgumentParser():
1692            """
1693            Get argument parser with added toil workflow options.
1694
1695            :returns: The argument parser used by a toil workflow with added Toil options.
1696            :rtype: :class:`argparse.ArgumentParser`
1697            """
1698            parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
1699            Job.Runner.addToilOptions(parser)
1700            return parser
1701
1702        @staticmethod
1703        def getDefaultOptions(jobStore):
1704            """
1705            Get default options for a toil workflow.
1706
1707            :param string jobStore: A string describing the jobStore \
1708            for the workflow.
1709            :returns: The options used by a toil workflow.
1710            :rtype: argparse.ArgumentParser values object
1711            """
1712            parser = Job.Runner.getDefaultArgumentParser()
1713            return parser.parse_args(args=[jobStore])
1714
1715        @staticmethod
1716        def addToilOptions(parser):
1717            """
1718            Adds the default toil options to an :mod:`optparse` or :mod:`argparse`
1719            parser object.
1720
1721            :param parser: Options object to add toil options to.
1722            :type parser: optparse.OptionParser or argparse.ArgumentParser
1723            """
1724            addOptions(parser)
1725
1726        @staticmethod
1727        def startToil(job, options):
1728            """
1729            Deprecated by toil.common.Toil.start. Runs the toil workflow using the given options
1730            (see Job.Runner.getDefaultOptions and Job.Runner.addToilOptions) starting with this
1731            job.
1732            :param toil.job.Job job: root job of the workflow
1733            :raises: toil.leader.FailedJobsException if at the end of function \
1734            their remain failed jobs.
1735            :return: The return value of the root job's run function.
1736            :rtype: Any
1737            """
1738            set_logging_from_options(options)
1739            with Toil(options) as toil:
1740                if not options.restart:
1741                    return toil.start(job)
1742                else:
1743                    return toil.restart()
1744
1745    class Service(Requirer, metaclass=ABCMeta):
1746        """
1747        Abstract class used to define the interface to a service.
1748
1749        Should be subclassed by the user to define services.
1750
1751        Is not executed as a job; runs within a ServiceHostJob.
1752        """
1753        def __init__(self, memory=None, cores=None, disk=None, preemptable=None, unitName=None):
1754            """
1755            Memory, core and disk requirements are specified identically to as in \
1756            :func:`toil.job.Job.__init__`.
1757            """
1758
1759            # Save the requirements in ourselves so they are visible on `self` to user code.
1760            super().__init__({'memory': memory, 'cores': cores, 'disk': disk, 'preemptable': preemptable})
1761
1762            # And the unit name
1763            self.unitName = unitName
1764
1765            # And the name for the hosting job
1766            self.jobName = self.__class__.__name__
1767
1768            # Record that we have as of yet no ServiceHostJob
1769            self.hostID = None
1770
1771        @abstractmethod
1772        def start(self, job):
1773            """
1774            Start the service.
1775
1776            :param toil.job.Job job: The underlying host job that the service is being run in.
1777                                     Can be used to register deferred functions, or to access
1778                                     the fileStore for creating temporary files.
1779
1780            :returns: An object describing how to access the service. The object must be pickleable
1781                      and will be used by jobs to access the service (see :func:`toil.job.Job.addService`).
1782            """
1783
1784        @abstractmethod
1785        def stop(self, job):
1786            """
1787            Stops the service. Function can block until complete.
1788
1789            :param toil.job.Job job: The underlying host job that the service is being run in.
1790                                     Can be used to register deferred functions, or to access
1791                                     the fileStore for creating temporary files.
1792            """
1793
1794        def check(self):
1795            """
1796            Checks the service is still running.
1797
1798            :raise exceptions.RuntimeError: If the service failed, this will cause the service job to be labeled failed.
1799            :returns: True if the service is still running, else False. If False then the service job will be terminated,
1800                and considered a success. Important point: if the service job exits due to a failure, it should raise a
1801                RuntimeError, not return False!
1802            """
1803
1804    def _addPredecessor(self, predecessorJob):
1805        """Adds a predecessor job to the set of predecessor jobs."""
1806        if predecessorJob in self._directPredecessors:
1807            raise ConflictingPredecessorError(predecessorJob, self)
1808        self._directPredecessors.add(predecessorJob)
1809
1810        # Record the need for the predecessor to finish
1811        self._description.addPredecessor()
1812
1813    @staticmethod
1814    def _isLeafVertex(job):
1815        return next(job.description.successorsAndServiceHosts(), None) is None
1816
1817    @classmethod
1818    def _loadUserModule(cls, userModule):
1819        """
1820        Imports and returns the module object represented by the given module descriptor.
1821
1822        :type userModule: ModuleDescriptor
1823        """
1824        return userModule.load()
1825
1826    @classmethod
1827    def _unpickle(cls, userModule, fileHandle, requireInstanceOf=None):
1828        """
1829        Unpickles an object graph from the given file handle while loading symbols \
1830        referencing the __main__ module from the given userModule instead.
1831
1832        :param userModule:
1833        :param fileHandle: An open, binary-mode file handle.
1834        :param requireInstanceOf: If set, require result to be an instance of this class.
1835        :returns:
1836        """
1837
1838        def filter_main(module_name, class_name):
1839            try:
1840                if module_name == '__main__':
1841                    return getattr(userModule, class_name)
1842                else:
1843                    return getattr(importlib.import_module(module_name), class_name)
1844            except:
1845                if module_name == '__main__':
1846                    logger.debug('Failed getting %s from module %s.', class_name, userModule)
1847                else:
1848                    logger.debug('Failed getting %s from module %s.', class_name, module_name)
1849                raise
1850
1851        class FilteredUnpickler(pickle.Unpickler):
1852            def find_class(self, module, name):
1853                return filter_main(module, name)
1854
1855        unpickler = FilteredUnpickler(fileHandle)
1856
1857        runnable = unpickler.load()
1858        if requireInstanceOf is not None:
1859            assert isinstance(runnable, requireInstanceOf), "Did not find a {} when expected".format(requireInstanceOf)
1860
1861        return runnable
1862
1863    def getUserScript(self):
1864        return self.userModule
1865
1866    def _fulfillPromises(self, returnValues, jobStore):
1867        """
1868        Sets the values for promises using the return values from this job's run() function.
1869        """
1870        for path, promiseFileStoreIDs in self._rvs.items():
1871            if not path:
1872                # Note that its possible for returnValues to be a promise, not an actual return
1873                # value. This is the case if the job returns a promise from another job. In
1874                # either case, we just pass it on.
1875                promisedValue = returnValues
1876            else:
1877                # If there is an path ...
1878                if isinstance(returnValues, Promise):
1879                    # ... and the value itself is a Promise, we need to created a new, narrower
1880                    # promise and pass it on.
1881                    promisedValue = Promise(returnValues.job, path)
1882                else:
1883                    # Otherwise, we just select the desired component of the return value.
1884                    promisedValue = returnValues
1885                    for index in path:
1886                        promisedValue = promisedValue[index]
1887            for promiseFileStoreID in promiseFileStoreIDs:
1888                # File may be gone if the job is a service being re-run and the accessing job is
1889                # already complete.
1890                if jobStore.fileExists(promiseFileStoreID):
1891                    with jobStore.updateFileStream(promiseFileStoreID) as fileHandle:
1892                        pickle.dump(promisedValue, fileHandle, pickle.HIGHEST_PROTOCOL)
1893
1894    # Functions associated with Job.checkJobGraphAcyclic to establish that the job graph does not
1895    # contain any cycles of dependencies:
1896
1897    def _collectAllSuccessors(self, visited):
1898        """
1899        Adds the job and all jobs reachable on a directed path from current node to the given set.
1900
1901        Only considers jobs in this job's subgraph that are newly added, not loaded from the job store.
1902        """
1903
1904        # Keep our own stack since we may have a stick in the graph long enough
1905        # to exhaust the real stack
1906        todo = [self]
1907
1908        while len(todo) > 0:
1909            job = todo[-1]
1910            todo.pop()
1911            if job not in visited:
1912                visited.add(job)
1913                for successorID in job.description.allSuccessors():
1914                    if successorID in self._registry:
1915                        # We added this successor locally
1916                        todo.append(self._registry[successorID])
1917
1918    def getTopologicalOrderingOfJobs(self):
1919        """
1920        :returns: a list of jobs such that for all pairs of indices i, j for which i < j, \
1921        the job at index i can be run before the job at index j.
1922
1923        Only considers jobs in this job's subgraph that are newly added, not loaded from the job store.
1924
1925        Ignores service jobs.
1926
1927        :rtype: list[Job]
1928        """
1929
1930        # List of Job objects in order.
1931        ordering = []
1932        # Set of IDs of visited jobs.
1933        visited = set()
1934
1935        # We need to recurse and traverse the graph without exhausting Python's
1936        # stack, so we keep our own stack of Job objects
1937        todo = [self]
1938
1939        while len(todo) > 0:
1940            job = todo[-1]
1941            todo.pop()
1942
1943            #Do not add the job to the ordering until all its predecessors have been
1944            #added to the ordering
1945            outstandingPredecessor = False
1946            for predJob in job._directPredecessors:
1947                if predJob.jobStoreID not in visited:
1948                    outstandingPredecessor = True
1949                    break
1950            if outstandingPredecessor:
1951                continue
1952
1953            if job.jobStoreID not in visited:
1954                visited.add(job.jobStoreID)
1955                ordering.append(job)
1956
1957                for otherID in itertools.chain(job.description.followOnIDs, job.description.childIDs):
1958                    if otherID in self._registry:
1959                        # Stack up descendants so we process children and then follow-ons.
1960                        # So stack up follow-ons deeper
1961                        todo.append(self._registry[otherID])
1962
1963        return ordering
1964
1965    ####################################################
1966    #Storing Jobs into the JobStore
1967    ####################################################
1968
1969    def _register(self, jobStore):
1970        """
1971        If this job lacks a JobStore-assigned ID, assign this job an ID.
1972        Must be called for each job before it is saved to the JobStore for the first time.
1973
1974        :returns: A list with either one old ID, new ID pair, or an empty list
1975        :rtype: list
1976        """
1977
1978        # TODO: This doesn't really have much to do with the registry. Rename
1979        # the registry.
1980
1981        if isinstance(self.jobStoreID, TemporaryID):
1982            # We need to get an ID.
1983
1984            # Save our fake ID
1985            fake = self.jobStoreID
1986
1987            # Replace it with a real ID
1988            jobStore.assignID(self.description)
1989
1990            # Make sure the JobDescription can do its JobStore-related setup.
1991            self.description.onRegistration(jobStore)
1992
1993            # Return the fake to real mapping
1994            return [(fake, self.description.jobStoreID)]
1995        else:
1996            # We already have an ID. No assignment or reference rewrite necessary.
1997            return []
1998
1999    def _renameReferences(self, renames):
2000        """
2001        Apply the given dict of ID renames to all references to other jobs.
2002
2003        Ignores the registry, which is shared and assumed to already be updated.
2004
2005        IDs not present in the renames dict are left as-is.
2006
2007        :param dict(TemporaryID, str) renames: Rename operations to apply.
2008        """
2009
2010        # Do renames in the description
2011        self._description.renameReferences(renames)
2012
2013    def saveBody(self, jobStore):
2014        """
2015        Save the execution data for just this job to the JobStore, and fill in
2016        the JobDescription with the information needed to retrieve it.
2017
2018        The Job's JobDescription must have already had a real jobStoreID assigned to it.
2019
2020        Does not save the JobDescription.
2021
2022        :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore: The job store
2023            to save the job body into.
2024        """
2025
2026        # We can't save the job in the right place for cleanup unless the
2027        # description has a real ID.
2028        assert not isinstance(self.jobStoreID, TemporaryID), "Tried to save job {} without ID assigned!".format(self)
2029
2030        # Note that we can't accept any more requests for our return value
2031        self._disablePromiseRegistration()
2032
2033        # Clear out old Cactus compatibility fields that don't need to be
2034        # preserved and shouldn't be serialized.
2035        if hasattr(self, '_services'):
2036            delattr(self, '_services')
2037
2038        # Remember fields we will overwrite
2039        description = self._description
2040        registry = self._registry
2041        directPredecessors = self._directPredecessors
2042
2043        try:
2044            try:
2045                # Drop out the description, which the job store will manage separately
2046                self._description = None
2047                # Fix up the registry and direct predecessors for when the job is
2048                # loaded to be run: the registry should contain just the job itself and
2049                # there should be no predecessors available when the job actually runs.
2050                self._registry = {description.jobStoreID: self}
2051                self._directPredecessors = set()
2052
2053                # Save the body of the job
2054                with jobStore.writeFileStream(description.jobStoreID, cleanup=True) as (fileHandle, fileStoreID):
2055                    pickle.dump(self, fileHandle, pickle.HIGHEST_PROTOCOL)
2056            finally:
2057                # Restore important fields (before handling errors)
2058                self._directPredecessors = directPredecessors
2059                self._registry = registry
2060                self._description = description
2061        except JobPromiseConstraintError as e:
2062            # The user is passing promises without regard to predecessor constraints.
2063            if e.recipientJob is None:
2064                # Add ourselves as the recipient job that wanted the promise.
2065                e = JobPromiseConstraintError(e.promisingJob, self)
2066            raise e
2067
2068        # Find the user script.
2069        # Note that getUserScript() may have been overridden. This is intended. If we used
2070        # self.userModule directly, we'd be getting a reference to job.py if the job was
2071        # specified as a function (as opposed to a class) since that is where FunctionWrappingJob
2072        #  is defined. What we really want is the module that was loaded as __main__,
2073        # and FunctionWrappingJob overrides getUserScript() to give us just that. Only then can
2074        # filter_main() in _unpickle( ) do its job of resolving any user-defined type or function.
2075        userScript = self.getUserScript().globalize()
2076
2077        # The command connects the body of the job to the JobDescription
2078        self._description.command = ' '.join(('_toil', fileStoreID) + userScript.toCommand())
2079
2080    def _saveJobGraph(self, jobStore, saveSelf=False, returnValues=None):
2081        """
2082        Save job data and new JobDescriptions to the given job store for this
2083        job and all descending jobs, including services.
2084
2085        Used to save the initial job graph containing the root job of the workflow.
2086
2087        :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore: The job store
2088            to save the jobs into.
2089        :param bool saveSelf: Set to True to save this job along with its children,
2090            follow-ons, and services, or False to just save the children, follow-ons,
2091            and services and to populate the return value.
2092        :param returnValues: The collection of values returned when executing
2093            the job (or starting the service the job is hosting). If saveSelf
2094            is not set, will be used to fulfill the job's return value promises.
2095        """
2096
2097        # Prohibit cycles and multiple roots
2098        self.checkJobGraphForDeadlocks()
2099
2100        # Make sure everybody in the registry is registered with the job store
2101        # and has an ID. Also rewrite ID references.
2102        allJobs = list(self._registry.values())
2103        # We use one big dict from fake ID to corresponding real ID to rewrite references.
2104        fakeToReal = {}
2105        for job in allJobs:
2106            # Register the job, get the old ID to new ID pair if any, and save that in the fake to real mapping
2107            fakeToReal.update(job._register(jobStore))
2108        if len(fakeToReal) > 0:
2109            # Some jobs changed ID. We need to rebuild the registry and apply the reference rewrites.
2110
2111            # Remake the registry in place
2112            self._registry.clear()
2113            self._registry.update({job.jobStoreID: job for job in allJobs})
2114
2115            for job in allJobs:
2116                # Tell all the jobs (and thus their descriptions and services)
2117                # about the renames.
2118                job._renameReferences(fakeToReal)
2119
2120        # Make sure the whole component is ready for promise registration
2121        for job in allJobs:
2122            job.prepareForPromiseRegistration(jobStore)
2123
2124        # Get an ordering on the non-service jobs which we use for pickling the
2125        # jobs in the correct order to ensure the promises are properly
2126        # established
2127        ordering = self.getTopologicalOrderingOfJobs()
2128
2129        # Set up to save last job first, so promises flow the right way
2130        ordering.reverse()
2131
2132        logger.info("Saving graph of %d jobs, %d new", len(allJobs), len(fakeToReal))
2133
2134        # Make sure we're the root
2135        assert ordering[-1] == self
2136
2137        # Don't verify the ordering length: it excludes service host jobs.
2138
2139        if not saveSelf:
2140            # Fulfil promises for return values (even if value is None)
2141            self._fulfillPromises(returnValues, jobStore)
2142
2143        for job in ordering:
2144            logger.info("Processing job %s", job.description)
2145            for serviceBatch in reversed(list(job.description.serviceHostIDsInBatches())):
2146                # For each batch of service host jobs in reverse order they start
2147                for serviceID in serviceBatch:
2148                    logger.info("Processing service %s", serviceID)
2149                    if serviceID in self._registry:
2150                        # It's a new service
2151
2152                        # Find the actual job
2153                        serviceJob = self._registry[serviceID]
2154                        logger.info("Saving service %s", serviceJob.description)
2155                        # Pickle the service body, which triggers all the promise stuff
2156                        serviceJob.saveBody(jobStore)
2157            if job != self or saveSelf:
2158                # Now pickle the job itself
2159                job.saveBody(jobStore)
2160
2161        # Now that the job data is on disk, commit the JobDescriptions in
2162        # reverse execution order, in a batch if supported.
2163        with jobStore.batch():
2164            for job in ordering:
2165                for serviceBatch in job.description.serviceHostIDsInBatches():
2166                    for serviceID in serviceBatch:
2167                        if serviceID in self._registry:
2168                            jobStore.create(self._registry[serviceID].description)
2169                if job != self or saveSelf:
2170                    jobStore.create(job.description)
2171
2172    def saveAsRootJob(self, jobStore):
2173        """
2174        Save this job to the given jobStore as the root job of the workflow.
2175
2176        :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore:
2177        :return: the JobDescription describing this job.
2178        """
2179
2180        # Check if the workflow root is a checkpoint but not a leaf vertex.
2181        # All other job vertices in the graph are checked by checkNewCheckpointsAreLeafVertices
2182        if self.checkpoint and not Job._isLeafVertex(self):
2183            raise JobGraphDeadlockException(
2184                'New checkpoint job %s is not a leaf in the job graph' % self)
2185
2186        # Save the root job and all descendants and services
2187        self._saveJobGraph(jobStore, saveSelf=True)
2188
2189        # Store the name of the first job in a file in case of restart. Up to this point the
2190        # root job is not recoverable. FIXME: "root job" or "first job", which one is it?
2191        jobStore.setRootJob(self.jobStoreID)
2192
2193        # Assign the config from the JobStore as if we were loaded.
2194        # TODO: Find a better way to make this the JobStore's responsibility
2195        self.description.assignConfig(jobStore.config)
2196
2197        return self.description
2198
2199    @classmethod
2200    def loadJob(cls, jobStore, jobDescription):
2201        """
2202        Retrieves a :class:`toil.job.Job` instance from a JobStore
2203
2204        :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore: The job store.
2205        :param toil.job.JobDescription jobDescription: the JobDescription of the job to retrieve.
2206        :returns: The job referenced by the JobDescription.
2207        :rtype: toil.job.Job
2208        """
2209
2210        # Grab the command that connects the description to the job body
2211        command = jobDescription.command
2212
2213        commandTokens = command.split()
2214        assert "_toil" == commandTokens[0]
2215        userModule = ModuleDescriptor.fromCommand(commandTokens[2:])
2216        logger.debug('Loading user module %s.', userModule)
2217        userModule = cls._loadUserModule(userModule)
2218        pickleFile = commandTokens[1]
2219
2220        # Get a directory to download the job in
2221        directory = tempfile.mkdtemp()
2222        # Initialize a blank filename so the finally below can't fail due to a
2223        # missing variable
2224        filename = ''
2225
2226        try:
2227            # Get a filename to download the job to.
2228            # Don't use mkstemp because we would need to delete and replace the
2229            # file.
2230            # Don't use a NamedTemporaryFile context manager because its
2231            # context manager exit will crash if we deleted it.
2232            filename = os.path.join(directory, 'job')
2233
2234            # Download the job
2235            if pickleFile == "firstJob":
2236                jobStore.readSharedFile(pickleFile, filename)
2237            else:
2238                jobStore.readFile(pickleFile, filename)
2239
2240            # Open and unpickle
2241            with open(filename, 'rb') as fileHandle:
2242                job = cls._unpickle(userModule, fileHandle, requireInstanceOf=Job)
2243                # Fill in the current description
2244                job._description = jobDescription
2245
2246                # Set up the registry again, so children and follow-ons can be added on the worker
2247                job._registry = {job.jobStoreID: job}
2248
2249                return job
2250
2251                # TODO: We ought to just unpickle straight from a streaming read
2252        finally:
2253            # Clean up the file
2254            if os.path.exists(filename):
2255                os.unlink(filename)
2256            # Clean up the directory we put it in
2257            shutil.rmtree(directory)
2258
2259    def _run(self, jobGraph=None, fileStore=None, **kwargs):
2260        """
2261        Function which worker calls to ultimately invoke
2262        a job's Job.run method, and then handle created
2263        children/followOn jobs.
2264
2265        May be (but currently is not) overridden by specialized Toil-internal jobs.
2266
2267        Should not be overridden by non-Toil code!
2268
2269        Despite this, it has been overridden by non-Toil code, so we keep it
2270        around and use a hardened kwargs-based interface to try and tolerate
2271        bad behavior by workflows (e.g. Cactus).
2272
2273        When everyone has upgraded to a sufficiently new Cactus, we can remove
2274        this!
2275
2276        :param NoneType jobGraph: Ignored. Here for compatibility with old
2277               Cactus versions that pass two positional arguments.
2278        :param toil.fileStores.abstractFileStore.AbstractFileStore fileStore: the
2279               FileStore to use to access files when running the job. Required.
2280        """
2281        return self.run(fileStore)
2282
2283    @contextmanager
2284    def _executor(self, stats, fileStore):
2285        """
2286        This is the core wrapping method for running the job within a worker.  It sets up the stats
2287        and logging before yielding. After completion of the body, the function will finish up the
2288        stats and logging, and starts the async update process for the job.
2289
2290        Will modify the job's description with changes that need to be committed back to the JobStore.
2291        """
2292        if stats is not None:
2293            startTime = time.time()
2294            startClock = get_total_cpu_time()
2295        baseDir = os.getcwd()
2296
2297        yield
2298
2299        # If the job is not a checkpoint job, add the promise files to delete
2300        # to the list of jobStoreFileIDs to delete
2301        # TODO: why is Promise holding a global list here???
2302        if not self.checkpoint:
2303            for jobStoreFileID in Promise.filesToDelete:
2304                # Make sure to wrap the job store ID in a FileID object so the file store will accept it
2305                # TODO: talk directly to the job sotre here instead.
2306                fileStore.deleteGlobalFile(FileID(jobStoreFileID, 0))
2307        else:
2308            # Else copy them to the job description to delete later
2309            self.description.checkpointFilesToDelete = list(Promise.filesToDelete)
2310        Promise.filesToDelete.clear()
2311        # Now indicate the asynchronous update of the job can happen
2312        fileStore.startCommit(jobState=True)
2313        # Change dir back to cwd dir, if changed by job (this is a safety issue)
2314        if os.getcwd() != baseDir:
2315            os.chdir(baseDir)
2316        # Finish up the stats
2317        if stats is not None:
2318            totalCpuTime, totalMemoryUsage = get_total_cpu_time_and_memory_usage()
2319            stats.jobs.append(
2320                Expando(
2321                    time=str(time.time() - startTime),
2322                    clock=str(totalCpuTime - startClock),
2323                    class_name=self._jobName(),
2324                    memory=str(totalMemoryUsage)
2325                )
2326            )
2327
2328    def _runner(self, jobStore=None, fileStore=None, defer=None, **kwargs):
2329        """
2330        This method actually runs the job, and serialises the next jobs.
2331
2332        It marks the job as completed (by clearing its command) and creates the
2333        successor relationships to new successors, but it doesn't actually
2334        commit those updates to the current job into the JobStore.
2335
2336        We take all arguments as keyword arguments, and accept and ignore
2337        additional keyword arguments, for compatibility with workflows (*cough*
2338        Cactus *cough*) which are reaching in and overriding _runner (which
2339        they aren't supposed to do). If everything is passed as name=value it
2340        won't break as soon as we add or remove a parameter.
2341
2342        :param class jobStore: Instance of the job store
2343        :param toil.fileStores.abstractFileStore.AbstractFileStore fileStore: Instance
2344               of a cached or uncached filestore
2345        :param defer: Function yielded by open() context
2346               manager of :class:`toil.DeferredFunctionManager`, which is called to
2347               register deferred functions.
2348        :param kwargs: Catch-all to accept superfluous arguments passed by old
2349               versions of Cactus. Cactus shouldn't override this method, but it does.
2350        :return:
2351        """
2352
2353        # Make deferred function registration available during run().
2354        self._defer = defer
2355        # Make fileStore available as an attribute during run() ...
2356        self._fileStore = fileStore
2357        # ... but also pass it to _run() as an argument for backwards
2358        # compatibility with workflows that tinker around with our internals,
2359        # and send a fake jobGraph in case they still think jobGraph exists.
2360        returnValues = self._run(jobGraph=None, fileStore=fileStore)
2361
2362        # Clean up state changes made for run()
2363        self._defer = None
2364        self._fileStore = None
2365
2366
2367        # Serialize the new Jobs defined by the run method to the jobStore
2368        self._saveJobGraph(jobStore, saveSelf=False, returnValues=returnValues)
2369
2370        # Clear out the command, because the job is done.
2371        self.description.command = None
2372
2373        # That and the new child/follow-on relationships will need to be
2374        # recorded later by an update() of the JobDescription.
2375
2376
2377
2378    def _jobName(self):
2379        """
2380        :rtype : string, used as identifier of the job class in the stats report.
2381        """
2382        return self._description.displayName
2383
2384
2385class JobException(Exception):
2386    """
2387    General job exception.
2388    """
2389    def __init__(self, message):
2390        super().__init__(message)
2391
2392
2393class JobGraphDeadlockException(JobException):
2394    """
2395    An exception raised in the event that a workflow contains an unresolvable \
2396    dependency, such as a cycle. See :func:`toil.job.Job.checkJobGraphForDeadlocks`.
2397    """
2398    def __init__(self, string):
2399        super().__init__(string)
2400
2401
2402class FunctionWrappingJob(Job):
2403    """
2404    Job used to wrap a function. In its `run` method the wrapped function is called.
2405    """
2406    def __init__(self, userFunction, *args, **kwargs):
2407        """
2408        :param callable userFunction: The function to wrap. It will be called with ``*args`` and
2409               ``**kwargs`` as arguments.
2410
2411        The keywords ``memory``, ``cores``, ``disk``, ``preemptable`` and ``checkpoint`` are
2412        reserved keyword arguments that if specified will be used to determine the resources
2413        required for the job, as :func:`toil.job.Job.__init__`. If they are keyword arguments to
2414        the function they will be extracted from the function definition, but may be overridden
2415        by the user (as you would expect).
2416        """
2417        # Use the user-specified requirements, if specified, else grab the default argument
2418        # from the function, if specified, else default to None
2419        argSpec = inspect.getfullargspec(userFunction)
2420
2421        if argSpec.defaults is None:
2422            argDict = {}
2423        else:
2424            argDict = dict(list(zip(argSpec.args[-len(argSpec.defaults):], argSpec.defaults)))
2425
2426        def resolve(key, default=None, dehumanize=False):
2427            try:
2428                # First, try constructor arguments, ...
2429                value = kwargs.pop(key)
2430            except KeyError:
2431                try:
2432                    # ..., then try default value for function keyword arguments, ...
2433                    value = argDict[key]
2434                except KeyError:
2435                    # ... and finally fall back to a default value.
2436                    value = default
2437            # Optionally, convert strings with metric or binary prefixes.
2438            if dehumanize and isinstance(value, str):
2439                value = human2bytes(value)
2440            return value
2441
2442        super().__init__(memory=resolve('memory', dehumanize=True),
2443                         cores=resolve('cores', dehumanize=True),
2444                         disk=resolve('disk', dehumanize=True),
2445                         preemptable=resolve('preemptable'),
2446                         checkpoint=resolve('checkpoint', default=False),
2447                         unitName=resolve('name', default=None))
2448
2449        self.userFunctionModule = ModuleDescriptor.forModule(userFunction.__module__).globalize()
2450        self.userFunctionName = str(userFunction.__name__)
2451        self.jobName = self.userFunctionName
2452        self._args = args
2453        self._kwargs = kwargs
2454
2455    def _getUserFunction(self):
2456        logger.debug('Loading user function %s from module %s.',
2457                     self.userFunctionName,
2458                     self.userFunctionModule)
2459        userFunctionModule = self._loadUserModule(self.userFunctionModule)
2460        return getattr(userFunctionModule, self.userFunctionName)
2461
2462    def run(self,fileStore):
2463        userFunction = self._getUserFunction( )
2464        return userFunction(*self._args, **self._kwargs)
2465
2466    def getUserScript(self):
2467        return self.userFunctionModule
2468
2469    def _jobName(self):
2470        return ".".join((self.__class__.__name__,self.userFunctionModule.name,self.userFunctionName))
2471
2472
2473class JobFunctionWrappingJob(FunctionWrappingJob):
2474    """
2475    A job function is a function whose first argument is a :class:`.Job`
2476    instance that is the wrapping job for the function. This can be used to
2477    add successor jobs for the function and perform all the functions the
2478    :class:`.Job` class provides.
2479
2480    To enable the job function to get access to the
2481    :class:`toil.fileStores.abstractFileStore.AbstractFileStore` instance (see
2482    :func:`toil.job.Job.run`), it is made a variable of the wrapping job called
2483    fileStore.
2484
2485    To specify a job's resource requirements the following default keyword arguments
2486    can be specified:
2487
2488        - memory
2489        - disk
2490        - cores
2491
2492    For example to wrap a function into a job we would call::
2493
2494        Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1)
2495
2496    """
2497
2498    @property
2499    def fileStore(self):
2500        return self._fileStore
2501
2502    def run(self, fileStore):
2503        userFunction = self._getUserFunction()
2504        rValue = userFunction(*((self,) + tuple(self._args)), **self._kwargs)
2505        return rValue
2506
2507
2508class PromisedRequirementFunctionWrappingJob(FunctionWrappingJob):
2509    """
2510    Handles dynamic resource allocation using :class:`toil.job.Promise` instances.
2511    Spawns child function using parent function parameters and fulfilled promised
2512    resource requirements.
2513    """
2514    def __init__(self, userFunction, *args, **kwargs):
2515        self._promisedKwargs = kwargs.copy()
2516        # Replace resource requirements in intermediate job with small values.
2517        kwargs.update(dict(disk='1M', memory='32M', cores=0.1))
2518        super().__init__(userFunction, *args, **kwargs)
2519
2520    @classmethod
2521    def create(cls, userFunction, *args, **kwargs):
2522        """
2523        Creates an encapsulated Toil job function with unfulfilled promised resource
2524        requirements. After the promises are fulfilled, a child job function is created
2525        using updated resource values. The subgraph is encapsulated to ensure that this
2526        child job function is run before other children in the workflow. Otherwise, a
2527        different child may try to use an unresolved promise return value from the parent.
2528        """
2529        return EncapsulatedJob(cls(userFunction, *args, **kwargs))
2530
2531    def run(self, fileStore):
2532        # Assumes promises are fulfilled when parent job is run
2533        self.evaluatePromisedRequirements()
2534        userFunction = self._getUserFunction()
2535        return self.addChildFn(userFunction, *self._args, **self._promisedKwargs).rv()
2536
2537    def evaluatePromisedRequirements(self):
2538        requirements = ["disk", "memory", "cores"]
2539        # Fulfill resource requirement promises
2540        for requirement in requirements:
2541            try:
2542                if isinstance(self._promisedKwargs[requirement], PromisedRequirement):
2543                    self._promisedKwargs[requirement] = self._promisedKwargs[requirement].getValue()
2544            except KeyError:
2545                pass
2546
2547
2548class PromisedRequirementJobFunctionWrappingJob(PromisedRequirementFunctionWrappingJob):
2549    """
2550    Handles dynamic resource allocation for job functions.
2551    See :class:`toil.job.JobFunctionWrappingJob`
2552    """
2553
2554    def run(self, fileStore):
2555        self.evaluatePromisedRequirements()
2556        userFunction = self._getUserFunction()
2557        return self.addChildJobFn(userFunction, *self._args, **self._promisedKwargs).rv()
2558
2559
2560class EncapsulatedJob(Job):
2561    """
2562    A convenience Job class used to make a job subgraph appear to be a single job.
2563
2564    Let A be the root job of a job subgraph and B be another job we'd like to run after A
2565    and all its successors have completed, for this use encapsulate::
2566
2567        #  Job A and subgraph, Job B
2568        A, B = A(), B()
2569        Aprime = A.encapsulate()
2570        Aprime.addChild(B)
2571        #  B will run after A and all its successors have completed, A and its subgraph of
2572        # successors in effect appear to be just one job.
2573
2574    If the job being encapsulated has predecessors (e.g. is not the root job), then the encapsulated
2575    job will inherit these predecessors. If predecessors are added to the job being encapsulated
2576    after the encapsulated job is created then the encapsulating job will NOT inherit these
2577    predecessors automatically. Care should be exercised to ensure the encapsulated job has the
2578    proper set of predecessors.
2579
2580    The return value of an encapsulatd job (as accessed by the :func:`toil.job.Job.rv` function)
2581    is the return value of the root job, e.g. A().encapsulate().rv() and A().rv() will resolve to
2582    the same value after A or A.encapsulate() has been run.
2583    """
2584    def __init__(self, job, unitName=None):
2585        """
2586        :param toil.job.Job job: the job to encapsulate.
2587        :param str unitName: human-readable name to identify this job instance.
2588        """
2589
2590        if job is not None:
2591            # Initial construction, when encapsulating a job
2592
2593            # Giving the root of the subgraph the same resources as the first job in the subgraph.
2594            super().__init__(**job.description.requirements, unitName=unitName)
2595            # Ensure that the encapsulated job has the same direct predecessors as the job
2596            # being encapsulated.
2597            for predJob in job._directPredecessors:
2598                predJob.addChild(self)
2599            self.encapsulatedJob = job
2600            Job.addChild(self, job)
2601            # Use small resource requirements for dummy Job instance.
2602            # But not too small, or the job won't have enough resources to safely start up Toil.
2603            self.encapsulatedFollowOn = Job(disk='100M', memory='512M', cores=0.1, unitName=None if unitName is None else unitName + '-followOn')
2604            Job.addFollowOn(self, self.encapsulatedFollowOn)
2605        else:
2606            # Unpickling on the worker, to be run as a no-op.
2607            # No need to try and hook things up, but nobody can add children or
2608            # follow-ons to us now either.
2609            super().__init__()
2610            self.encapsulatedJob = None
2611            self.encapsulatedFollowOn = None
2612
2613    def addChild(self, childJob):
2614        assert self.encapsulatedFollowOn is not None, \
2615            "Children cannot be added to EncapsulatedJob while it is running"
2616        return Job.addChild(self.encapsulatedFollowOn, childJob)
2617
2618    def addService(self, service, parentService=None):
2619        assert self.encapsulatedFollowOn is not None, \
2620            "Services cannot be added to EncapsulatedJob while it is running"
2621        return Job.addService(self.encapsulatedFollowOn, service, parentService=parentService)
2622
2623    def addFollowOn(self, followOnJob):
2624        assert self.encapsulatedFollowOn is not None, \
2625            "Follow-ons cannot be added to EncapsulatedJob while it is running"
2626        return Job.addFollowOn(self.encapsulatedFollowOn, followOnJob)
2627
2628    def rv(self, *path):
2629        assert self.encapsulatedJob is not None
2630        return self.encapsulatedJob.rv(*path)
2631
2632    def prepareForPromiseRegistration(self, jobStore):
2633        # This one will be called after execution when re-serializing the
2634        # (unchanged) graph of jobs rooted here.
2635        super().prepareForPromiseRegistration(jobStore)
2636        if self.encapsulatedJob is not None:
2637            # Running where the job was created.
2638            self.encapsulatedJob.prepareForPromiseRegistration(jobStore)
2639
2640    def _disablePromiseRegistration(self):
2641        assert self.encapsulatedJob is not None
2642        super()._disablePromiseRegistration()
2643        self.encapsulatedJob._disablePromiseRegistration()
2644
2645    def __reduce__(self):
2646        """
2647        Called during pickling to define the pickled representation of the job.
2648
2649        We don't want to pickle our internal references to the job we
2650        encapsulate, so we elide them here. When actually run, we're just a
2651        no-op job that can maybe chain.
2652        """
2653
2654        return self.__class__, (None,)
2655
2656    def getUserScript(self):
2657        assert self.encapsulatedJob is not None
2658        return self.encapsulatedJob.getUserScript()
2659
2660
2661class ServiceHostJob(Job):
2662    """
2663    Job that runs a service. Used internally by Toil. Users should subclass Service instead of using this.
2664    """
2665    def __init__(self, service):
2666        """
2667        This constructor should not be called by a user.
2668
2669        :param service: The service to wrap in a job.
2670        :type service: toil.job.Job.Service
2671        """
2672
2673        # Make sure the service hasn't been given a host already.
2674        assert service.hostID is None
2675
2676        # Make ourselves with name info from the Service and a
2677        # ServiceJobDescription that has the service control flags.
2678        super().__init__(**service.requirements,
2679            unitName=service.unitName, descriptionClass=ServiceJobDescription)
2680
2681        # Make sure the service knows it has a host now
2682        service.hostID = self.jobStoreID
2683
2684        # service.__module__ is the module defining the class service is an instance of.
2685        # Will need to be loaded before unpickling the Service
2686        self.serviceModule = ModuleDescriptor.forModule(service.__module__).globalize()
2687
2688        # The service to run, or None if it is still pickled.
2689        # We can't just pickle as part of ourselves because we may need to load
2690        # an additional module.
2691        self.service = service
2692        # The pickled service, or None if it isn't currently pickled.
2693        # We can't just pickle right away because we may owe promises from it.
2694        self.pickledService = None
2695
2696        # Pick up our name form the service.
2697        self.jobName = service.jobName
2698        # This references the parent job wrapper. It is initialised just before
2699        # the job is run. It is used to access the start and terminate flags.
2700        self.jobGraph = None
2701
2702    @property
2703    def fileStore(self):
2704        """
2705        Return the file store, which the Service may need.
2706        """
2707        return self._fileStore
2708
2709    def _renameReferences(self, renames):
2710        # When the job store finally hads out IDs we have to fix up the
2711        # back-reference from our Service to us.
2712        super()._renameReferences(renames)
2713        if self.service is not None:
2714            self.service.hostID = renames[self.service.hostID]
2715
2716    # Since the running service has us, make sure they don't try to tack more
2717    # stuff onto us.
2718
2719    def addChild(self, child):
2720        raise RuntimeError("Service host jobs cannot have children, follow-ons, or services")
2721
2722    def addFollowOn(self, followOn):
2723        raise RuntimeError("Service host jobs cannot have children, follow-ons, or services")
2724
2725    def addService(self, service, parentService=None):
2726        raise RuntimeError("Service host jobs cannot have children, follow-ons, or services")
2727
2728    def saveBody(self, jobStore):
2729        """
2730        Serialize the service itself before saving the host job's body.
2731        """
2732        # Save unpickled service
2733        service = self.service
2734        # Serialize service
2735        self.pickledService = pickle.dumps(self.service, protocol=pickle.HIGHEST_PROTOCOL)
2736        # Clear real service until we have the module to load it back
2737        self.service = None
2738        # Save body as normal
2739        super().saveBody(jobStore)
2740        # Restore unpickled service
2741        self.service = service
2742        self.pickledService = None
2743
2744    def run(self, fileStore):
2745        # Unpickle the service
2746        logger.debug('Loading service module %s.', self.serviceModule)
2747        userModule = self._loadUserModule(self.serviceModule)
2748        service = self._unpickle(userModule, BytesIO(self.pickledService), requireInstanceOf=Job.Service)
2749        self.pickledService = None
2750        # Make sure it has the config, since it wasn't load()-ed via the JobStore
2751        service.assignConfig(fileStore.jobStore.config)
2752        #Start the service
2753        startCredentials = service.start(self)
2754        try:
2755            #The start credentials  must be communicated to processes connecting to
2756            #the service, to do this while the run method is running we
2757            #cheat and set the return value promise within the run method
2758            self._fulfillPromises(startCredentials, fileStore.jobStore)
2759            self._rvs = {}  # Set this to avoid the return values being updated after the
2760            #run method has completed!
2761
2762            #Now flag that the service is running jobs can connect to it
2763            logger.debug("Removing the start jobStoreID to indicate that establishment of the service")
2764            assert self.description.startJobStoreID != None
2765            if fileStore.jobStore.fileExists(self.description.startJobStoreID):
2766                fileStore.jobStore.deleteFile(self.description.startJobStoreID)
2767            assert not fileStore.jobStore.fileExists(self.description.startJobStoreID)
2768
2769            #Now block until we are told to stop, which is indicated by the removal
2770            #of a file
2771            assert self.description.terminateJobStoreID != None
2772            while True:
2773                # Check for the terminate signal
2774                if not fileStore.jobStore.fileExists(self.description.terminateJobStoreID):
2775                    logger.debug("Detected that the terminate jobStoreID has been removed so exiting")
2776                    if not fileStore.jobStore.fileExists(self.description.errorJobStoreID):
2777                        raise RuntimeError("Detected the error jobStoreID has been removed so exiting with an error")
2778                    break
2779
2780                # Check the service's status and exit if failed or complete
2781                try:
2782                    if not service.check():
2783                        logger.debug("The service has finished okay, exiting")
2784                        break
2785                except RuntimeError:
2786                    logger.debug("Detected termination of the service")
2787                    raise
2788
2789                time.sleep(fileStore.jobStore.config.servicePollingInterval) #Avoid excessive polling
2790
2791            logger.debug("Service is done")
2792        finally:
2793            # The stop function is always called
2794            service.stop(self)
2795
2796    def getUserScript(self):
2797        return self.serviceModule
2798
2799
2800class Promise:
2801    """
2802    References a return value from a :meth:`toil.job.Job.run` or
2803    :meth:`toil.job.Job.Service.start` method as a *promise* before the method itself is run.
2804
2805    Let T be a job. Instances of :class:`.Promise` (termed a *promise*) are returned by T.rv(),
2806    which is used to reference the return value of T's run function. When the promise is passed
2807    to the constructor (or as an argument to a wrapped function) of a different, successor job
2808    the promise will be replaced by the actual referenced return value. This mechanism allows a
2809    return values from one job's run method to be input argument to job before the former job's
2810    run function has been executed.
2811    """
2812    _jobstore = None
2813    """
2814    Caches the job store instance used during unpickling to prevent it from being instantiated
2815    for each promise
2816
2817    :type: toil.jobStores.abstractJobStore.AbstractJobStore
2818    """
2819
2820    filesToDelete = set()
2821    """
2822    A set of IDs of files containing promised values when we know we won't need them anymore
2823    """
2824    def __init__(self, job, path):
2825        """
2826        :param Job job: the job whose return value this promise references
2827        :param path: see :meth:`.Job.rv`
2828        """
2829        self.job = job
2830        self.path = path
2831
2832    def __reduce__(self):
2833        """
2834        Called during pickling when a promise (an instance of this class) is about to be be
2835        pickled. Returns the Promise class and construction arguments that will be evaluated
2836        during unpickling, namely the job store coordinates of a file that will hold the promised
2837        return value. By the time the promise is about to be unpickled, that file should be
2838        populated.
2839        """
2840        # The allocation of the file in the job store is intentionally lazy, we only allocate an
2841        # empty file in the job store if the promise is actually being pickled. This is done so
2842        # that we do not allocate files for promises that are never used.
2843        jobStoreLocator, jobStoreFileID = self.job.registerPromise(self.path)
2844        # Returning a class object here causes the pickling machinery to attempt to instantiate
2845        # the class. We will catch that with __new__ and return an the actual return value instead.
2846        return self.__class__, (jobStoreLocator, jobStoreFileID)
2847
2848    @staticmethod
2849    def __new__(cls, *args):
2850        assert len(args) == 2
2851        if isinstance(args[0], Job):
2852            # Regular instantiation when promise is created, before it is being pickled
2853            return super().__new__(cls)
2854        else:
2855            # Attempted instantiation during unpickling, return promised value instead
2856            return cls._resolve(*args)
2857
2858    @classmethod
2859    def _resolve(cls, jobStoreLocator, jobStoreFileID):
2860        # Initialize the cached job store if it was never initialized in the current process or
2861        # if it belongs to a different workflow that was run earlier in the current process.
2862        if cls._jobstore is None or cls._jobstore.config.jobStore != jobStoreLocator:
2863            cls._jobstore = Toil.resumeJobStore(jobStoreLocator)
2864        cls.filesToDelete.add(jobStoreFileID)
2865        with cls._jobstore.readFileStream(jobStoreFileID) as fileHandle:
2866            # If this doesn't work then the file containing the promise may not exist or be
2867            # corrupted
2868            value = safeUnpickleFromStream(fileHandle)
2869            return value
2870
2871
2872class PromisedRequirement:
2873    def __init__(self, valueOrCallable, *args):
2874        """
2875        Class for dynamically allocating job function resource requirements involving
2876        :class:`toil.job.Promise` instances.
2877
2878        Use when resource requirements depend on the return value of a parent function.
2879        PromisedRequirements can be modified by passing a function that takes the
2880        :class:`.Promise` as input.
2881
2882        For example, let f, g, and h be functions. Then a Toil workflow can be
2883        defined as follows::
2884        A = Job.wrapFn(f)
2885        B = A.addChildFn(g, cores=PromisedRequirement(A.rv())
2886        C = B.addChildFn(h, cores=PromisedRequirement(lambda x: 2*x, B.rv()))
2887
2888        :param valueOrCallable: A single Promise instance or a function that
2889                                takes args as input parameters.
2890        :param args: variable length argument list
2891        :type args: int or .Promise
2892        """
2893        if hasattr(valueOrCallable, '__call__'):
2894            assert len(args) != 0, 'Need parameters for PromisedRequirement function.'
2895            func = valueOrCallable
2896        else:
2897            assert len(args) == 0, 'Define a PromisedRequirement function to handle multiple arguments.'
2898            func = lambda x: x
2899            args = [valueOrCallable]
2900
2901        self._func = dill.dumps(func)
2902        self._args = list(args)
2903
2904    def getValue(self):
2905        """
2906        Returns PromisedRequirement value
2907        """
2908        func = dill.loads(self._func)
2909        return func(*self._args)
2910
2911    @staticmethod
2912    def convertPromises(kwargs):
2913        """
2914        Returns True if reserved resource keyword is a Promise or
2915        PromisedRequirement instance. Converts Promise instance
2916        to PromisedRequirement.
2917
2918        :param kwargs: function keyword arguments
2919        :return: bool
2920        """
2921        for r in ["disk", "memory", "cores"]:
2922            if isinstance(kwargs.get(r), Promise):
2923                kwargs[r] = PromisedRequirement(kwargs[r])
2924                return True
2925            elif isinstance(kwargs.get(r), PromisedRequirement):
2926                return True
2927        return False
2928
2929
2930class UnfulfilledPromiseSentinel:
2931    """This should be overwritten by a proper promised value. Throws an
2932    exception when unpickled."""
2933    def __init__(self, fulfillingJobName, unpickled):
2934        self.fulfillingJobName = fulfillingJobName
2935
2936    @staticmethod
2937    def __setstate__(stateDict):
2938        """Only called when unpickling. This won't be unpickled unless the
2939        promise wasn't resolved, so we throw an exception."""
2940        jobName = stateDict['fulfillingJobName']
2941        raise RuntimeError("This job was passed a promise that wasn't yet resolved when it "
2942                           "ran. The job {jobName} that fulfills this promise hasn't yet "
2943                           "finished. This means that there aren't enough constraints to "
2944                           "ensure the current job always runs after {jobName}. Consider adding a "
2945                           "follow-on indirection between this job and its parent, or adding "
2946                           "this job as a child/follow-on of {jobName}.".format(jobName=jobName))
2947