1# Copyright (C) 2015-2021 Regents of the University of California
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import logging
15import os
16import pickle
17import re
18import requests
19import subprocess
20import sys
21import tempfile
22import time
23import uuid
24
25from argparse import _ArgumentGroup, ArgumentParser, ArgumentDefaultsHelpFormatter
26from typing import Optional, Callable, Any, List, Tuple, Union
27
28from toil import logProcessContext, lookupEnvVar
29from toil.batchSystems.options import (add_all_batchsystem_options,
30                                       set_batchsystem_config_defaults,
31                                       set_batchsystem_options)
32from toil.lib.conversions import human2bytes, bytes2human
33from toil.lib.retry import retry
34from toil.provisioners import (add_provisioner_options,
35                               parse_node_types,
36                               check_valid_node_types,
37                               cluster_factory)
38from toil.lib.aws import zone_to_region
39from toil.realtimeLogger import RealtimeLogger
40from toil.statsAndLogging import (add_logging_options,
41                                  root_logger,
42                                  set_logging_from_options)
43from toil.version import dockerRegistry, dockerTag, version
44
45# aim to pack autoscaling jobs within a 30 minute block before provisioning a new node
46defaultTargetTime = 1800
47UUID_LENGTH = 32
48logger = logging.getLogger(__name__)
49
50
51class Config:
52    """Class to represent configuration operations for a toil workflow run."""
53    def __init__(self):
54        # Core options
55        self.workflowID: Optional[str] = None
56        """This attribute uniquely identifies the job store and therefore the workflow. It is
57        necessary in order to distinguish between two consecutive workflows for which
58        self.jobStore is the same, e.g. when a job store name is reused after a previous run has
59        finished successfully and its job store has been clean up."""
60        self.workflowAttemptNumber = None
61        self.jobStore = None
62        self.logLevel: str = logging.getLevelName(root_logger.getEffectiveLevel())
63        self.workDir: Optional[str] = None
64        self.noStdOutErr: bool = False
65        self.stats: bool = False
66
67        # Because the stats option needs the jobStore to persist past the end of the run,
68        # the clean default value depends the specified stats option and is determined in setOptions
69        self.clean = None
70        self.cleanWorkDir: Optional[bool] = None
71        self.clusterStats = None
72
73        # Restarting the workflow options
74        self.restart: bool = False
75
76        # Batch system options
77        set_batchsystem_config_defaults(self)
78
79        # Autoscaling options
80        self.provisioner = None
81        self.nodeTypes = []
82        self.minNodes = None
83        self.maxNodes = [10]
84        self.targetTime = defaultTargetTime
85        self.betaInertia = 0.1
86        self.scaleInterval = 60
87        self.preemptableCompensation = 0.0
88        self.nodeStorage = 50
89        self.nodeStorageOverrides = []
90        self.metrics: bool = False
91
92        # Parameters to limit service jobs, so preventing deadlock scheduling scenarios
93        self.maxPreemptableServiceJobs: int = sys.maxsize
94        self.maxServiceJobs: int = sys.maxsize
95        self.deadlockWait: Union[float, int] = 60  # Number of seconds we must be stuck with all services before declaring a deadlock
96        self.deadlockCheckInterval: Union[float, int] = 30  # Minimum polling delay for deadlocks
97        self.statePollingWait: Union[float, int] = 1  # Number of seconds to wait before querying job state
98
99        # Resource requirements
100        self.defaultMemory: int = 2147483648
101        self.defaultCores: Union[float, int] = 1
102        self.defaultDisk: int = 2147483648
103        self.readGlobalFileMutableByDefault: bool = False
104        self.defaultPreemptable: bool = False
105        self.maxCores: int = sys.maxsize
106        self.maxMemory: int = sys.maxsize
107        self.maxDisk: int = sys.maxsize
108
109        # Retrying/rescuing jobs
110        self.retryCount: int = 1
111        self.enableUnlimitedPreemptableRetries: bool = False
112        self.doubleMem: bool = False
113        self.maxJobDuration: int = sys.maxsize
114        self.rescueJobsFrequency: int = 3600
115
116        # Misc
117        self.disableCaching: bool = False
118        self.disableChaining: bool = False
119        self.disableJobStoreChecksumVerification: bool = False
120        self.maxLogFileSize: int = 64000
121        self.writeLogs = None
122        self.writeLogsGzip = None
123        self.writeLogsFromAllJobs: bool = False
124        self.sseKey: str = None
125        self.servicePollingInterval: int = 60
126        self.useAsync: bool = True
127        self.forceDockerAppliance: bool = False
128        self.runCwlInternalJobsOnWorkers: bool = False
129        self.statusWait: int = 3600
130        self.disableProgress: bool = False
131
132        # Debug options
133        self.debugWorker: bool = False
134        self.disableWorkerOutputCapture: bool = False
135        self.badWorker = 0.0
136        self.badWorkerFailInterval = 0.01
137
138        # CWL
139        self.cwl: bool = False
140
141    def setOptions(self, options) -> None:
142        """Creates a config object from the options object."""
143        def set_option(option_name: str,
144                       parsing_function: Optional[Callable] = None,
145                       check_function: Optional[Callable] = None,
146                       default: Any = None) -> None:
147            option_value = getattr(options, option_name, default)
148
149            if option_value is not None:
150                if parsing_function is not None:
151                    option_value = parsing_function(option_value)
152                if check_function is not None:
153                    try:
154                        check_function(option_value)
155                    except AssertionError:
156                        raise RuntimeError(f"The {option_name} option has an invalid value: {option_value}")
157                setattr(self, option_name, option_value)
158
159        # Function to parse integer from string expressed in different formats
160        h2b = lambda x: human2bytes(str(x))
161
162        def parse_jobstore(jobstore_uri: str):
163            name, rest = Toil.parseLocator(jobstore_uri)
164            if name == 'file':
165                # We need to resolve relative paths early, on the leader, because the worker process
166                # may have a different working directory than the leader, e.g. under Mesos.
167                return Toil.buildLocator(name, os.path.abspath(rest))
168            else:
169                return jobstore_uri
170
171        def parse_str_list(s: str):
172            return [str(x) for x in s.split(",")]
173
174        def parse_int_list(s: str):
175            return [int(x) for x in s.split(",")]
176
177        # Core options
178        set_option("jobStore", parsing_function=parse_jobstore)
179        # TODO: LOG LEVEL STRING
180        set_option("workDir")
181        if self.workDir is not None:
182            self.workDir: Optional[str] = os.path.abspath(self.workDir)
183            if not os.path.exists(self.workDir):
184                raise RuntimeError(f"The path provided to --workDir ({self.workDir}) does not exist.")
185
186            if len(self.workDir) > 80:
187                logger.warning(f'Length of workDir path "{self.workDir}" is {len(self.workDir)} characters.  '
188                               f'Consider setting a shorter path with --workPath or setting TMPDIR to something '
189                               f'like "/tmp" to avoid overly long paths.')
190
191        set_option("noStdOutErr")
192        set_option("stats")
193        set_option("cleanWorkDir")
194        set_option("clean")
195        if self.stats:
196            if self.clean != "never" and self.clean is not None:
197                raise RuntimeError("Contradicting options passed: Clean flag is set to %s "
198                                   "despite the stats flag requiring "
199                                   "the jobStore to be intact at the end of the run. "
200                                   "Set clean to \'never\'" % self.clean)
201            self.clean = "never"
202        elif self.clean is None:
203            self.clean = "onSuccess"
204        set_option('clusterStats')
205        set_option("restart")
206
207        # Batch system options
208        set_option("batchSystem")
209        set_batchsystem_options(self.batchSystem, set_option)
210        set_option("disableAutoDeployment")
211        set_option("scale", float, fC(0.0))
212        set_option("parasolCommand")
213        set_option("parasolMaxBatches", int, iC(1))
214        set_option("linkImports")
215        set_option("moveExports")
216        set_option("allocate_mem")
217        set_option("mesosMasterAddress")
218        set_option("kubernetesHostPath")
219        set_option("environment", parseSetEnv)
220
221        # Autoscaling options
222        set_option("provisioner")
223        set_option("nodeTypes", parse_node_types)
224        set_option("minNodes", parse_int_list)
225        set_option("maxNodes", parse_int_list)
226        set_option("targetTime", int)
227        if self.targetTime <= 0:
228            raise RuntimeError(f'targetTime ({self.targetTime}) must be a positive integer!')
229        set_option("betaInertia", float)
230        if not 0.0 <= self.betaInertia <= 0.9:
231            raise RuntimeError(f'betaInertia ({self.betaInertia}) must be between 0.0 and 0.9!')
232        set_option("scaleInterval", float)
233        set_option("metrics")
234        set_option("preemptableCompensation", float)
235        if not 0.0 <= self.preemptableCompensation <= 1.0:
236            raise RuntimeError(f'preemptableCompensation ({self.preemptableCompensation}) must be between 0.0 and 1.0!')
237        set_option("nodeStorage", int)
238
239        def check_nodestoreage_overrides(overrides: List[str]) -> None:
240            for override in overrides:
241                tokens = override.split(":")
242                assert len(tokens) == 2, \
243                    'Each component of --nodeStorageOverrides must be of the form <instance type>:<storage in GiB>'
244                assert any(tokens[0] in n[0] for n in self.nodeTypes), \
245                    'instance type in --nodeStorageOverrides must be used in --nodeTypes'
246                assert tokens[1].isdigit(), \
247                    'storage must be an integer in --nodeStorageOverrides'
248        set_option("nodeStorageOverrides", parse_str_list, check_function=check_nodestoreage_overrides)
249
250        # Parameters to limit service jobs / detect deadlocks
251        set_option("maxServiceJobs", int)
252        set_option("maxPreemptableServiceJobs", int)
253        set_option("deadlockWait", int)
254        set_option("deadlockCheckInterval", int)
255        set_option("statePollingWait", int)
256
257        # Resource requirements
258        set_option("defaultMemory", h2b, iC(1))
259        set_option("defaultCores", float, fC(1.0))
260        set_option("defaultDisk", h2b, iC(1))
261        set_option("readGlobalFileMutableByDefault")
262        set_option("maxCores", int, iC(1))
263        set_option("maxMemory", h2b, iC(1))
264        set_option("maxDisk", h2b, iC(1))
265        set_option("defaultPreemptable")
266
267        # Retrying/rescuing jobs
268        set_option("retryCount", int, iC(1))
269        set_option("enableUnlimitedPreemptableRetries")
270        set_option("doubleMem")
271        set_option("maxJobDuration", int, iC(1))
272        set_option("rescueJobsFrequency", int, iC(1))
273
274        # Misc
275        set_option("maxLocalJobs", int)
276        set_option("disableCaching")
277        set_option("disableChaining")
278        set_option("disableJobStoreChecksumVerification")
279        set_option("maxLogFileSize", h2b, iC(1))
280        set_option("writeLogs")
281        set_option("writeLogsGzip")
282        set_option("writeLogsFromAllJobs")
283        set_option("runCwlInternalJobsOnWorkers")
284        set_option("disableProgress")
285
286        assert not (self.writeLogs and self.writeLogsGzip), \
287            "Cannot use both --writeLogs and --writeLogsGzip at the same time."
288        assert not self.writeLogsFromAllJobs or self.writeLogs or self.writeLogsGzip, \
289            "To enable --writeLogsFromAllJobs, either --writeLogs or --writeLogsGzip must be set."
290
291        def check_sse_key(sse_key: str) -> None:
292            with open(sse_key) as f:
293                assert len(f.readline().rstrip()) == 32, 'SSE key appears to be invalid.'
294
295        set_option("sseKey", check_function=check_sse_key)
296        set_option("servicePollingInterval", float, fC(0.0))
297        set_option("forceDockerAppliance")
298
299        # Debug options
300        set_option("debugWorker")
301        set_option("disableWorkerOutputCapture")
302        set_option("badWorker", float, fC(0.0, 1.0))
303        set_option("badWorkerFailInterval", float, fC(0.0))
304
305    def __eq__(self, other):
306        return self.__dict__ == other.__dict__
307
308    def __hash__(self):
309        return self.__dict__.__hash__()
310
311
312JOBSTORE_HELP = ("The location of the job store for the workflow.  "
313                 "A job store holds persistent information about the jobs, stats, and files in a "
314                 "workflow. If the workflow is run with a distributed batch system, the job "
315                 "store must be accessible by all worker nodes. Depending on the desired "
316                 "job store implementation, the location should be formatted according to "
317                 "one of the following schemes:\n\n"
318                 "file:<path> where <path> points to a directory on the file systen\n\n"
319                 "aws:<region>:<prefix> where <region> is the name of an AWS region like "
320                 "us-west-2 and <prefix> will be prepended to the names of any top-level "
321                 "AWS resources in use by job store, e.g. S3 buckets.\n\n "
322                 "google:<project_id>:<prefix> TODO: explain\n\n"
323                 "For backwards compatibility, you may also specify ./foo (equivalent to "
324                 "file:./foo or just file:foo) or /bar (equivalent to file:/bar).")
325
326
327def parser_with_common_options(provisioner_options=False, jobstore_option=True):
328    parser = ArgumentParser(prog='Toil', formatter_class=ArgumentDefaultsHelpFormatter)
329
330    if provisioner_options:
331        add_provisioner_options(parser)
332
333    if jobstore_option:
334        parser.add_argument('jobStore', type=str, help=JOBSTORE_HELP)
335
336    # always add these
337    add_logging_options(parser)
338    parser.add_argument("--version", action='version', version=version)
339    parser.add_argument("--tempDirRoot", dest="tempDirRoot", type=str, default=tempfile.gettempdir(),
340                        help="Path to where temporary directory containing all temp files are created, "
341                             "by default generates a fresh tmp dir with 'tempfile.gettempdir()'.")
342    return parser
343
344
345def addOptions(parser: ArgumentParser, config: Config = Config()):
346    if not (isinstance(parser, ArgumentParser) or isinstance(parser, _ArgumentGroup)):
347        raise ValueError(f"Unanticipated class: {parser.__class__}.  Must be: argparse.ArgumentParser or ArgumentGroup.")
348
349    add_logging_options(parser)
350    parser.register("type", "bool", parseBool)  # Custom type for arg=True/False.
351
352    # Core options
353    core_options = parser.add_argument_group(
354        title="Toil core options.",
355        description="Options to specify the location of the Toil workflow and "
356                    "turn on stats collation about the performance of jobs."
357    )
358    core_options.add_argument('jobStore', type=str, help=JOBSTORE_HELP)
359    core_options.add_argument("--workDir", dest="workDir", default=None,
360                              help="Absolute path to directory where temporary files generated during the Toil "
361                                   "run should be placed. Standard output and error from batch system jobs "
362                                   "(unless --noStdOutErr) will be placed in this directory. A cache directory "
363                                   "may be placed in this directory. Temp files and folders will be placed in a "
364                                   "directory toil-<workflowID> within workDir. The workflowID is generated by "
365                                   "Toil and will be reported in the workflow logs. Default is determined by the "
366                                   "variables (TMPDIR, TEMP, TMP) via mkdtemp. This directory needs to exist on "
367                                   "all machines running jobs; if capturing standard output and error from batch "
368                                   "system jobs is desired, it will generally need to be on a shared file system. "
369                                   "When sharing a cache between containers on a host, this directory must be "
370                                   "shared between the containers.")
371    core_options.add_argument("--noStdOutErr", dest="noStdOutErr", action="store_true", default=None,
372                              help="Do not capture standard output and error from batch system jobs.")
373    core_options.add_argument("--stats", dest="stats", action="store_true", default=None,
374                              help="Records statistics about the toil workflow to be used by 'toil stats'.")
375    clean_choices = ['always', 'onError', 'never', 'onSuccess']
376    core_options.add_argument("--clean", dest="clean", choices=clean_choices, default=None,
377                              help=f"Determines the deletion of the jobStore upon completion of the program.  "
378                                   f"Choices: {clean_choices}.  The --stats option requires information from the "
379                                   f"jobStore upon completion so the jobStore will never be deleted with that flag.  "
380                                   f"If you wish to be able to restart the run, choose \'never\' or \'onSuccess\'.  "
381                                   f"Default is \'never\' if stats is enabled, and \'onSuccess\' otherwise.")
382    core_options.add_argument("--cleanWorkDir", dest="cleanWorkDir", choices=clean_choices, default='always',
383                              help=f"Determines deletion of temporary worker directory upon completion of a job.  "
384                                   f"Choices: {clean_choices}.  Default = always.  WARNING: This option should be "
385                                   f"changed for debugging only.  Running a full pipeline with this option could "
386                                   f"fill your disk with excessive intermediate data.")
387    core_options.add_argument("--clusterStats", dest="clusterStats", nargs='?', action='store', default=None,
388                              const=os.getcwd(),
389                              help="If enabled, writes out JSON resource usage statistics to a file.  "
390                                   "The default location for this file is the current working directory, but an "
391                                   "absolute path can also be passed to specify where this file should be written. "
392                                   "This options only applies when using scalable batch systems.")
393
394    # Restarting the workflow options
395    restart_options = parser.add_argument_group(
396        title="Toil options for restarting an existing workflow.",
397        description="Allows the restart of an existing workflow"
398    )
399    restart_options.add_argument("--restart", dest="restart", default=None, action="store_true",
400                                 help="If --restart is specified then will attempt to restart existing workflow "
401                                      "at the location pointed to by the --jobStore option. Will raise an exception "
402                                      "if the workflow does not exist")
403
404    # Batch system options
405    batchsystem_options = parser.add_argument_group(
406        title="Toil options for specifying the batch system.",
407        description="Allows the specification of the batch system."
408    )
409    batchsystem_options.add_argument("--statePollingWait", dest="statePollingWait", default=1, type=int,
410                                     help="Time, in seconds, to wait before doing a scheduler query for job state.  "
411                                          "Return cached results if within the waiting period.")
412    add_all_batchsystem_options(batchsystem_options)
413
414    # Auto scaling options
415    autoscaling_options = parser.add_argument_group(
416        title="Toil options for autoscaling the cluster of worker nodes.",
417        description="Allows the specification of the minimum and maximum number of nodes in an autoscaled cluster, "
418                    "as well as parameters to control the level of provisioning."
419    )
420    provisioner_choices = ['aws', 'gce', None]
421    # TODO: Better consolidate this provisioner arg and the one in provisioners/__init__.py?
422    autoscaling_options.add_argument('--provisioner', '-p', dest="provisioner", choices=provisioner_choices,
423                                     help=f"The provisioner for cluster auto-scaling.  This is the main Toil "
424                                          f"'--provisioner' option, and defaults to None for running on single "
425                                          f"machine and non-auto-scaling batch systems.  The currently supported "
426                                          f"choices are {provisioner_choices}.  The default is {config.provisioner}.")
427    autoscaling_options.add_argument('--nodeTypes', default=None,
428                                     help="Specifies a list of comma-separated node types, each of which is "
429                                          "composed of slash-separated instance types, and an optional spot "
430                                          "bid set off by a colon, making the node type preemptable. Instance "
431                                          "types may appear in multiple node types, and the same node type "
432                                          "may appear as both preemptable and non-preemptable.\n"
433                                          "Valid argument specifying two node types:\n"
434                                          "\tc5.4xlarge/c5a.4xlarge:0.42,t2.large\n"
435                                          "Node types:\n"
436                                          "\tc5.4xlarge/c5a.4xlarge:0.42 and t2.large\n"
437                                          "Instance types:\n"
438                                          "\tc5.4xlarge, c5a.4xlarge, and t2.large\n"
439                                          "Semantics:\n"
440                                          "\tBid $0.42/hour for either c5.4xlarge or c5a.4xlarge instances,\n"
441                                          "\ttreated interchangeably, while they are available at that price,\n"
442                                          "\tand buy t2.large instances at full price")
443    autoscaling_options.add_argument('--minNodes', default=None,
444                                     help="Mininum number of nodes of each type in the cluster, if using "
445                                          "auto-scaling.  This should be provided as a comma-separated list of the "
446                                          "same length as the list of node types. default=0")
447    autoscaling_options.add_argument('--maxNodes', default=None,
448                                     help=f"Maximum number of nodes of each type in the cluster, if using autoscaling, "
449                                          f"provided as a comma-separated list.  The first value is used as a default "
450                                          f"if the list length is less than the number of nodeTypes.  "
451                                          f"default={config.maxNodes[0]}")
452    autoscaling_options.add_argument("--targetTime", dest="targetTime", default=None,
453                                     help=f"Sets how rapidly you aim to complete jobs in seconds. Shorter times mean "
454                                          f"more aggressive parallelization. The autoscaler attempts to scale up/down "
455                                          f"so that it expects all queued jobs will complete within targetTime "
456                                          f"seconds.  default={config.targetTime}")
457    autoscaling_options.add_argument("--betaInertia", dest="betaInertia", default=None,
458                                     help=f"A smoothing parameter to prevent unnecessary oscillations in the number "
459                                          f"of provisioned nodes. This controls an exponentially weighted moving "
460                                          f"average of the estimated number of nodes. A value of 0.0 disables any "
461                                          f"smoothing, and a value of 0.9 will smooth so much that few changes will "
462                                          f"ever be made.  Must be between 0.0 and 0.9.  default={config.betaInertia}")
463    autoscaling_options.add_argument("--scaleInterval", dest="scaleInterval", default=None,
464                                     help=f"The interval (seconds) between assessing if the scale of "
465                                          f"the cluster needs to change. default={config.scaleInterval}")
466    autoscaling_options.add_argument("--preemptableCompensation", dest="preemptableCompensation", default=None,
467                                     help=f"The preference of the autoscaler to replace preemptable nodes with "
468                                          f"non-preemptable nodes, when preemptable nodes cannot be started for some "
469                                          f"reason. Defaults to {config.preemptableCompensation}. This value must be "
470                                          f"between 0.0 and 1.0, inclusive.  A value of 0.0 disables such "
471                                          f"compensation, a value of 0.5 compensates two missing preemptable nodes "
472                                          f"with a non-preemptable one. A value of 1.0 replaces every missing "
473                                          f"pre-emptable node with a non-preemptable one.")
474    autoscaling_options.add_argument("--nodeStorage", dest="nodeStorage", default=50,
475                                     help="Specify the size of the root volume of worker nodes when they are launched "
476                                          "in gigabytes. You may want to set this if your jobs require a lot of disk "
477                                          "space.  (default: %(default)s).")
478    autoscaling_options.add_argument('--nodeStorageOverrides', default=None,
479                                     help="Comma-separated list of nodeType:nodeStorage that are used to override "
480                                          "the default value from --nodeStorage for the specified nodeType(s).  "
481                                          "This is useful for heterogeneous jobs where some tasks require much more "
482                                          "disk than others.")
483    autoscaling_options.add_argument("--metrics", dest="metrics", default=False, action="store_true",
484                                     help="Enable the prometheus/grafana dashboard for monitoring CPU/RAM usage, "
485                                          "queue size, and issued jobs.")
486
487    # Parameters to limit service jobs / detect service deadlocks
488    if not config.cwl:
489        service_options = parser.add_argument_group(
490            title="Toil options for limiting the number of service jobs and detecting service deadlocks",
491            description="Allows the specification of the maximum number of service jobs in a cluster.  By keeping "
492                        "this limited we can avoid nodes occupied with services causing deadlocks."
493        )
494        service_options.add_argument("--maxServiceJobs", dest="maxServiceJobs", default=None, type=int,
495                                     help=f"The maximum number of service jobs that can be run concurrently, "
496                                          f"excluding service jobs running on preemptable nodes.  "
497                                          f"default={config.maxServiceJobs}")
498        service_options.add_argument("--maxPreemptableServiceJobs", dest="maxPreemptableServiceJobs", default=None,
499                                     type=int,
500                                     help=f"The maximum number of service jobs that can run concurrently on "
501                                          f"preemptable nodes.  default={config.maxPreemptableServiceJobs}")
502        service_options.add_argument("--deadlockWait", dest="deadlockWait", default=None, type=int,
503                                     help=f"Time, in seconds, to tolerate the workflow running only the same service "
504                                          f"jobs, with no jobs to use them, before declaring the workflow to be "
505                                          f"deadlocked and stopping.  default={config.deadlockWait}")
506        service_options.add_argument("--deadlockCheckInterval", dest="deadlockCheckInterval", default=None, type=int,
507                                     help="Time, in seconds, to wait between checks to see if the workflow is stuck "
508                                          "running only service jobs, with no jobs to use them. Should be shorter "
509                                          "than --deadlockWait. May need to be increased if the batch system cannot "
510                                          "enumerate running jobs quickly enough, or if polling for running jobs is "
511                                          "placing an unacceptable load on a shared cluster.  "
512                                          "default={config.deadlockCheckInterval}")
513
514    # Resource requirements
515    resource_options = parser.add_argument_group(
516        title="Toil options for cores/memory requirements.",
517        description="The options to specify default cores/memory requirements (if not specified by the jobs "
518                    "themselves), and to limit the total amount of memory/cores requested from the batch system."
519    )
520    resource_help_msg = ('The {} amount of {} to request for a job.  '
521                         'Only applicable to jobs that do not specify an explicit value for this requirement.  '
522                         '{}.  '
523                         'Default is {}.')
524    cpu_note = 'Fractions of a core (for example 0.1) are supported on some batch systems [mesos, single_machine]'
525    disk_mem_note = 'Standard suffixes like K, Ki, M, Mi, G or Gi are supported'
526    resource_options.add_argument('--defaultMemory', dest='defaultMemory', default=None, metavar='INT',
527                                  help=resource_help_msg.format('default', 'memory', disk_mem_note,
528                                                                bytes2human(config.defaultMemory)))
529    resource_options.add_argument('--defaultCores', dest='defaultCores', default=None, metavar='FLOAT',
530                                  help=resource_help_msg.format('default', 'cpu', cpu_note, str(config.defaultCores)))
531    resource_options.add_argument('--defaultDisk', dest='defaultDisk', default=None, metavar='INT',
532                                  help=resource_help_msg.format('default', 'disk', disk_mem_note,
533                                                                bytes2human(config.defaultDisk)))
534    resource_options.add_argument('--defaultPreemptable', dest='defaultPreemptable', metavar='BOOL',
535                                  type='bool', nargs='?', const=True, default=False,
536                                  help='Make all jobs able to run on preemptable (spot) nodes by default.')
537    resource_options.add_argument('--maxCores', dest='maxCores', default=None, metavar='INT',
538                                  help=resource_help_msg.format('max', 'cpu', cpu_note, str(config.maxCores)))
539    resource_options.add_argument('--maxMemory', dest='maxMemory', default=None, metavar='INT',
540                                  help=resource_help_msg.format('max', 'memory', disk_mem_note,
541                                                                bytes2human(config.maxMemory)))
542    resource_options.add_argument('--maxDisk', dest='maxDisk', default=None, metavar='INT',
543                                  help=resource_help_msg.format('max', 'disk', disk_mem_note,
544                                                                bytes2human(config.maxDisk)))
545
546    # Retrying/rescuing jobs
547    job_options = parser.add_argument_group(
548        title="Toil options for rescuing/killing/restarting jobs.",
549        description="The options for jobs that either run too long/fail or get lost (some batch systems have issues!)."
550    )
551    job_options.add_argument("--retryCount", dest="retryCount", default=None,
552                             help=f"Number of times to retry a failing job before giving up and "
553                                  f"labeling job failed. default={config.retryCount}")
554    job_options.add_argument("--enableUnlimitedPreemptableRetries", dest="enableUnlimitedPreemptableRetries",
555                             action='store_true', default=False,
556                             help="If set, preemptable failures (or any failure due to an instance getting "
557                                  "unexpectedly terminated) will not count towards job failures and --retryCount.")
558    job_options.add_argument("--doubleMem", dest="doubleMem", action='store_true', default=False,
559                             help="If set, batch jobs which die to reaching memory limit on batch schedulers "
560                                  "will have their memory doubled and they will be retried. The remaining "
561                                  "retry count will be reduced by 1. Currently supported by LSF.")
562    job_options.add_argument("--maxJobDuration", dest="maxJobDuration", default=None,
563                             help=f"Maximum runtime of a job (in seconds) before we kill it (this is a lower bound, "
564                                  f"and the actual time before killing the job may be longer).  "
565                                  f"default={config.maxJobDuration}")
566    job_options.add_argument("--rescueJobsFrequency", dest="rescueJobsFrequency", default=None,
567                             help=f"Period of time to wait (in seconds) between checking for missing/overlong jobs, "
568                                  f"that is jobs which get lost by the batch system. Expert parameter.  "
569                                  f"default={config.rescueJobsFrequency}")
570
571    # Debug options
572    debug_options = parser.add_argument_group(
573        title="Toil debug options.",
574        description="Debug options for finding problems or helping with testing."
575    )
576    debug_options.add_argument("--debugWorker", default=False, action="store_true",
577                               help="Experimental no forking mode for local debugging.  Specifically, workers "
578                                    "are not forked and stderr/stdout are not redirected to the log.")
579    debug_options.add_argument("--disableWorkerOutputCapture", default=False, action="store_true",
580                               help="Let worker output go to worker's standard out/error instead of per-job logs.")
581    debug_options.add_argument("--badWorker", dest="badWorker", default=None,
582                               help=f"For testing purposes randomly kill --badWorker proportion of jobs using "
583                                    f"SIGKILL.  default={config.badWorker}")
584    debug_options.add_argument("--badWorkerFailInterval", dest="badWorkerFailInterval", default=None,
585                               help=f"When killing the job pick uniformly within the interval from 0.0 to "
586                                    f"--badWorkerFailInterval seconds after the worker starts.  "
587                                    f"default={config.badWorkerFailInterval}")
588
589    # Misc options
590    misc_options = parser.add_argument_group(
591        title="Toil miscellaneous options.",
592        description="Everything else."
593    )
594    misc_options.add_argument('--disableCaching', dest='disableCaching', type='bool', nargs='?', const=True,
595                              default=False,
596                              help='Disables caching in the file store. This flag must be set to use '
597                                   'a batch system that does not support cleanup, such as Parasol.')
598    misc_options.add_argument('--disableChaining', dest='disableChaining', action='store_true', default=False,
599                              help="Disables chaining of jobs (chaining uses one job's resource allocation "
600                                   "for its successor job if possible).")
601    misc_options.add_argument("--disableJobStoreChecksumVerification", dest="disableJobStoreChecksumVerification",
602                              default=False, action="store_true",
603                              help="Disables checksum verification for files transferred to/from the job store.  "
604                                   "Checksum verification is a safety check to ensure the data is not corrupted "
605                                   "during transfer. Currently only supported for non-streaming AWS files.")
606    misc_options.add_argument("--maxLogFileSize", dest="maxLogFileSize", default=None,
607                              help=f"The maximum size of a job log file to keep (in bytes), log files larger than "
608                                   f"this will be truncated to the last X bytes. Setting this option to zero will "
609                                   f"prevent any truncation. Setting this option to a negative value will truncate "
610                                   f"from the beginning.  Default={bytes2human(config.maxLogFileSize)}")
611    misc_options.add_argument("--writeLogs", dest="writeLogs", nargs='?', action='store', default=None,
612                              const=os.getcwd(),
613                              help="Write worker logs received by the leader into their own files at the specified "
614                                   "path. Any non-empty standard output and error from failed batch system jobs will "
615                                   "also be written into files at this path.  The current working directory will be "
616                                   "used if a path is not specified explicitly. Note: By default only the logs of "
617                                   "failed jobs are returned to leader. Set log level to 'debug' or enable "
618                                   "'--writeLogsFromAllJobs' to get logs back from successful jobs, and adjust "
619                                   "'maxLogFileSize' to control the truncation limit for worker logs.")
620    misc_options.add_argument("--writeLogsGzip", dest="writeLogsGzip", nargs='?', action='store', default=None,
621                              const=os.getcwd(),
622                              help="Identical to --writeLogs except the logs files are gzipped on the leader.")
623    misc_options.add_argument("--writeLogsFromAllJobs", dest="writeLogsFromAllJobs", action='store_true',
624                              default=False,
625                              help="Whether to write logs from all jobs (including the successful ones) without "
626                                   "necessarily setting the log level to 'debug'. Ensure that either --writeLogs "
627                                   "or --writeLogsGzip is set if enabling this option.")
628    misc_options.add_argument("--realTimeLogging", dest="realTimeLogging", action="store_true", default=False,
629                              help="Enable real-time logging from workers to masters")
630    misc_options.add_argument("--sseKey", dest="sseKey", default=None,
631                              help="Path to file containing 32 character key to be used for server-side encryption on "
632                                   "awsJobStore or googleJobStore. SSE will not be used if this flag is not passed.")
633    misc_options.add_argument("--setEnv", '-e', metavar='NAME=VALUE or NAME', dest="environment", default=[],
634                              action="append",
635                              help="Set an environment variable early on in the worker. If VALUE is omitted, it will "
636                                   "be looked up in the current environment. Independently of this option, the worker "
637                                   "will try to emulate the leader's environment before running a job, except for "
638                                   "some variables known to vary across systems.  Using this option, a variable can "
639                                   "be injected into the worker process itself before it is started.")
640    misc_options.add_argument("--servicePollingInterval", dest="servicePollingInterval", default=None,
641                              help=f"Interval of time service jobs wait between polling for the existence of the "
642                                   f"keep-alive flag.  Default: {config.servicePollingInterval}")
643    misc_options.add_argument('--forceDockerAppliance', dest='forceDockerAppliance', action='store_true', default=False,
644                              help='Disables sanity checking the existence of the docker image specified by '
645                                   'TOIL_APPLIANCE_SELF, which Toil uses to provision mesos for autoscaling.')
646    misc_options.add_argument('--disableProgress', dest='disableProgress', action='store_true', default=False,
647                              help="Disables the progress bar shown when standard error is a terminal.")
648
649
650def parseBool(val):
651    if val.lower() in ['true', 't', 'yes', 'y', 'on', '1']:
652        return True
653    elif val.lower() in ['false', 'f', 'no', 'n', 'off', '0']:
654        return False
655    else:
656        raise RuntimeError("Could not interpret \"%s\" as a boolean value" % val)
657
658
659def getNodeID() -> str:
660    """
661    Return unique ID of the current node (host). The resulting string will be convertable to a uuid.UUID.
662
663    Tries several methods until success. The returned ID should be identical across calls from different processes on
664    the same node at least until the next OS reboot.
665
666    The last resort method is uuid.getnode() that in some rare OS configurations may return a random ID each time it is
667    called. However, this method should never be reached on a Linux system, because reading from
668    /proc/sys/kernel/random/boot_id will be tried prior to that. If uuid.getnode() is reached, it will be called twice,
669    and exception raised if the values are not identical.
670    """
671    for idSourceFile in ["/var/lib/dbus/machine-id", "/proc/sys/kernel/random/boot_id"]:
672        if os.path.exists(idSourceFile):
673            try:
674                with open(idSourceFile, "r") as inp:
675                    nodeID = inp.readline().strip()
676            except EnvironmentError:
677                logger.warning(f"Exception when trying to read ID file {idSourceFile}.  "
678                               f"Will try next method to get node ID.", exc_info=True)
679            else:
680                if len(nodeID.split()) == 1:
681                    logger.debug(f"Obtained node ID {nodeID} from file {idSourceFile}")
682                    break
683                else:
684                    logger.warning(f"Node ID {nodeID} from file {idSourceFile} contains spaces.  "
685                                   f"Will try next method to get node ID.")
686    else:
687        nodeIDs = []
688        for i_call in range(2):
689            nodeID = str(uuid.getnode()).strip()
690            if len(nodeID.split()) == 1:
691                nodeIDs.append(nodeID)
692            else:
693                logger.warning(f"Node ID {nodeID} from uuid.getnode() contains spaces")
694        nodeID = ""
695        if len(nodeIDs) == 2:
696            if nodeIDs[0] == nodeIDs[1]:
697                nodeID = nodeIDs[0]
698            else:
699                logger.warning(f"Different node IDs {nodeIDs} received from repeated calls to uuid.getnode().  "
700                               f"You should use another method to generate node ID.")
701
702            logger.debug(f"Obtained node ID {nodeID} from uuid.getnode()")
703    if not nodeID:
704        logger.warning("Failed to generate stable node ID, returning empty string. If you see this message with a "
705                       "work dir on a shared file system when using workers running on multiple nodes, you might "
706                       "experience cryptic job failures")
707    if len(nodeID.replace('-', '')) < UUID_LENGTH:
708        # Some platforms (Mac) give us not enough actual hex characters.
709        # Repeat them so the result is convertable to a uuid.UUID
710        nodeID = nodeID.replace('-', '')
711        num_repeats = UUID_LENGTH // len(nodeID) + 1
712        nodeID = nodeID * num_repeats
713        nodeID = nodeID[:UUID_LENGTH]
714    return nodeID
715
716
717class Toil:
718    """
719    A context manager that represents a Toil workflow, specifically the batch system, job store,
720    and its configuration.
721    """
722
723    def __init__(self, options):
724        """
725        Initialize a Toil object from the given options. Note that this is very light-weight and
726        that the bulk of the work is done when the context is entered.
727
728        :param argparse.Namespace options: command line options specified by the user
729        """
730        super(Toil, self).__init__()
731        self.options = options
732        self.config = None
733        """
734        :type: toil.common.Config
735        """
736        self._jobStore = None
737        """
738        :type: toil.jobStores.abstractJobStore.AbstractJobStore
739        """
740        self._batchSystem = None
741        """
742        :type: toil.batchSystems.abstractBatchSystem.AbstractBatchSystem
743        """
744        self._provisioner = None
745        """
746        :type: toil.provisioners.abstractProvisioner.AbstractProvisioner
747        """
748        self._jobCache = dict()
749        self._inContextManager = False
750        self._inRestart = False
751
752    def __enter__(self):
753        """
754        Derive configuration from the command line options, load the job store and, on restart,
755        consolidate the derived configuration with the one from the previous invocation of the
756        workflow.
757        """
758        set_logging_from_options(self.options)
759        config = Config()
760        config.setOptions(self.options)
761        jobStore = self.getJobStore(config.jobStore)
762        if not config.restart:
763            config.workflowAttemptNumber = 0
764            jobStore.initialize(config)
765        else:
766            jobStore.resume()
767            # Merge configuration from job store with command line options
768            config = jobStore.config
769            config.setOptions(self.options)
770            config.workflowAttemptNumber += 1
771            jobStore.writeConfig()
772        self.config = config
773        self._jobStore = jobStore
774        self._inContextManager = True
775        return self
776
777    # noinspection PyUnusedLocal
778    def __exit__(self, exc_type, exc_val, exc_tb):
779        """
780        Clean up after a workflow invocation. Depending on the configuration, delete the job store.
781        """
782        try:
783            if (exc_type is not None and self.config.clean == "onError" or
784                            exc_type is None and self.config.clean == "onSuccess" or
785                        self.config.clean == "always"):
786
787                try:
788                    if self.config.restart and not self._inRestart:
789                        pass
790                    else:
791                        self._jobStore.destroy()
792                        logger.info("Successfully deleted the job store: %s" % str(self._jobStore))
793                except:
794                    logger.info("Failed to delete the job store: %s" % str(self._jobStore))
795                    raise
796        except Exception as e:
797            if exc_type is None:
798                raise
799            else:
800                logger.exception('The following error was raised during clean up:')
801        self._inContextManager = False
802        self._inRestart = False
803        return False  # let exceptions through
804
805    def start(self, rootJob):
806        """
807        Invoke a Toil workflow with the given job as the root for an initial run. This method
808        must be called in the body of a ``with Toil(...) as toil:`` statement. This method should
809        not be called more than once for a workflow that has not finished.
810
811        :param toil.job.Job rootJob: The root job of the workflow
812        :return: The root job's return value
813        """
814        self._assertContextManagerUsed()
815        self.writePIDFile()
816        if self.config.restart:
817            raise ToilRestartException('A Toil workflow can only be started once. Use '
818                                       'Toil.restart() to resume it.')
819
820        self._batchSystem = self.createBatchSystem(self.config)
821        self._setupAutoDeployment(rootJob.getUserScript())
822        try:
823            self._setBatchSystemEnvVars()
824            self._serialiseEnv()
825            self._cacheAllJobs()
826
827            # Pickle the promised return value of the root job, then write the pickled promise to
828            # a shared file, where we can find and unpickle it at the end of the workflow.
829            # Unpickling the promise will automatically substitute the promise for the actual
830            # return value.
831            with self._jobStore.writeSharedFileStream('rootJobReturnValue') as fH:
832                rootJob.prepareForPromiseRegistration(self._jobStore)
833                promise = rootJob.rv()
834                pickle.dump(promise, fH, protocol=pickle.HIGHEST_PROTOCOL)
835
836            # Setup the first JobDescription and cache it
837            rootJobDescription = rootJob.saveAsRootJob(self._jobStore)
838            self._cacheJob(rootJobDescription)
839
840            self._setProvisioner()
841            return self._runMainLoop(rootJobDescription)
842        finally:
843            self._shutdownBatchSystem()
844
845    def restart(self):
846        """
847        Restarts a workflow that has been interrupted.
848
849        :return: The root job's return value
850        """
851        self._inRestart = True
852        self._assertContextManagerUsed()
853        self.writePIDFile()
854        if not self.config.restart:
855            raise ToilRestartException('A Toil workflow must be initiated with Toil.start(), '
856                                       'not restart().')
857
858        from toil.job import JobException
859        try:
860            self._jobStore.loadRootJob()
861        except JobException:
862            logger.warning(
863                'Requested restart but the workflow has already been completed; allowing exports to rerun.')
864            return self._jobStore.getRootJobReturnValue()
865
866        self._batchSystem = self.createBatchSystem(self.config)
867        self._setupAutoDeployment()
868        try:
869            self._setBatchSystemEnvVars()
870            self._serialiseEnv()
871            self._cacheAllJobs()
872            self._setProvisioner()
873            rootJobDescription = self._jobStore.clean(jobCache=self._jobCache)
874            return self._runMainLoop(rootJobDescription)
875        finally:
876            self._shutdownBatchSystem()
877
878    def _setProvisioner(self):
879        if self.config.provisioner is None:
880            self._provisioner = None
881        else:
882            self._provisioner = cluster_factory(provisioner=self.config.provisioner,
883                                                clusterName=None,
884                                                zone=None,  # read from instance meta-data
885                                                nodeStorage=self.config.nodeStorage,
886                                                nodeStorageOverrides=self.config.nodeStorageOverrides,
887                                                sseKey=self.config.sseKey)
888            self._provisioner.setAutoscaledNodeTypes(self.config.nodeTypes)
889
890    @classmethod
891    def getJobStore(cls, locator):
892        """
893        Create an instance of the concrete job store implementation that matches the given locator.
894
895        :param str locator: The location of the job store to be represent by the instance
896
897        :return: an instance of a concrete subclass of AbstractJobStore
898        :rtype: toil.jobStores.abstractJobStore.AbstractJobStore
899        """
900        name, rest = cls.parseLocator(locator)
901        if name == 'file':
902            from toil.jobStores.fileJobStore import FileJobStore
903            return FileJobStore(rest)
904        elif name == 'aws':
905            from toil.jobStores.aws.jobStore import AWSJobStore
906            return AWSJobStore(rest)
907        elif name == 'google':
908            from toil.jobStores.googleJobStore import GoogleJobStore
909            return GoogleJobStore(rest)
910        else:
911            raise RuntimeError("Unknown job store implementation '%s'" % name)
912
913    @staticmethod
914    def parseLocator(locator):
915        if locator[0] in '/.' or ':' not in locator:
916            return 'file', locator
917        else:
918            try:
919                name, rest = locator.split(':', 1)
920            except ValueError:
921                raise RuntimeError('Invalid job store locator syntax.')
922            else:
923                return name, rest
924
925    @staticmethod
926    def buildLocator(name, rest):
927        assert ':' not in name
928        return f'{name}:{rest}'
929
930    @classmethod
931    def resumeJobStore(cls, locator):
932        jobStore = cls.getJobStore(locator)
933        jobStore.resume()
934        return jobStore
935
936    @staticmethod
937    def createBatchSystem(config):
938        """
939        Creates an instance of the batch system specified in the given config.
940
941        :param toil.common.Config config: the current configuration
942
943        :rtype: batchSystems.abstractBatchSystem.AbstractBatchSystem
944
945        :return: an instance of a concrete subclass of AbstractBatchSystem
946        """
947        kwargs = dict(config=config,
948                      maxCores=config.maxCores,
949                      maxMemory=config.maxMemory,
950                      maxDisk=config.maxDisk)
951
952        from toil.batchSystems.registry import BATCH_SYSTEM_FACTORY_REGISTRY
953
954        try:
955            batch_system = BATCH_SYSTEM_FACTORY_REGISTRY[config.batchSystem]()
956        except:
957            raise RuntimeError(f'Unrecognized batch system: {config.batchSystem}')
958
959        if not config.disableCaching and not batch_system.supportsWorkerCleanup():
960            raise RuntimeError(f'{config.batchSystem} currently does not support shared caching, because it '
961                               'does not support cleaning up a worker after the last job '
962                               'finishes. Set the --disableCaching flag if you want to '
963                               'use this batch system.')
964        logger.debug('Using the %s' % re.sub("([a-z])([A-Z])", r"\g<1> \g<2>", batch_system.__name__).lower())
965
966        return batch_system(**kwargs)
967
968    def _setupAutoDeployment(self, userScript=None):
969        """
970        Determine the user script, save it to the job store and inject a reference to the saved
971        copy into the batch system such that it can auto-deploy the resource on the worker
972        nodes.
973
974        :param toil.resource.ModuleDescriptor userScript: the module descriptor referencing the
975               user script. If None, it will be looked up in the job store.
976        """
977        if userScript is not None:
978            # This branch is hit when a workflow is being started
979            if userScript.belongsToToil:
980                logger.debug('User script %s belongs to Toil. No need to auto-deploy it.', userScript)
981                userScript = None
982            else:
983                if (self._batchSystem.supportsAutoDeployment() and
984                        not self.config.disableAutoDeployment):
985                    # Note that by saving the ModuleDescriptor, and not the Resource we allow for
986                    # redeploying a potentially modified user script on workflow restarts.
987                    with self._jobStore.writeSharedFileStream('userScript') as f:
988                        pickle.dump(userScript, f, protocol=pickle.HIGHEST_PROTOCOL)
989                else:
990                    from toil.batchSystems.singleMachine import \
991                        SingleMachineBatchSystem
992                    if not isinstance(self._batchSystem, SingleMachineBatchSystem):
993                        logger.warning('Batch system does not support auto-deployment. The user '
994                                    'script %s will have to be present at the same location on '
995                                    'every worker.', userScript)
996                    userScript = None
997        else:
998            # This branch is hit on restarts
999            if (self._batchSystem.supportsAutoDeployment() and
1000                not self.config.disableAutoDeployment):
1001                # We could deploy a user script
1002                from toil.jobStores.abstractJobStore import NoSuchFileException
1003                try:
1004                    with self._jobStore.readSharedFileStream('userScript') as f:
1005                        userScript = safeUnpickleFromStream(f)
1006                except NoSuchFileException:
1007                    logger.debug('User script neither set explicitly nor present in the job store.')
1008                    userScript = None
1009        if userScript is None:
1010            logger.debug('No user script to auto-deploy.')
1011        else:
1012            logger.debug('Saving user script %s as a resource', userScript)
1013            userScriptResource = userScript.saveAsResourceTo(self._jobStore)
1014            logger.debug('Injecting user script %s into batch system.', userScriptResource)
1015            self._batchSystem.setUserScript(userScriptResource)
1016
1017    def importFile(self, srcUrl, sharedFileName=None, symlink=False):
1018        """
1019        Imports the file at the given URL into job store.
1020
1021        See :func:`toil.jobStores.abstractJobStore.AbstractJobStore.importFile` for a
1022        full description
1023        """
1024        self._assertContextManagerUsed()
1025        return self._jobStore.importFile(srcUrl, sharedFileName=sharedFileName, symlink=symlink)
1026
1027    def exportFile(self, jobStoreFileID, dstUrl):
1028        """
1029        Exports file to destination pointed at by the destination URL.
1030
1031        See :func:`toil.jobStores.abstractJobStore.AbstractJobStore.exportFile` for a
1032        full description
1033        """
1034        self._assertContextManagerUsed()
1035        self._jobStore.exportFile(jobStoreFileID, dstUrl)
1036
1037    def _setBatchSystemEnvVars(self):
1038        """
1039        Sets the environment variables required by the job store and those passed on command line.
1040        """
1041        for envDict in (self._jobStore.getEnv(), self.config.environment):
1042            for k, v in envDict.items():
1043                self._batchSystem.setEnv(k, v)
1044
1045    def _serialiseEnv(self):
1046        """
1047        Puts the environment in a globally accessible pickle file.
1048        """
1049        # Dump out the environment of this process in the environment pickle file.
1050        with self._jobStore.writeSharedFileStream("environment.pickle") as fileHandle:
1051            pickle.dump(dict(os.environ), fileHandle, pickle.HIGHEST_PROTOCOL)
1052        logger.debug("Written the environment for the jobs to the environment file")
1053
1054    def _cacheAllJobs(self):
1055        """
1056        Downloads all jobs in the current job store into self.jobCache.
1057        """
1058        logger.debug('Caching all jobs in job store')
1059        self._jobCache = {jobDesc.jobStoreID: jobDesc for jobDesc in self._jobStore.jobs()}
1060        logger.debug('{} jobs downloaded.'.format(len(self._jobCache)))
1061
1062    def _cacheJob(self, job):
1063        """
1064        Adds given job to current job cache.
1065
1066        :param toil.job.JobDescription job: job to be added to current job cache
1067        """
1068        self._jobCache[job.jobStoreID] = job
1069
1070    @staticmethod
1071    def getToilWorkDir(configWorkDir: Optional[str] = None) -> str:
1072        """
1073        Returns a path to a writable directory under which per-workflow
1074        directories exist.  This directory is always required to exist on a
1075        machine, even if the Toil worker has not run yet.  If your workers and
1076        leader have different temp directories, you may need to set
1077        TOIL_WORKDIR.
1078
1079        :param str configWorkDir: Value passed to the program using the --workDir flag
1080        :return: Path to the Toil work directory, constant across all machines
1081        :rtype: str
1082        """
1083        workDir = os.getenv('TOIL_WORKDIR_OVERRIDE') or configWorkDir or os.getenv('TOIL_WORKDIR') or tempfile.gettempdir()
1084        if not os.path.exists(workDir):
1085            raise RuntimeError(f'The directory specified by --workDir or TOIL_WORKDIR ({workDir}) does not exist.')
1086        return workDir
1087
1088    @classmethod
1089    def getLocalWorkflowDir(cls, workflowID, configWorkDir=None):
1090        """
1091        Returns a path to the directory where worker directories and the cache will be located
1092        for this workflow on this machine.
1093
1094        :param str configWorkDir: Value passed to the program using the --workDir flag
1095        :return: Path to the local workflow directory on this machine
1096        :rtype: str
1097        """
1098        # Get the global Toil work directory. This ensures that it exists.
1099        base = cls.getToilWorkDir(configWorkDir=configWorkDir)
1100
1101        # Create a directory unique to each host in case workDir is on a shared FS.
1102        # This prevents workers on different nodes from erasing each other's directories.
1103        workflowDir: str = os.path.join(base, str(uuid.uuid5(uuid.UUID(getNodeID()), workflowID)).replace('-', ''))
1104        try:
1105            # Directory creation is atomic
1106            os.mkdir(workflowDir)
1107        except OSError as err:
1108            if err.errno != 17:
1109                # The directory exists if a previous worker set it up.
1110                raise
1111        else:
1112            logger.debug('Created the workflow directory for this machine at %s' % workflowDir)
1113        return workflowDir
1114
1115    def _runMainLoop(self, rootJob):
1116        """
1117        Runs the main loop with the given job.
1118        :param toil.job.Job rootJob: The root job for the workflow.
1119        :rtype: Any
1120        """
1121        logProcessContext(self.config)
1122
1123        with RealtimeLogger(self._batchSystem,
1124                            level=self.options.logLevel if self.options.realTimeLogging else None):
1125            # FIXME: common should not import from leader
1126            from toil.leader import Leader
1127            return Leader(config=self.config,
1128                          batchSystem=self._batchSystem,
1129                          provisioner=self._provisioner,
1130                          jobStore=self._jobStore,
1131                          rootJob=rootJob,
1132                          jobCache=self._jobCache).run()
1133
1134    def _shutdownBatchSystem(self):
1135        """
1136        Shuts down current batch system if it has been created.
1137        """
1138        assert self._batchSystem is not None
1139
1140        startTime = time.time()
1141        logger.debug('Shutting down batch system ...')
1142        self._batchSystem.shutdown()
1143        logger.debug('... finished shutting down the batch system in %s seconds.'
1144                     % (time.time() - startTime))
1145
1146    def _assertContextManagerUsed(self):
1147        if not self._inContextManager:
1148            raise ToilContextManagerException()
1149
1150    def writePIDFile(self):
1151        """
1152        Write a the pid of this process to a file in the jobstore.
1153
1154        Overwriting the current contents of pid.log is a feature, not a bug of this method.
1155        Other methods will rely on always having the most current pid available.
1156        So far there is no reason to store any old pids.
1157        """
1158        with self._jobStore.writeSharedFileStream('pid.log') as f:
1159            f.write(str(os.getpid()).encode('utf-8'))
1160
1161
1162class ToilRestartException(Exception):
1163    def __init__(self, message):
1164        super(ToilRestartException, self).__init__(message)
1165
1166
1167class ToilContextManagerException(Exception):
1168    def __init__(self):
1169        super(ToilContextManagerException, self).__init__(
1170            'This method cannot be called outside the "with Toil(...)" context manager.')
1171
1172
1173class ToilMetrics:
1174    def __init__(self, provisioner=None):
1175        clusterName = 'none'
1176        region = 'us-west-2'
1177        if provisioner is not None:
1178            clusterName = provisioner.clusterName
1179            if provisioner._zone is not None:
1180                if provisioner.cloud == 'aws':
1181                    # Remove AZ name
1182                    region = zone_to_region(provisioner._zone)
1183                else:
1184                    region = provisioner._zone
1185
1186        registry = lookupEnvVar(name='docker registry',
1187                                envName='TOIL_DOCKER_REGISTRY',
1188                                defaultValue=dockerRegistry)
1189
1190        self.mtailImage = "%s/toil-mtail:%s" % (registry, dockerTag)
1191        self.grafanaImage = "%s/toil-grafana:%s" % (registry, dockerTag)
1192        self.prometheusImage = "%s/toil-prometheus:%s" % (registry, dockerTag)
1193
1194        self.startDashboard(clusterName=clusterName, zone=region)
1195
1196        # Always restart the mtail container, because metrics should start from scratch
1197        # for each workflow
1198        try:
1199            subprocess.check_call(["docker", "rm", "-f", "toil_mtail"])
1200        except subprocess.CalledProcessError:
1201            pass
1202
1203        try:
1204            self.mtailProc = subprocess.Popen(["docker", "run", "--rm", "--interactive",
1205                                               "--net=host",
1206                                               "--name", "toil_mtail",
1207                                               "-p", "3903:3903",
1208                                               self.mtailImage],
1209                                              stdin=subprocess.PIPE, stdout=subprocess.PIPE)
1210        except subprocess.CalledProcessError:
1211            logger.warning("Could not start toil metrics server.")
1212            self.mtailProc = None
1213        except KeyboardInterrupt:
1214            self.mtailProc.terminate()
1215
1216        # On single machine, launch a node exporter instance to monitor CPU/RAM usage.
1217        # On AWS this is handled by the EC2 init script
1218        self.nodeExporterProc = None
1219        if not provisioner:
1220            try:
1221                self.nodeExporterProc = subprocess.Popen(["docker", "run", "--rm",
1222                                                          "--net=host",
1223                                                          "-p", "9100:9100",
1224                                                          "-v", "/proc:/host/proc",
1225                                                          "-v", "/sys:/host/sys",
1226                                                          "-v", "/:/rootfs",
1227                                                          "quay.io/prometheus/node-exporter:0.15.2",
1228                                                          "-collector.procfs", "/host/proc",
1229                                                          "-collector.sysfs", "/host/sys",
1230                                                          "-collector.filesystem.ignored-mount-points",
1231                                                          "^/(sys|proc|dev|host|etc)($|/)"])
1232            except subprocess.CalledProcessError:
1233                logger.warning(
1234                    "Couldn't start node exporter, won't get RAM and CPU usage for dashboard.")
1235                self.nodeExporterProc = None
1236            except KeyboardInterrupt:
1237                self.nodeExporterProc.terminate()
1238
1239    @staticmethod
1240    def _containerRunning(containerName):
1241        try:
1242            result = subprocess.check_output(["docker", "inspect", "-f",
1243                                              "'{{.State.Running}}'", containerName]).decode('utf-8') == "true"
1244        except subprocess.CalledProcessError:
1245            result = False
1246        return result
1247
1248    def startDashboard(self, clusterName, zone):
1249        try:
1250            if not self._containerRunning("toil_prometheus"):
1251                try:
1252                    subprocess.check_call(["docker", "rm", "-f", "toil_prometheus"])
1253                except subprocess.CalledProcessError:
1254                    pass
1255                subprocess.check_call(["docker", "run",
1256                                       "--name", "toil_prometheus",
1257                                       "--net=host",
1258                                       "-d",
1259                                       "-p", "9090:9090",
1260                                       self.prometheusImage,
1261                                       clusterName,
1262                                       zone])
1263
1264            if not self._containerRunning("toil_grafana"):
1265                try:
1266                    subprocess.check_call(["docker", "rm", "-f", "toil_grafana"])
1267                except subprocess.CalledProcessError:
1268                    pass
1269                subprocess.check_call(["docker", "run",
1270                                       "--name", "toil_grafana",
1271                                       "-d", "-p=3000:3000",
1272                                       self.grafanaImage])
1273        except subprocess.CalledProcessError:
1274            logger.warning("Could not start prometheus/grafana dashboard.")
1275            return
1276
1277        try:
1278            self.add_prometheus_data_source()
1279        except requests.exceptions.ConnectionError:
1280            logger.debug("Could not add data source to Grafana dashboard - no metrics will be displayed.")
1281
1282    @retry(errors=[requests.exceptions.ConnectionError])
1283    def add_prometheus_data_source(self):
1284        requests.post(
1285            'http://localhost:3000/api/datasources',
1286            auth=('admin', 'admin'),
1287            data='{"name":"DS_PROMETHEUS","type":"prometheus", "url":"http://localhost:9090", "access":"direct"}',
1288            headers={'content-type': 'application/json', "access": "direct"}
1289        )
1290
1291    def log(self, message):
1292        if self.mtailProc:
1293            self.mtailProc.stdin.write((message + "\n").encode("utf-8"))
1294            self.mtailProc.stdin.flush()
1295
1296    # Note: The mtail configuration (dashboard/mtail/toil.mtail) depends on these messages
1297    # remaining intact
1298
1299    def logMissingJob(self):
1300        self.log("missing_job")
1301
1302    def logClusterSize(self, nodeType, currentSize, desiredSize):
1303        self.log("current_size '%s' %i" % (nodeType, currentSize))
1304        self.log("desired_size '%s' %i" % (nodeType, desiredSize))
1305
1306    def logQueueSize(self, queueSize):
1307        self.log("queue_size %i" % queueSize)
1308
1309    def logIssuedJob(self, jobType):
1310        self.log("issued_job %s" % jobType)
1311
1312    def logFailedJob(self, jobType):
1313        self.log("failed_job %s" % jobType)
1314
1315    def logCompletedJob(self, jobType):
1316        self.log("completed_job %s" % jobType)
1317
1318    def shutdown(self):
1319        if self.mtailProc:
1320            self.mtailProc.kill()
1321        if self.nodeExporterProc:
1322            self.nodeExporterProc.kill()
1323
1324
1325def parseSetEnv(l):
1326    """
1327    Parses a list of strings of the form "NAME=VALUE" or just "NAME" into a dictionary. Strings
1328    of the latter from will result in dictionary entries whose value is None.
1329
1330    :type l: list[str]
1331    :rtype: dict[str,str]
1332
1333    >>> parseSetEnv([])
1334    {}
1335    >>> parseSetEnv(['a'])
1336    {'a': None}
1337    >>> parseSetEnv(['a='])
1338    {'a': ''}
1339    >>> parseSetEnv(['a=b'])
1340    {'a': 'b'}
1341    >>> parseSetEnv(['a=a', 'a=b'])
1342    {'a': 'b'}
1343    >>> parseSetEnv(['a=b', 'c=d'])
1344    {'a': 'b', 'c': 'd'}
1345    >>> parseSetEnv(['a=b=c'])
1346    {'a': 'b=c'}
1347    >>> parseSetEnv([''])
1348    Traceback (most recent call last):
1349    ...
1350    ValueError: Empty name
1351    >>> parseSetEnv(['=1'])
1352    Traceback (most recent call last):
1353    ...
1354    ValueError: Empty name
1355    """
1356    d = dict()
1357    for i in l:
1358        try:
1359            k, v = i.split('=', 1)
1360        except ValueError:
1361            k, v = i, None
1362        if not k:
1363            raise ValueError('Empty name')
1364        d[k] = v
1365    return d
1366
1367
1368def iC(minValue, maxValue=sys.maxsize):
1369    # Returns function that checks if a given int is in the given half-open interval
1370    assert isinstance(minValue, int) and isinstance(maxValue, int)
1371    return lambda x: minValue <= x < maxValue
1372
1373
1374def fC(minValue, maxValue=None):
1375    # Returns function that checks if a given float is in the given half-open interval
1376    assert isinstance(minValue, float)
1377    if maxValue is None:
1378        return lambda x: minValue <= x
1379    else:
1380        assert isinstance(maxValue, float)
1381        return lambda x: minValue <= x < maxValue
1382
1383
1384def cacheDirName(workflowID):
1385    """
1386    :return: Name of the cache directory.
1387    """
1388    return f'cache-{workflowID}'
1389
1390
1391def getDirSizeRecursively(dirPath: str) -> int:
1392    """
1393    This method will return the cumulative number of bytes occupied by the files
1394    on disk in the directory and its subdirectories.
1395
1396    If the method is unable to access a file or directory (due to insufficient
1397    permissions, or due to the file or directory having been removed while this
1398    function was attempting to traverse it), the error will be handled
1399    internally, and a (possibly 0) lower bound on the size of the directory
1400    will be returned.
1401
1402    The environment variable 'BLOCKSIZE'='512' is set instead of the much cleaner
1403    --block-size=1 because Apple can't handle it.
1404
1405    :param str dirPath: A valid path to a directory or file.
1406    :return: Total size, in bytes, of the file or directory at dirPath.
1407    """
1408
1409    # du is often faster than using os.lstat(), sometimes significantly so.
1410
1411    # The call: 'du -s /some/path' should give the number of 512-byte blocks
1412    # allocated with the environment variable: BLOCKSIZE='512' set, and we
1413    # multiply this by 512 to return the filesize in bytes.
1414
1415    try:
1416        return int(subprocess.check_output(['du', '-s', dirPath],
1417                                           env=dict(os.environ, BLOCKSIZE='512')).decode('utf-8').split()[0]) * 512
1418    except subprocess.CalledProcessError:
1419        # Something was inaccessible or went away
1420        return 0
1421
1422
1423def getFileSystemSize(dirPath: str) -> Tuple[int, int]:
1424    """
1425    Return the free space, and total size of the file system hosting `dirPath`.
1426
1427    :param str dirPath: A valid path to a directory.
1428    :return: free space and total size of file system
1429    :rtype: tuple
1430    """
1431    assert os.path.exists(dirPath)
1432    diskStats = os.statvfs(dirPath)
1433    freeSpace = diskStats.f_frsize * diskStats.f_bavail
1434    diskSize = diskStats.f_frsize * diskStats.f_blocks
1435    return freeSpace, diskSize
1436
1437
1438def safeUnpickleFromStream(stream):
1439    string = stream.read()
1440    return pickle.loads(string)
1441