1# Copyright (C) 2015-2021 Regents of the University of California 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import logging 15import os 16import pickle 17import re 18import requests 19import subprocess 20import sys 21import tempfile 22import time 23import uuid 24 25from argparse import _ArgumentGroup, ArgumentParser, ArgumentDefaultsHelpFormatter 26from typing import Optional, Callable, Any, List, Tuple, Union 27 28from toil import logProcessContext, lookupEnvVar 29from toil.batchSystems.options import (add_all_batchsystem_options, 30 set_batchsystem_config_defaults, 31 set_batchsystem_options) 32from toil.lib.conversions import human2bytes, bytes2human 33from toil.lib.retry import retry 34from toil.provisioners import (add_provisioner_options, 35 parse_node_types, 36 check_valid_node_types, 37 cluster_factory) 38from toil.lib.aws import zone_to_region 39from toil.realtimeLogger import RealtimeLogger 40from toil.statsAndLogging import (add_logging_options, 41 root_logger, 42 set_logging_from_options) 43from toil.version import dockerRegistry, dockerTag, version 44 45# aim to pack autoscaling jobs within a 30 minute block before provisioning a new node 46defaultTargetTime = 1800 47UUID_LENGTH = 32 48logger = logging.getLogger(__name__) 49 50 51class Config: 52 """Class to represent configuration operations for a toil workflow run.""" 53 def __init__(self): 54 # Core options 55 self.workflowID: Optional[str] = None 56 """This attribute uniquely identifies the job store and therefore the workflow. It is 57 necessary in order to distinguish between two consecutive workflows for which 58 self.jobStore is the same, e.g. when a job store name is reused after a previous run has 59 finished successfully and its job store has been clean up.""" 60 self.workflowAttemptNumber = None 61 self.jobStore = None 62 self.logLevel: str = logging.getLevelName(root_logger.getEffectiveLevel()) 63 self.workDir: Optional[str] = None 64 self.noStdOutErr: bool = False 65 self.stats: bool = False 66 67 # Because the stats option needs the jobStore to persist past the end of the run, 68 # the clean default value depends the specified stats option and is determined in setOptions 69 self.clean = None 70 self.cleanWorkDir: Optional[bool] = None 71 self.clusterStats = None 72 73 # Restarting the workflow options 74 self.restart: bool = False 75 76 # Batch system options 77 set_batchsystem_config_defaults(self) 78 79 # Autoscaling options 80 self.provisioner = None 81 self.nodeTypes = [] 82 self.minNodes = None 83 self.maxNodes = [10] 84 self.targetTime = defaultTargetTime 85 self.betaInertia = 0.1 86 self.scaleInterval = 60 87 self.preemptableCompensation = 0.0 88 self.nodeStorage = 50 89 self.nodeStorageOverrides = [] 90 self.metrics: bool = False 91 92 # Parameters to limit service jobs, so preventing deadlock scheduling scenarios 93 self.maxPreemptableServiceJobs: int = sys.maxsize 94 self.maxServiceJobs: int = sys.maxsize 95 self.deadlockWait: Union[float, int] = 60 # Number of seconds we must be stuck with all services before declaring a deadlock 96 self.deadlockCheckInterval: Union[float, int] = 30 # Minimum polling delay for deadlocks 97 self.statePollingWait: Union[float, int] = 1 # Number of seconds to wait before querying job state 98 99 # Resource requirements 100 self.defaultMemory: int = 2147483648 101 self.defaultCores: Union[float, int] = 1 102 self.defaultDisk: int = 2147483648 103 self.readGlobalFileMutableByDefault: bool = False 104 self.defaultPreemptable: bool = False 105 self.maxCores: int = sys.maxsize 106 self.maxMemory: int = sys.maxsize 107 self.maxDisk: int = sys.maxsize 108 109 # Retrying/rescuing jobs 110 self.retryCount: int = 1 111 self.enableUnlimitedPreemptableRetries: bool = False 112 self.doubleMem: bool = False 113 self.maxJobDuration: int = sys.maxsize 114 self.rescueJobsFrequency: int = 3600 115 116 # Misc 117 self.disableCaching: bool = False 118 self.disableChaining: bool = False 119 self.disableJobStoreChecksumVerification: bool = False 120 self.maxLogFileSize: int = 64000 121 self.writeLogs = None 122 self.writeLogsGzip = None 123 self.writeLogsFromAllJobs: bool = False 124 self.sseKey: str = None 125 self.servicePollingInterval: int = 60 126 self.useAsync: bool = True 127 self.forceDockerAppliance: bool = False 128 self.runCwlInternalJobsOnWorkers: bool = False 129 self.statusWait: int = 3600 130 self.disableProgress: bool = False 131 132 # Debug options 133 self.debugWorker: bool = False 134 self.disableWorkerOutputCapture: bool = False 135 self.badWorker = 0.0 136 self.badWorkerFailInterval = 0.01 137 138 # CWL 139 self.cwl: bool = False 140 141 def setOptions(self, options) -> None: 142 """Creates a config object from the options object.""" 143 def set_option(option_name: str, 144 parsing_function: Optional[Callable] = None, 145 check_function: Optional[Callable] = None, 146 default: Any = None) -> None: 147 option_value = getattr(options, option_name, default) 148 149 if option_value is not None: 150 if parsing_function is not None: 151 option_value = parsing_function(option_value) 152 if check_function is not None: 153 try: 154 check_function(option_value) 155 except AssertionError: 156 raise RuntimeError(f"The {option_name} option has an invalid value: {option_value}") 157 setattr(self, option_name, option_value) 158 159 # Function to parse integer from string expressed in different formats 160 h2b = lambda x: human2bytes(str(x)) 161 162 def parse_jobstore(jobstore_uri: str): 163 name, rest = Toil.parseLocator(jobstore_uri) 164 if name == 'file': 165 # We need to resolve relative paths early, on the leader, because the worker process 166 # may have a different working directory than the leader, e.g. under Mesos. 167 return Toil.buildLocator(name, os.path.abspath(rest)) 168 else: 169 return jobstore_uri 170 171 def parse_str_list(s: str): 172 return [str(x) for x in s.split(",")] 173 174 def parse_int_list(s: str): 175 return [int(x) for x in s.split(",")] 176 177 # Core options 178 set_option("jobStore", parsing_function=parse_jobstore) 179 # TODO: LOG LEVEL STRING 180 set_option("workDir") 181 if self.workDir is not None: 182 self.workDir: Optional[str] = os.path.abspath(self.workDir) 183 if not os.path.exists(self.workDir): 184 raise RuntimeError(f"The path provided to --workDir ({self.workDir}) does not exist.") 185 186 if len(self.workDir) > 80: 187 logger.warning(f'Length of workDir path "{self.workDir}" is {len(self.workDir)} characters. ' 188 f'Consider setting a shorter path with --workPath or setting TMPDIR to something ' 189 f'like "/tmp" to avoid overly long paths.') 190 191 set_option("noStdOutErr") 192 set_option("stats") 193 set_option("cleanWorkDir") 194 set_option("clean") 195 if self.stats: 196 if self.clean != "never" and self.clean is not None: 197 raise RuntimeError("Contradicting options passed: Clean flag is set to %s " 198 "despite the stats flag requiring " 199 "the jobStore to be intact at the end of the run. " 200 "Set clean to \'never\'" % self.clean) 201 self.clean = "never" 202 elif self.clean is None: 203 self.clean = "onSuccess" 204 set_option('clusterStats') 205 set_option("restart") 206 207 # Batch system options 208 set_option("batchSystem") 209 set_batchsystem_options(self.batchSystem, set_option) 210 set_option("disableAutoDeployment") 211 set_option("scale", float, fC(0.0)) 212 set_option("parasolCommand") 213 set_option("parasolMaxBatches", int, iC(1)) 214 set_option("linkImports") 215 set_option("moveExports") 216 set_option("allocate_mem") 217 set_option("mesosMasterAddress") 218 set_option("kubernetesHostPath") 219 set_option("environment", parseSetEnv) 220 221 # Autoscaling options 222 set_option("provisioner") 223 set_option("nodeTypes", parse_node_types) 224 set_option("minNodes", parse_int_list) 225 set_option("maxNodes", parse_int_list) 226 set_option("targetTime", int) 227 if self.targetTime <= 0: 228 raise RuntimeError(f'targetTime ({self.targetTime}) must be a positive integer!') 229 set_option("betaInertia", float) 230 if not 0.0 <= self.betaInertia <= 0.9: 231 raise RuntimeError(f'betaInertia ({self.betaInertia}) must be between 0.0 and 0.9!') 232 set_option("scaleInterval", float) 233 set_option("metrics") 234 set_option("preemptableCompensation", float) 235 if not 0.0 <= self.preemptableCompensation <= 1.0: 236 raise RuntimeError(f'preemptableCompensation ({self.preemptableCompensation}) must be between 0.0 and 1.0!') 237 set_option("nodeStorage", int) 238 239 def check_nodestoreage_overrides(overrides: List[str]) -> None: 240 for override in overrides: 241 tokens = override.split(":") 242 assert len(tokens) == 2, \ 243 'Each component of --nodeStorageOverrides must be of the form <instance type>:<storage in GiB>' 244 assert any(tokens[0] in n[0] for n in self.nodeTypes), \ 245 'instance type in --nodeStorageOverrides must be used in --nodeTypes' 246 assert tokens[1].isdigit(), \ 247 'storage must be an integer in --nodeStorageOverrides' 248 set_option("nodeStorageOverrides", parse_str_list, check_function=check_nodestoreage_overrides) 249 250 # Parameters to limit service jobs / detect deadlocks 251 set_option("maxServiceJobs", int) 252 set_option("maxPreemptableServiceJobs", int) 253 set_option("deadlockWait", int) 254 set_option("deadlockCheckInterval", int) 255 set_option("statePollingWait", int) 256 257 # Resource requirements 258 set_option("defaultMemory", h2b, iC(1)) 259 set_option("defaultCores", float, fC(1.0)) 260 set_option("defaultDisk", h2b, iC(1)) 261 set_option("readGlobalFileMutableByDefault") 262 set_option("maxCores", int, iC(1)) 263 set_option("maxMemory", h2b, iC(1)) 264 set_option("maxDisk", h2b, iC(1)) 265 set_option("defaultPreemptable") 266 267 # Retrying/rescuing jobs 268 set_option("retryCount", int, iC(1)) 269 set_option("enableUnlimitedPreemptableRetries") 270 set_option("doubleMem") 271 set_option("maxJobDuration", int, iC(1)) 272 set_option("rescueJobsFrequency", int, iC(1)) 273 274 # Misc 275 set_option("maxLocalJobs", int) 276 set_option("disableCaching") 277 set_option("disableChaining") 278 set_option("disableJobStoreChecksumVerification") 279 set_option("maxLogFileSize", h2b, iC(1)) 280 set_option("writeLogs") 281 set_option("writeLogsGzip") 282 set_option("writeLogsFromAllJobs") 283 set_option("runCwlInternalJobsOnWorkers") 284 set_option("disableProgress") 285 286 assert not (self.writeLogs and self.writeLogsGzip), \ 287 "Cannot use both --writeLogs and --writeLogsGzip at the same time." 288 assert not self.writeLogsFromAllJobs or self.writeLogs or self.writeLogsGzip, \ 289 "To enable --writeLogsFromAllJobs, either --writeLogs or --writeLogsGzip must be set." 290 291 def check_sse_key(sse_key: str) -> None: 292 with open(sse_key) as f: 293 assert len(f.readline().rstrip()) == 32, 'SSE key appears to be invalid.' 294 295 set_option("sseKey", check_function=check_sse_key) 296 set_option("servicePollingInterval", float, fC(0.0)) 297 set_option("forceDockerAppliance") 298 299 # Debug options 300 set_option("debugWorker") 301 set_option("disableWorkerOutputCapture") 302 set_option("badWorker", float, fC(0.0, 1.0)) 303 set_option("badWorkerFailInterval", float, fC(0.0)) 304 305 def __eq__(self, other): 306 return self.__dict__ == other.__dict__ 307 308 def __hash__(self): 309 return self.__dict__.__hash__() 310 311 312JOBSTORE_HELP = ("The location of the job store for the workflow. " 313 "A job store holds persistent information about the jobs, stats, and files in a " 314 "workflow. If the workflow is run with a distributed batch system, the job " 315 "store must be accessible by all worker nodes. Depending on the desired " 316 "job store implementation, the location should be formatted according to " 317 "one of the following schemes:\n\n" 318 "file:<path> where <path> points to a directory on the file systen\n\n" 319 "aws:<region>:<prefix> where <region> is the name of an AWS region like " 320 "us-west-2 and <prefix> will be prepended to the names of any top-level " 321 "AWS resources in use by job store, e.g. S3 buckets.\n\n " 322 "google:<project_id>:<prefix> TODO: explain\n\n" 323 "For backwards compatibility, you may also specify ./foo (equivalent to " 324 "file:./foo or just file:foo) or /bar (equivalent to file:/bar).") 325 326 327def parser_with_common_options(provisioner_options=False, jobstore_option=True): 328 parser = ArgumentParser(prog='Toil', formatter_class=ArgumentDefaultsHelpFormatter) 329 330 if provisioner_options: 331 add_provisioner_options(parser) 332 333 if jobstore_option: 334 parser.add_argument('jobStore', type=str, help=JOBSTORE_HELP) 335 336 # always add these 337 add_logging_options(parser) 338 parser.add_argument("--version", action='version', version=version) 339 parser.add_argument("--tempDirRoot", dest="tempDirRoot", type=str, default=tempfile.gettempdir(), 340 help="Path to where temporary directory containing all temp files are created, " 341 "by default generates a fresh tmp dir with 'tempfile.gettempdir()'.") 342 return parser 343 344 345def addOptions(parser: ArgumentParser, config: Config = Config()): 346 if not (isinstance(parser, ArgumentParser) or isinstance(parser, _ArgumentGroup)): 347 raise ValueError(f"Unanticipated class: {parser.__class__}. Must be: argparse.ArgumentParser or ArgumentGroup.") 348 349 add_logging_options(parser) 350 parser.register("type", "bool", parseBool) # Custom type for arg=True/False. 351 352 # Core options 353 core_options = parser.add_argument_group( 354 title="Toil core options.", 355 description="Options to specify the location of the Toil workflow and " 356 "turn on stats collation about the performance of jobs." 357 ) 358 core_options.add_argument('jobStore', type=str, help=JOBSTORE_HELP) 359 core_options.add_argument("--workDir", dest="workDir", default=None, 360 help="Absolute path to directory where temporary files generated during the Toil " 361 "run should be placed. Standard output and error from batch system jobs " 362 "(unless --noStdOutErr) will be placed in this directory. A cache directory " 363 "may be placed in this directory. Temp files and folders will be placed in a " 364 "directory toil-<workflowID> within workDir. The workflowID is generated by " 365 "Toil and will be reported in the workflow logs. Default is determined by the " 366 "variables (TMPDIR, TEMP, TMP) via mkdtemp. This directory needs to exist on " 367 "all machines running jobs; if capturing standard output and error from batch " 368 "system jobs is desired, it will generally need to be on a shared file system. " 369 "When sharing a cache between containers on a host, this directory must be " 370 "shared between the containers.") 371 core_options.add_argument("--noStdOutErr", dest="noStdOutErr", action="store_true", default=None, 372 help="Do not capture standard output and error from batch system jobs.") 373 core_options.add_argument("--stats", dest="stats", action="store_true", default=None, 374 help="Records statistics about the toil workflow to be used by 'toil stats'.") 375 clean_choices = ['always', 'onError', 'never', 'onSuccess'] 376 core_options.add_argument("--clean", dest="clean", choices=clean_choices, default=None, 377 help=f"Determines the deletion of the jobStore upon completion of the program. " 378 f"Choices: {clean_choices}. The --stats option requires information from the " 379 f"jobStore upon completion so the jobStore will never be deleted with that flag. " 380 f"If you wish to be able to restart the run, choose \'never\' or \'onSuccess\'. " 381 f"Default is \'never\' if stats is enabled, and \'onSuccess\' otherwise.") 382 core_options.add_argument("--cleanWorkDir", dest="cleanWorkDir", choices=clean_choices, default='always', 383 help=f"Determines deletion of temporary worker directory upon completion of a job. " 384 f"Choices: {clean_choices}. Default = always. WARNING: This option should be " 385 f"changed for debugging only. Running a full pipeline with this option could " 386 f"fill your disk with excessive intermediate data.") 387 core_options.add_argument("--clusterStats", dest="clusterStats", nargs='?', action='store', default=None, 388 const=os.getcwd(), 389 help="If enabled, writes out JSON resource usage statistics to a file. " 390 "The default location for this file is the current working directory, but an " 391 "absolute path can also be passed to specify where this file should be written. " 392 "This options only applies when using scalable batch systems.") 393 394 # Restarting the workflow options 395 restart_options = parser.add_argument_group( 396 title="Toil options for restarting an existing workflow.", 397 description="Allows the restart of an existing workflow" 398 ) 399 restart_options.add_argument("--restart", dest="restart", default=None, action="store_true", 400 help="If --restart is specified then will attempt to restart existing workflow " 401 "at the location pointed to by the --jobStore option. Will raise an exception " 402 "if the workflow does not exist") 403 404 # Batch system options 405 batchsystem_options = parser.add_argument_group( 406 title="Toil options for specifying the batch system.", 407 description="Allows the specification of the batch system." 408 ) 409 batchsystem_options.add_argument("--statePollingWait", dest="statePollingWait", default=1, type=int, 410 help="Time, in seconds, to wait before doing a scheduler query for job state. " 411 "Return cached results if within the waiting period.") 412 add_all_batchsystem_options(batchsystem_options) 413 414 # Auto scaling options 415 autoscaling_options = parser.add_argument_group( 416 title="Toil options for autoscaling the cluster of worker nodes.", 417 description="Allows the specification of the minimum and maximum number of nodes in an autoscaled cluster, " 418 "as well as parameters to control the level of provisioning." 419 ) 420 provisioner_choices = ['aws', 'gce', None] 421 # TODO: Better consolidate this provisioner arg and the one in provisioners/__init__.py? 422 autoscaling_options.add_argument('--provisioner', '-p', dest="provisioner", choices=provisioner_choices, 423 help=f"The provisioner for cluster auto-scaling. This is the main Toil " 424 f"'--provisioner' option, and defaults to None for running on single " 425 f"machine and non-auto-scaling batch systems. The currently supported " 426 f"choices are {provisioner_choices}. The default is {config.provisioner}.") 427 autoscaling_options.add_argument('--nodeTypes', default=None, 428 help="Specifies a list of comma-separated node types, each of which is " 429 "composed of slash-separated instance types, and an optional spot " 430 "bid set off by a colon, making the node type preemptable. Instance " 431 "types may appear in multiple node types, and the same node type " 432 "may appear as both preemptable and non-preemptable.\n" 433 "Valid argument specifying two node types:\n" 434 "\tc5.4xlarge/c5a.4xlarge:0.42,t2.large\n" 435 "Node types:\n" 436 "\tc5.4xlarge/c5a.4xlarge:0.42 and t2.large\n" 437 "Instance types:\n" 438 "\tc5.4xlarge, c5a.4xlarge, and t2.large\n" 439 "Semantics:\n" 440 "\tBid $0.42/hour for either c5.4xlarge or c5a.4xlarge instances,\n" 441 "\ttreated interchangeably, while they are available at that price,\n" 442 "\tand buy t2.large instances at full price") 443 autoscaling_options.add_argument('--minNodes', default=None, 444 help="Mininum number of nodes of each type in the cluster, if using " 445 "auto-scaling. This should be provided as a comma-separated list of the " 446 "same length as the list of node types. default=0") 447 autoscaling_options.add_argument('--maxNodes', default=None, 448 help=f"Maximum number of nodes of each type in the cluster, if using autoscaling, " 449 f"provided as a comma-separated list. The first value is used as a default " 450 f"if the list length is less than the number of nodeTypes. " 451 f"default={config.maxNodes[0]}") 452 autoscaling_options.add_argument("--targetTime", dest="targetTime", default=None, 453 help=f"Sets how rapidly you aim to complete jobs in seconds. Shorter times mean " 454 f"more aggressive parallelization. The autoscaler attempts to scale up/down " 455 f"so that it expects all queued jobs will complete within targetTime " 456 f"seconds. default={config.targetTime}") 457 autoscaling_options.add_argument("--betaInertia", dest="betaInertia", default=None, 458 help=f"A smoothing parameter to prevent unnecessary oscillations in the number " 459 f"of provisioned nodes. This controls an exponentially weighted moving " 460 f"average of the estimated number of nodes. A value of 0.0 disables any " 461 f"smoothing, and a value of 0.9 will smooth so much that few changes will " 462 f"ever be made. Must be between 0.0 and 0.9. default={config.betaInertia}") 463 autoscaling_options.add_argument("--scaleInterval", dest="scaleInterval", default=None, 464 help=f"The interval (seconds) between assessing if the scale of " 465 f"the cluster needs to change. default={config.scaleInterval}") 466 autoscaling_options.add_argument("--preemptableCompensation", dest="preemptableCompensation", default=None, 467 help=f"The preference of the autoscaler to replace preemptable nodes with " 468 f"non-preemptable nodes, when preemptable nodes cannot be started for some " 469 f"reason. Defaults to {config.preemptableCompensation}. This value must be " 470 f"between 0.0 and 1.0, inclusive. A value of 0.0 disables such " 471 f"compensation, a value of 0.5 compensates two missing preemptable nodes " 472 f"with a non-preemptable one. A value of 1.0 replaces every missing " 473 f"pre-emptable node with a non-preemptable one.") 474 autoscaling_options.add_argument("--nodeStorage", dest="nodeStorage", default=50, 475 help="Specify the size of the root volume of worker nodes when they are launched " 476 "in gigabytes. You may want to set this if your jobs require a lot of disk " 477 "space. (default: %(default)s).") 478 autoscaling_options.add_argument('--nodeStorageOverrides', default=None, 479 help="Comma-separated list of nodeType:nodeStorage that are used to override " 480 "the default value from --nodeStorage for the specified nodeType(s). " 481 "This is useful for heterogeneous jobs where some tasks require much more " 482 "disk than others.") 483 autoscaling_options.add_argument("--metrics", dest="metrics", default=False, action="store_true", 484 help="Enable the prometheus/grafana dashboard for monitoring CPU/RAM usage, " 485 "queue size, and issued jobs.") 486 487 # Parameters to limit service jobs / detect service deadlocks 488 if not config.cwl: 489 service_options = parser.add_argument_group( 490 title="Toil options for limiting the number of service jobs and detecting service deadlocks", 491 description="Allows the specification of the maximum number of service jobs in a cluster. By keeping " 492 "this limited we can avoid nodes occupied with services causing deadlocks." 493 ) 494 service_options.add_argument("--maxServiceJobs", dest="maxServiceJobs", default=None, type=int, 495 help=f"The maximum number of service jobs that can be run concurrently, " 496 f"excluding service jobs running on preemptable nodes. " 497 f"default={config.maxServiceJobs}") 498 service_options.add_argument("--maxPreemptableServiceJobs", dest="maxPreemptableServiceJobs", default=None, 499 type=int, 500 help=f"The maximum number of service jobs that can run concurrently on " 501 f"preemptable nodes. default={config.maxPreemptableServiceJobs}") 502 service_options.add_argument("--deadlockWait", dest="deadlockWait", default=None, type=int, 503 help=f"Time, in seconds, to tolerate the workflow running only the same service " 504 f"jobs, with no jobs to use them, before declaring the workflow to be " 505 f"deadlocked and stopping. default={config.deadlockWait}") 506 service_options.add_argument("--deadlockCheckInterval", dest="deadlockCheckInterval", default=None, type=int, 507 help="Time, in seconds, to wait between checks to see if the workflow is stuck " 508 "running only service jobs, with no jobs to use them. Should be shorter " 509 "than --deadlockWait. May need to be increased if the batch system cannot " 510 "enumerate running jobs quickly enough, or if polling for running jobs is " 511 "placing an unacceptable load on a shared cluster. " 512 "default={config.deadlockCheckInterval}") 513 514 # Resource requirements 515 resource_options = parser.add_argument_group( 516 title="Toil options for cores/memory requirements.", 517 description="The options to specify default cores/memory requirements (if not specified by the jobs " 518 "themselves), and to limit the total amount of memory/cores requested from the batch system." 519 ) 520 resource_help_msg = ('The {} amount of {} to request for a job. ' 521 'Only applicable to jobs that do not specify an explicit value for this requirement. ' 522 '{}. ' 523 'Default is {}.') 524 cpu_note = 'Fractions of a core (for example 0.1) are supported on some batch systems [mesos, single_machine]' 525 disk_mem_note = 'Standard suffixes like K, Ki, M, Mi, G or Gi are supported' 526 resource_options.add_argument('--defaultMemory', dest='defaultMemory', default=None, metavar='INT', 527 help=resource_help_msg.format('default', 'memory', disk_mem_note, 528 bytes2human(config.defaultMemory))) 529 resource_options.add_argument('--defaultCores', dest='defaultCores', default=None, metavar='FLOAT', 530 help=resource_help_msg.format('default', 'cpu', cpu_note, str(config.defaultCores))) 531 resource_options.add_argument('--defaultDisk', dest='defaultDisk', default=None, metavar='INT', 532 help=resource_help_msg.format('default', 'disk', disk_mem_note, 533 bytes2human(config.defaultDisk))) 534 resource_options.add_argument('--defaultPreemptable', dest='defaultPreemptable', metavar='BOOL', 535 type='bool', nargs='?', const=True, default=False, 536 help='Make all jobs able to run on preemptable (spot) nodes by default.') 537 resource_options.add_argument('--maxCores', dest='maxCores', default=None, metavar='INT', 538 help=resource_help_msg.format('max', 'cpu', cpu_note, str(config.maxCores))) 539 resource_options.add_argument('--maxMemory', dest='maxMemory', default=None, metavar='INT', 540 help=resource_help_msg.format('max', 'memory', disk_mem_note, 541 bytes2human(config.maxMemory))) 542 resource_options.add_argument('--maxDisk', dest='maxDisk', default=None, metavar='INT', 543 help=resource_help_msg.format('max', 'disk', disk_mem_note, 544 bytes2human(config.maxDisk))) 545 546 # Retrying/rescuing jobs 547 job_options = parser.add_argument_group( 548 title="Toil options for rescuing/killing/restarting jobs.", 549 description="The options for jobs that either run too long/fail or get lost (some batch systems have issues!)." 550 ) 551 job_options.add_argument("--retryCount", dest="retryCount", default=None, 552 help=f"Number of times to retry a failing job before giving up and " 553 f"labeling job failed. default={config.retryCount}") 554 job_options.add_argument("--enableUnlimitedPreemptableRetries", dest="enableUnlimitedPreemptableRetries", 555 action='store_true', default=False, 556 help="If set, preemptable failures (or any failure due to an instance getting " 557 "unexpectedly terminated) will not count towards job failures and --retryCount.") 558 job_options.add_argument("--doubleMem", dest="doubleMem", action='store_true', default=False, 559 help="If set, batch jobs which die to reaching memory limit on batch schedulers " 560 "will have their memory doubled and they will be retried. The remaining " 561 "retry count will be reduced by 1. Currently supported by LSF.") 562 job_options.add_argument("--maxJobDuration", dest="maxJobDuration", default=None, 563 help=f"Maximum runtime of a job (in seconds) before we kill it (this is a lower bound, " 564 f"and the actual time before killing the job may be longer). " 565 f"default={config.maxJobDuration}") 566 job_options.add_argument("--rescueJobsFrequency", dest="rescueJobsFrequency", default=None, 567 help=f"Period of time to wait (in seconds) between checking for missing/overlong jobs, " 568 f"that is jobs which get lost by the batch system. Expert parameter. " 569 f"default={config.rescueJobsFrequency}") 570 571 # Debug options 572 debug_options = parser.add_argument_group( 573 title="Toil debug options.", 574 description="Debug options for finding problems or helping with testing." 575 ) 576 debug_options.add_argument("--debugWorker", default=False, action="store_true", 577 help="Experimental no forking mode for local debugging. Specifically, workers " 578 "are not forked and stderr/stdout are not redirected to the log.") 579 debug_options.add_argument("--disableWorkerOutputCapture", default=False, action="store_true", 580 help="Let worker output go to worker's standard out/error instead of per-job logs.") 581 debug_options.add_argument("--badWorker", dest="badWorker", default=None, 582 help=f"For testing purposes randomly kill --badWorker proportion of jobs using " 583 f"SIGKILL. default={config.badWorker}") 584 debug_options.add_argument("--badWorkerFailInterval", dest="badWorkerFailInterval", default=None, 585 help=f"When killing the job pick uniformly within the interval from 0.0 to " 586 f"--badWorkerFailInterval seconds after the worker starts. " 587 f"default={config.badWorkerFailInterval}") 588 589 # Misc options 590 misc_options = parser.add_argument_group( 591 title="Toil miscellaneous options.", 592 description="Everything else." 593 ) 594 misc_options.add_argument('--disableCaching', dest='disableCaching', type='bool', nargs='?', const=True, 595 default=False, 596 help='Disables caching in the file store. This flag must be set to use ' 597 'a batch system that does not support cleanup, such as Parasol.') 598 misc_options.add_argument('--disableChaining', dest='disableChaining', action='store_true', default=False, 599 help="Disables chaining of jobs (chaining uses one job's resource allocation " 600 "for its successor job if possible).") 601 misc_options.add_argument("--disableJobStoreChecksumVerification", dest="disableJobStoreChecksumVerification", 602 default=False, action="store_true", 603 help="Disables checksum verification for files transferred to/from the job store. " 604 "Checksum verification is a safety check to ensure the data is not corrupted " 605 "during transfer. Currently only supported for non-streaming AWS files.") 606 misc_options.add_argument("--maxLogFileSize", dest="maxLogFileSize", default=None, 607 help=f"The maximum size of a job log file to keep (in bytes), log files larger than " 608 f"this will be truncated to the last X bytes. Setting this option to zero will " 609 f"prevent any truncation. Setting this option to a negative value will truncate " 610 f"from the beginning. Default={bytes2human(config.maxLogFileSize)}") 611 misc_options.add_argument("--writeLogs", dest="writeLogs", nargs='?', action='store', default=None, 612 const=os.getcwd(), 613 help="Write worker logs received by the leader into their own files at the specified " 614 "path. Any non-empty standard output and error from failed batch system jobs will " 615 "also be written into files at this path. The current working directory will be " 616 "used if a path is not specified explicitly. Note: By default only the logs of " 617 "failed jobs are returned to leader. Set log level to 'debug' or enable " 618 "'--writeLogsFromAllJobs' to get logs back from successful jobs, and adjust " 619 "'maxLogFileSize' to control the truncation limit for worker logs.") 620 misc_options.add_argument("--writeLogsGzip", dest="writeLogsGzip", nargs='?', action='store', default=None, 621 const=os.getcwd(), 622 help="Identical to --writeLogs except the logs files are gzipped on the leader.") 623 misc_options.add_argument("--writeLogsFromAllJobs", dest="writeLogsFromAllJobs", action='store_true', 624 default=False, 625 help="Whether to write logs from all jobs (including the successful ones) without " 626 "necessarily setting the log level to 'debug'. Ensure that either --writeLogs " 627 "or --writeLogsGzip is set if enabling this option.") 628 misc_options.add_argument("--realTimeLogging", dest="realTimeLogging", action="store_true", default=False, 629 help="Enable real-time logging from workers to masters") 630 misc_options.add_argument("--sseKey", dest="sseKey", default=None, 631 help="Path to file containing 32 character key to be used for server-side encryption on " 632 "awsJobStore or googleJobStore. SSE will not be used if this flag is not passed.") 633 misc_options.add_argument("--setEnv", '-e', metavar='NAME=VALUE or NAME', dest="environment", default=[], 634 action="append", 635 help="Set an environment variable early on in the worker. If VALUE is omitted, it will " 636 "be looked up in the current environment. Independently of this option, the worker " 637 "will try to emulate the leader's environment before running a job, except for " 638 "some variables known to vary across systems. Using this option, a variable can " 639 "be injected into the worker process itself before it is started.") 640 misc_options.add_argument("--servicePollingInterval", dest="servicePollingInterval", default=None, 641 help=f"Interval of time service jobs wait between polling for the existence of the " 642 f"keep-alive flag. Default: {config.servicePollingInterval}") 643 misc_options.add_argument('--forceDockerAppliance', dest='forceDockerAppliance', action='store_true', default=False, 644 help='Disables sanity checking the existence of the docker image specified by ' 645 'TOIL_APPLIANCE_SELF, which Toil uses to provision mesos for autoscaling.') 646 misc_options.add_argument('--disableProgress', dest='disableProgress', action='store_true', default=False, 647 help="Disables the progress bar shown when standard error is a terminal.") 648 649 650def parseBool(val): 651 if val.lower() in ['true', 't', 'yes', 'y', 'on', '1']: 652 return True 653 elif val.lower() in ['false', 'f', 'no', 'n', 'off', '0']: 654 return False 655 else: 656 raise RuntimeError("Could not interpret \"%s\" as a boolean value" % val) 657 658 659def getNodeID() -> str: 660 """ 661 Return unique ID of the current node (host). The resulting string will be convertable to a uuid.UUID. 662 663 Tries several methods until success. The returned ID should be identical across calls from different processes on 664 the same node at least until the next OS reboot. 665 666 The last resort method is uuid.getnode() that in some rare OS configurations may return a random ID each time it is 667 called. However, this method should never be reached on a Linux system, because reading from 668 /proc/sys/kernel/random/boot_id will be tried prior to that. If uuid.getnode() is reached, it will be called twice, 669 and exception raised if the values are not identical. 670 """ 671 for idSourceFile in ["/var/lib/dbus/machine-id", "/proc/sys/kernel/random/boot_id"]: 672 if os.path.exists(idSourceFile): 673 try: 674 with open(idSourceFile, "r") as inp: 675 nodeID = inp.readline().strip() 676 except EnvironmentError: 677 logger.warning(f"Exception when trying to read ID file {idSourceFile}. " 678 f"Will try next method to get node ID.", exc_info=True) 679 else: 680 if len(nodeID.split()) == 1: 681 logger.debug(f"Obtained node ID {nodeID} from file {idSourceFile}") 682 break 683 else: 684 logger.warning(f"Node ID {nodeID} from file {idSourceFile} contains spaces. " 685 f"Will try next method to get node ID.") 686 else: 687 nodeIDs = [] 688 for i_call in range(2): 689 nodeID = str(uuid.getnode()).strip() 690 if len(nodeID.split()) == 1: 691 nodeIDs.append(nodeID) 692 else: 693 logger.warning(f"Node ID {nodeID} from uuid.getnode() contains spaces") 694 nodeID = "" 695 if len(nodeIDs) == 2: 696 if nodeIDs[0] == nodeIDs[1]: 697 nodeID = nodeIDs[0] 698 else: 699 logger.warning(f"Different node IDs {nodeIDs} received from repeated calls to uuid.getnode(). " 700 f"You should use another method to generate node ID.") 701 702 logger.debug(f"Obtained node ID {nodeID} from uuid.getnode()") 703 if not nodeID: 704 logger.warning("Failed to generate stable node ID, returning empty string. If you see this message with a " 705 "work dir on a shared file system when using workers running on multiple nodes, you might " 706 "experience cryptic job failures") 707 if len(nodeID.replace('-', '')) < UUID_LENGTH: 708 # Some platforms (Mac) give us not enough actual hex characters. 709 # Repeat them so the result is convertable to a uuid.UUID 710 nodeID = nodeID.replace('-', '') 711 num_repeats = UUID_LENGTH // len(nodeID) + 1 712 nodeID = nodeID * num_repeats 713 nodeID = nodeID[:UUID_LENGTH] 714 return nodeID 715 716 717class Toil: 718 """ 719 A context manager that represents a Toil workflow, specifically the batch system, job store, 720 and its configuration. 721 """ 722 723 def __init__(self, options): 724 """ 725 Initialize a Toil object from the given options. Note that this is very light-weight and 726 that the bulk of the work is done when the context is entered. 727 728 :param argparse.Namespace options: command line options specified by the user 729 """ 730 super(Toil, self).__init__() 731 self.options = options 732 self.config = None 733 """ 734 :type: toil.common.Config 735 """ 736 self._jobStore = None 737 """ 738 :type: toil.jobStores.abstractJobStore.AbstractJobStore 739 """ 740 self._batchSystem = None 741 """ 742 :type: toil.batchSystems.abstractBatchSystem.AbstractBatchSystem 743 """ 744 self._provisioner = None 745 """ 746 :type: toil.provisioners.abstractProvisioner.AbstractProvisioner 747 """ 748 self._jobCache = dict() 749 self._inContextManager = False 750 self._inRestart = False 751 752 def __enter__(self): 753 """ 754 Derive configuration from the command line options, load the job store and, on restart, 755 consolidate the derived configuration with the one from the previous invocation of the 756 workflow. 757 """ 758 set_logging_from_options(self.options) 759 config = Config() 760 config.setOptions(self.options) 761 jobStore = self.getJobStore(config.jobStore) 762 if not config.restart: 763 config.workflowAttemptNumber = 0 764 jobStore.initialize(config) 765 else: 766 jobStore.resume() 767 # Merge configuration from job store with command line options 768 config = jobStore.config 769 config.setOptions(self.options) 770 config.workflowAttemptNumber += 1 771 jobStore.writeConfig() 772 self.config = config 773 self._jobStore = jobStore 774 self._inContextManager = True 775 return self 776 777 # noinspection PyUnusedLocal 778 def __exit__(self, exc_type, exc_val, exc_tb): 779 """ 780 Clean up after a workflow invocation. Depending on the configuration, delete the job store. 781 """ 782 try: 783 if (exc_type is not None and self.config.clean == "onError" or 784 exc_type is None and self.config.clean == "onSuccess" or 785 self.config.clean == "always"): 786 787 try: 788 if self.config.restart and not self._inRestart: 789 pass 790 else: 791 self._jobStore.destroy() 792 logger.info("Successfully deleted the job store: %s" % str(self._jobStore)) 793 except: 794 logger.info("Failed to delete the job store: %s" % str(self._jobStore)) 795 raise 796 except Exception as e: 797 if exc_type is None: 798 raise 799 else: 800 logger.exception('The following error was raised during clean up:') 801 self._inContextManager = False 802 self._inRestart = False 803 return False # let exceptions through 804 805 def start(self, rootJob): 806 """ 807 Invoke a Toil workflow with the given job as the root for an initial run. This method 808 must be called in the body of a ``with Toil(...) as toil:`` statement. This method should 809 not be called more than once for a workflow that has not finished. 810 811 :param toil.job.Job rootJob: The root job of the workflow 812 :return: The root job's return value 813 """ 814 self._assertContextManagerUsed() 815 self.writePIDFile() 816 if self.config.restart: 817 raise ToilRestartException('A Toil workflow can only be started once. Use ' 818 'Toil.restart() to resume it.') 819 820 self._batchSystem = self.createBatchSystem(self.config) 821 self._setupAutoDeployment(rootJob.getUserScript()) 822 try: 823 self._setBatchSystemEnvVars() 824 self._serialiseEnv() 825 self._cacheAllJobs() 826 827 # Pickle the promised return value of the root job, then write the pickled promise to 828 # a shared file, where we can find and unpickle it at the end of the workflow. 829 # Unpickling the promise will automatically substitute the promise for the actual 830 # return value. 831 with self._jobStore.writeSharedFileStream('rootJobReturnValue') as fH: 832 rootJob.prepareForPromiseRegistration(self._jobStore) 833 promise = rootJob.rv() 834 pickle.dump(promise, fH, protocol=pickle.HIGHEST_PROTOCOL) 835 836 # Setup the first JobDescription and cache it 837 rootJobDescription = rootJob.saveAsRootJob(self._jobStore) 838 self._cacheJob(rootJobDescription) 839 840 self._setProvisioner() 841 return self._runMainLoop(rootJobDescription) 842 finally: 843 self._shutdownBatchSystem() 844 845 def restart(self): 846 """ 847 Restarts a workflow that has been interrupted. 848 849 :return: The root job's return value 850 """ 851 self._inRestart = True 852 self._assertContextManagerUsed() 853 self.writePIDFile() 854 if not self.config.restart: 855 raise ToilRestartException('A Toil workflow must be initiated with Toil.start(), ' 856 'not restart().') 857 858 from toil.job import JobException 859 try: 860 self._jobStore.loadRootJob() 861 except JobException: 862 logger.warning( 863 'Requested restart but the workflow has already been completed; allowing exports to rerun.') 864 return self._jobStore.getRootJobReturnValue() 865 866 self._batchSystem = self.createBatchSystem(self.config) 867 self._setupAutoDeployment() 868 try: 869 self._setBatchSystemEnvVars() 870 self._serialiseEnv() 871 self._cacheAllJobs() 872 self._setProvisioner() 873 rootJobDescription = self._jobStore.clean(jobCache=self._jobCache) 874 return self._runMainLoop(rootJobDescription) 875 finally: 876 self._shutdownBatchSystem() 877 878 def _setProvisioner(self): 879 if self.config.provisioner is None: 880 self._provisioner = None 881 else: 882 self._provisioner = cluster_factory(provisioner=self.config.provisioner, 883 clusterName=None, 884 zone=None, # read from instance meta-data 885 nodeStorage=self.config.nodeStorage, 886 nodeStorageOverrides=self.config.nodeStorageOverrides, 887 sseKey=self.config.sseKey) 888 self._provisioner.setAutoscaledNodeTypes(self.config.nodeTypes) 889 890 @classmethod 891 def getJobStore(cls, locator): 892 """ 893 Create an instance of the concrete job store implementation that matches the given locator. 894 895 :param str locator: The location of the job store to be represent by the instance 896 897 :return: an instance of a concrete subclass of AbstractJobStore 898 :rtype: toil.jobStores.abstractJobStore.AbstractJobStore 899 """ 900 name, rest = cls.parseLocator(locator) 901 if name == 'file': 902 from toil.jobStores.fileJobStore import FileJobStore 903 return FileJobStore(rest) 904 elif name == 'aws': 905 from toil.jobStores.aws.jobStore import AWSJobStore 906 return AWSJobStore(rest) 907 elif name == 'google': 908 from toil.jobStores.googleJobStore import GoogleJobStore 909 return GoogleJobStore(rest) 910 else: 911 raise RuntimeError("Unknown job store implementation '%s'" % name) 912 913 @staticmethod 914 def parseLocator(locator): 915 if locator[0] in '/.' or ':' not in locator: 916 return 'file', locator 917 else: 918 try: 919 name, rest = locator.split(':', 1) 920 except ValueError: 921 raise RuntimeError('Invalid job store locator syntax.') 922 else: 923 return name, rest 924 925 @staticmethod 926 def buildLocator(name, rest): 927 assert ':' not in name 928 return f'{name}:{rest}' 929 930 @classmethod 931 def resumeJobStore(cls, locator): 932 jobStore = cls.getJobStore(locator) 933 jobStore.resume() 934 return jobStore 935 936 @staticmethod 937 def createBatchSystem(config): 938 """ 939 Creates an instance of the batch system specified in the given config. 940 941 :param toil.common.Config config: the current configuration 942 943 :rtype: batchSystems.abstractBatchSystem.AbstractBatchSystem 944 945 :return: an instance of a concrete subclass of AbstractBatchSystem 946 """ 947 kwargs = dict(config=config, 948 maxCores=config.maxCores, 949 maxMemory=config.maxMemory, 950 maxDisk=config.maxDisk) 951 952 from toil.batchSystems.registry import BATCH_SYSTEM_FACTORY_REGISTRY 953 954 try: 955 batch_system = BATCH_SYSTEM_FACTORY_REGISTRY[config.batchSystem]() 956 except: 957 raise RuntimeError(f'Unrecognized batch system: {config.batchSystem}') 958 959 if not config.disableCaching and not batch_system.supportsWorkerCleanup(): 960 raise RuntimeError(f'{config.batchSystem} currently does not support shared caching, because it ' 961 'does not support cleaning up a worker after the last job ' 962 'finishes. Set the --disableCaching flag if you want to ' 963 'use this batch system.') 964 logger.debug('Using the %s' % re.sub("([a-z])([A-Z])", r"\g<1> \g<2>", batch_system.__name__).lower()) 965 966 return batch_system(**kwargs) 967 968 def _setupAutoDeployment(self, userScript=None): 969 """ 970 Determine the user script, save it to the job store and inject a reference to the saved 971 copy into the batch system such that it can auto-deploy the resource on the worker 972 nodes. 973 974 :param toil.resource.ModuleDescriptor userScript: the module descriptor referencing the 975 user script. If None, it will be looked up in the job store. 976 """ 977 if userScript is not None: 978 # This branch is hit when a workflow is being started 979 if userScript.belongsToToil: 980 logger.debug('User script %s belongs to Toil. No need to auto-deploy it.', userScript) 981 userScript = None 982 else: 983 if (self._batchSystem.supportsAutoDeployment() and 984 not self.config.disableAutoDeployment): 985 # Note that by saving the ModuleDescriptor, and not the Resource we allow for 986 # redeploying a potentially modified user script on workflow restarts. 987 with self._jobStore.writeSharedFileStream('userScript') as f: 988 pickle.dump(userScript, f, protocol=pickle.HIGHEST_PROTOCOL) 989 else: 990 from toil.batchSystems.singleMachine import \ 991 SingleMachineBatchSystem 992 if not isinstance(self._batchSystem, SingleMachineBatchSystem): 993 logger.warning('Batch system does not support auto-deployment. The user ' 994 'script %s will have to be present at the same location on ' 995 'every worker.', userScript) 996 userScript = None 997 else: 998 # This branch is hit on restarts 999 if (self._batchSystem.supportsAutoDeployment() and 1000 not self.config.disableAutoDeployment): 1001 # We could deploy a user script 1002 from toil.jobStores.abstractJobStore import NoSuchFileException 1003 try: 1004 with self._jobStore.readSharedFileStream('userScript') as f: 1005 userScript = safeUnpickleFromStream(f) 1006 except NoSuchFileException: 1007 logger.debug('User script neither set explicitly nor present in the job store.') 1008 userScript = None 1009 if userScript is None: 1010 logger.debug('No user script to auto-deploy.') 1011 else: 1012 logger.debug('Saving user script %s as a resource', userScript) 1013 userScriptResource = userScript.saveAsResourceTo(self._jobStore) 1014 logger.debug('Injecting user script %s into batch system.', userScriptResource) 1015 self._batchSystem.setUserScript(userScriptResource) 1016 1017 def importFile(self, srcUrl, sharedFileName=None, symlink=False): 1018 """ 1019 Imports the file at the given URL into job store. 1020 1021 See :func:`toil.jobStores.abstractJobStore.AbstractJobStore.importFile` for a 1022 full description 1023 """ 1024 self._assertContextManagerUsed() 1025 return self._jobStore.importFile(srcUrl, sharedFileName=sharedFileName, symlink=symlink) 1026 1027 def exportFile(self, jobStoreFileID, dstUrl): 1028 """ 1029 Exports file to destination pointed at by the destination URL. 1030 1031 See :func:`toil.jobStores.abstractJobStore.AbstractJobStore.exportFile` for a 1032 full description 1033 """ 1034 self._assertContextManagerUsed() 1035 self._jobStore.exportFile(jobStoreFileID, dstUrl) 1036 1037 def _setBatchSystemEnvVars(self): 1038 """ 1039 Sets the environment variables required by the job store and those passed on command line. 1040 """ 1041 for envDict in (self._jobStore.getEnv(), self.config.environment): 1042 for k, v in envDict.items(): 1043 self._batchSystem.setEnv(k, v) 1044 1045 def _serialiseEnv(self): 1046 """ 1047 Puts the environment in a globally accessible pickle file. 1048 """ 1049 # Dump out the environment of this process in the environment pickle file. 1050 with self._jobStore.writeSharedFileStream("environment.pickle") as fileHandle: 1051 pickle.dump(dict(os.environ), fileHandle, pickle.HIGHEST_PROTOCOL) 1052 logger.debug("Written the environment for the jobs to the environment file") 1053 1054 def _cacheAllJobs(self): 1055 """ 1056 Downloads all jobs in the current job store into self.jobCache. 1057 """ 1058 logger.debug('Caching all jobs in job store') 1059 self._jobCache = {jobDesc.jobStoreID: jobDesc for jobDesc in self._jobStore.jobs()} 1060 logger.debug('{} jobs downloaded.'.format(len(self._jobCache))) 1061 1062 def _cacheJob(self, job): 1063 """ 1064 Adds given job to current job cache. 1065 1066 :param toil.job.JobDescription job: job to be added to current job cache 1067 """ 1068 self._jobCache[job.jobStoreID] = job 1069 1070 @staticmethod 1071 def getToilWorkDir(configWorkDir: Optional[str] = None) -> str: 1072 """ 1073 Returns a path to a writable directory under which per-workflow 1074 directories exist. This directory is always required to exist on a 1075 machine, even if the Toil worker has not run yet. If your workers and 1076 leader have different temp directories, you may need to set 1077 TOIL_WORKDIR. 1078 1079 :param str configWorkDir: Value passed to the program using the --workDir flag 1080 :return: Path to the Toil work directory, constant across all machines 1081 :rtype: str 1082 """ 1083 workDir = os.getenv('TOIL_WORKDIR_OVERRIDE') or configWorkDir or os.getenv('TOIL_WORKDIR') or tempfile.gettempdir() 1084 if not os.path.exists(workDir): 1085 raise RuntimeError(f'The directory specified by --workDir or TOIL_WORKDIR ({workDir}) does not exist.') 1086 return workDir 1087 1088 @classmethod 1089 def getLocalWorkflowDir(cls, workflowID, configWorkDir=None): 1090 """ 1091 Returns a path to the directory where worker directories and the cache will be located 1092 for this workflow on this machine. 1093 1094 :param str configWorkDir: Value passed to the program using the --workDir flag 1095 :return: Path to the local workflow directory on this machine 1096 :rtype: str 1097 """ 1098 # Get the global Toil work directory. This ensures that it exists. 1099 base = cls.getToilWorkDir(configWorkDir=configWorkDir) 1100 1101 # Create a directory unique to each host in case workDir is on a shared FS. 1102 # This prevents workers on different nodes from erasing each other's directories. 1103 workflowDir: str = os.path.join(base, str(uuid.uuid5(uuid.UUID(getNodeID()), workflowID)).replace('-', '')) 1104 try: 1105 # Directory creation is atomic 1106 os.mkdir(workflowDir) 1107 except OSError as err: 1108 if err.errno != 17: 1109 # The directory exists if a previous worker set it up. 1110 raise 1111 else: 1112 logger.debug('Created the workflow directory for this machine at %s' % workflowDir) 1113 return workflowDir 1114 1115 def _runMainLoop(self, rootJob): 1116 """ 1117 Runs the main loop with the given job. 1118 :param toil.job.Job rootJob: The root job for the workflow. 1119 :rtype: Any 1120 """ 1121 logProcessContext(self.config) 1122 1123 with RealtimeLogger(self._batchSystem, 1124 level=self.options.logLevel if self.options.realTimeLogging else None): 1125 # FIXME: common should not import from leader 1126 from toil.leader import Leader 1127 return Leader(config=self.config, 1128 batchSystem=self._batchSystem, 1129 provisioner=self._provisioner, 1130 jobStore=self._jobStore, 1131 rootJob=rootJob, 1132 jobCache=self._jobCache).run() 1133 1134 def _shutdownBatchSystem(self): 1135 """ 1136 Shuts down current batch system if it has been created. 1137 """ 1138 assert self._batchSystem is not None 1139 1140 startTime = time.time() 1141 logger.debug('Shutting down batch system ...') 1142 self._batchSystem.shutdown() 1143 logger.debug('... finished shutting down the batch system in %s seconds.' 1144 % (time.time() - startTime)) 1145 1146 def _assertContextManagerUsed(self): 1147 if not self._inContextManager: 1148 raise ToilContextManagerException() 1149 1150 def writePIDFile(self): 1151 """ 1152 Write a the pid of this process to a file in the jobstore. 1153 1154 Overwriting the current contents of pid.log is a feature, not a bug of this method. 1155 Other methods will rely on always having the most current pid available. 1156 So far there is no reason to store any old pids. 1157 """ 1158 with self._jobStore.writeSharedFileStream('pid.log') as f: 1159 f.write(str(os.getpid()).encode('utf-8')) 1160 1161 1162class ToilRestartException(Exception): 1163 def __init__(self, message): 1164 super(ToilRestartException, self).__init__(message) 1165 1166 1167class ToilContextManagerException(Exception): 1168 def __init__(self): 1169 super(ToilContextManagerException, self).__init__( 1170 'This method cannot be called outside the "with Toil(...)" context manager.') 1171 1172 1173class ToilMetrics: 1174 def __init__(self, provisioner=None): 1175 clusterName = 'none' 1176 region = 'us-west-2' 1177 if provisioner is not None: 1178 clusterName = provisioner.clusterName 1179 if provisioner._zone is not None: 1180 if provisioner.cloud == 'aws': 1181 # Remove AZ name 1182 region = zone_to_region(provisioner._zone) 1183 else: 1184 region = provisioner._zone 1185 1186 registry = lookupEnvVar(name='docker registry', 1187 envName='TOIL_DOCKER_REGISTRY', 1188 defaultValue=dockerRegistry) 1189 1190 self.mtailImage = "%s/toil-mtail:%s" % (registry, dockerTag) 1191 self.grafanaImage = "%s/toil-grafana:%s" % (registry, dockerTag) 1192 self.prometheusImage = "%s/toil-prometheus:%s" % (registry, dockerTag) 1193 1194 self.startDashboard(clusterName=clusterName, zone=region) 1195 1196 # Always restart the mtail container, because metrics should start from scratch 1197 # for each workflow 1198 try: 1199 subprocess.check_call(["docker", "rm", "-f", "toil_mtail"]) 1200 except subprocess.CalledProcessError: 1201 pass 1202 1203 try: 1204 self.mtailProc = subprocess.Popen(["docker", "run", "--rm", "--interactive", 1205 "--net=host", 1206 "--name", "toil_mtail", 1207 "-p", "3903:3903", 1208 self.mtailImage], 1209 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 1210 except subprocess.CalledProcessError: 1211 logger.warning("Could not start toil metrics server.") 1212 self.mtailProc = None 1213 except KeyboardInterrupt: 1214 self.mtailProc.terminate() 1215 1216 # On single machine, launch a node exporter instance to monitor CPU/RAM usage. 1217 # On AWS this is handled by the EC2 init script 1218 self.nodeExporterProc = None 1219 if not provisioner: 1220 try: 1221 self.nodeExporterProc = subprocess.Popen(["docker", "run", "--rm", 1222 "--net=host", 1223 "-p", "9100:9100", 1224 "-v", "/proc:/host/proc", 1225 "-v", "/sys:/host/sys", 1226 "-v", "/:/rootfs", 1227 "quay.io/prometheus/node-exporter:0.15.2", 1228 "-collector.procfs", "/host/proc", 1229 "-collector.sysfs", "/host/sys", 1230 "-collector.filesystem.ignored-mount-points", 1231 "^/(sys|proc|dev|host|etc)($|/)"]) 1232 except subprocess.CalledProcessError: 1233 logger.warning( 1234 "Couldn't start node exporter, won't get RAM and CPU usage for dashboard.") 1235 self.nodeExporterProc = None 1236 except KeyboardInterrupt: 1237 self.nodeExporterProc.terminate() 1238 1239 @staticmethod 1240 def _containerRunning(containerName): 1241 try: 1242 result = subprocess.check_output(["docker", "inspect", "-f", 1243 "'{{.State.Running}}'", containerName]).decode('utf-8') == "true" 1244 except subprocess.CalledProcessError: 1245 result = False 1246 return result 1247 1248 def startDashboard(self, clusterName, zone): 1249 try: 1250 if not self._containerRunning("toil_prometheus"): 1251 try: 1252 subprocess.check_call(["docker", "rm", "-f", "toil_prometheus"]) 1253 except subprocess.CalledProcessError: 1254 pass 1255 subprocess.check_call(["docker", "run", 1256 "--name", "toil_prometheus", 1257 "--net=host", 1258 "-d", 1259 "-p", "9090:9090", 1260 self.prometheusImage, 1261 clusterName, 1262 zone]) 1263 1264 if not self._containerRunning("toil_grafana"): 1265 try: 1266 subprocess.check_call(["docker", "rm", "-f", "toil_grafana"]) 1267 except subprocess.CalledProcessError: 1268 pass 1269 subprocess.check_call(["docker", "run", 1270 "--name", "toil_grafana", 1271 "-d", "-p=3000:3000", 1272 self.grafanaImage]) 1273 except subprocess.CalledProcessError: 1274 logger.warning("Could not start prometheus/grafana dashboard.") 1275 return 1276 1277 try: 1278 self.add_prometheus_data_source() 1279 except requests.exceptions.ConnectionError: 1280 logger.debug("Could not add data source to Grafana dashboard - no metrics will be displayed.") 1281 1282 @retry(errors=[requests.exceptions.ConnectionError]) 1283 def add_prometheus_data_source(self): 1284 requests.post( 1285 'http://localhost:3000/api/datasources', 1286 auth=('admin', 'admin'), 1287 data='{"name":"DS_PROMETHEUS","type":"prometheus", "url":"http://localhost:9090", "access":"direct"}', 1288 headers={'content-type': 'application/json', "access": "direct"} 1289 ) 1290 1291 def log(self, message): 1292 if self.mtailProc: 1293 self.mtailProc.stdin.write((message + "\n").encode("utf-8")) 1294 self.mtailProc.stdin.flush() 1295 1296 # Note: The mtail configuration (dashboard/mtail/toil.mtail) depends on these messages 1297 # remaining intact 1298 1299 def logMissingJob(self): 1300 self.log("missing_job") 1301 1302 def logClusterSize(self, nodeType, currentSize, desiredSize): 1303 self.log("current_size '%s' %i" % (nodeType, currentSize)) 1304 self.log("desired_size '%s' %i" % (nodeType, desiredSize)) 1305 1306 def logQueueSize(self, queueSize): 1307 self.log("queue_size %i" % queueSize) 1308 1309 def logIssuedJob(self, jobType): 1310 self.log("issued_job %s" % jobType) 1311 1312 def logFailedJob(self, jobType): 1313 self.log("failed_job %s" % jobType) 1314 1315 def logCompletedJob(self, jobType): 1316 self.log("completed_job %s" % jobType) 1317 1318 def shutdown(self): 1319 if self.mtailProc: 1320 self.mtailProc.kill() 1321 if self.nodeExporterProc: 1322 self.nodeExporterProc.kill() 1323 1324 1325def parseSetEnv(l): 1326 """ 1327 Parses a list of strings of the form "NAME=VALUE" or just "NAME" into a dictionary. Strings 1328 of the latter from will result in dictionary entries whose value is None. 1329 1330 :type l: list[str] 1331 :rtype: dict[str,str] 1332 1333 >>> parseSetEnv([]) 1334 {} 1335 >>> parseSetEnv(['a']) 1336 {'a': None} 1337 >>> parseSetEnv(['a=']) 1338 {'a': ''} 1339 >>> parseSetEnv(['a=b']) 1340 {'a': 'b'} 1341 >>> parseSetEnv(['a=a', 'a=b']) 1342 {'a': 'b'} 1343 >>> parseSetEnv(['a=b', 'c=d']) 1344 {'a': 'b', 'c': 'd'} 1345 >>> parseSetEnv(['a=b=c']) 1346 {'a': 'b=c'} 1347 >>> parseSetEnv(['']) 1348 Traceback (most recent call last): 1349 ... 1350 ValueError: Empty name 1351 >>> parseSetEnv(['=1']) 1352 Traceback (most recent call last): 1353 ... 1354 ValueError: Empty name 1355 """ 1356 d = dict() 1357 for i in l: 1358 try: 1359 k, v = i.split('=', 1) 1360 except ValueError: 1361 k, v = i, None 1362 if not k: 1363 raise ValueError('Empty name') 1364 d[k] = v 1365 return d 1366 1367 1368def iC(minValue, maxValue=sys.maxsize): 1369 # Returns function that checks if a given int is in the given half-open interval 1370 assert isinstance(minValue, int) and isinstance(maxValue, int) 1371 return lambda x: minValue <= x < maxValue 1372 1373 1374def fC(minValue, maxValue=None): 1375 # Returns function that checks if a given float is in the given half-open interval 1376 assert isinstance(minValue, float) 1377 if maxValue is None: 1378 return lambda x: minValue <= x 1379 else: 1380 assert isinstance(maxValue, float) 1381 return lambda x: minValue <= x < maxValue 1382 1383 1384def cacheDirName(workflowID): 1385 """ 1386 :return: Name of the cache directory. 1387 """ 1388 return f'cache-{workflowID}' 1389 1390 1391def getDirSizeRecursively(dirPath: str) -> int: 1392 """ 1393 This method will return the cumulative number of bytes occupied by the files 1394 on disk in the directory and its subdirectories. 1395 1396 If the method is unable to access a file or directory (due to insufficient 1397 permissions, or due to the file or directory having been removed while this 1398 function was attempting to traverse it), the error will be handled 1399 internally, and a (possibly 0) lower bound on the size of the directory 1400 will be returned. 1401 1402 The environment variable 'BLOCKSIZE'='512' is set instead of the much cleaner 1403 --block-size=1 because Apple can't handle it. 1404 1405 :param str dirPath: A valid path to a directory or file. 1406 :return: Total size, in bytes, of the file or directory at dirPath. 1407 """ 1408 1409 # du is often faster than using os.lstat(), sometimes significantly so. 1410 1411 # The call: 'du -s /some/path' should give the number of 512-byte blocks 1412 # allocated with the environment variable: BLOCKSIZE='512' set, and we 1413 # multiply this by 512 to return the filesize in bytes. 1414 1415 try: 1416 return int(subprocess.check_output(['du', '-s', dirPath], 1417 env=dict(os.environ, BLOCKSIZE='512')).decode('utf-8').split()[0]) * 512 1418 except subprocess.CalledProcessError: 1419 # Something was inaccessible or went away 1420 return 0 1421 1422 1423def getFileSystemSize(dirPath: str) -> Tuple[int, int]: 1424 """ 1425 Return the free space, and total size of the file system hosting `dirPath`. 1426 1427 :param str dirPath: A valid path to a directory. 1428 :return: free space and total size of file system 1429 :rtype: tuple 1430 """ 1431 assert os.path.exists(dirPath) 1432 diskStats = os.statvfs(dirPath) 1433 freeSpace = diskStats.f_frsize * diskStats.f_bavail 1434 diskSize = diskStats.f_frsize * diskStats.f_blocks 1435 return freeSpace, diskSize 1436 1437 1438def safeUnpickleFromStream(stream): 1439 string = stream.read() 1440 return pickle.loads(string) 1441