1# Copyright (C) 2015-2021 Regents of the University of California
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import enum
15import logging
16import os
17import shutil
18from abc import ABC, abstractmethod
19from contextlib import contextmanager
20from typing import Any, Optional, Tuple, Union, Dict, NamedTuple
21
22from toil.batchSystems.registry import (BATCH_SYSTEM_FACTORY_REGISTRY,
23                                        DEFAULT_BATCH_SYSTEM)
24from toil.common import Toil, cacheDirName, Config
25from toil.deferred import DeferredFunctionManager
26from toil.fileStores.abstractFileStore import AbstractFileStore
27from toil.lib.threading import LastProcessStandingArena
28
29try:
30    from toil.cwl.cwltoil import CWL_INTERNAL_JOBS
31except ImportError:
32    # CWL extra not installed
33    CWL_INTERNAL_JOBS = ()
34
35# Value to use as exitStatus in UpdatedBatchJobInfo.exitStatus when status is not available.
36EXIT_STATUS_UNAVAILABLE_VALUE = 255
37logger = logging.getLogger(__name__)
38
39
40UpdatedBatchJobInfo = NamedTuple('UpdatedBatchJobInfo',
41    [('jobID', str),
42     # The exit status (integer value) of the job. 0 implies successful.
43     # EXIT_STATUS_UNAVAILABLE_VALUE is used when the exit status is not available (e.g. job is lost).
44     ('exitStatus', int),
45     ('exitReason', Union[int, None]),  # The exit reason, if available. One of BatchJobExitReason enum.
46     ('wallTime', Union[float, int, None])])
47
48
49# Information required for worker cleanup on shutdown of the batch system.
50WorkerCleanupInfo = NamedTuple('WorkerCleanupInfo',
51    [('workDir', str),  # workdir path (where the cache would go)
52     ('workflowID', int),  # used to identify files specific to this workflow
53     ('cleanWorkDir', bool)])
54
55
56class BatchJobExitReason(enum.Enum):
57    FINISHED: int = 1  # Successfully finished.
58    FAILED: int = 2  # Job finished, but failed.
59    LOST: int = 3  # Preemptable failure (job's executing host went away).
60    KILLED: int = 4  # Job killed before finishing.
61    ERROR: int = 5  # Internal error.
62    MEMLIMIT: int = 6  # Job hit batch system imposed memory limit
63
64
65class AbstractBatchSystem(ABC):
66    """
67    An abstract (as far as Python currently allows) base class to represent the interface the batch
68    system must provide to Toil.
69    """
70    @classmethod
71    @abstractmethod
72    def supportsAutoDeployment(cls):
73        """
74        Whether this batch system supports auto-deployment of the user script itself. If it does,
75        the :meth:`.setUserScript` can be invoked to set the resource object representing the user
76        script.
77
78        Note to implementors: If your implementation returns True here, it should also override
79
80        :rtype: bool
81        """
82        raise NotImplementedError()
83
84    @classmethod
85    @abstractmethod
86    def supportsWorkerCleanup(cls):
87        """
88        Indicates whether this batch system invokes
89        :meth:`BatchSystemSupport.workerCleanup` after the last job for a
90        particular workflow invocation finishes. Note that the term *worker*
91        refers to an entire node, not just a worker process. A worker process
92        may run more than one job sequentially, and more than one concurrent
93        worker process may exist on a worker node, for the same workflow. The
94        batch system is said to *shut down* after the last worker process
95        terminates.
96
97        :rtype: bool
98        """
99        raise NotImplementedError()
100
101    def setUserScript(self, userScript):
102        """
103        Set the user script for this workflow. This method must be called before the first job is
104        issued to this batch system, and only if :meth:`.supportsAutoDeployment` returns True,
105        otherwise it will raise an exception.
106
107        :param toil.resource.Resource userScript: the resource object representing the user script
108               or module and the modules it depends on.
109        """
110        raise NotImplementedError()
111
112    @abstractmethod
113    def issueBatchJob(self, jobDesc):
114        """
115        Issues a job with the specified command to the batch system and returns a unique jobID.
116
117        :param jobDesc a toil.job.JobDescription
118
119        :return: a unique jobID that can be used to reference the newly issued job
120        :rtype: int
121        """
122        raise NotImplementedError()
123
124    @abstractmethod
125    def killBatchJobs(self, jobIDs):
126        """
127        Kills the given job IDs. After returning, the killed jobs will not
128        appear in the results of getRunningBatchJobIDs. The killed job will not
129        be returned from getUpdatedBatchJob.
130
131        :param jobIDs: list of IDs of jobs to kill
132        :type jobIDs: list[int]
133        """
134        raise NotImplementedError()
135
136    # FIXME: Return value should be a set (then also fix the tests)
137
138    @abstractmethod
139    def getIssuedBatchJobIDs(self):
140        """
141        Gets all currently issued jobs
142
143        :return: A list of jobs (as jobIDs) currently issued (may be running, or may be
144                 waiting to be run). Despite the result being a list, the ordering should not
145                 be depended upon.
146        :rtype: list[str]
147        """
148        raise NotImplementedError()
149
150    @abstractmethod
151    def getRunningBatchJobIDs(self):
152        """
153        Gets a map of jobs as jobIDs that are currently running (not just waiting)
154        and how long they have been running, in seconds.
155
156        :return: dictionary with currently running jobID keys and how many seconds they have
157                 been running as the value
158        :rtype: dict[int,float]
159        """
160        raise NotImplementedError()
161
162    @abstractmethod
163    def getUpdatedBatchJob(self, maxWait):
164        """
165        Returns information about job that has updated its status (i.e. ceased
166        running, either successfully or with an error). Each such job will be
167        returned exactly once.
168
169        Does not return info for jobs killed by killBatchJobs, although they
170        may cause None to be returned earlier than maxWait.
171
172        :param float maxWait: the number of seconds to block, waiting for a result
173
174        :rtype: UpdatedBatchJobInfo or None
175        :return: If a result is available, returns UpdatedBatchJobInfo.
176                 Otherwise it returns None. wallTime is the number of seconds (a strictly
177                 positive float) in wall-clock time the job ran for, or None if this
178                 batch system does not support tracking wall time.
179        """
180        raise NotImplementedError()
181
182    def getSchedulingStatusMessage(self):
183        """
184        Get a log message fragment for the user about anything that might be
185        going wrong in the batch system, if available.
186
187        If no useful message is available, return None.
188
189        This can be used to report what resource is the limiting factor when
190        scheduling jobs, for example. If the leader thinks the workflow is
191        stuck, the message can be displayed to the user to help them diagnose
192        why it might be stuck.
193
194        :rtype: str or None
195        :return: User-directed message about scheduling state.
196        """
197
198        # Default implementation returns None.
199        # Override to provide scheduling status information.
200        return None
201
202    @abstractmethod
203    def shutdown(self):
204        """
205        Called at the completion of a toil invocation.
206        Should cleanly terminate all worker threads.
207        """
208        raise NotImplementedError()
209
210    def setEnv(self, name, value=None):
211        """
212        Set an environment variable for the worker process before it is launched. The worker
213        process will typically inherit the environment of the machine it is running on but this
214        method makes it possible to override specific variables in that inherited environment
215        before the worker is launched. Note that this mechanism is different to the one used by
216        the worker internally to set up the environment of a job. A call to this method affects
217        all jobs issued after this method returns. Note to implementors: This means that you
218        would typically need to copy the variables before enqueuing a job.
219
220        If no value is provided it will be looked up from the current environment.
221        """
222        raise NotImplementedError()
223
224    @classmethod
225    def setOptions(cls, setOption):
226        """
227        Process command line or configuration options relevant to this batch system.
228        The
229
230        :param setOption: A function with signature setOption(varName, parsingFn=None, checkFn=None, default=None)
231           used to update run configuration
232        """
233
234    def getWorkerContexts(self):
235        """
236        Get a list of picklable context manager objects to wrap worker work in,
237        in order.
238
239        Can be used to ask the Toil worker to do things in-process (such as
240        configuring environment variables, hot-deploying user scripts, or
241        cleaning up a node) that would otherwise require a wrapping "executor"
242        process.
243
244        :rtype: list
245        """
246        return []
247
248
249class BatchSystemSupport(AbstractBatchSystem):
250    """
251    Partial implementation of AbstractBatchSystem, support methods.
252    """
253
254    def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int):
255        """
256        Initializes initial state of the object
257
258        :param toil.common.Config config: object is setup by the toilSetup script and
259          has configuration parameters for the jobtree. You can add code
260          to that script to get parameters for your batch system.
261
262        :param float maxCores: the maximum number of cores the batch system can
263          request for any one job
264
265        :param int maxMemory: the maximum amount of memory the batch system can
266          request for any one job, in bytes
267
268        :param int maxDisk: the maximum amount of disk space the batch system can
269          request for any one job, in bytes
270        """
271        super(BatchSystemSupport, self).__init__()
272        self.config = config
273        self.maxCores = maxCores
274        self.maxMemory = maxMemory
275        self.maxDisk = maxDisk
276        self.environment: Dict[str, str] = {}
277        self.workerCleanupInfo = WorkerCleanupInfo(workDir=self.config.workDir,
278                                                   workflowID=self.config.workflowID,
279                                                   cleanWorkDir=self.config.cleanWorkDir)
280
281    def checkResourceRequest(self, memory: int, cores: float, disk: int, job_name: str = '', detail: str = ''):
282        """
283        Check resource request is not greater than that available or allowed.
284
285        :param int memory: amount of memory being requested, in bytes
286
287        :param float cores: number of cores being requested
288
289        :param int disk: amount of disk space being requested, in bytes
290
291        :param str job_name: Name of the job being checked, for generating a useful error report.
292
293        :param str detail: Batch-system-specific message to include in the error.
294
295        :raise InsufficientSystemResources: raised when a resource is requested in an amount
296               greater than allowed
297        """
298        batch_system = self.__class__.__name__ or 'this batch system'
299        for resource, requested, available in [('cores', cores, self.maxCores),
300                                               ('memory', memory, self.maxMemory),
301                                               ('disk', disk, self.maxDisk)]:
302            assert requested is not None
303            if requested > available:
304                unit = 'bytes of ' if resource in ('disk', 'memory') else ''
305                R = f'The job {job_name} is r' if job_name else 'R'
306                if resource == 'disk':
307                    msg = (f'{R}equesting {requested} {unit}{resource} for temporary space, '
308                           f'more than the maximum of {available} {unit}{resource} of free space on '
309                           f'{self.config.workDir} that {batch_system} was configured with, or enforced '
310                           f'by --max{resource.capitalize()}.  Try setting/changing the toil option '
311                           f'"--workDir" or changing the base temporary directory by setting TMPDIR.')
312                else:
313                    msg = (f'{R}equesting {requested} {unit}{resource}, more than the maximum of '
314                           f'{available} {unit}{resource} that {batch_system} was configured with, '
315                           f'or enforced by --max{resource.capitalize()}.')
316                if detail:
317                    msg += detail
318
319                raise InsufficientSystemResources(msg)
320
321    def setEnv(self, name, value=None):
322        """
323        Set an environment variable for the worker process before it is launched. The worker
324        process will typically inherit the environment of the machine it is running on but this
325        method makes it possible to override specific variables in that inherited environment
326        before the worker is launched. Note that this mechanism is different to the one used by
327        the worker internally to set up the environment of a job. A call to this method affects
328        all jobs issued after this method returns. Note to implementors: This means that you
329        would typically need to copy the variables before enqueuing a job.
330
331        If no value is provided it will be looked up from the current environment.
332
333        :param str name: the environment variable to be set on the worker.
334
335        :param str value: if given, the environment variable given by name will be set to this value.
336               if None, the variable's current value will be used as the value on the worker
337
338        :raise RuntimeError: if value is None and the name cannot be found in the environment
339        """
340        if value is None:
341            try:
342                value = os.environ[name]
343            except KeyError:
344                raise RuntimeError(f"{name} does not exist in current environment")
345        self.environment[name] = value
346
347    def formatStdOutErrPath(self, toil_job_id: int, cluster_job_id: str, std: str) -> str:
348        """
349        Format path for batch system standard output/error and other files
350        generated by the batch system itself.
351
352        Files will be written to the Toil work directory (which may
353        be on a shared file system) with names containing both the Toil and
354        batch system job IDs, for ease of debugging job failures.
355
356        :param: int toil_job_id : The unique id that Toil gives a job.
357        :param: cluster_job_id : What the cluster, for example, GridEngine, uses as its internal job id.
358        :param: string std : The provenance of the stream (for example: 'err' for 'stderr' or 'out' for 'stdout')
359
360        :rtype: string : Formatted filename; however if self.config.noStdOutErr is true,
361             returns '/dev/null' or equivalent.
362        """
363        if self.config.noStdOutErr:
364            return os.devnull
365
366        fileName: str = f'toil_{self.config.workflowID}.{toil_job_id}.{cluster_job_id}.{std}.log'
367        workDir: str = Toil.getToilWorkDir(self.config.workDir)
368        return os.path.join(workDir, fileName)
369
370    @staticmethod
371    def workerCleanup(info: WorkerCleanupInfo) -> None:
372        """
373        Cleans up the worker node on batch system shutdown. Also see :meth:`supportsWorkerCleanup`.
374
375        :param WorkerCleanupInfo info: A named tuple consisting of all the relevant information
376               for cleaning up the worker.
377        """
378        assert isinstance(info, WorkerCleanupInfo)
379        workflowDir = Toil.getLocalWorkflowDir(info.workflowID, info.workDir)
380        DeferredFunctionManager.cleanupWorker(workflowDir)
381        workflowDirContents = os.listdir(workflowDir)
382        AbstractFileStore.shutdownFileStore(workflowDir, info.workflowID)
383        if (info.cleanWorkDir == 'always'
384            or info.cleanWorkDir in ('onSuccess', 'onError')
385            and workflowDirContents in ([], [cacheDirName(info.workflowID)])):
386            shutil.rmtree(workflowDir, ignore_errors=True)
387
388
389class BatchSystemLocalSupport(BatchSystemSupport):
390    """
391    Adds a local queue for helper jobs, useful for CWL & others
392    """
393
394    def __init__(self, config, maxCores, maxMemory, maxDisk):
395        super(BatchSystemLocalSupport, self).__init__(config, maxCores, maxMemory, maxDisk)
396        self.localBatch = BATCH_SYSTEM_FACTORY_REGISTRY[DEFAULT_BATCH_SYSTEM]()(
397                config, config.maxLocalJobs, maxMemory, maxDisk)
398
399    def handleLocalJob(self, jobDesc):  # type: (Any) -> Optional[int]
400        """
401        To be called by issueBatchJobs.
402
403        Returns the jobID if the jobDesc has been submitted to the local queue,
404        otherwise returns None
405        """
406        if (not self.config.runCwlInternalJobsOnWorkers
407                and jobDesc.jobName.startswith(CWL_INTERNAL_JOBS)):
408            return self.localBatch.issueBatchJob(jobDesc)
409        else:
410            return None
411
412    def killLocalJobs(self, jobIDs):
413        """
414        To be called by killBatchJobs. Will kill all local jobs that match the
415        provided jobIDs.
416        """
417        self.localBatch.killBatchJobs(jobIDs)
418
419    def getIssuedLocalJobIDs(self):
420        """To be called by getIssuedBatchJobIDs"""
421        return self.localBatch.getIssuedBatchJobIDs()
422
423    def getRunningLocalJobIDs(self):
424        """To be called by getRunningBatchJobIDs()."""
425        return self.localBatch.getRunningBatchJobIDs()
426
427    def getUpdatedLocalJob(self, maxWait):
428        # type: (int) -> Optional[Tuple[int, int, int]]
429        """To be called by getUpdatedBatchJob()"""
430        return self.localBatch.getUpdatedBatchJob(maxWait)
431
432    def getNextJobID(self):  # type: () -> int
433        """
434        Must be used to get job IDs so that the local and batch jobs do not
435        conflict.
436        """
437        with self.localBatch.jobIndexLock:
438            jobID = self.localBatch.jobIndex
439            self.localBatch.jobIndex += 1
440        return jobID
441
442    def shutdownLocal(self):  # type: () -> None
443        """To be called from shutdown()"""
444        self.localBatch.shutdown()
445
446class BatchSystemCleanupSupport(BatchSystemLocalSupport):
447    """
448    Adds cleanup support when the last running job leaves a node, for batch
449    systems that can't provide it using the backing scheduler.
450    """
451
452    @classmethod
453    def supportsWorkerCleanup(cls):
454        return True
455
456    def getWorkerContexts(self):
457        # Tell worker to register for and invoke cleanup
458
459        # Create a context manager that has a copy of our cleanup info
460        context = WorkerCleanupContext(self.workerCleanupInfo)
461
462        # Send it along so the worker works inside of it
463        contexts = super(BatchSystemCleanupSupport, self).getWorkerContexts()
464        contexts.append(context)
465        return contexts
466
467    def __init__(self, config, maxCores, maxMemory, maxDisk):
468        super(BatchSystemCleanupSupport, self).__init__(config, maxCores, maxMemory, maxDisk)
469
470class WorkerCleanupContext:
471    """
472    Context manager used by :class:`BatchSystemCleanupSupport` to implement
473    cleanup on a node after the last worker is done working.
474
475    Gets wrapped around the worker's work.
476    """
477
478    def __init__(self, workerCleanupInfo):
479        """
480        Wrap the given workerCleanupInfo in a context manager.
481
482        :param WorkerCleanupInfo workerCleanupInfo: Info to use to clean up the worker if we are
483                                                    the last to exit the context manager.
484        """
485
486
487        self.workerCleanupInfo = workerCleanupInfo
488        self.arena = None
489
490    def __enter__(self):
491        # Set up an arena so we know who is the last worker to leave
492        self.arena = LastProcessStandingArena(Toil.getToilWorkDir(self.workerCleanupInfo.workDir),
493                                              self.workerCleanupInfo.workflowID + '-cleanup')
494        self.arena.enter()
495
496    def __exit__(self, type, value, traceback):
497        for _ in self.arena.leave():
498            # We are the last concurrent worker to finish.
499            # Do batch system cleanup.
500            logger.debug('Cleaning up worker')
501            BatchSystemSupport.workerCleanup(self.workerCleanupInfo)
502        # We have nothing to say about exceptions
503        return False
504
505class NodeInfo(object):
506    """
507    The coresUsed attribute  is a floating point value between 0 (all cores idle) and 1 (all cores
508    busy), reflecting the CPU load of the node.
509
510    The memoryUsed attribute is a floating point value between 0 (no memory used) and 1 (all memory
511    used), reflecting the memory pressure on the node.
512
513    The coresTotal and memoryTotal attributes are the node's resources, not just the used resources
514
515    The requestedCores and requestedMemory attributes are all the resources that Toil Jobs have reserved on the
516    node, regardless of whether the resources are actually being used by the Jobs.
517
518    The workers attribute is an integer reflecting the number of workers currently active workers
519    on the node.
520    """
521    def __init__(self, coresUsed, memoryUsed, coresTotal, memoryTotal,
522                 requestedCores, requestedMemory, workers):
523        self.coresUsed = coresUsed
524        self.memoryUsed = memoryUsed
525
526        self.coresTotal = coresTotal
527        self.memoryTotal = memoryTotal
528
529        self.requestedCores = requestedCores
530        self.requestedMemory = requestedMemory
531
532        self.workers = workers
533
534
535class AbstractScalableBatchSystem(AbstractBatchSystem):
536    """
537    A batch system that supports a variable number of worker nodes. Used by :class:`toil.
538    provisioners.clusterScaler.ClusterScaler` to scale the number of worker nodes in the cluster
539    up or down depending on overall load.
540    """
541
542    @abstractmethod
543    def getNodes(self, preemptable=None):
544        """
545        Returns a dictionary mapping node identifiers of preemptable or non-preemptable nodes to
546        NodeInfo objects, one for each node.
547
548        :param bool preemptable: If True (False) only (non-)preemptable nodes will be returned.
549               If None, all nodes will be returned.
550
551        :rtype: dict[str,NodeInfo]
552        """
553        raise NotImplementedError()
554
555    @abstractmethod
556    def nodeInUse(self, nodeIP):
557        """
558        Can be used to determine if a worker node is running any tasks. If the node is doesn't
559        exist, this function should simply return False.
560
561        :param str nodeIP: The worker nodes private IP address
562
563        :return: True if the worker node has been issued any tasks, else False
564        :rtype: bool
565        """
566        raise NotImplementedError()
567
568    @abstractmethod
569    @contextmanager
570    def nodeFiltering(self, filter):
571        """
572        Used to prevent races in autoscaling where
573        1) nodes have reported to the autoscaler as having no jobs
574        2) scaler decides to terminate these nodes. In parallel the batch system assigns jobs to the same nodes
575        3) scaler terminates nodes, resulting in job failures for all jobs on that node.
576
577        Call this method prior to node termination to ensure that nodes being considered for termination are not
578        assigned new jobs. Call the method again passing None as the filter to disable the filtering
579        after node termination is done.
580
581        :param method: This will be used as a filter on nodes considered when assigning new jobs.
582            After this context manager exits the filter should be removed
583        :rtype: None
584        """
585        raise NotImplementedError()
586
587    @abstractmethod
588    def ignoreNode(self, nodeAddress):
589        """
590        Stop sending jobs to this node. Used in autoscaling
591        when the autoscaler is ready to terminate a node, but
592        jobs are still running. This allows the node to be terminated
593        after the current jobs have finished.
594
595        :param str: IP address of node to ignore.
596        :rtype: None
597        """
598        raise NotImplementedError()
599
600    @abstractmethod
601    def unignoreNode(self, nodeAddress):
602        """
603        Stop ignoring this address, presumably after
604        a node with this address has been terminated. This allows for the
605        possibility of a new node having the same address as a terminated one.
606        """
607        raise NotImplementedError()
608
609
610class InsufficientSystemResources(Exception):
611    pass
612