1# Copyright (C) 2015-2021 Regents of the University of California 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import enum 15import logging 16import os 17import shutil 18from abc import ABC, abstractmethod 19from contextlib import contextmanager 20from typing import Any, Optional, Tuple, Union, Dict, NamedTuple 21 22from toil.batchSystems.registry import (BATCH_SYSTEM_FACTORY_REGISTRY, 23 DEFAULT_BATCH_SYSTEM) 24from toil.common import Toil, cacheDirName, Config 25from toil.deferred import DeferredFunctionManager 26from toil.fileStores.abstractFileStore import AbstractFileStore 27from toil.lib.threading import LastProcessStandingArena 28 29try: 30 from toil.cwl.cwltoil import CWL_INTERNAL_JOBS 31except ImportError: 32 # CWL extra not installed 33 CWL_INTERNAL_JOBS = () 34 35# Value to use as exitStatus in UpdatedBatchJobInfo.exitStatus when status is not available. 36EXIT_STATUS_UNAVAILABLE_VALUE = 255 37logger = logging.getLogger(__name__) 38 39 40UpdatedBatchJobInfo = NamedTuple('UpdatedBatchJobInfo', 41 [('jobID', str), 42 # The exit status (integer value) of the job. 0 implies successful. 43 # EXIT_STATUS_UNAVAILABLE_VALUE is used when the exit status is not available (e.g. job is lost). 44 ('exitStatus', int), 45 ('exitReason', Union[int, None]), # The exit reason, if available. One of BatchJobExitReason enum. 46 ('wallTime', Union[float, int, None])]) 47 48 49# Information required for worker cleanup on shutdown of the batch system. 50WorkerCleanupInfo = NamedTuple('WorkerCleanupInfo', 51 [('workDir', str), # workdir path (where the cache would go) 52 ('workflowID', int), # used to identify files specific to this workflow 53 ('cleanWorkDir', bool)]) 54 55 56class BatchJobExitReason(enum.Enum): 57 FINISHED: int = 1 # Successfully finished. 58 FAILED: int = 2 # Job finished, but failed. 59 LOST: int = 3 # Preemptable failure (job's executing host went away). 60 KILLED: int = 4 # Job killed before finishing. 61 ERROR: int = 5 # Internal error. 62 MEMLIMIT: int = 6 # Job hit batch system imposed memory limit 63 64 65class AbstractBatchSystem(ABC): 66 """ 67 An abstract (as far as Python currently allows) base class to represent the interface the batch 68 system must provide to Toil. 69 """ 70 @classmethod 71 @abstractmethod 72 def supportsAutoDeployment(cls): 73 """ 74 Whether this batch system supports auto-deployment of the user script itself. If it does, 75 the :meth:`.setUserScript` can be invoked to set the resource object representing the user 76 script. 77 78 Note to implementors: If your implementation returns True here, it should also override 79 80 :rtype: bool 81 """ 82 raise NotImplementedError() 83 84 @classmethod 85 @abstractmethod 86 def supportsWorkerCleanup(cls): 87 """ 88 Indicates whether this batch system invokes 89 :meth:`BatchSystemSupport.workerCleanup` after the last job for a 90 particular workflow invocation finishes. Note that the term *worker* 91 refers to an entire node, not just a worker process. A worker process 92 may run more than one job sequentially, and more than one concurrent 93 worker process may exist on a worker node, for the same workflow. The 94 batch system is said to *shut down* after the last worker process 95 terminates. 96 97 :rtype: bool 98 """ 99 raise NotImplementedError() 100 101 def setUserScript(self, userScript): 102 """ 103 Set the user script for this workflow. This method must be called before the first job is 104 issued to this batch system, and only if :meth:`.supportsAutoDeployment` returns True, 105 otherwise it will raise an exception. 106 107 :param toil.resource.Resource userScript: the resource object representing the user script 108 or module and the modules it depends on. 109 """ 110 raise NotImplementedError() 111 112 @abstractmethod 113 def issueBatchJob(self, jobDesc): 114 """ 115 Issues a job with the specified command to the batch system and returns a unique jobID. 116 117 :param jobDesc a toil.job.JobDescription 118 119 :return: a unique jobID that can be used to reference the newly issued job 120 :rtype: int 121 """ 122 raise NotImplementedError() 123 124 @abstractmethod 125 def killBatchJobs(self, jobIDs): 126 """ 127 Kills the given job IDs. After returning, the killed jobs will not 128 appear in the results of getRunningBatchJobIDs. The killed job will not 129 be returned from getUpdatedBatchJob. 130 131 :param jobIDs: list of IDs of jobs to kill 132 :type jobIDs: list[int] 133 """ 134 raise NotImplementedError() 135 136 # FIXME: Return value should be a set (then also fix the tests) 137 138 @abstractmethod 139 def getIssuedBatchJobIDs(self): 140 """ 141 Gets all currently issued jobs 142 143 :return: A list of jobs (as jobIDs) currently issued (may be running, or may be 144 waiting to be run). Despite the result being a list, the ordering should not 145 be depended upon. 146 :rtype: list[str] 147 """ 148 raise NotImplementedError() 149 150 @abstractmethod 151 def getRunningBatchJobIDs(self): 152 """ 153 Gets a map of jobs as jobIDs that are currently running (not just waiting) 154 and how long they have been running, in seconds. 155 156 :return: dictionary with currently running jobID keys and how many seconds they have 157 been running as the value 158 :rtype: dict[int,float] 159 """ 160 raise NotImplementedError() 161 162 @abstractmethod 163 def getUpdatedBatchJob(self, maxWait): 164 """ 165 Returns information about job that has updated its status (i.e. ceased 166 running, either successfully or with an error). Each such job will be 167 returned exactly once. 168 169 Does not return info for jobs killed by killBatchJobs, although they 170 may cause None to be returned earlier than maxWait. 171 172 :param float maxWait: the number of seconds to block, waiting for a result 173 174 :rtype: UpdatedBatchJobInfo or None 175 :return: If a result is available, returns UpdatedBatchJobInfo. 176 Otherwise it returns None. wallTime is the number of seconds (a strictly 177 positive float) in wall-clock time the job ran for, or None if this 178 batch system does not support tracking wall time. 179 """ 180 raise NotImplementedError() 181 182 def getSchedulingStatusMessage(self): 183 """ 184 Get a log message fragment for the user about anything that might be 185 going wrong in the batch system, if available. 186 187 If no useful message is available, return None. 188 189 This can be used to report what resource is the limiting factor when 190 scheduling jobs, for example. If the leader thinks the workflow is 191 stuck, the message can be displayed to the user to help them diagnose 192 why it might be stuck. 193 194 :rtype: str or None 195 :return: User-directed message about scheduling state. 196 """ 197 198 # Default implementation returns None. 199 # Override to provide scheduling status information. 200 return None 201 202 @abstractmethod 203 def shutdown(self): 204 """ 205 Called at the completion of a toil invocation. 206 Should cleanly terminate all worker threads. 207 """ 208 raise NotImplementedError() 209 210 def setEnv(self, name, value=None): 211 """ 212 Set an environment variable for the worker process before it is launched. The worker 213 process will typically inherit the environment of the machine it is running on but this 214 method makes it possible to override specific variables in that inherited environment 215 before the worker is launched. Note that this mechanism is different to the one used by 216 the worker internally to set up the environment of a job. A call to this method affects 217 all jobs issued after this method returns. Note to implementors: This means that you 218 would typically need to copy the variables before enqueuing a job. 219 220 If no value is provided it will be looked up from the current environment. 221 """ 222 raise NotImplementedError() 223 224 @classmethod 225 def setOptions(cls, setOption): 226 """ 227 Process command line or configuration options relevant to this batch system. 228 The 229 230 :param setOption: A function with signature setOption(varName, parsingFn=None, checkFn=None, default=None) 231 used to update run configuration 232 """ 233 234 def getWorkerContexts(self): 235 """ 236 Get a list of picklable context manager objects to wrap worker work in, 237 in order. 238 239 Can be used to ask the Toil worker to do things in-process (such as 240 configuring environment variables, hot-deploying user scripts, or 241 cleaning up a node) that would otherwise require a wrapping "executor" 242 process. 243 244 :rtype: list 245 """ 246 return [] 247 248 249class BatchSystemSupport(AbstractBatchSystem): 250 """ 251 Partial implementation of AbstractBatchSystem, support methods. 252 """ 253 254 def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int): 255 """ 256 Initializes initial state of the object 257 258 :param toil.common.Config config: object is setup by the toilSetup script and 259 has configuration parameters for the jobtree. You can add code 260 to that script to get parameters for your batch system. 261 262 :param float maxCores: the maximum number of cores the batch system can 263 request for any one job 264 265 :param int maxMemory: the maximum amount of memory the batch system can 266 request for any one job, in bytes 267 268 :param int maxDisk: the maximum amount of disk space the batch system can 269 request for any one job, in bytes 270 """ 271 super(BatchSystemSupport, self).__init__() 272 self.config = config 273 self.maxCores = maxCores 274 self.maxMemory = maxMemory 275 self.maxDisk = maxDisk 276 self.environment: Dict[str, str] = {} 277 self.workerCleanupInfo = WorkerCleanupInfo(workDir=self.config.workDir, 278 workflowID=self.config.workflowID, 279 cleanWorkDir=self.config.cleanWorkDir) 280 281 def checkResourceRequest(self, memory: int, cores: float, disk: int, job_name: str = '', detail: str = ''): 282 """ 283 Check resource request is not greater than that available or allowed. 284 285 :param int memory: amount of memory being requested, in bytes 286 287 :param float cores: number of cores being requested 288 289 :param int disk: amount of disk space being requested, in bytes 290 291 :param str job_name: Name of the job being checked, for generating a useful error report. 292 293 :param str detail: Batch-system-specific message to include in the error. 294 295 :raise InsufficientSystemResources: raised when a resource is requested in an amount 296 greater than allowed 297 """ 298 batch_system = self.__class__.__name__ or 'this batch system' 299 for resource, requested, available in [('cores', cores, self.maxCores), 300 ('memory', memory, self.maxMemory), 301 ('disk', disk, self.maxDisk)]: 302 assert requested is not None 303 if requested > available: 304 unit = 'bytes of ' if resource in ('disk', 'memory') else '' 305 R = f'The job {job_name} is r' if job_name else 'R' 306 if resource == 'disk': 307 msg = (f'{R}equesting {requested} {unit}{resource} for temporary space, ' 308 f'more than the maximum of {available} {unit}{resource} of free space on ' 309 f'{self.config.workDir} that {batch_system} was configured with, or enforced ' 310 f'by --max{resource.capitalize()}. Try setting/changing the toil option ' 311 f'"--workDir" or changing the base temporary directory by setting TMPDIR.') 312 else: 313 msg = (f'{R}equesting {requested} {unit}{resource}, more than the maximum of ' 314 f'{available} {unit}{resource} that {batch_system} was configured with, ' 315 f'or enforced by --max{resource.capitalize()}.') 316 if detail: 317 msg += detail 318 319 raise InsufficientSystemResources(msg) 320 321 def setEnv(self, name, value=None): 322 """ 323 Set an environment variable for the worker process before it is launched. The worker 324 process will typically inherit the environment of the machine it is running on but this 325 method makes it possible to override specific variables in that inherited environment 326 before the worker is launched. Note that this mechanism is different to the one used by 327 the worker internally to set up the environment of a job. A call to this method affects 328 all jobs issued after this method returns. Note to implementors: This means that you 329 would typically need to copy the variables before enqueuing a job. 330 331 If no value is provided it will be looked up from the current environment. 332 333 :param str name: the environment variable to be set on the worker. 334 335 :param str value: if given, the environment variable given by name will be set to this value. 336 if None, the variable's current value will be used as the value on the worker 337 338 :raise RuntimeError: if value is None and the name cannot be found in the environment 339 """ 340 if value is None: 341 try: 342 value = os.environ[name] 343 except KeyError: 344 raise RuntimeError(f"{name} does not exist in current environment") 345 self.environment[name] = value 346 347 def formatStdOutErrPath(self, toil_job_id: int, cluster_job_id: str, std: str) -> str: 348 """ 349 Format path for batch system standard output/error and other files 350 generated by the batch system itself. 351 352 Files will be written to the Toil work directory (which may 353 be on a shared file system) with names containing both the Toil and 354 batch system job IDs, for ease of debugging job failures. 355 356 :param: int toil_job_id : The unique id that Toil gives a job. 357 :param: cluster_job_id : What the cluster, for example, GridEngine, uses as its internal job id. 358 :param: string std : The provenance of the stream (for example: 'err' for 'stderr' or 'out' for 'stdout') 359 360 :rtype: string : Formatted filename; however if self.config.noStdOutErr is true, 361 returns '/dev/null' or equivalent. 362 """ 363 if self.config.noStdOutErr: 364 return os.devnull 365 366 fileName: str = f'toil_{self.config.workflowID}.{toil_job_id}.{cluster_job_id}.{std}.log' 367 workDir: str = Toil.getToilWorkDir(self.config.workDir) 368 return os.path.join(workDir, fileName) 369 370 @staticmethod 371 def workerCleanup(info: WorkerCleanupInfo) -> None: 372 """ 373 Cleans up the worker node on batch system shutdown. Also see :meth:`supportsWorkerCleanup`. 374 375 :param WorkerCleanupInfo info: A named tuple consisting of all the relevant information 376 for cleaning up the worker. 377 """ 378 assert isinstance(info, WorkerCleanupInfo) 379 workflowDir = Toil.getLocalWorkflowDir(info.workflowID, info.workDir) 380 DeferredFunctionManager.cleanupWorker(workflowDir) 381 workflowDirContents = os.listdir(workflowDir) 382 AbstractFileStore.shutdownFileStore(workflowDir, info.workflowID) 383 if (info.cleanWorkDir == 'always' 384 or info.cleanWorkDir in ('onSuccess', 'onError') 385 and workflowDirContents in ([], [cacheDirName(info.workflowID)])): 386 shutil.rmtree(workflowDir, ignore_errors=True) 387 388 389class BatchSystemLocalSupport(BatchSystemSupport): 390 """ 391 Adds a local queue for helper jobs, useful for CWL & others 392 """ 393 394 def __init__(self, config, maxCores, maxMemory, maxDisk): 395 super(BatchSystemLocalSupport, self).__init__(config, maxCores, maxMemory, maxDisk) 396 self.localBatch = BATCH_SYSTEM_FACTORY_REGISTRY[DEFAULT_BATCH_SYSTEM]()( 397 config, config.maxLocalJobs, maxMemory, maxDisk) 398 399 def handleLocalJob(self, jobDesc): # type: (Any) -> Optional[int] 400 """ 401 To be called by issueBatchJobs. 402 403 Returns the jobID if the jobDesc has been submitted to the local queue, 404 otherwise returns None 405 """ 406 if (not self.config.runCwlInternalJobsOnWorkers 407 and jobDesc.jobName.startswith(CWL_INTERNAL_JOBS)): 408 return self.localBatch.issueBatchJob(jobDesc) 409 else: 410 return None 411 412 def killLocalJobs(self, jobIDs): 413 """ 414 To be called by killBatchJobs. Will kill all local jobs that match the 415 provided jobIDs. 416 """ 417 self.localBatch.killBatchJobs(jobIDs) 418 419 def getIssuedLocalJobIDs(self): 420 """To be called by getIssuedBatchJobIDs""" 421 return self.localBatch.getIssuedBatchJobIDs() 422 423 def getRunningLocalJobIDs(self): 424 """To be called by getRunningBatchJobIDs().""" 425 return self.localBatch.getRunningBatchJobIDs() 426 427 def getUpdatedLocalJob(self, maxWait): 428 # type: (int) -> Optional[Tuple[int, int, int]] 429 """To be called by getUpdatedBatchJob()""" 430 return self.localBatch.getUpdatedBatchJob(maxWait) 431 432 def getNextJobID(self): # type: () -> int 433 """ 434 Must be used to get job IDs so that the local and batch jobs do not 435 conflict. 436 """ 437 with self.localBatch.jobIndexLock: 438 jobID = self.localBatch.jobIndex 439 self.localBatch.jobIndex += 1 440 return jobID 441 442 def shutdownLocal(self): # type: () -> None 443 """To be called from shutdown()""" 444 self.localBatch.shutdown() 445 446class BatchSystemCleanupSupport(BatchSystemLocalSupport): 447 """ 448 Adds cleanup support when the last running job leaves a node, for batch 449 systems that can't provide it using the backing scheduler. 450 """ 451 452 @classmethod 453 def supportsWorkerCleanup(cls): 454 return True 455 456 def getWorkerContexts(self): 457 # Tell worker to register for and invoke cleanup 458 459 # Create a context manager that has a copy of our cleanup info 460 context = WorkerCleanupContext(self.workerCleanupInfo) 461 462 # Send it along so the worker works inside of it 463 contexts = super(BatchSystemCleanupSupport, self).getWorkerContexts() 464 contexts.append(context) 465 return contexts 466 467 def __init__(self, config, maxCores, maxMemory, maxDisk): 468 super(BatchSystemCleanupSupport, self).__init__(config, maxCores, maxMemory, maxDisk) 469 470class WorkerCleanupContext: 471 """ 472 Context manager used by :class:`BatchSystemCleanupSupport` to implement 473 cleanup on a node after the last worker is done working. 474 475 Gets wrapped around the worker's work. 476 """ 477 478 def __init__(self, workerCleanupInfo): 479 """ 480 Wrap the given workerCleanupInfo in a context manager. 481 482 :param WorkerCleanupInfo workerCleanupInfo: Info to use to clean up the worker if we are 483 the last to exit the context manager. 484 """ 485 486 487 self.workerCleanupInfo = workerCleanupInfo 488 self.arena = None 489 490 def __enter__(self): 491 # Set up an arena so we know who is the last worker to leave 492 self.arena = LastProcessStandingArena(Toil.getToilWorkDir(self.workerCleanupInfo.workDir), 493 self.workerCleanupInfo.workflowID + '-cleanup') 494 self.arena.enter() 495 496 def __exit__(self, type, value, traceback): 497 for _ in self.arena.leave(): 498 # We are the last concurrent worker to finish. 499 # Do batch system cleanup. 500 logger.debug('Cleaning up worker') 501 BatchSystemSupport.workerCleanup(self.workerCleanupInfo) 502 # We have nothing to say about exceptions 503 return False 504 505class NodeInfo(object): 506 """ 507 The coresUsed attribute is a floating point value between 0 (all cores idle) and 1 (all cores 508 busy), reflecting the CPU load of the node. 509 510 The memoryUsed attribute is a floating point value between 0 (no memory used) and 1 (all memory 511 used), reflecting the memory pressure on the node. 512 513 The coresTotal and memoryTotal attributes are the node's resources, not just the used resources 514 515 The requestedCores and requestedMemory attributes are all the resources that Toil Jobs have reserved on the 516 node, regardless of whether the resources are actually being used by the Jobs. 517 518 The workers attribute is an integer reflecting the number of workers currently active workers 519 on the node. 520 """ 521 def __init__(self, coresUsed, memoryUsed, coresTotal, memoryTotal, 522 requestedCores, requestedMemory, workers): 523 self.coresUsed = coresUsed 524 self.memoryUsed = memoryUsed 525 526 self.coresTotal = coresTotal 527 self.memoryTotal = memoryTotal 528 529 self.requestedCores = requestedCores 530 self.requestedMemory = requestedMemory 531 532 self.workers = workers 533 534 535class AbstractScalableBatchSystem(AbstractBatchSystem): 536 """ 537 A batch system that supports a variable number of worker nodes. Used by :class:`toil. 538 provisioners.clusterScaler.ClusterScaler` to scale the number of worker nodes in the cluster 539 up or down depending on overall load. 540 """ 541 542 @abstractmethod 543 def getNodes(self, preemptable=None): 544 """ 545 Returns a dictionary mapping node identifiers of preemptable or non-preemptable nodes to 546 NodeInfo objects, one for each node. 547 548 :param bool preemptable: If True (False) only (non-)preemptable nodes will be returned. 549 If None, all nodes will be returned. 550 551 :rtype: dict[str,NodeInfo] 552 """ 553 raise NotImplementedError() 554 555 @abstractmethod 556 def nodeInUse(self, nodeIP): 557 """ 558 Can be used to determine if a worker node is running any tasks. If the node is doesn't 559 exist, this function should simply return False. 560 561 :param str nodeIP: The worker nodes private IP address 562 563 :return: True if the worker node has been issued any tasks, else False 564 :rtype: bool 565 """ 566 raise NotImplementedError() 567 568 @abstractmethod 569 @contextmanager 570 def nodeFiltering(self, filter): 571 """ 572 Used to prevent races in autoscaling where 573 1) nodes have reported to the autoscaler as having no jobs 574 2) scaler decides to terminate these nodes. In parallel the batch system assigns jobs to the same nodes 575 3) scaler terminates nodes, resulting in job failures for all jobs on that node. 576 577 Call this method prior to node termination to ensure that nodes being considered for termination are not 578 assigned new jobs. Call the method again passing None as the filter to disable the filtering 579 after node termination is done. 580 581 :param method: This will be used as a filter on nodes considered when assigning new jobs. 582 After this context manager exits the filter should be removed 583 :rtype: None 584 """ 585 raise NotImplementedError() 586 587 @abstractmethod 588 def ignoreNode(self, nodeAddress): 589 """ 590 Stop sending jobs to this node. Used in autoscaling 591 when the autoscaler is ready to terminate a node, but 592 jobs are still running. This allows the node to be terminated 593 after the current jobs have finished. 594 595 :param str: IP address of node to ignore. 596 :rtype: None 597 """ 598 raise NotImplementedError() 599 600 @abstractmethod 601 def unignoreNode(self, nodeAddress): 602 """ 603 Stop ignoring this address, presumably after 604 a node with this address has been terminated. This allows for the 605 possibility of a new node having the same address as a terminated one. 606 """ 607 raise NotImplementedError() 608 609 610class InsufficientSystemResources(Exception): 611 pass 612