1__author__ = "Johannes Köster" 2__copyright__ = "Copyright 2021, Johannes Köster" 3__email__ = "johannes.koester@uni-due.de" 4__license__ = "MIT" 5 6import re 7import os 8import sys 9import signal 10import json 11from tokenize import maybe 12import urllib 13from collections import OrderedDict, namedtuple 14from itertools import filterfalse, chain 15from functools import partial 16from operator import attrgetter 17import copy 18import subprocess 19from pathlib import Path 20from urllib.request import pathname2url, url2pathname 21 22 23from snakemake.logging import logger, format_resources, format_resource_names 24from snakemake.rules import Rule, Ruleorder, RuleProxy 25from snakemake.exceptions import ( 26 CreateCondaEnvironmentException, 27 RuleException, 28 CreateRuleException, 29 UnknownRuleException, 30 NoRulesException, 31 print_exception, 32 WorkflowError, 33) 34from snakemake.shell import shell 35from snakemake.dag import DAG 36from snakemake.scheduler import JobScheduler 37from snakemake.parser import parse 38import snakemake.io 39from snakemake.io import ( 40 protected, 41 temp, 42 temporary, 43 ancient, 44 directory, 45 expand, 46 dynamic, 47 glob_wildcards, 48 flag, 49 not_iterable, 50 touch, 51 unpack, 52 local, 53 pipe, 54 repeat, 55 report, 56 multiext, 57 IOFile, 58) 59from snakemake.persistence import Persistence 60from snakemake.utils import update_config 61from snakemake.script import script 62from snakemake.notebook import notebook 63from snakemake.wrapper import wrapper 64from snakemake.cwl import cwl 65import snakemake.wrapper 66from snakemake.common import ( 67 Mode, 68 bytesto, 69 ON_WINDOWS, 70 is_local_file, 71 parse_uri, 72 Rules, 73 Scatter, 74 Gather, 75 smart_join, 76 NOTHING_TO_BE_DONE_MSG, 77) 78from snakemake.utils import simplify_path 79from snakemake.checkpoints import Checkpoint, Checkpoints 80from snakemake.resources import DefaultResources 81from snakemake.caching.local import OutputFileCache as LocalOutputFileCache 82from snakemake.caching.remote import OutputFileCache as RemoteOutputFileCache 83from snakemake.modules import ModuleInfo, WorkflowModifier, get_name_modifier_func 84from snakemake.ruleinfo import RuleInfo 85from snakemake.sourcecache import ( 86 GenericSourceFile, 87 LocalSourceFile, 88 SourceCache, 89 SourceFile, 90 infer_source_file, 91) 92from snakemake.deployment.conda import Conda 93from snakemake import sourcecache 94 95 96class Workflow: 97 def __init__( 98 self, 99 snakefile=None, 100 jobscript=None, 101 overwrite_shellcmd=None, 102 overwrite_config=None, 103 overwrite_workdir=None, 104 overwrite_configfiles=None, 105 overwrite_clusterconfig=None, 106 overwrite_threads=None, 107 overwrite_scatter=None, 108 overwrite_groups=None, 109 overwrite_resources=None, 110 group_components=None, 111 config_args=None, 112 debug=False, 113 verbose=False, 114 use_conda=False, 115 conda_frontend=None, 116 conda_prefix=None, 117 use_singularity=False, 118 use_env_modules=False, 119 singularity_prefix=None, 120 singularity_args="", 121 shadow_prefix=None, 122 scheduler_type="ilp", 123 scheduler_ilp_solver=None, 124 mode=Mode.default, 125 wrapper_prefix=None, 126 printshellcmds=False, 127 restart_times=None, 128 attempt=1, 129 default_remote_provider=None, 130 default_remote_prefix="", 131 run_local=True, 132 default_resources=None, 133 cache=None, 134 nodes=1, 135 cores=1, 136 resources=None, 137 conda_cleanup_pkgs=None, 138 edit_notebook=False, 139 envvars=None, 140 max_inventory_wait_time=20, 141 conda_not_block_search_path_envvars=False, 142 execute_subworkflows=True, 143 scheduler_solver_path=None, 144 conda_base_path=None, 145 check_envvars=True, 146 max_threads=None, 147 all_temp=False, 148 ): 149 """ 150 Create the controller. 151 """ 152 153 self.global_resources = dict() if resources is None else resources 154 self.global_resources["_cores"] = cores 155 self.global_resources["_nodes"] = nodes 156 157 self._rules = OrderedDict() 158 self.first_rule = None 159 self._workdir = None 160 self.overwrite_workdir = overwrite_workdir 161 self.workdir_init = os.path.abspath(os.curdir) 162 self._ruleorder = Ruleorder() 163 self._localrules = set() 164 self.linemaps = dict() 165 self.rule_count = 0 166 self.basedir = os.path.dirname(snakefile) 167 self.main_snakefile = os.path.abspath(snakefile) 168 self.included = [] 169 self.included_stack = [] 170 self.jobscript = jobscript 171 self.persistence = None 172 self._subworkflows = dict() 173 self.overwrite_shellcmd = overwrite_shellcmd 174 self.overwrite_config = overwrite_config or dict() 175 self.overwrite_configfiles = overwrite_configfiles 176 self.overwrite_clusterconfig = overwrite_clusterconfig or dict() 177 self.overwrite_threads = overwrite_threads or dict() 178 self.overwrite_resources = overwrite_resources or dict() 179 self.config_args = config_args 180 self.immediate_submit = None 181 self._onsuccess = lambda log: None 182 self._onerror = lambda log: None 183 self._onstart = lambda log: None 184 self._wildcard_constraints = dict() 185 self.debug = debug 186 self.verbose = verbose 187 self._rulecount = 0 188 self.use_conda = use_conda 189 self.conda_frontend = conda_frontend 190 self.conda_prefix = conda_prefix 191 self.use_singularity = use_singularity 192 self.use_env_modules = use_env_modules 193 self.singularity_prefix = singularity_prefix 194 self.singularity_args = singularity_args 195 self.shadow_prefix = shadow_prefix 196 self.scheduler_type = scheduler_type 197 self.scheduler_ilp_solver = scheduler_ilp_solver 198 self.global_container_img = None 199 self.global_is_containerized = False 200 self.mode = mode 201 self.wrapper_prefix = wrapper_prefix 202 self.printshellcmds = printshellcmds 203 self.restart_times = restart_times 204 self.attempt = attempt 205 self.default_remote_provider = default_remote_provider 206 self.default_remote_prefix = default_remote_prefix 207 self.configfiles = list(overwrite_configfiles) or [] 208 self.run_local = run_local 209 self.report_text = None 210 self.conda_cleanup_pkgs = conda_cleanup_pkgs 211 self.edit_notebook = edit_notebook 212 # environment variables to pass to jobs 213 # These are defined via the "envvars:" syntax in the Snakefile itself 214 self.envvars = set() 215 self.overwrite_groups = overwrite_groups or dict() 216 self.group_components = group_components or dict() 217 self._scatter = dict(overwrite_scatter or dict()) 218 self.overwrite_scatter = overwrite_scatter or dict() 219 self.conda_not_block_search_path_envvars = conda_not_block_search_path_envvars 220 self.execute_subworkflows = execute_subworkflows 221 self.modules = dict() 222 self.sourcecache = SourceCache() 223 self.scheduler_solver_path = scheduler_solver_path 224 self._conda_base_path = conda_base_path 225 self.check_envvars = check_envvars 226 self.max_threads = max_threads 227 self.all_temp = all_temp 228 self.scheduler = None 229 230 _globals = globals() 231 _globals["workflow"] = self 232 _globals["cluster_config"] = copy.deepcopy(self.overwrite_clusterconfig) 233 _globals["rules"] = Rules() 234 _globals["checkpoints"] = Checkpoints() 235 _globals["scatter"] = Scatter() 236 _globals["gather"] = Gather() 237 _globals["github"] = sourcecache.GithubFile 238 _globals["gitlab"] = sourcecache.GitlabFile 239 240 self.vanilla_globals = dict(_globals) 241 self.modifier_stack = [WorkflowModifier(self, globals=_globals)] 242 243 self.enable_cache = False 244 if cache is not None: 245 self.enable_cache = True 246 self.cache_rules = set(cache) 247 if self.default_remote_provider is not None: 248 self.output_file_cache = RemoteOutputFileCache( 249 self.default_remote_provider 250 ) 251 else: 252 self.output_file_cache = LocalOutputFileCache() 253 else: 254 self.output_file_cache = None 255 self.cache_rules = set() 256 257 if default_resources is not None: 258 self.default_resources = default_resources 259 else: 260 # only _cores, _nodes, and _tmpdir 261 self.default_resources = DefaultResources(mode="bare") 262 263 self.iocache = snakemake.io.IOCache(max_inventory_wait_time) 264 265 self.globals["config"] = copy.deepcopy(self.overwrite_config) 266 267 if envvars is not None: 268 self.register_envvars(*envvars) 269 270 @property 271 def conda_base_path(self): 272 if self._conda_base_path: 273 return self._conda_base_path 274 if self.use_conda: 275 try: 276 return Conda().prefix_path 277 except CreateCondaEnvironmentException as e: 278 # Return no preset conda base path now and report error later in jobs. 279 return None 280 else: 281 return None 282 283 @property 284 def modifier(self): 285 return self.modifier_stack[-1] 286 287 @property 288 def globals(self): 289 return self.modifier.globals 290 291 def lint(self, json=False): 292 from snakemake.linting.rules import RuleLinter 293 from snakemake.linting.snakefiles import SnakefileLinter 294 295 json_snakefile_lints, snakefile_linted = SnakefileLinter( 296 self, self.included 297 ).lint(json=json) 298 json_rule_lints, rules_linted = RuleLinter(self, self.rules).lint(json=json) 299 300 linted = snakefile_linted or rules_linted 301 302 if json: 303 import json 304 305 print( 306 json.dumps( 307 {"snakefiles": json_snakefile_lints, "rules": json_rule_lints}, 308 indent=2, 309 ) 310 ) 311 else: 312 if not linted: 313 logger.info("Congratulations, your workflow is in a good condition!") 314 return linted 315 316 def is_cached_rule(self, rule: Rule): 317 return rule.name in self.cache_rules 318 319 def get_sources(self): 320 files = set() 321 322 def local_path(f): 323 if not isinstance(f, SourceFile) and is_local_file(f): 324 return f 325 if isinstance(f, LocalSourceFile): 326 return f.get_path_or_uri() 327 328 def norm_rule_relpath(f, rule): 329 if not os.path.isabs(f): 330 f = os.path.join(rule.basedir, f) 331 return os.path.relpath(f) 332 333 # get registered sources 334 for f in self.included: 335 f = local_path(f) 336 if f: 337 try: 338 f = os.path.relpath(f) 339 except ValueError: 340 if ON_WINDOWS: 341 pass # relpath doesn't work on win if files are on different drive 342 else: 343 raise 344 files.add(f) 345 for rule in self.rules: 346 script_path = rule.script or rule.notebook 347 if script_path: 348 script_path = norm_rule_relpath(script_path, rule) 349 files.add(script_path) 350 script_dir = os.path.dirname(script_path) 351 files.update( 352 os.path.join(dirpath, f) 353 for dirpath, _, files in os.walk(script_dir) 354 for f in files 355 ) 356 if rule.conda_env: 357 f = local_path(rule.conda_env) 358 if f: 359 # url points to a local env file 360 env_path = norm_rule_relpath(f, rule) 361 files.add(env_path) 362 363 for f in self.configfiles: 364 files.add(f) 365 366 # get git-managed files 367 # TODO allow a manifest file as alternative 368 try: 369 out = subprocess.check_output( 370 ["git", "ls-files", "--recurse-submodules", "."], stderr=subprocess.PIPE 371 ) 372 for f in out.decode().split("\n"): 373 if f: 374 files.add(os.path.relpath(f)) 375 except subprocess.CalledProcessError as e: 376 if "fatal: not a git repository" in e.stderr.decode().lower(): 377 logger.warning( 378 "Unable to retrieve additional files from git. " 379 "This is not a git repository." 380 ) 381 else: 382 raise WorkflowError( 383 "Error executing git:\n{}".format(e.stderr.decode()) 384 ) 385 386 return files 387 388 def check_source_sizes(self, filename, warning_size_gb=0.2): 389 """A helper function to check the filesize, and return the file 390 to the calling function Additionally, given that we encourage these 391 packages to be small, we set a warning at 200MB (0.2GB). 392 """ 393 gb = bytesto(os.stat(filename).st_size, "g") 394 if gb > warning_size_gb: 395 logger.warning( 396 "File {} (size {} GB) is greater than the {} GB suggested size " 397 "Consider uploading larger files to storage first.".format( 398 filename, gb, warning_size_gb 399 ) 400 ) 401 return filename 402 403 @property 404 def subworkflows(self): 405 return self._subworkflows.values() 406 407 @property 408 def rules(self): 409 return self._rules.values() 410 411 @property 412 def cores(self): 413 if self._cores is None: 414 raise WorkflowError( 415 "Workflow requires a total number of cores to be defined (e.g. because a " 416 "rule defines its number of threads as a fraction of a total number of cores). " 417 "Please set it with --cores N with N being the desired number of cores. " 418 "Consider to use this in combination with --max-threads to avoid " 419 "jobs with too many threads for your setup. Also make sure to perform " 420 "a dryrun first." 421 ) 422 return self._cores 423 424 @property 425 def _cores(self): 426 return self.global_resources["_cores"] 427 428 @property 429 def nodes(self): 430 return self.global_resources["_nodes"] 431 432 @property 433 def concrete_files(self): 434 return ( 435 file 436 for rule in self.rules 437 for file in chain(rule.input, rule.output) 438 if not callable(file) and not file.contains_wildcard() 439 ) 440 441 def check(self): 442 for clause in self._ruleorder: 443 for rulename in clause: 444 if not self.is_rule(rulename): 445 raise UnknownRuleException( 446 rulename, prefix="Error in ruleorder definition." 447 ) 448 449 def add_rule( 450 self, 451 name=None, 452 lineno=None, 453 snakefile=None, 454 checkpoint=False, 455 allow_overwrite=False, 456 ): 457 """ 458 Add a rule. 459 """ 460 is_overwrite = self.is_rule(name) 461 if not allow_overwrite and is_overwrite: 462 raise CreateRuleException( 463 "The name {} is already used by another rule".format(name) 464 ) 465 rule = Rule(name, self, lineno=lineno, snakefile=snakefile) 466 self._rules[rule.name] = rule 467 if not is_overwrite: 468 self.rule_count += 1 469 if not self.first_rule: 470 self.first_rule = rule.name 471 return name 472 473 def is_rule(self, name): 474 """ 475 Return True if name is the name of a rule. 476 477 Arguments 478 name -- a name 479 """ 480 return name in self._rules 481 482 def get_rule(self, name): 483 """ 484 Get rule by name. 485 486 Arguments 487 name -- the name of the rule 488 """ 489 if not self._rules: 490 raise NoRulesException() 491 if not name in self._rules: 492 raise UnknownRuleException(name) 493 return self._rules[name] 494 495 def list_rules(self, only_targets=False): 496 rules = self.rules 497 if only_targets: 498 rules = filterfalse(Rule.has_wildcards, rules) 499 for rule in rules: 500 logger.rule_info(name=rule.name, docstring=rule.docstring) 501 502 def list_resources(self): 503 for resource in set( 504 resource for rule in self.rules for resource in rule.resources 505 ): 506 if resource not in "_cores _nodes".split(): 507 logger.info(resource) 508 509 def is_local(self, rule): 510 return rule.group is None and (rule.name in self._localrules or rule.norun) 511 512 def check_localrules(self): 513 undefined = self._localrules - set(rule.name for rule in self.rules) 514 if undefined: 515 logger.warning( 516 "localrules directive specifies rules that are not " 517 "present in the Snakefile:\n{}\n".format( 518 "\n".join(map("\t{}".format, undefined)) 519 ) 520 ) 521 522 def inputfile(self, path): 523 """Mark file as being an input file of the workflow. 524 525 This also means that eventual --default-remote-provider/prefix settings 526 will be applied to this file. The file is returned as _IOFile object, 527 such that it can e.g. be transparently opened with _IOFile.open(). 528 """ 529 if isinstance(path, Path): 530 path = str(path) 531 if self.default_remote_provider is not None: 532 path = self.modifier.modify_path(path) 533 return IOFile(path) 534 535 def execute( 536 self, 537 targets=None, 538 dryrun=False, 539 generate_unit_tests=None, 540 touch=False, 541 scheduler_type=None, 542 scheduler_ilp_solver=None, 543 local_cores=1, 544 forcetargets=False, 545 forceall=False, 546 forcerun=None, 547 until=[], 548 omit_from=[], 549 prioritytargets=None, 550 quiet=False, 551 keepgoing=False, 552 printshellcmds=False, 553 printreason=False, 554 printdag=False, 555 cluster=None, 556 cluster_sync=None, 557 jobname=None, 558 immediate_submit=False, 559 ignore_ambiguity=False, 560 printrulegraph=False, 561 printfilegraph=False, 562 printd3dag=False, 563 drmaa=None, 564 drmaa_log_dir=None, 565 kubernetes=None, 566 tibanna=None, 567 tibanna_sfn=None, 568 google_lifesciences=None, 569 google_lifesciences_regions=None, 570 google_lifesciences_location=None, 571 google_lifesciences_cache=False, 572 tes=None, 573 precommand="", 574 preemption_default=None, 575 preemptible_rules=None, 576 tibanna_config=False, 577 container_image=None, 578 stats=None, 579 force_incomplete=False, 580 ignore_incomplete=False, 581 list_version_changes=False, 582 list_code_changes=False, 583 list_input_changes=False, 584 list_params_changes=False, 585 list_untracked=False, 586 list_conda_envs=False, 587 summary=False, 588 archive=None, 589 delete_all_output=False, 590 delete_temp_output=False, 591 detailed_summary=False, 592 latency_wait=3, 593 wait_for_files=None, 594 nolock=False, 595 unlock=False, 596 notemp=False, 597 nodeps=False, 598 cleanup_metadata=None, 599 conda_cleanup_envs=False, 600 cleanup_shadow=False, 601 cleanup_scripts=True, 602 subsnakemake=None, 603 updated_files=None, 604 keep_target_files=False, 605 keep_shadow=False, 606 keep_remote_local=False, 607 allowed_rules=None, 608 max_jobs_per_second=None, 609 max_status_checks_per_second=None, 610 greediness=1.0, 611 no_hooks=False, 612 force_use_threads=False, 613 conda_create_envs_only=False, 614 assume_shared_fs=True, 615 cluster_status=None, 616 report=None, 617 report_stylesheet=None, 618 export_cwl=False, 619 batch=None, 620 keepincomplete=False, 621 keepmetadata=True, 622 ): 623 624 self.check_localrules() 625 self.immediate_submit = immediate_submit 626 self.cleanup_scripts = cleanup_scripts 627 628 def rules(items): 629 return map(self._rules.__getitem__, filter(self.is_rule, items)) 630 631 if keep_target_files: 632 633 def files(items): 634 return filterfalse(self.is_rule, items) 635 636 else: 637 638 def files(items): 639 relpath = ( 640 lambda f: f 641 if os.path.isabs(f) or f.startswith("root://") 642 else os.path.relpath(f) 643 ) 644 return map(relpath, filterfalse(self.is_rule, items)) 645 646 if not targets: 647 targets = [self.first_rule] if self.first_rule is not None else list() 648 649 if prioritytargets is None: 650 prioritytargets = list() 651 if forcerun is None: 652 forcerun = list() 653 if until is None: 654 until = list() 655 if omit_from is None: 656 omit_from = list() 657 658 priorityrules = set(rules(prioritytargets)) 659 priorityfiles = set(files(prioritytargets)) 660 forcerules = set(rules(forcerun)) 661 forcefiles = set(files(forcerun)) 662 untilrules = set(rules(until)) 663 untilfiles = set(files(until)) 664 omitrules = set(rules(omit_from)) 665 omitfiles = set(files(omit_from)) 666 targetrules = set( 667 chain( 668 rules(targets), 669 filterfalse(Rule.has_wildcards, priorityrules), 670 filterfalse(Rule.has_wildcards, forcerules), 671 filterfalse(Rule.has_wildcards, untilrules), 672 ) 673 ) 674 targetfiles = set(chain(files(targets), priorityfiles, forcefiles, untilfiles)) 675 676 if ON_WINDOWS: 677 targetfiles = set(tf.replace(os.sep, os.altsep) for tf in targetfiles) 678 679 if forcetargets: 680 forcefiles.update(targetfiles) 681 forcerules.update(targetrules) 682 683 rules = self.rules 684 if allowed_rules: 685 allowed_rules = set(allowed_rules) 686 rules = [rule for rule in rules if rule.name in allowed_rules] 687 688 if wait_for_files is not None: 689 try: 690 snakemake.io.wait_for_files(wait_for_files, latency_wait=latency_wait) 691 except IOError as e: 692 logger.error(str(e)) 693 return False 694 695 dag = DAG( 696 self, 697 rules, 698 dryrun=dryrun, 699 targetfiles=targetfiles, 700 targetrules=targetrules, 701 # when cleaning up conda, we should enforce all possible jobs 702 # since their envs shall not be deleted 703 forceall=forceall or conda_cleanup_envs, 704 forcefiles=forcefiles, 705 forcerules=forcerules, 706 priorityfiles=priorityfiles, 707 priorityrules=priorityrules, 708 untilfiles=untilfiles, 709 untilrules=untilrules, 710 omitfiles=omitfiles, 711 omitrules=omitrules, 712 ignore_ambiguity=ignore_ambiguity, 713 force_incomplete=force_incomplete, 714 ignore_incomplete=ignore_incomplete 715 or printdag 716 or printrulegraph 717 or printfilegraph, 718 notemp=notemp, 719 keep_remote_local=keep_remote_local, 720 batch=batch, 721 ) 722 723 self.persistence = Persistence( 724 nolock=nolock, 725 dag=dag, 726 conda_prefix=self.conda_prefix, 727 singularity_prefix=self.singularity_prefix, 728 shadow_prefix=self.shadow_prefix, 729 warn_only=dryrun 730 or printrulegraph 731 or printfilegraph 732 or printdag 733 or summary 734 or archive 735 or list_version_changes 736 or list_code_changes 737 or list_input_changes 738 or list_params_changes 739 or list_untracked 740 or delete_all_output 741 or delete_temp_output, 742 ) 743 744 if self.mode in [Mode.subprocess, Mode.cluster]: 745 self.persistence.deactivate_cache() 746 747 if cleanup_metadata: 748 for f in cleanup_metadata: 749 self.persistence.cleanup_metadata(f) 750 return True 751 752 if unlock: 753 try: 754 self.persistence.cleanup_locks() 755 logger.info("Unlocking working directory.") 756 return True 757 except IOError: 758 logger.error( 759 "Error: Unlocking the directory {} failed. Maybe " 760 "you don't have the permissions?" 761 ) 762 return False 763 764 logger.info("Building DAG of jobs...") 765 dag.init() 766 dag.update_checkpoint_dependencies() 767 dag.check_dynamic() 768 769 try: 770 self.persistence.lock() 771 except IOError: 772 logger.error( 773 "Error: Directory cannot be locked. Please make " 774 "sure that no other Snakemake process is trying to create " 775 "the same files in the following directory:\n{}\n" 776 "If you are sure that no other " 777 "instances of snakemake are running on this directory, " 778 "the remaining lock was likely caused by a kill signal or " 779 "a power loss. It can be removed with " 780 "the --unlock argument.".format(os.getcwd()) 781 ) 782 return False 783 784 if cleanup_shadow: 785 self.persistence.cleanup_shadow() 786 return True 787 788 if ( 789 self.subworkflows 790 and self.execute_subworkflows 791 and not printdag 792 and not printrulegraph 793 and not printfilegraph 794 ): 795 # backup globals 796 globals_backup = dict(self.globals) 797 # execute subworkflows 798 for subworkflow in self.subworkflows: 799 subworkflow_targets = subworkflow.targets(dag) 800 logger.debug( 801 "Files requested from subworkflow:\n {}".format( 802 "\n ".join(subworkflow_targets) 803 ) 804 ) 805 updated = list() 806 if subworkflow_targets: 807 logger.info("Executing subworkflow {}.".format(subworkflow.name)) 808 if not subsnakemake( 809 subworkflow.snakefile, 810 workdir=subworkflow.workdir, 811 targets=subworkflow_targets, 812 cores=self._cores, 813 nodes=self.nodes, 814 configfiles=[subworkflow.configfile] 815 if subworkflow.configfile 816 else None, 817 updated_files=updated, 818 ): 819 return False 820 dag.updated_subworkflow_files.update( 821 subworkflow.target(f) for f in updated 822 ) 823 else: 824 logger.info( 825 "Subworkflow {}: {}".format( 826 subworkflow.name, NOTHING_TO_BE_DONE_MSG 827 ) 828 ) 829 if self.subworkflows: 830 logger.info("Executing main workflow.") 831 # rescue globals 832 self.globals.update(globals_backup) 833 834 dag.postprocess(update_needrun=False) 835 if not dryrun: 836 # deactivate IOCache such that from now on we always get updated 837 # size, existence and mtime information 838 # ATTENTION: this may never be removed without really good reason. 839 # Otherwise weird things may happen. 840 self.iocache.deactivate() 841 # clear and deactivate persistence cache, from now on we want to see updates 842 self.persistence.deactivate_cache() 843 844 if nodeps: 845 missing_input = [ 846 f 847 for job in dag.targetjobs 848 for f in job.input 849 if dag.needrun(job) and not os.path.exists(f) 850 ] 851 if missing_input: 852 logger.error( 853 "Dependency resolution disabled (--nodeps) " 854 "but missing input " 855 "files detected. If this happens on a cluster, please make sure " 856 "that you handle the dependencies yourself or turn off " 857 "--immediate-submit. Missing input files:\n{}".format( 858 "\n".join(missing_input) 859 ) 860 ) 861 return False 862 863 updated_files.extend(f for job in dag.needrun_jobs for f in job.output) 864 865 if generate_unit_tests: 866 from snakemake import unit_tests 867 868 path = generate_unit_tests 869 deploy = [] 870 if self.use_conda: 871 deploy.append("conda") 872 if self.use_singularity: 873 deploy.append("singularity") 874 unit_tests.generate( 875 dag, path, deploy, configfiles=self.overwrite_configfiles 876 ) 877 return True 878 elif export_cwl: 879 from snakemake.cwl import dag_to_cwl 880 import json 881 882 with open(export_cwl, "w") as cwl: 883 json.dump(dag_to_cwl(dag), cwl, indent=4) 884 return True 885 elif report: 886 from snakemake.report import auto_report 887 888 auto_report(dag, report, stylesheet=report_stylesheet) 889 return True 890 elif printd3dag: 891 dag.d3dag() 892 return True 893 elif printdag: 894 print(dag) 895 return True 896 elif printrulegraph: 897 print(dag.rule_dot()) 898 return True 899 elif printfilegraph: 900 print(dag.filegraph_dot()) 901 return True 902 elif summary: 903 print("\n".join(dag.summary(detailed=False))) 904 return True 905 elif detailed_summary: 906 print("\n".join(dag.summary(detailed=True))) 907 return True 908 elif archive: 909 dag.archive(archive) 910 return True 911 elif delete_all_output: 912 dag.clean(only_temp=False, dryrun=dryrun) 913 return True 914 elif delete_temp_output: 915 dag.clean(only_temp=True, dryrun=dryrun) 916 return True 917 elif list_version_changes: 918 items = list(chain(*map(self.persistence.version_changed, dag.jobs))) 919 if items: 920 print(*items, sep="\n") 921 return True 922 elif list_code_changes: 923 items = list(chain(*map(self.persistence.code_changed, dag.jobs))) 924 for j in dag.jobs: 925 items.extend(list(j.outputs_older_than_script_or_notebook())) 926 if items: 927 print(*items, sep="\n") 928 return True 929 elif list_input_changes: 930 items = list(chain(*map(self.persistence.input_changed, dag.jobs))) 931 if items: 932 print(*items, sep="\n") 933 return True 934 elif list_params_changes: 935 items = list(chain(*map(self.persistence.params_changed, dag.jobs))) 936 if items: 937 print(*items, sep="\n") 938 return True 939 elif list_untracked: 940 dag.list_untracked() 941 return True 942 943 if self.use_singularity: 944 if assume_shared_fs: 945 dag.pull_container_imgs( 946 dryrun=dryrun or list_conda_envs, quiet=list_conda_envs 947 ) 948 if self.use_conda: 949 if assume_shared_fs: 950 dag.create_conda_envs( 951 dryrun=dryrun or list_conda_envs or conda_cleanup_envs, 952 quiet=list_conda_envs, 953 ) 954 if conda_create_envs_only: 955 return True 956 957 if list_conda_envs: 958 print("environment", "container", "location", sep="\t") 959 for env in set(job.conda_env for job in dag.jobs): 960 if env: 961 print( 962 env.file.simplify_path(), 963 env.container_img_url or "", 964 simplify_path(env.path), 965 sep="\t", 966 ) 967 return True 968 969 if conda_cleanup_envs: 970 self.persistence.conda_cleanup_envs() 971 return True 972 973 self.scheduler = JobScheduler( 974 self, 975 dag, 976 local_cores=local_cores, 977 dryrun=dryrun, 978 touch=touch, 979 cluster=cluster, 980 cluster_status=cluster_status, 981 cluster_config=cluster_config, 982 cluster_sync=cluster_sync, 983 jobname=jobname, 984 max_jobs_per_second=max_jobs_per_second, 985 max_status_checks_per_second=max_status_checks_per_second, 986 quiet=quiet, 987 keepgoing=keepgoing, 988 drmaa=drmaa, 989 drmaa_log_dir=drmaa_log_dir, 990 kubernetes=kubernetes, 991 tibanna=tibanna, 992 tibanna_sfn=tibanna_sfn, 993 google_lifesciences=google_lifesciences, 994 google_lifesciences_regions=google_lifesciences_regions, 995 google_lifesciences_location=google_lifesciences_location, 996 google_lifesciences_cache=google_lifesciences_cache, 997 tes=tes, 998 preemption_default=preemption_default, 999 preemptible_rules=preemptible_rules, 1000 precommand=precommand, 1001 tibanna_config=tibanna_config, 1002 container_image=container_image, 1003 printreason=printreason, 1004 printshellcmds=printshellcmds, 1005 latency_wait=latency_wait, 1006 greediness=greediness, 1007 force_use_threads=force_use_threads, 1008 assume_shared_fs=assume_shared_fs, 1009 keepincomplete=keepincomplete, 1010 keepmetadata=keepmetadata, 1011 scheduler_type=scheduler_type, 1012 scheduler_ilp_solver=scheduler_ilp_solver, 1013 ) 1014 1015 if not dryrun: 1016 if len(dag): 1017 shell_exec = shell.get_executable() 1018 if shell_exec is not None: 1019 logger.info("Using shell: {}".format(shell_exec)) 1020 if cluster or cluster_sync or drmaa: 1021 logger.resources_info( 1022 "Provided cluster nodes: {}".format(self.nodes) 1023 ) 1024 elif kubernetes or tibanna or google_lifesciences: 1025 logger.resources_info("Provided cloud nodes: {}".format(self.nodes)) 1026 else: 1027 if self._cores is not None: 1028 warning = ( 1029 "" 1030 if self._cores > 1 1031 else " (use --cores to define parallelism)" 1032 ) 1033 logger.resources_info( 1034 "Provided cores: {}{}".format(self._cores, warning) 1035 ) 1036 logger.resources_info( 1037 "Rules claiming more threads " "will be scaled down." 1038 ) 1039 1040 provided_resources = format_resources(self.global_resources) 1041 if provided_resources: 1042 logger.resources_info("Provided resources: " + provided_resources) 1043 1044 if self.run_local and any(rule.group for rule in self.rules): 1045 logger.info("Group jobs: inactive (local execution)") 1046 1047 if not self.use_conda and any(rule.conda_env for rule in self.rules): 1048 logger.info("Conda environments: ignored") 1049 1050 if not self.use_singularity and any( 1051 rule.container_img for rule in self.rules 1052 ): 1053 logger.info("Singularity containers: ignored") 1054 1055 if self.mode == Mode.default: 1056 logger.run_info("\n".join(dag.stats())) 1057 else: 1058 logger.info(NOTHING_TO_BE_DONE_MSG) 1059 else: 1060 # the dryrun case 1061 if len(dag): 1062 logger.run_info("\n".join(dag.stats())) 1063 else: 1064 logger.info(NOTHING_TO_BE_DONE_MSG) 1065 return True 1066 if quiet: 1067 # in case of dryrun and quiet, just print above info and exit 1068 return True 1069 1070 if not dryrun and not no_hooks: 1071 self._onstart(logger.get_logfile()) 1072 1073 success = self.scheduler.schedule() 1074 1075 if not immediate_submit and not dryrun: 1076 dag.cleanup_workdir() 1077 1078 if success: 1079 if dryrun: 1080 if len(dag): 1081 logger.run_info("\n".join(dag.stats())) 1082 logger.info( 1083 "This was a dry-run (flag -n). The order of jobs " 1084 "does not reflect the order of execution." 1085 ) 1086 logger.remove_logfile() 1087 else: 1088 if stats: 1089 self.scheduler.stats.to_json(stats) 1090 logger.logfile_hint() 1091 if not dryrun and not no_hooks: 1092 self._onsuccess(logger.get_logfile()) 1093 return True 1094 else: 1095 if not dryrun and not no_hooks: 1096 self._onerror(logger.get_logfile()) 1097 logger.logfile_hint() 1098 return False 1099 1100 @property 1101 def current_basedir(self): 1102 """Basedir of currently parsed Snakefile.""" 1103 assert self.included_stack 1104 snakefile = self.included_stack[-1] 1105 basedir = snakefile.get_basedir() 1106 if isinstance(basedir, LocalSourceFile): 1107 return basedir.abspath() 1108 else: 1109 return basedir 1110 1111 def source_path(self, rel_path): 1112 """Return path to source file from work dir derived from given path relative to snakefile""" 1113 # TODO download to disk (use source cache) in case of remote file 1114 import inspect 1115 1116 frame = inspect.currentframe().f_back 1117 calling_file = frame.f_code.co_filename 1118 calling_dir = os.path.dirname(calling_file) 1119 path = smart_join(calling_dir, rel_path) 1120 return self.sourcecache.get_path(infer_source_file(path)) 1121 1122 @property 1123 def snakefile(self): 1124 import inspect 1125 1126 frame = inspect.currentframe().f_back 1127 return frame.f_code.co_filename 1128 1129 def register_envvars(self, *envvars): 1130 """ 1131 Register environment variables that shall be passed to jobs. 1132 If used multiple times, union is taken. 1133 """ 1134 undefined = set(var for var in envvars if var not in os.environ) 1135 if self.check_envvars and undefined: 1136 raise WorkflowError( 1137 "The following environment variables are requested by the workflow but undefined. " 1138 "Please make sure that they are correctly defined before running Snakemake:\n" 1139 "{}".format("\n".join(undefined)) 1140 ) 1141 self.envvars.update(envvars) 1142 1143 def containerize(self): 1144 from snakemake.deployment.containerize import containerize 1145 1146 containerize(self) 1147 1148 def include( 1149 self, 1150 snakefile, 1151 overwrite_first_rule=False, 1152 print_compilation=False, 1153 overwrite_shellcmd=None, 1154 ): 1155 """ 1156 Include a snakefile. 1157 """ 1158 basedir = self.current_basedir if self.included_stack else None 1159 snakefile = infer_source_file(snakefile, basedir) 1160 1161 if not self.modifier.allow_rule_overwrite and snakefile in self.included: 1162 logger.info("Multiple includes of {} ignored".format(snakefile)) 1163 return 1164 self.included.append(snakefile) 1165 self.included_stack.append(snakefile) 1166 1167 first_rule = self.first_rule 1168 code, linemap, rulecount = parse( 1169 snakefile, 1170 self, 1171 overwrite_shellcmd=self.overwrite_shellcmd, 1172 rulecount=self._rulecount, 1173 ) 1174 self._rulecount = rulecount 1175 1176 if print_compilation: 1177 print(code) 1178 1179 if isinstance(snakefile, LocalSourceFile): 1180 # insert the current directory into sys.path 1181 # this allows to import modules from the workflow directory 1182 sys.path.insert(0, snakefile.get_basedir().get_path_or_uri()) 1183 1184 self.linemaps[snakefile] = linemap 1185 1186 exec(compile(code, snakefile.get_path_or_uri(), "exec"), self.globals) 1187 1188 if not overwrite_first_rule: 1189 self.first_rule = first_rule 1190 self.included_stack.pop() 1191 1192 def onstart(self, func): 1193 """Register onstart function.""" 1194 self._onstart = func 1195 1196 def onsuccess(self, func): 1197 """Register onsuccess function.""" 1198 self._onsuccess = func 1199 1200 def onerror(self, func): 1201 """Register onerror function.""" 1202 self._onerror = func 1203 1204 def global_wildcard_constraints(self, **content): 1205 """Register global wildcard constraints.""" 1206 self._wildcard_constraints.update(content) 1207 # update all rules so far 1208 for rule in self.rules: 1209 rule.update_wildcard_constraints() 1210 1211 def scattergather(self, **content): 1212 """Register scattergather defaults.""" 1213 self._scatter.update(content) 1214 self._scatter.update(self.overwrite_scatter) 1215 1216 # add corresponding wildcard constraint 1217 self.global_wildcard_constraints(scatteritem="\d+-of-\d+") 1218 1219 def func(*args, **wildcards): 1220 n = self._scatter[key] 1221 return expand( 1222 *args, 1223 scatteritem=map("{{}}-of-{}".format(n).format, range(1, n + 1)), 1224 **wildcards 1225 ) 1226 1227 for key in content: 1228 setattr(self.globals["scatter"], key, func) 1229 setattr(self.globals["gather"], key, func) 1230 1231 def workdir(self, workdir): 1232 """Register workdir.""" 1233 if self.overwrite_workdir is None: 1234 os.makedirs(workdir, exist_ok=True) 1235 self._workdir = workdir 1236 os.chdir(workdir) 1237 1238 def configfile(self, fp): 1239 """Update the global config with data from the given file.""" 1240 global config 1241 if not self.modifier.skip_configfile: 1242 if os.path.exists(fp): 1243 self.configfiles.append(fp) 1244 c = snakemake.io.load_configfile(fp) 1245 update_config(config, c) 1246 if self.overwrite_config: 1247 logger.info( 1248 "Config file {} is extended by additional config specified via the command line.".format( 1249 fp 1250 ) 1251 ) 1252 update_config(config, self.overwrite_config) 1253 elif not self.overwrite_configfiles: 1254 raise WorkflowError( 1255 "Workflow defines configfile {} but it is not present or accessible.".format( 1256 fp 1257 ) 1258 ) 1259 1260 def pepfile(self, path): 1261 global pep 1262 1263 try: 1264 import peppy 1265 except ImportError: 1266 raise WorkflowError("For PEP support, please install peppy.") 1267 1268 self.pepfile = path 1269 pep = peppy.Project(self.pepfile) 1270 1271 def pepschema(self, schema): 1272 global pep 1273 1274 try: 1275 import eido 1276 except ImportError: 1277 raise WorkflowError("For PEP schema support, please install eido.") 1278 1279 if is_local_file(schema) and not os.path.isabs(schema): 1280 # schema is relative to current Snakefile 1281 schema = self.current_basedir.join(schema).get_path_or_uri() 1282 if self.pepfile is None: 1283 raise WorkflowError("Please specify a PEP with the pepfile directive.") 1284 eido.validate_project(project=pep, schema=schema, exclude_case=True) 1285 1286 def report(self, path): 1287 """Define a global report description in .rst format.""" 1288 self.report_text = self.current_basedir.join(path) 1289 1290 @property 1291 def config(self): 1292 return self.globals["config"] 1293 1294 def ruleorder(self, *rulenames): 1295 self._ruleorder.add(*map(self.modifier.modify_rulename, rulenames)) 1296 1297 def subworkflow(self, name, snakefile=None, workdir=None, configfile=None): 1298 # Take absolute path of config file, because it is relative to current 1299 # workdir, which could be changed for the subworkflow. 1300 if configfile: 1301 configfile = os.path.abspath(configfile) 1302 sw = Subworkflow(self, name, snakefile, workdir, configfile) 1303 self._subworkflows[name] = sw 1304 self.globals[name] = sw.target 1305 1306 def localrules(self, *rulenames): 1307 self._localrules.update(rulenames) 1308 1309 def rule(self, name=None, lineno=None, snakefile=None, checkpoint=False): 1310 # choose a name for an unnamed rule 1311 if name is None: 1312 name = str(len(self._rules) + 1) 1313 1314 if self.modifier.skip_rule(name): 1315 1316 def decorate(ruleinfo): 1317 # do nothing, ignore rule 1318 return ruleinfo.func 1319 1320 return decorate 1321 1322 # Optionally let the modifier change the rulename. 1323 orig_name = name 1324 name = self.modifier.modify_rulename(name) 1325 1326 name = self.add_rule( 1327 name, 1328 lineno, 1329 snakefile, 1330 checkpoint, 1331 allow_overwrite=self.modifier.allow_rule_overwrite, 1332 ) 1333 rule = self.get_rule(name) 1334 rule.is_checkpoint = checkpoint 1335 1336 def decorate(ruleinfo): 1337 nonlocal name 1338 1339 # If requested, modify ruleinfo via the modifier. 1340 ruleinfo.apply_modifier(self.modifier) 1341 1342 if ruleinfo.wildcard_constraints: 1343 rule.set_wildcard_constraints( 1344 *ruleinfo.wildcard_constraints[0], 1345 **ruleinfo.wildcard_constraints[1] 1346 ) 1347 if ruleinfo.name: 1348 rule.name = ruleinfo.name 1349 del self._rules[name] 1350 self._rules[ruleinfo.name] = rule 1351 name = rule.name 1352 rule.path_modifier = ruleinfo.path_modifier 1353 if ruleinfo.input: 1354 rule.set_input(*ruleinfo.input[0], **ruleinfo.input[1]) 1355 if ruleinfo.output: 1356 rule.set_output(*ruleinfo.output[0], **ruleinfo.output[1]) 1357 if ruleinfo.params: 1358 rule.set_params(*ruleinfo.params[0], **ruleinfo.params[1]) 1359 # handle default resources 1360 if self.default_resources is not None: 1361 rule.resources = copy.deepcopy(self.default_resources.parsed) 1362 if ruleinfo.threads is not None: 1363 if ( 1364 not isinstance(ruleinfo.threads, int) 1365 and not isinstance(ruleinfo.threads, float) 1366 and not callable(ruleinfo.threads) 1367 ): 1368 raise RuleException( 1369 "Threads value has to be an integer, float, or a callable.", 1370 rule=rule, 1371 ) 1372 if name in self.overwrite_threads: 1373 rule.resources["_cores"] = self.overwrite_threads[name] 1374 else: 1375 if isinstance(ruleinfo.threads, float): 1376 ruleinfo.threads = int(ruleinfo.threads) 1377 rule.resources["_cores"] = ruleinfo.threads 1378 if ruleinfo.shadow_depth: 1379 if ruleinfo.shadow_depth not in ( 1380 True, 1381 "shallow", 1382 "full", 1383 "minimal", 1384 "copy-minimal", 1385 ): 1386 raise RuleException( 1387 "Shadow must either be 'minimal', 'copy-minimal', 'shallow', 'full', " 1388 "or True (equivalent to 'full')", 1389 rule=rule, 1390 ) 1391 if ruleinfo.shadow_depth is True: 1392 rule.shadow_depth = "full" 1393 logger.warning( 1394 "Shadow is set to True in rule {} (equivalent to 'full'). It's encouraged to use the more explicit options 'minimal|copy-minimal|shallow|full' instead.".format( 1395 rule 1396 ) 1397 ) 1398 else: 1399 rule.shadow_depth = ruleinfo.shadow_depth 1400 if ruleinfo.resources: 1401 args, resources = ruleinfo.resources 1402 if args: 1403 raise RuleException("Resources have to be named.") 1404 if not all( 1405 map( 1406 lambda r: isinstance(r, int) 1407 or isinstance(r, str) 1408 or callable(r), 1409 resources.values(), 1410 ) 1411 ): 1412 raise RuleException( 1413 "Resources values have to be integers, strings, or callables (functions)", 1414 rule=rule, 1415 ) 1416 rule.resources.update(resources) 1417 if name in self.overwrite_resources: 1418 rule.resources.update(self.overwrite_resources[name]) 1419 1420 if ruleinfo.priority: 1421 if not isinstance(ruleinfo.priority, int) and not isinstance( 1422 ruleinfo.priority, float 1423 ): 1424 raise RuleException( 1425 "Priority values have to be numeric.", rule=rule 1426 ) 1427 rule.priority = ruleinfo.priority 1428 if ruleinfo.version: 1429 rule.version = ruleinfo.version 1430 if ruleinfo.log: 1431 rule.set_log(*ruleinfo.log[0], **ruleinfo.log[1]) 1432 if ruleinfo.message: 1433 rule.message = ruleinfo.message 1434 if ruleinfo.benchmark: 1435 rule.benchmark = ruleinfo.benchmark 1436 if not self.run_local: 1437 group = self.overwrite_groups.get(name) or ruleinfo.group 1438 if group is not None: 1439 rule.group = group 1440 if ruleinfo.wrapper: 1441 rule.conda_env = snakemake.wrapper.get_conda_env( 1442 ruleinfo.wrapper, prefix=self.wrapper_prefix 1443 ) 1444 # TODO retrieve suitable singularity image 1445 1446 if ruleinfo.env_modules: 1447 # If using environment modules and they are defined for the rule, 1448 # ignore conda and singularity directive below. 1449 # The reason is that this is likely intended in order to use 1450 # a software stack specifically compiled for a particular 1451 # HPC cluster. 1452 invalid_rule = not ( 1453 ruleinfo.script 1454 or ruleinfo.wrapper 1455 or ruleinfo.shellcmd 1456 or ruleinfo.notebook 1457 ) 1458 if invalid_rule: 1459 raise RuleException( 1460 "envmodules directive is only allowed with " 1461 "shell, script, notebook, or wrapper directives (not with run)", 1462 rule=rule, 1463 ) 1464 from snakemake.deployment.env_modules import EnvModules 1465 1466 rule.env_modules = EnvModules(*ruleinfo.env_modules) 1467 1468 if ruleinfo.conda_env: 1469 if not ( 1470 ruleinfo.script 1471 or ruleinfo.wrapper 1472 or ruleinfo.shellcmd 1473 or ruleinfo.notebook 1474 ): 1475 raise RuleException( 1476 "Conda environments are only allowed " 1477 "with shell, script, notebook, or wrapper directives " 1478 "(not with run).", 1479 rule=rule, 1480 ) 1481 1482 if ( 1483 ruleinfo.conda_env is not None 1484 and is_local_file(ruleinfo.conda_env) 1485 and not os.path.isabs(ruleinfo.conda_env) 1486 ): 1487 ruleinfo.conda_env = self.current_basedir.join( 1488 ruleinfo.conda_env 1489 ).get_path_or_uri() 1490 rule.conda_env = ruleinfo.conda_env 1491 1492 invalid_rule = not ( 1493 ruleinfo.script 1494 or ruleinfo.wrapper 1495 or ruleinfo.shellcmd 1496 or ruleinfo.notebook 1497 ) 1498 if ruleinfo.container_img: 1499 if invalid_rule: 1500 raise RuleException( 1501 "Singularity directive is only allowed " 1502 "with shell, script, notebook or wrapper directives " 1503 "(not with run).", 1504 rule=rule, 1505 ) 1506 rule.container_img = ruleinfo.container_img 1507 rule.is_containerized = ruleinfo.is_containerized 1508 elif self.global_container_img: 1509 if not invalid_rule and ruleinfo.container_img != False: 1510 # skip rules with run directive or empty image 1511 rule.container_img = self.global_container_img 1512 rule.is_containerized = self.global_is_containerized 1513 1514 rule.norun = ruleinfo.norun 1515 if ruleinfo.name is not None: 1516 rule.name = ruleinfo.name 1517 rule.docstring = ruleinfo.docstring 1518 rule.run_func = ruleinfo.func 1519 rule.shellcmd = ruleinfo.shellcmd 1520 rule.script = ruleinfo.script 1521 rule.notebook = ruleinfo.notebook 1522 rule.wrapper = ruleinfo.wrapper 1523 rule.cwl = ruleinfo.cwl 1524 rule.restart_times = self.restart_times 1525 rule.basedir = self.current_basedir 1526 1527 if ruleinfo.handover: 1528 if not ruleinfo.resources: 1529 # give all available resources to the rule 1530 rule.resources.update( 1531 { 1532 name: val 1533 for name, val in self.global_resources.items() 1534 if val is not None 1535 } 1536 ) 1537 # This becomes a local rule, which might spawn jobs to a cluster, 1538 # depending on its configuration (e.g. nextflow config). 1539 self._localrules.add(rule.name) 1540 rule.is_handover = True 1541 1542 if ruleinfo.cache is True: 1543 if not self.enable_cache: 1544 logger.warning( 1545 "Workflow defines that rule {} is eligible for caching between workflows " 1546 "(use the --cache argument to enable this).".format(rule.name) 1547 ) 1548 else: 1549 self.cache_rules.add(rule.name) 1550 elif not (ruleinfo.cache is False): 1551 raise WorkflowError( 1552 "Invalid argument for 'cache:' directive. Only true allowed. " 1553 "To deactivate caching, remove directive.", 1554 rule=rule, 1555 ) 1556 1557 ruleinfo.func.__name__ = "__{}".format(rule.name) 1558 self.globals[ruleinfo.func.__name__] = ruleinfo.func 1559 1560 rule_proxy = RuleProxy(rule) 1561 if orig_name is not None: 1562 setattr(self.globals["rules"], orig_name, rule_proxy) 1563 setattr(self.globals["rules"], rule.name, rule_proxy) 1564 1565 if checkpoint: 1566 self.globals["checkpoints"].register(rule, fallback_name=orig_name) 1567 rule.ruleinfo = ruleinfo 1568 return ruleinfo.func 1569 1570 return decorate 1571 1572 def docstring(self, string): 1573 def decorate(ruleinfo): 1574 ruleinfo.docstring = string 1575 return ruleinfo 1576 1577 return decorate 1578 1579 def input(self, *paths, **kwpaths): 1580 def decorate(ruleinfo): 1581 ruleinfo.input = (paths, kwpaths) 1582 return ruleinfo 1583 1584 return decorate 1585 1586 def output(self, *paths, **kwpaths): 1587 def decorate(ruleinfo): 1588 ruleinfo.output = (paths, kwpaths) 1589 return ruleinfo 1590 1591 return decorate 1592 1593 def params(self, *params, **kwparams): 1594 def decorate(ruleinfo): 1595 ruleinfo.params = (params, kwparams) 1596 return ruleinfo 1597 1598 return decorate 1599 1600 def wildcard_constraints(self, *wildcard_constraints, **kwwildcard_constraints): 1601 def decorate(ruleinfo): 1602 ruleinfo.wildcard_constraints = ( 1603 wildcard_constraints, 1604 kwwildcard_constraints, 1605 ) 1606 return ruleinfo 1607 1608 return decorate 1609 1610 def cache_rule(self, cache): 1611 def decorate(ruleinfo): 1612 ruleinfo.cache = cache 1613 return ruleinfo 1614 1615 return decorate 1616 1617 def message(self, message): 1618 def decorate(ruleinfo): 1619 ruleinfo.message = message 1620 return ruleinfo 1621 1622 return decorate 1623 1624 def benchmark(self, benchmark): 1625 def decorate(ruleinfo): 1626 ruleinfo.benchmark = benchmark 1627 return ruleinfo 1628 1629 return decorate 1630 1631 def conda(self, conda_env): 1632 def decorate(ruleinfo): 1633 ruleinfo.conda_env = conda_env 1634 return ruleinfo 1635 1636 return decorate 1637 1638 def container(self, container_img): 1639 def decorate(ruleinfo): 1640 # Explicitly set container_img to False if None is passed, indicating that 1641 # no container image shall be used, also not a global one. 1642 ruleinfo.container_img = ( 1643 container_img if container_img is not None else False 1644 ) 1645 ruleinfo.is_containerized = False 1646 return ruleinfo 1647 1648 return decorate 1649 1650 def containerized(self, container_img): 1651 def decorate(ruleinfo): 1652 ruleinfo.container_img = container_img 1653 ruleinfo.is_containerized = True 1654 return ruleinfo 1655 1656 return decorate 1657 1658 def envmodules(self, *env_modules): 1659 def decorate(ruleinfo): 1660 ruleinfo.env_modules = env_modules 1661 return ruleinfo 1662 1663 return decorate 1664 1665 def global_container(self, container_img): 1666 self.global_container_img = container_img 1667 self.global_is_containerized = False 1668 1669 def global_containerized(self, container_img): 1670 self.global_container_img = container_img 1671 self.global_is_containerized = True 1672 1673 def threads(self, threads): 1674 def decorate(ruleinfo): 1675 ruleinfo.threads = threads 1676 return ruleinfo 1677 1678 return decorate 1679 1680 def shadow(self, shadow_depth): 1681 def decorate(ruleinfo): 1682 ruleinfo.shadow_depth = shadow_depth 1683 return ruleinfo 1684 1685 return decorate 1686 1687 def resources(self, *args, **resources): 1688 def decorate(ruleinfo): 1689 ruleinfo.resources = (args, resources) 1690 return ruleinfo 1691 1692 return decorate 1693 1694 def priority(self, priority): 1695 def decorate(ruleinfo): 1696 ruleinfo.priority = priority 1697 return ruleinfo 1698 1699 return decorate 1700 1701 def version(self, version): 1702 def decorate(ruleinfo): 1703 ruleinfo.version = version 1704 return ruleinfo 1705 1706 return decorate 1707 1708 def group(self, group): 1709 def decorate(ruleinfo): 1710 ruleinfo.group = group 1711 return ruleinfo 1712 1713 return decorate 1714 1715 def log(self, *logs, **kwlogs): 1716 def decorate(ruleinfo): 1717 ruleinfo.log = (logs, kwlogs) 1718 return ruleinfo 1719 1720 return decorate 1721 1722 def handover(self, value): 1723 def decorate(ruleinfo): 1724 ruleinfo.handover = value 1725 return ruleinfo 1726 1727 return decorate 1728 1729 def shellcmd(self, cmd): 1730 def decorate(ruleinfo): 1731 ruleinfo.shellcmd = cmd 1732 return ruleinfo 1733 1734 return decorate 1735 1736 def script(self, script): 1737 def decorate(ruleinfo): 1738 ruleinfo.script = script 1739 return ruleinfo 1740 1741 return decorate 1742 1743 def notebook(self, notebook): 1744 def decorate(ruleinfo): 1745 ruleinfo.notebook = notebook 1746 return ruleinfo 1747 1748 return decorate 1749 1750 def wrapper(self, wrapper): 1751 def decorate(ruleinfo): 1752 ruleinfo.wrapper = wrapper 1753 return ruleinfo 1754 1755 return decorate 1756 1757 def cwl(self, cwl): 1758 def decorate(ruleinfo): 1759 ruleinfo.cwl = cwl 1760 return ruleinfo 1761 1762 return decorate 1763 1764 def norun(self): 1765 def decorate(ruleinfo): 1766 ruleinfo.norun = True 1767 return ruleinfo 1768 1769 return decorate 1770 1771 def name(self, name): 1772 def decorate(ruleinfo): 1773 ruleinfo.name = name 1774 return ruleinfo 1775 1776 return decorate 1777 1778 def run(self, func): 1779 return RuleInfo(func) 1780 1781 def module( 1782 self, 1783 name, 1784 snakefile=None, 1785 meta_wrapper=None, 1786 config=None, 1787 skip_validation=False, 1788 replace_prefix=None, 1789 ): 1790 self.modules[name] = ModuleInfo( 1791 self, 1792 name, 1793 snakefile=snakefile, 1794 meta_wrapper=meta_wrapper, 1795 config=config, 1796 skip_validation=skip_validation, 1797 replace_prefix=replace_prefix, 1798 ) 1799 1800 def userule(self, rules=None, from_module=None, name_modifier=None, lineno=None): 1801 def decorate(maybe_ruleinfo): 1802 if from_module is not None: 1803 try: 1804 module = self.modules[from_module] 1805 except KeyError: 1806 raise WorkflowError( 1807 "Module {} has not been registered with 'module' statement before using it in 'use rule' statement.".format( 1808 from_module 1809 ) 1810 ) 1811 module.use_rules( 1812 rules, 1813 name_modifier, 1814 ruleinfo=None if callable(maybe_ruleinfo) else maybe_ruleinfo, 1815 ) 1816 else: 1817 # local inheritance 1818 if len(rules) > 1: 1819 raise WorkflowError( 1820 "'use rule' statement from rule in the same module must declare a single rule but multiple rules are declared." 1821 ) 1822 orig_rule = self._rules[rules[0]] 1823 ruleinfo = maybe_ruleinfo if not callable(maybe_ruleinfo) else None 1824 with WorkflowModifier( 1825 self, 1826 rulename_modifier=get_name_modifier_func(rules, name_modifier), 1827 ruleinfo_overwrite=ruleinfo, 1828 ): 1829 self.rule( 1830 name=name_modifier, 1831 lineno=lineno, 1832 snakefile=self.included_stack[-1], 1833 )(orig_rule.ruleinfo) 1834 1835 return decorate 1836 1837 @staticmethod 1838 def _empty_decorator(f): 1839 return f 1840 1841 1842class Subworkflow: 1843 def __init__(self, workflow, name, snakefile, workdir, configfile): 1844 self.workflow = workflow 1845 self.name = name 1846 self._snakefile = snakefile 1847 self._workdir = workdir 1848 self.configfile = configfile 1849 1850 @property 1851 def snakefile(self): 1852 if self._snakefile is None: 1853 return os.path.abspath(os.path.join(self.workdir, "Snakefile")) 1854 if not os.path.isabs(self._snakefile): 1855 return os.path.abspath(os.path.join(self.workflow.basedir, self._snakefile)) 1856 return self._snakefile 1857 1858 @property 1859 def workdir(self): 1860 workdir = "." if self._workdir is None else self._workdir 1861 if not os.path.isabs(workdir): 1862 return os.path.abspath(os.path.join(self.workflow.basedir, workdir)) 1863 return workdir 1864 1865 def target(self, paths): 1866 if not_iterable(paths): 1867 path = paths 1868 path = ( 1869 path 1870 if os.path.isabs(path) or path.startswith("root://") 1871 else os.path.join(self.workdir, path) 1872 ) 1873 return flag(path, "subworkflow", self) 1874 return [self.target(path) for path in paths] 1875 1876 def targets(self, dag): 1877 def relpath(f): 1878 if f.startswith(self.workdir): 1879 return os.path.relpath(f, start=self.workdir) 1880 # do not adjust absolute targets outside of workdir 1881 return f 1882 1883 return [ 1884 relpath(f) 1885 for job in dag.jobs 1886 for f in job.subworkflow_input 1887 if job.subworkflow_input[f] is self 1888 ] 1889 1890 1891def srcdir(path): 1892 """Return the absolute path, relative to the source directory of the current Snakefile.""" 1893 if not workflow.included_stack: 1894 return None 1895 return workflow.current_basedir.join(path).get_path_or_uri() 1896