1__author__ = "Johannes Köster"
2__copyright__ = "Copyright 2021, Johannes Köster"
3__email__ = "johannes.koester@uni-due.de"
4__license__ = "MIT"
5
6import re
7import os
8import sys
9import signal
10import json
11from tokenize import maybe
12import urllib
13from collections import OrderedDict, namedtuple
14from itertools import filterfalse, chain
15from functools import partial
16from operator import attrgetter
17import copy
18import subprocess
19from pathlib import Path
20from urllib.request import pathname2url, url2pathname
21
22
23from snakemake.logging import logger, format_resources, format_resource_names
24from snakemake.rules import Rule, Ruleorder, RuleProxy
25from snakemake.exceptions import (
26    CreateCondaEnvironmentException,
27    RuleException,
28    CreateRuleException,
29    UnknownRuleException,
30    NoRulesException,
31    print_exception,
32    WorkflowError,
33)
34from snakemake.shell import shell
35from snakemake.dag import DAG
36from snakemake.scheduler import JobScheduler
37from snakemake.parser import parse
38import snakemake.io
39from snakemake.io import (
40    protected,
41    temp,
42    temporary,
43    ancient,
44    directory,
45    expand,
46    dynamic,
47    glob_wildcards,
48    flag,
49    not_iterable,
50    touch,
51    unpack,
52    local,
53    pipe,
54    repeat,
55    report,
56    multiext,
57    IOFile,
58)
59from snakemake.persistence import Persistence
60from snakemake.utils import update_config
61from snakemake.script import script
62from snakemake.notebook import notebook
63from snakemake.wrapper import wrapper
64from snakemake.cwl import cwl
65import snakemake.wrapper
66from snakemake.common import (
67    Mode,
68    bytesto,
69    ON_WINDOWS,
70    is_local_file,
71    parse_uri,
72    Rules,
73    Scatter,
74    Gather,
75    smart_join,
76    NOTHING_TO_BE_DONE_MSG,
77)
78from snakemake.utils import simplify_path
79from snakemake.checkpoints import Checkpoint, Checkpoints
80from snakemake.resources import DefaultResources
81from snakemake.caching.local import OutputFileCache as LocalOutputFileCache
82from snakemake.caching.remote import OutputFileCache as RemoteOutputFileCache
83from snakemake.modules import ModuleInfo, WorkflowModifier, get_name_modifier_func
84from snakemake.ruleinfo import RuleInfo
85from snakemake.sourcecache import (
86    GenericSourceFile,
87    LocalSourceFile,
88    SourceCache,
89    SourceFile,
90    infer_source_file,
91)
92from snakemake.deployment.conda import Conda
93from snakemake import sourcecache
94
95
96class Workflow:
97    def __init__(
98        self,
99        snakefile=None,
100        jobscript=None,
101        overwrite_shellcmd=None,
102        overwrite_config=None,
103        overwrite_workdir=None,
104        overwrite_configfiles=None,
105        overwrite_clusterconfig=None,
106        overwrite_threads=None,
107        overwrite_scatter=None,
108        overwrite_groups=None,
109        overwrite_resources=None,
110        group_components=None,
111        config_args=None,
112        debug=False,
113        verbose=False,
114        use_conda=False,
115        conda_frontend=None,
116        conda_prefix=None,
117        use_singularity=False,
118        use_env_modules=False,
119        singularity_prefix=None,
120        singularity_args="",
121        shadow_prefix=None,
122        scheduler_type="ilp",
123        scheduler_ilp_solver=None,
124        mode=Mode.default,
125        wrapper_prefix=None,
126        printshellcmds=False,
127        restart_times=None,
128        attempt=1,
129        default_remote_provider=None,
130        default_remote_prefix="",
131        run_local=True,
132        default_resources=None,
133        cache=None,
134        nodes=1,
135        cores=1,
136        resources=None,
137        conda_cleanup_pkgs=None,
138        edit_notebook=False,
139        envvars=None,
140        max_inventory_wait_time=20,
141        conda_not_block_search_path_envvars=False,
142        execute_subworkflows=True,
143        scheduler_solver_path=None,
144        conda_base_path=None,
145        check_envvars=True,
146        max_threads=None,
147        all_temp=False,
148    ):
149        """
150        Create the controller.
151        """
152
153        self.global_resources = dict() if resources is None else resources
154        self.global_resources["_cores"] = cores
155        self.global_resources["_nodes"] = nodes
156
157        self._rules = OrderedDict()
158        self.first_rule = None
159        self._workdir = None
160        self.overwrite_workdir = overwrite_workdir
161        self.workdir_init = os.path.abspath(os.curdir)
162        self._ruleorder = Ruleorder()
163        self._localrules = set()
164        self.linemaps = dict()
165        self.rule_count = 0
166        self.basedir = os.path.dirname(snakefile)
167        self.main_snakefile = os.path.abspath(snakefile)
168        self.included = []
169        self.included_stack = []
170        self.jobscript = jobscript
171        self.persistence = None
172        self._subworkflows = dict()
173        self.overwrite_shellcmd = overwrite_shellcmd
174        self.overwrite_config = overwrite_config or dict()
175        self.overwrite_configfiles = overwrite_configfiles
176        self.overwrite_clusterconfig = overwrite_clusterconfig or dict()
177        self.overwrite_threads = overwrite_threads or dict()
178        self.overwrite_resources = overwrite_resources or dict()
179        self.config_args = config_args
180        self.immediate_submit = None
181        self._onsuccess = lambda log: None
182        self._onerror = lambda log: None
183        self._onstart = lambda log: None
184        self._wildcard_constraints = dict()
185        self.debug = debug
186        self.verbose = verbose
187        self._rulecount = 0
188        self.use_conda = use_conda
189        self.conda_frontend = conda_frontend
190        self.conda_prefix = conda_prefix
191        self.use_singularity = use_singularity
192        self.use_env_modules = use_env_modules
193        self.singularity_prefix = singularity_prefix
194        self.singularity_args = singularity_args
195        self.shadow_prefix = shadow_prefix
196        self.scheduler_type = scheduler_type
197        self.scheduler_ilp_solver = scheduler_ilp_solver
198        self.global_container_img = None
199        self.global_is_containerized = False
200        self.mode = mode
201        self.wrapper_prefix = wrapper_prefix
202        self.printshellcmds = printshellcmds
203        self.restart_times = restart_times
204        self.attempt = attempt
205        self.default_remote_provider = default_remote_provider
206        self.default_remote_prefix = default_remote_prefix
207        self.configfiles = list(overwrite_configfiles) or []
208        self.run_local = run_local
209        self.report_text = None
210        self.conda_cleanup_pkgs = conda_cleanup_pkgs
211        self.edit_notebook = edit_notebook
212        # environment variables to pass to jobs
213        # These are defined via the "envvars:" syntax in the Snakefile itself
214        self.envvars = set()
215        self.overwrite_groups = overwrite_groups or dict()
216        self.group_components = group_components or dict()
217        self._scatter = dict(overwrite_scatter or dict())
218        self.overwrite_scatter = overwrite_scatter or dict()
219        self.conda_not_block_search_path_envvars = conda_not_block_search_path_envvars
220        self.execute_subworkflows = execute_subworkflows
221        self.modules = dict()
222        self.sourcecache = SourceCache()
223        self.scheduler_solver_path = scheduler_solver_path
224        self._conda_base_path = conda_base_path
225        self.check_envvars = check_envvars
226        self.max_threads = max_threads
227        self.all_temp = all_temp
228        self.scheduler = None
229
230        _globals = globals()
231        _globals["workflow"] = self
232        _globals["cluster_config"] = copy.deepcopy(self.overwrite_clusterconfig)
233        _globals["rules"] = Rules()
234        _globals["checkpoints"] = Checkpoints()
235        _globals["scatter"] = Scatter()
236        _globals["gather"] = Gather()
237        _globals["github"] = sourcecache.GithubFile
238        _globals["gitlab"] = sourcecache.GitlabFile
239
240        self.vanilla_globals = dict(_globals)
241        self.modifier_stack = [WorkflowModifier(self, globals=_globals)]
242
243        self.enable_cache = False
244        if cache is not None:
245            self.enable_cache = True
246            self.cache_rules = set(cache)
247            if self.default_remote_provider is not None:
248                self.output_file_cache = RemoteOutputFileCache(
249                    self.default_remote_provider
250                )
251            else:
252                self.output_file_cache = LocalOutputFileCache()
253        else:
254            self.output_file_cache = None
255            self.cache_rules = set()
256
257        if default_resources is not None:
258            self.default_resources = default_resources
259        else:
260            # only _cores, _nodes, and _tmpdir
261            self.default_resources = DefaultResources(mode="bare")
262
263        self.iocache = snakemake.io.IOCache(max_inventory_wait_time)
264
265        self.globals["config"] = copy.deepcopy(self.overwrite_config)
266
267        if envvars is not None:
268            self.register_envvars(*envvars)
269
270    @property
271    def conda_base_path(self):
272        if self._conda_base_path:
273            return self._conda_base_path
274        if self.use_conda:
275            try:
276                return Conda().prefix_path
277            except CreateCondaEnvironmentException as e:
278                # Return no preset conda base path now and report error later in jobs.
279                return None
280        else:
281            return None
282
283    @property
284    def modifier(self):
285        return self.modifier_stack[-1]
286
287    @property
288    def globals(self):
289        return self.modifier.globals
290
291    def lint(self, json=False):
292        from snakemake.linting.rules import RuleLinter
293        from snakemake.linting.snakefiles import SnakefileLinter
294
295        json_snakefile_lints, snakefile_linted = SnakefileLinter(
296            self, self.included
297        ).lint(json=json)
298        json_rule_lints, rules_linted = RuleLinter(self, self.rules).lint(json=json)
299
300        linted = snakefile_linted or rules_linted
301
302        if json:
303            import json
304
305            print(
306                json.dumps(
307                    {"snakefiles": json_snakefile_lints, "rules": json_rule_lints},
308                    indent=2,
309                )
310            )
311        else:
312            if not linted:
313                logger.info("Congratulations, your workflow is in a good condition!")
314        return linted
315
316    def is_cached_rule(self, rule: Rule):
317        return rule.name in self.cache_rules
318
319    def get_sources(self):
320        files = set()
321
322        def local_path(f):
323            if not isinstance(f, SourceFile) and is_local_file(f):
324                return f
325            if isinstance(f, LocalSourceFile):
326                return f.get_path_or_uri()
327
328        def norm_rule_relpath(f, rule):
329            if not os.path.isabs(f):
330                f = os.path.join(rule.basedir, f)
331            return os.path.relpath(f)
332
333        # get registered sources
334        for f in self.included:
335            f = local_path(f)
336            if f:
337                try:
338                    f = os.path.relpath(f)
339                except ValueError:
340                    if ON_WINDOWS:
341                        pass  # relpath doesn't work on win if files are on different drive
342                    else:
343                        raise
344                files.add(f)
345        for rule in self.rules:
346            script_path = rule.script or rule.notebook
347            if script_path:
348                script_path = norm_rule_relpath(script_path, rule)
349                files.add(script_path)
350                script_dir = os.path.dirname(script_path)
351                files.update(
352                    os.path.join(dirpath, f)
353                    for dirpath, _, files in os.walk(script_dir)
354                    for f in files
355                )
356            if rule.conda_env:
357                f = local_path(rule.conda_env)
358                if f:
359                    # url points to a local env file
360                    env_path = norm_rule_relpath(f, rule)
361                    files.add(env_path)
362
363        for f in self.configfiles:
364            files.add(f)
365
366        # get git-managed files
367        # TODO allow a manifest file as alternative
368        try:
369            out = subprocess.check_output(
370                ["git", "ls-files", "--recurse-submodules", "."], stderr=subprocess.PIPE
371            )
372            for f in out.decode().split("\n"):
373                if f:
374                    files.add(os.path.relpath(f))
375        except subprocess.CalledProcessError as e:
376            if "fatal: not a git repository" in e.stderr.decode().lower():
377                logger.warning(
378                    "Unable to retrieve additional files from git. "
379                    "This is not a git repository."
380                )
381            else:
382                raise WorkflowError(
383                    "Error executing git:\n{}".format(e.stderr.decode())
384                )
385
386        return files
387
388    def check_source_sizes(self, filename, warning_size_gb=0.2):
389        """A helper function to check the filesize, and return the file
390        to the calling function Additionally, given that we encourage these
391        packages to be small, we set a warning at 200MB (0.2GB).
392        """
393        gb = bytesto(os.stat(filename).st_size, "g")
394        if gb > warning_size_gb:
395            logger.warning(
396                "File {} (size {} GB) is greater than the {} GB suggested size "
397                "Consider uploading larger files to storage first.".format(
398                    filename, gb, warning_size_gb
399                )
400            )
401        return filename
402
403    @property
404    def subworkflows(self):
405        return self._subworkflows.values()
406
407    @property
408    def rules(self):
409        return self._rules.values()
410
411    @property
412    def cores(self):
413        if self._cores is None:
414            raise WorkflowError(
415                "Workflow requires a total number of cores to be defined (e.g. because a "
416                "rule defines its number of threads as a fraction of a total number of cores). "
417                "Please set it with --cores N with N being the desired number of cores. "
418                "Consider to use this in combination with --max-threads to avoid "
419                "jobs with too many threads for your setup. Also make sure to perform "
420                "a dryrun first."
421            )
422        return self._cores
423
424    @property
425    def _cores(self):
426        return self.global_resources["_cores"]
427
428    @property
429    def nodes(self):
430        return self.global_resources["_nodes"]
431
432    @property
433    def concrete_files(self):
434        return (
435            file
436            for rule in self.rules
437            for file in chain(rule.input, rule.output)
438            if not callable(file) and not file.contains_wildcard()
439        )
440
441    def check(self):
442        for clause in self._ruleorder:
443            for rulename in clause:
444                if not self.is_rule(rulename):
445                    raise UnknownRuleException(
446                        rulename, prefix="Error in ruleorder definition."
447                    )
448
449    def add_rule(
450        self,
451        name=None,
452        lineno=None,
453        snakefile=None,
454        checkpoint=False,
455        allow_overwrite=False,
456    ):
457        """
458        Add a rule.
459        """
460        is_overwrite = self.is_rule(name)
461        if not allow_overwrite and is_overwrite:
462            raise CreateRuleException(
463                "The name {} is already used by another rule".format(name)
464            )
465        rule = Rule(name, self, lineno=lineno, snakefile=snakefile)
466        self._rules[rule.name] = rule
467        if not is_overwrite:
468            self.rule_count += 1
469        if not self.first_rule:
470            self.first_rule = rule.name
471        return name
472
473    def is_rule(self, name):
474        """
475        Return True if name is the name of a rule.
476
477        Arguments
478        name -- a name
479        """
480        return name in self._rules
481
482    def get_rule(self, name):
483        """
484        Get rule by name.
485
486        Arguments
487        name -- the name of the rule
488        """
489        if not self._rules:
490            raise NoRulesException()
491        if not name in self._rules:
492            raise UnknownRuleException(name)
493        return self._rules[name]
494
495    def list_rules(self, only_targets=False):
496        rules = self.rules
497        if only_targets:
498            rules = filterfalse(Rule.has_wildcards, rules)
499        for rule in rules:
500            logger.rule_info(name=rule.name, docstring=rule.docstring)
501
502    def list_resources(self):
503        for resource in set(
504            resource for rule in self.rules for resource in rule.resources
505        ):
506            if resource not in "_cores _nodes".split():
507                logger.info(resource)
508
509    def is_local(self, rule):
510        return rule.group is None and (rule.name in self._localrules or rule.norun)
511
512    def check_localrules(self):
513        undefined = self._localrules - set(rule.name for rule in self.rules)
514        if undefined:
515            logger.warning(
516                "localrules directive specifies rules that are not "
517                "present in the Snakefile:\n{}\n".format(
518                    "\n".join(map("\t{}".format, undefined))
519                )
520            )
521
522    def inputfile(self, path):
523        """Mark file as being an input file of the workflow.
524
525        This also means that eventual --default-remote-provider/prefix settings
526        will be applied to this file. The file is returned as _IOFile object,
527        such that it can e.g. be transparently opened with _IOFile.open().
528        """
529        if isinstance(path, Path):
530            path = str(path)
531        if self.default_remote_provider is not None:
532            path = self.modifier.modify_path(path)
533        return IOFile(path)
534
535    def execute(
536        self,
537        targets=None,
538        dryrun=False,
539        generate_unit_tests=None,
540        touch=False,
541        scheduler_type=None,
542        scheduler_ilp_solver=None,
543        local_cores=1,
544        forcetargets=False,
545        forceall=False,
546        forcerun=None,
547        until=[],
548        omit_from=[],
549        prioritytargets=None,
550        quiet=False,
551        keepgoing=False,
552        printshellcmds=False,
553        printreason=False,
554        printdag=False,
555        cluster=None,
556        cluster_sync=None,
557        jobname=None,
558        immediate_submit=False,
559        ignore_ambiguity=False,
560        printrulegraph=False,
561        printfilegraph=False,
562        printd3dag=False,
563        drmaa=None,
564        drmaa_log_dir=None,
565        kubernetes=None,
566        tibanna=None,
567        tibanna_sfn=None,
568        google_lifesciences=None,
569        google_lifesciences_regions=None,
570        google_lifesciences_location=None,
571        google_lifesciences_cache=False,
572        tes=None,
573        precommand="",
574        preemption_default=None,
575        preemptible_rules=None,
576        tibanna_config=False,
577        container_image=None,
578        stats=None,
579        force_incomplete=False,
580        ignore_incomplete=False,
581        list_version_changes=False,
582        list_code_changes=False,
583        list_input_changes=False,
584        list_params_changes=False,
585        list_untracked=False,
586        list_conda_envs=False,
587        summary=False,
588        archive=None,
589        delete_all_output=False,
590        delete_temp_output=False,
591        detailed_summary=False,
592        latency_wait=3,
593        wait_for_files=None,
594        nolock=False,
595        unlock=False,
596        notemp=False,
597        nodeps=False,
598        cleanup_metadata=None,
599        conda_cleanup_envs=False,
600        cleanup_shadow=False,
601        cleanup_scripts=True,
602        subsnakemake=None,
603        updated_files=None,
604        keep_target_files=False,
605        keep_shadow=False,
606        keep_remote_local=False,
607        allowed_rules=None,
608        max_jobs_per_second=None,
609        max_status_checks_per_second=None,
610        greediness=1.0,
611        no_hooks=False,
612        force_use_threads=False,
613        conda_create_envs_only=False,
614        assume_shared_fs=True,
615        cluster_status=None,
616        report=None,
617        report_stylesheet=None,
618        export_cwl=False,
619        batch=None,
620        keepincomplete=False,
621        keepmetadata=True,
622    ):
623
624        self.check_localrules()
625        self.immediate_submit = immediate_submit
626        self.cleanup_scripts = cleanup_scripts
627
628        def rules(items):
629            return map(self._rules.__getitem__, filter(self.is_rule, items))
630
631        if keep_target_files:
632
633            def files(items):
634                return filterfalse(self.is_rule, items)
635
636        else:
637
638            def files(items):
639                relpath = (
640                    lambda f: f
641                    if os.path.isabs(f) or f.startswith("root://")
642                    else os.path.relpath(f)
643                )
644                return map(relpath, filterfalse(self.is_rule, items))
645
646        if not targets:
647            targets = [self.first_rule] if self.first_rule is not None else list()
648
649        if prioritytargets is None:
650            prioritytargets = list()
651        if forcerun is None:
652            forcerun = list()
653        if until is None:
654            until = list()
655        if omit_from is None:
656            omit_from = list()
657
658        priorityrules = set(rules(prioritytargets))
659        priorityfiles = set(files(prioritytargets))
660        forcerules = set(rules(forcerun))
661        forcefiles = set(files(forcerun))
662        untilrules = set(rules(until))
663        untilfiles = set(files(until))
664        omitrules = set(rules(omit_from))
665        omitfiles = set(files(omit_from))
666        targetrules = set(
667            chain(
668                rules(targets),
669                filterfalse(Rule.has_wildcards, priorityrules),
670                filterfalse(Rule.has_wildcards, forcerules),
671                filterfalse(Rule.has_wildcards, untilrules),
672            )
673        )
674        targetfiles = set(chain(files(targets), priorityfiles, forcefiles, untilfiles))
675
676        if ON_WINDOWS:
677            targetfiles = set(tf.replace(os.sep, os.altsep) for tf in targetfiles)
678
679        if forcetargets:
680            forcefiles.update(targetfiles)
681            forcerules.update(targetrules)
682
683        rules = self.rules
684        if allowed_rules:
685            allowed_rules = set(allowed_rules)
686            rules = [rule for rule in rules if rule.name in allowed_rules]
687
688        if wait_for_files is not None:
689            try:
690                snakemake.io.wait_for_files(wait_for_files, latency_wait=latency_wait)
691            except IOError as e:
692                logger.error(str(e))
693                return False
694
695        dag = DAG(
696            self,
697            rules,
698            dryrun=dryrun,
699            targetfiles=targetfiles,
700            targetrules=targetrules,
701            # when cleaning up conda, we should enforce all possible jobs
702            # since their envs shall not be deleted
703            forceall=forceall or conda_cleanup_envs,
704            forcefiles=forcefiles,
705            forcerules=forcerules,
706            priorityfiles=priorityfiles,
707            priorityrules=priorityrules,
708            untilfiles=untilfiles,
709            untilrules=untilrules,
710            omitfiles=omitfiles,
711            omitrules=omitrules,
712            ignore_ambiguity=ignore_ambiguity,
713            force_incomplete=force_incomplete,
714            ignore_incomplete=ignore_incomplete
715            or printdag
716            or printrulegraph
717            or printfilegraph,
718            notemp=notemp,
719            keep_remote_local=keep_remote_local,
720            batch=batch,
721        )
722
723        self.persistence = Persistence(
724            nolock=nolock,
725            dag=dag,
726            conda_prefix=self.conda_prefix,
727            singularity_prefix=self.singularity_prefix,
728            shadow_prefix=self.shadow_prefix,
729            warn_only=dryrun
730            or printrulegraph
731            or printfilegraph
732            or printdag
733            or summary
734            or archive
735            or list_version_changes
736            or list_code_changes
737            or list_input_changes
738            or list_params_changes
739            or list_untracked
740            or delete_all_output
741            or delete_temp_output,
742        )
743
744        if self.mode in [Mode.subprocess, Mode.cluster]:
745            self.persistence.deactivate_cache()
746
747        if cleanup_metadata:
748            for f in cleanup_metadata:
749                self.persistence.cleanup_metadata(f)
750            return True
751
752        if unlock:
753            try:
754                self.persistence.cleanup_locks()
755                logger.info("Unlocking working directory.")
756                return True
757            except IOError:
758                logger.error(
759                    "Error: Unlocking the directory {} failed. Maybe "
760                    "you don't have the permissions?"
761                )
762                return False
763
764        logger.info("Building DAG of jobs...")
765        dag.init()
766        dag.update_checkpoint_dependencies()
767        dag.check_dynamic()
768
769        try:
770            self.persistence.lock()
771        except IOError:
772            logger.error(
773                "Error: Directory cannot be locked. Please make "
774                "sure that no other Snakemake process is trying to create "
775                "the same files in the following directory:\n{}\n"
776                "If you are sure that no other "
777                "instances of snakemake are running on this directory, "
778                "the remaining lock was likely caused by a kill signal or "
779                "a power loss. It can be removed with "
780                "the --unlock argument.".format(os.getcwd())
781            )
782            return False
783
784        if cleanup_shadow:
785            self.persistence.cleanup_shadow()
786            return True
787
788        if (
789            self.subworkflows
790            and self.execute_subworkflows
791            and not printdag
792            and not printrulegraph
793            and not printfilegraph
794        ):
795            # backup globals
796            globals_backup = dict(self.globals)
797            # execute subworkflows
798            for subworkflow in self.subworkflows:
799                subworkflow_targets = subworkflow.targets(dag)
800                logger.debug(
801                    "Files requested from subworkflow:\n    {}".format(
802                        "\n    ".join(subworkflow_targets)
803                    )
804                )
805                updated = list()
806                if subworkflow_targets:
807                    logger.info("Executing subworkflow {}.".format(subworkflow.name))
808                    if not subsnakemake(
809                        subworkflow.snakefile,
810                        workdir=subworkflow.workdir,
811                        targets=subworkflow_targets,
812                        cores=self._cores,
813                        nodes=self.nodes,
814                        configfiles=[subworkflow.configfile]
815                        if subworkflow.configfile
816                        else None,
817                        updated_files=updated,
818                    ):
819                        return False
820                    dag.updated_subworkflow_files.update(
821                        subworkflow.target(f) for f in updated
822                    )
823                else:
824                    logger.info(
825                        "Subworkflow {}: {}".format(
826                            subworkflow.name, NOTHING_TO_BE_DONE_MSG
827                        )
828                    )
829            if self.subworkflows:
830                logger.info("Executing main workflow.")
831            # rescue globals
832            self.globals.update(globals_backup)
833
834        dag.postprocess(update_needrun=False)
835        if not dryrun:
836            # deactivate IOCache such that from now on we always get updated
837            # size, existence and mtime information
838            # ATTENTION: this may never be removed without really good reason.
839            # Otherwise weird things may happen.
840            self.iocache.deactivate()
841            # clear and deactivate persistence cache, from now on we want to see updates
842            self.persistence.deactivate_cache()
843
844        if nodeps:
845            missing_input = [
846                f
847                for job in dag.targetjobs
848                for f in job.input
849                if dag.needrun(job) and not os.path.exists(f)
850            ]
851            if missing_input:
852                logger.error(
853                    "Dependency resolution disabled (--nodeps) "
854                    "but missing input "
855                    "files detected. If this happens on a cluster, please make sure "
856                    "that you handle the dependencies yourself or turn off "
857                    "--immediate-submit. Missing input files:\n{}".format(
858                        "\n".join(missing_input)
859                    )
860                )
861                return False
862
863        updated_files.extend(f for job in dag.needrun_jobs for f in job.output)
864
865        if generate_unit_tests:
866            from snakemake import unit_tests
867
868            path = generate_unit_tests
869            deploy = []
870            if self.use_conda:
871                deploy.append("conda")
872            if self.use_singularity:
873                deploy.append("singularity")
874            unit_tests.generate(
875                dag, path, deploy, configfiles=self.overwrite_configfiles
876            )
877            return True
878        elif export_cwl:
879            from snakemake.cwl import dag_to_cwl
880            import json
881
882            with open(export_cwl, "w") as cwl:
883                json.dump(dag_to_cwl(dag), cwl, indent=4)
884            return True
885        elif report:
886            from snakemake.report import auto_report
887
888            auto_report(dag, report, stylesheet=report_stylesheet)
889            return True
890        elif printd3dag:
891            dag.d3dag()
892            return True
893        elif printdag:
894            print(dag)
895            return True
896        elif printrulegraph:
897            print(dag.rule_dot())
898            return True
899        elif printfilegraph:
900            print(dag.filegraph_dot())
901            return True
902        elif summary:
903            print("\n".join(dag.summary(detailed=False)))
904            return True
905        elif detailed_summary:
906            print("\n".join(dag.summary(detailed=True)))
907            return True
908        elif archive:
909            dag.archive(archive)
910            return True
911        elif delete_all_output:
912            dag.clean(only_temp=False, dryrun=dryrun)
913            return True
914        elif delete_temp_output:
915            dag.clean(only_temp=True, dryrun=dryrun)
916            return True
917        elif list_version_changes:
918            items = list(chain(*map(self.persistence.version_changed, dag.jobs)))
919            if items:
920                print(*items, sep="\n")
921            return True
922        elif list_code_changes:
923            items = list(chain(*map(self.persistence.code_changed, dag.jobs)))
924            for j in dag.jobs:
925                items.extend(list(j.outputs_older_than_script_or_notebook()))
926            if items:
927                print(*items, sep="\n")
928            return True
929        elif list_input_changes:
930            items = list(chain(*map(self.persistence.input_changed, dag.jobs)))
931            if items:
932                print(*items, sep="\n")
933            return True
934        elif list_params_changes:
935            items = list(chain(*map(self.persistence.params_changed, dag.jobs)))
936            if items:
937                print(*items, sep="\n")
938            return True
939        elif list_untracked:
940            dag.list_untracked()
941            return True
942
943        if self.use_singularity:
944            if assume_shared_fs:
945                dag.pull_container_imgs(
946                    dryrun=dryrun or list_conda_envs, quiet=list_conda_envs
947                )
948        if self.use_conda:
949            if assume_shared_fs:
950                dag.create_conda_envs(
951                    dryrun=dryrun or list_conda_envs or conda_cleanup_envs,
952                    quiet=list_conda_envs,
953                )
954            if conda_create_envs_only:
955                return True
956
957        if list_conda_envs:
958            print("environment", "container", "location", sep="\t")
959            for env in set(job.conda_env for job in dag.jobs):
960                if env:
961                    print(
962                        env.file.simplify_path(),
963                        env.container_img_url or "",
964                        simplify_path(env.path),
965                        sep="\t",
966                    )
967            return True
968
969        if conda_cleanup_envs:
970            self.persistence.conda_cleanup_envs()
971            return True
972
973        self.scheduler = JobScheduler(
974            self,
975            dag,
976            local_cores=local_cores,
977            dryrun=dryrun,
978            touch=touch,
979            cluster=cluster,
980            cluster_status=cluster_status,
981            cluster_config=cluster_config,
982            cluster_sync=cluster_sync,
983            jobname=jobname,
984            max_jobs_per_second=max_jobs_per_second,
985            max_status_checks_per_second=max_status_checks_per_second,
986            quiet=quiet,
987            keepgoing=keepgoing,
988            drmaa=drmaa,
989            drmaa_log_dir=drmaa_log_dir,
990            kubernetes=kubernetes,
991            tibanna=tibanna,
992            tibanna_sfn=tibanna_sfn,
993            google_lifesciences=google_lifesciences,
994            google_lifesciences_regions=google_lifesciences_regions,
995            google_lifesciences_location=google_lifesciences_location,
996            google_lifesciences_cache=google_lifesciences_cache,
997            tes=tes,
998            preemption_default=preemption_default,
999            preemptible_rules=preemptible_rules,
1000            precommand=precommand,
1001            tibanna_config=tibanna_config,
1002            container_image=container_image,
1003            printreason=printreason,
1004            printshellcmds=printshellcmds,
1005            latency_wait=latency_wait,
1006            greediness=greediness,
1007            force_use_threads=force_use_threads,
1008            assume_shared_fs=assume_shared_fs,
1009            keepincomplete=keepincomplete,
1010            keepmetadata=keepmetadata,
1011            scheduler_type=scheduler_type,
1012            scheduler_ilp_solver=scheduler_ilp_solver,
1013        )
1014
1015        if not dryrun:
1016            if len(dag):
1017                shell_exec = shell.get_executable()
1018                if shell_exec is not None:
1019                    logger.info("Using shell: {}".format(shell_exec))
1020                if cluster or cluster_sync or drmaa:
1021                    logger.resources_info(
1022                        "Provided cluster nodes: {}".format(self.nodes)
1023                    )
1024                elif kubernetes or tibanna or google_lifesciences:
1025                    logger.resources_info("Provided cloud nodes: {}".format(self.nodes))
1026                else:
1027                    if self._cores is not None:
1028                        warning = (
1029                            ""
1030                            if self._cores > 1
1031                            else " (use --cores to define parallelism)"
1032                        )
1033                        logger.resources_info(
1034                            "Provided cores: {}{}".format(self._cores, warning)
1035                        )
1036                        logger.resources_info(
1037                            "Rules claiming more threads " "will be scaled down."
1038                        )
1039
1040                provided_resources = format_resources(self.global_resources)
1041                if provided_resources:
1042                    logger.resources_info("Provided resources: " + provided_resources)
1043
1044                if self.run_local and any(rule.group for rule in self.rules):
1045                    logger.info("Group jobs: inactive (local execution)")
1046
1047                if not self.use_conda and any(rule.conda_env for rule in self.rules):
1048                    logger.info("Conda environments: ignored")
1049
1050                if not self.use_singularity and any(
1051                    rule.container_img for rule in self.rules
1052                ):
1053                    logger.info("Singularity containers: ignored")
1054
1055                if self.mode == Mode.default:
1056                    logger.run_info("\n".join(dag.stats()))
1057            else:
1058                logger.info(NOTHING_TO_BE_DONE_MSG)
1059        else:
1060            # the dryrun case
1061            if len(dag):
1062                logger.run_info("\n".join(dag.stats()))
1063            else:
1064                logger.info(NOTHING_TO_BE_DONE_MSG)
1065                return True
1066            if quiet:
1067                # in case of dryrun and quiet, just print above info and exit
1068                return True
1069
1070        if not dryrun and not no_hooks:
1071            self._onstart(logger.get_logfile())
1072
1073        success = self.scheduler.schedule()
1074
1075        if not immediate_submit and not dryrun:
1076            dag.cleanup_workdir()
1077
1078        if success:
1079            if dryrun:
1080                if len(dag):
1081                    logger.run_info("\n".join(dag.stats()))
1082                logger.info(
1083                    "This was a dry-run (flag -n). The order of jobs "
1084                    "does not reflect the order of execution."
1085                )
1086                logger.remove_logfile()
1087            else:
1088                if stats:
1089                    self.scheduler.stats.to_json(stats)
1090                logger.logfile_hint()
1091            if not dryrun and not no_hooks:
1092                self._onsuccess(logger.get_logfile())
1093            return True
1094        else:
1095            if not dryrun and not no_hooks:
1096                self._onerror(logger.get_logfile())
1097            logger.logfile_hint()
1098            return False
1099
1100    @property
1101    def current_basedir(self):
1102        """Basedir of currently parsed Snakefile."""
1103        assert self.included_stack
1104        snakefile = self.included_stack[-1]
1105        basedir = snakefile.get_basedir()
1106        if isinstance(basedir, LocalSourceFile):
1107            return basedir.abspath()
1108        else:
1109            return basedir
1110
1111    def source_path(self, rel_path):
1112        """Return path to source file from work dir derived from given path relative to snakefile"""
1113        # TODO download to disk (use source cache) in case of remote file
1114        import inspect
1115
1116        frame = inspect.currentframe().f_back
1117        calling_file = frame.f_code.co_filename
1118        calling_dir = os.path.dirname(calling_file)
1119        path = smart_join(calling_dir, rel_path)
1120        return self.sourcecache.get_path(infer_source_file(path))
1121
1122    @property
1123    def snakefile(self):
1124        import inspect
1125
1126        frame = inspect.currentframe().f_back
1127        return frame.f_code.co_filename
1128
1129    def register_envvars(self, *envvars):
1130        """
1131        Register environment variables that shall be passed to jobs.
1132        If used multiple times, union is taken.
1133        """
1134        undefined = set(var for var in envvars if var not in os.environ)
1135        if self.check_envvars and undefined:
1136            raise WorkflowError(
1137                "The following environment variables are requested by the workflow but undefined. "
1138                "Please make sure that they are correctly defined before running Snakemake:\n"
1139                "{}".format("\n".join(undefined))
1140            )
1141        self.envvars.update(envvars)
1142
1143    def containerize(self):
1144        from snakemake.deployment.containerize import containerize
1145
1146        containerize(self)
1147
1148    def include(
1149        self,
1150        snakefile,
1151        overwrite_first_rule=False,
1152        print_compilation=False,
1153        overwrite_shellcmd=None,
1154    ):
1155        """
1156        Include a snakefile.
1157        """
1158        basedir = self.current_basedir if self.included_stack else None
1159        snakefile = infer_source_file(snakefile, basedir)
1160
1161        if not self.modifier.allow_rule_overwrite and snakefile in self.included:
1162            logger.info("Multiple includes of {} ignored".format(snakefile))
1163            return
1164        self.included.append(snakefile)
1165        self.included_stack.append(snakefile)
1166
1167        first_rule = self.first_rule
1168        code, linemap, rulecount = parse(
1169            snakefile,
1170            self,
1171            overwrite_shellcmd=self.overwrite_shellcmd,
1172            rulecount=self._rulecount,
1173        )
1174        self._rulecount = rulecount
1175
1176        if print_compilation:
1177            print(code)
1178
1179        if isinstance(snakefile, LocalSourceFile):
1180            # insert the current directory into sys.path
1181            # this allows to import modules from the workflow directory
1182            sys.path.insert(0, snakefile.get_basedir().get_path_or_uri())
1183
1184        self.linemaps[snakefile] = linemap
1185
1186        exec(compile(code, snakefile.get_path_or_uri(), "exec"), self.globals)
1187
1188        if not overwrite_first_rule:
1189            self.first_rule = first_rule
1190        self.included_stack.pop()
1191
1192    def onstart(self, func):
1193        """Register onstart function."""
1194        self._onstart = func
1195
1196    def onsuccess(self, func):
1197        """Register onsuccess function."""
1198        self._onsuccess = func
1199
1200    def onerror(self, func):
1201        """Register onerror function."""
1202        self._onerror = func
1203
1204    def global_wildcard_constraints(self, **content):
1205        """Register global wildcard constraints."""
1206        self._wildcard_constraints.update(content)
1207        # update all rules so far
1208        for rule in self.rules:
1209            rule.update_wildcard_constraints()
1210
1211    def scattergather(self, **content):
1212        """Register scattergather defaults."""
1213        self._scatter.update(content)
1214        self._scatter.update(self.overwrite_scatter)
1215
1216        # add corresponding wildcard constraint
1217        self.global_wildcard_constraints(scatteritem="\d+-of-\d+")
1218
1219        def func(*args, **wildcards):
1220            n = self._scatter[key]
1221            return expand(
1222                *args,
1223                scatteritem=map("{{}}-of-{}".format(n).format, range(1, n + 1)),
1224                **wildcards
1225            )
1226
1227        for key in content:
1228            setattr(self.globals["scatter"], key, func)
1229            setattr(self.globals["gather"], key, func)
1230
1231    def workdir(self, workdir):
1232        """Register workdir."""
1233        if self.overwrite_workdir is None:
1234            os.makedirs(workdir, exist_ok=True)
1235            self._workdir = workdir
1236            os.chdir(workdir)
1237
1238    def configfile(self, fp):
1239        """Update the global config with data from the given file."""
1240        global config
1241        if not self.modifier.skip_configfile:
1242            if os.path.exists(fp):
1243                self.configfiles.append(fp)
1244                c = snakemake.io.load_configfile(fp)
1245                update_config(config, c)
1246                if self.overwrite_config:
1247                    logger.info(
1248                        "Config file {} is extended by additional config specified via the command line.".format(
1249                            fp
1250                        )
1251                    )
1252                    update_config(config, self.overwrite_config)
1253            elif not self.overwrite_configfiles:
1254                raise WorkflowError(
1255                    "Workflow defines configfile {} but it is not present or accessible.".format(
1256                        fp
1257                    )
1258                )
1259
1260    def pepfile(self, path):
1261        global pep
1262
1263        try:
1264            import peppy
1265        except ImportError:
1266            raise WorkflowError("For PEP support, please install peppy.")
1267
1268        self.pepfile = path
1269        pep = peppy.Project(self.pepfile)
1270
1271    def pepschema(self, schema):
1272        global pep
1273
1274        try:
1275            import eido
1276        except ImportError:
1277            raise WorkflowError("For PEP schema support, please install eido.")
1278
1279        if is_local_file(schema) and not os.path.isabs(schema):
1280            # schema is relative to current Snakefile
1281            schema = self.current_basedir.join(schema).get_path_or_uri()
1282        if self.pepfile is None:
1283            raise WorkflowError("Please specify a PEP with the pepfile directive.")
1284        eido.validate_project(project=pep, schema=schema, exclude_case=True)
1285
1286    def report(self, path):
1287        """Define a global report description in .rst format."""
1288        self.report_text = self.current_basedir.join(path)
1289
1290    @property
1291    def config(self):
1292        return self.globals["config"]
1293
1294    def ruleorder(self, *rulenames):
1295        self._ruleorder.add(*map(self.modifier.modify_rulename, rulenames))
1296
1297    def subworkflow(self, name, snakefile=None, workdir=None, configfile=None):
1298        # Take absolute path of config file, because it is relative to current
1299        # workdir, which could be changed for the subworkflow.
1300        if configfile:
1301            configfile = os.path.abspath(configfile)
1302        sw = Subworkflow(self, name, snakefile, workdir, configfile)
1303        self._subworkflows[name] = sw
1304        self.globals[name] = sw.target
1305
1306    def localrules(self, *rulenames):
1307        self._localrules.update(rulenames)
1308
1309    def rule(self, name=None, lineno=None, snakefile=None, checkpoint=False):
1310        # choose a name for an unnamed rule
1311        if name is None:
1312            name = str(len(self._rules) + 1)
1313
1314        if self.modifier.skip_rule(name):
1315
1316            def decorate(ruleinfo):
1317                # do nothing, ignore rule
1318                return ruleinfo.func
1319
1320            return decorate
1321
1322        # Optionally let the modifier change the rulename.
1323        orig_name = name
1324        name = self.modifier.modify_rulename(name)
1325
1326        name = self.add_rule(
1327            name,
1328            lineno,
1329            snakefile,
1330            checkpoint,
1331            allow_overwrite=self.modifier.allow_rule_overwrite,
1332        )
1333        rule = self.get_rule(name)
1334        rule.is_checkpoint = checkpoint
1335
1336        def decorate(ruleinfo):
1337            nonlocal name
1338
1339            # If requested, modify ruleinfo via the modifier.
1340            ruleinfo.apply_modifier(self.modifier)
1341
1342            if ruleinfo.wildcard_constraints:
1343                rule.set_wildcard_constraints(
1344                    *ruleinfo.wildcard_constraints[0],
1345                    **ruleinfo.wildcard_constraints[1]
1346                )
1347            if ruleinfo.name:
1348                rule.name = ruleinfo.name
1349                del self._rules[name]
1350                self._rules[ruleinfo.name] = rule
1351                name = rule.name
1352            rule.path_modifier = ruleinfo.path_modifier
1353            if ruleinfo.input:
1354                rule.set_input(*ruleinfo.input[0], **ruleinfo.input[1])
1355            if ruleinfo.output:
1356                rule.set_output(*ruleinfo.output[0], **ruleinfo.output[1])
1357            if ruleinfo.params:
1358                rule.set_params(*ruleinfo.params[0], **ruleinfo.params[1])
1359            # handle default resources
1360            if self.default_resources is not None:
1361                rule.resources = copy.deepcopy(self.default_resources.parsed)
1362            if ruleinfo.threads is not None:
1363                if (
1364                    not isinstance(ruleinfo.threads, int)
1365                    and not isinstance(ruleinfo.threads, float)
1366                    and not callable(ruleinfo.threads)
1367                ):
1368                    raise RuleException(
1369                        "Threads value has to be an integer, float, or a callable.",
1370                        rule=rule,
1371                    )
1372                if name in self.overwrite_threads:
1373                    rule.resources["_cores"] = self.overwrite_threads[name]
1374                else:
1375                    if isinstance(ruleinfo.threads, float):
1376                        ruleinfo.threads = int(ruleinfo.threads)
1377                    rule.resources["_cores"] = ruleinfo.threads
1378            if ruleinfo.shadow_depth:
1379                if ruleinfo.shadow_depth not in (
1380                    True,
1381                    "shallow",
1382                    "full",
1383                    "minimal",
1384                    "copy-minimal",
1385                ):
1386                    raise RuleException(
1387                        "Shadow must either be 'minimal', 'copy-minimal', 'shallow', 'full', "
1388                        "or True (equivalent to 'full')",
1389                        rule=rule,
1390                    )
1391                if ruleinfo.shadow_depth is True:
1392                    rule.shadow_depth = "full"
1393                    logger.warning(
1394                        "Shadow is set to True in rule {} (equivalent to 'full'). It's encouraged to use the more explicit options 'minimal|copy-minimal|shallow|full' instead.".format(
1395                            rule
1396                        )
1397                    )
1398                else:
1399                    rule.shadow_depth = ruleinfo.shadow_depth
1400            if ruleinfo.resources:
1401                args, resources = ruleinfo.resources
1402                if args:
1403                    raise RuleException("Resources have to be named.")
1404                if not all(
1405                    map(
1406                        lambda r: isinstance(r, int)
1407                        or isinstance(r, str)
1408                        or callable(r),
1409                        resources.values(),
1410                    )
1411                ):
1412                    raise RuleException(
1413                        "Resources values have to be integers, strings, or callables (functions)",
1414                        rule=rule,
1415                    )
1416                rule.resources.update(resources)
1417            if name in self.overwrite_resources:
1418                rule.resources.update(self.overwrite_resources[name])
1419
1420            if ruleinfo.priority:
1421                if not isinstance(ruleinfo.priority, int) and not isinstance(
1422                    ruleinfo.priority, float
1423                ):
1424                    raise RuleException(
1425                        "Priority values have to be numeric.", rule=rule
1426                    )
1427                rule.priority = ruleinfo.priority
1428            if ruleinfo.version:
1429                rule.version = ruleinfo.version
1430            if ruleinfo.log:
1431                rule.set_log(*ruleinfo.log[0], **ruleinfo.log[1])
1432            if ruleinfo.message:
1433                rule.message = ruleinfo.message
1434            if ruleinfo.benchmark:
1435                rule.benchmark = ruleinfo.benchmark
1436            if not self.run_local:
1437                group = self.overwrite_groups.get(name) or ruleinfo.group
1438                if group is not None:
1439                    rule.group = group
1440            if ruleinfo.wrapper:
1441                rule.conda_env = snakemake.wrapper.get_conda_env(
1442                    ruleinfo.wrapper, prefix=self.wrapper_prefix
1443                )
1444                # TODO retrieve suitable singularity image
1445
1446            if ruleinfo.env_modules:
1447                # If using environment modules and they are defined for the rule,
1448                # ignore conda and singularity directive below.
1449                # The reason is that this is likely intended in order to use
1450                # a software stack specifically compiled for a particular
1451                # HPC cluster.
1452                invalid_rule = not (
1453                    ruleinfo.script
1454                    or ruleinfo.wrapper
1455                    or ruleinfo.shellcmd
1456                    or ruleinfo.notebook
1457                )
1458                if invalid_rule:
1459                    raise RuleException(
1460                        "envmodules directive is only allowed with "
1461                        "shell, script, notebook, or wrapper directives (not with run)",
1462                        rule=rule,
1463                    )
1464                from snakemake.deployment.env_modules import EnvModules
1465
1466                rule.env_modules = EnvModules(*ruleinfo.env_modules)
1467
1468            if ruleinfo.conda_env:
1469                if not (
1470                    ruleinfo.script
1471                    or ruleinfo.wrapper
1472                    or ruleinfo.shellcmd
1473                    or ruleinfo.notebook
1474                ):
1475                    raise RuleException(
1476                        "Conda environments are only allowed "
1477                        "with shell, script, notebook, or wrapper directives "
1478                        "(not with run).",
1479                        rule=rule,
1480                    )
1481
1482                if (
1483                    ruleinfo.conda_env is not None
1484                    and is_local_file(ruleinfo.conda_env)
1485                    and not os.path.isabs(ruleinfo.conda_env)
1486                ):
1487                    ruleinfo.conda_env = self.current_basedir.join(
1488                        ruleinfo.conda_env
1489                    ).get_path_or_uri()
1490                rule.conda_env = ruleinfo.conda_env
1491
1492            invalid_rule = not (
1493                ruleinfo.script
1494                or ruleinfo.wrapper
1495                or ruleinfo.shellcmd
1496                or ruleinfo.notebook
1497            )
1498            if ruleinfo.container_img:
1499                if invalid_rule:
1500                    raise RuleException(
1501                        "Singularity directive is only allowed "
1502                        "with shell, script, notebook or wrapper directives "
1503                        "(not with run).",
1504                        rule=rule,
1505                    )
1506                rule.container_img = ruleinfo.container_img
1507                rule.is_containerized = ruleinfo.is_containerized
1508            elif self.global_container_img:
1509                if not invalid_rule and ruleinfo.container_img != False:
1510                    # skip rules with run directive or empty image
1511                    rule.container_img = self.global_container_img
1512                    rule.is_containerized = self.global_is_containerized
1513
1514            rule.norun = ruleinfo.norun
1515            if ruleinfo.name is not None:
1516                rule.name = ruleinfo.name
1517            rule.docstring = ruleinfo.docstring
1518            rule.run_func = ruleinfo.func
1519            rule.shellcmd = ruleinfo.shellcmd
1520            rule.script = ruleinfo.script
1521            rule.notebook = ruleinfo.notebook
1522            rule.wrapper = ruleinfo.wrapper
1523            rule.cwl = ruleinfo.cwl
1524            rule.restart_times = self.restart_times
1525            rule.basedir = self.current_basedir
1526
1527            if ruleinfo.handover:
1528                if not ruleinfo.resources:
1529                    # give all available resources to the rule
1530                    rule.resources.update(
1531                        {
1532                            name: val
1533                            for name, val in self.global_resources.items()
1534                            if val is not None
1535                        }
1536                    )
1537                # This becomes a local rule, which might spawn jobs to a cluster,
1538                # depending on its configuration (e.g. nextflow config).
1539                self._localrules.add(rule.name)
1540                rule.is_handover = True
1541
1542            if ruleinfo.cache is True:
1543                if not self.enable_cache:
1544                    logger.warning(
1545                        "Workflow defines that rule {} is eligible for caching between workflows "
1546                        "(use the --cache argument to enable this).".format(rule.name)
1547                    )
1548                else:
1549                    self.cache_rules.add(rule.name)
1550            elif not (ruleinfo.cache is False):
1551                raise WorkflowError(
1552                    "Invalid argument for 'cache:' directive. Only true allowed. "
1553                    "To deactivate caching, remove directive.",
1554                    rule=rule,
1555                )
1556
1557            ruleinfo.func.__name__ = "__{}".format(rule.name)
1558            self.globals[ruleinfo.func.__name__] = ruleinfo.func
1559
1560            rule_proxy = RuleProxy(rule)
1561            if orig_name is not None:
1562                setattr(self.globals["rules"], orig_name, rule_proxy)
1563            setattr(self.globals["rules"], rule.name, rule_proxy)
1564
1565            if checkpoint:
1566                self.globals["checkpoints"].register(rule, fallback_name=orig_name)
1567            rule.ruleinfo = ruleinfo
1568            return ruleinfo.func
1569
1570        return decorate
1571
1572    def docstring(self, string):
1573        def decorate(ruleinfo):
1574            ruleinfo.docstring = string
1575            return ruleinfo
1576
1577        return decorate
1578
1579    def input(self, *paths, **kwpaths):
1580        def decorate(ruleinfo):
1581            ruleinfo.input = (paths, kwpaths)
1582            return ruleinfo
1583
1584        return decorate
1585
1586    def output(self, *paths, **kwpaths):
1587        def decorate(ruleinfo):
1588            ruleinfo.output = (paths, kwpaths)
1589            return ruleinfo
1590
1591        return decorate
1592
1593    def params(self, *params, **kwparams):
1594        def decorate(ruleinfo):
1595            ruleinfo.params = (params, kwparams)
1596            return ruleinfo
1597
1598        return decorate
1599
1600    def wildcard_constraints(self, *wildcard_constraints, **kwwildcard_constraints):
1601        def decorate(ruleinfo):
1602            ruleinfo.wildcard_constraints = (
1603                wildcard_constraints,
1604                kwwildcard_constraints,
1605            )
1606            return ruleinfo
1607
1608        return decorate
1609
1610    def cache_rule(self, cache):
1611        def decorate(ruleinfo):
1612            ruleinfo.cache = cache
1613            return ruleinfo
1614
1615        return decorate
1616
1617    def message(self, message):
1618        def decorate(ruleinfo):
1619            ruleinfo.message = message
1620            return ruleinfo
1621
1622        return decorate
1623
1624    def benchmark(self, benchmark):
1625        def decorate(ruleinfo):
1626            ruleinfo.benchmark = benchmark
1627            return ruleinfo
1628
1629        return decorate
1630
1631    def conda(self, conda_env):
1632        def decorate(ruleinfo):
1633            ruleinfo.conda_env = conda_env
1634            return ruleinfo
1635
1636        return decorate
1637
1638    def container(self, container_img):
1639        def decorate(ruleinfo):
1640            # Explicitly set container_img to False if None is passed, indicating that
1641            # no container image shall be used, also not a global one.
1642            ruleinfo.container_img = (
1643                container_img if container_img is not None else False
1644            )
1645            ruleinfo.is_containerized = False
1646            return ruleinfo
1647
1648        return decorate
1649
1650    def containerized(self, container_img):
1651        def decorate(ruleinfo):
1652            ruleinfo.container_img = container_img
1653            ruleinfo.is_containerized = True
1654            return ruleinfo
1655
1656        return decorate
1657
1658    def envmodules(self, *env_modules):
1659        def decorate(ruleinfo):
1660            ruleinfo.env_modules = env_modules
1661            return ruleinfo
1662
1663        return decorate
1664
1665    def global_container(self, container_img):
1666        self.global_container_img = container_img
1667        self.global_is_containerized = False
1668
1669    def global_containerized(self, container_img):
1670        self.global_container_img = container_img
1671        self.global_is_containerized = True
1672
1673    def threads(self, threads):
1674        def decorate(ruleinfo):
1675            ruleinfo.threads = threads
1676            return ruleinfo
1677
1678        return decorate
1679
1680    def shadow(self, shadow_depth):
1681        def decorate(ruleinfo):
1682            ruleinfo.shadow_depth = shadow_depth
1683            return ruleinfo
1684
1685        return decorate
1686
1687    def resources(self, *args, **resources):
1688        def decorate(ruleinfo):
1689            ruleinfo.resources = (args, resources)
1690            return ruleinfo
1691
1692        return decorate
1693
1694    def priority(self, priority):
1695        def decorate(ruleinfo):
1696            ruleinfo.priority = priority
1697            return ruleinfo
1698
1699        return decorate
1700
1701    def version(self, version):
1702        def decorate(ruleinfo):
1703            ruleinfo.version = version
1704            return ruleinfo
1705
1706        return decorate
1707
1708    def group(self, group):
1709        def decorate(ruleinfo):
1710            ruleinfo.group = group
1711            return ruleinfo
1712
1713        return decorate
1714
1715    def log(self, *logs, **kwlogs):
1716        def decorate(ruleinfo):
1717            ruleinfo.log = (logs, kwlogs)
1718            return ruleinfo
1719
1720        return decorate
1721
1722    def handover(self, value):
1723        def decorate(ruleinfo):
1724            ruleinfo.handover = value
1725            return ruleinfo
1726
1727        return decorate
1728
1729    def shellcmd(self, cmd):
1730        def decorate(ruleinfo):
1731            ruleinfo.shellcmd = cmd
1732            return ruleinfo
1733
1734        return decorate
1735
1736    def script(self, script):
1737        def decorate(ruleinfo):
1738            ruleinfo.script = script
1739            return ruleinfo
1740
1741        return decorate
1742
1743    def notebook(self, notebook):
1744        def decorate(ruleinfo):
1745            ruleinfo.notebook = notebook
1746            return ruleinfo
1747
1748        return decorate
1749
1750    def wrapper(self, wrapper):
1751        def decorate(ruleinfo):
1752            ruleinfo.wrapper = wrapper
1753            return ruleinfo
1754
1755        return decorate
1756
1757    def cwl(self, cwl):
1758        def decorate(ruleinfo):
1759            ruleinfo.cwl = cwl
1760            return ruleinfo
1761
1762        return decorate
1763
1764    def norun(self):
1765        def decorate(ruleinfo):
1766            ruleinfo.norun = True
1767            return ruleinfo
1768
1769        return decorate
1770
1771    def name(self, name):
1772        def decorate(ruleinfo):
1773            ruleinfo.name = name
1774            return ruleinfo
1775
1776        return decorate
1777
1778    def run(self, func):
1779        return RuleInfo(func)
1780
1781    def module(
1782        self,
1783        name,
1784        snakefile=None,
1785        meta_wrapper=None,
1786        config=None,
1787        skip_validation=False,
1788        replace_prefix=None,
1789    ):
1790        self.modules[name] = ModuleInfo(
1791            self,
1792            name,
1793            snakefile=snakefile,
1794            meta_wrapper=meta_wrapper,
1795            config=config,
1796            skip_validation=skip_validation,
1797            replace_prefix=replace_prefix,
1798        )
1799
1800    def userule(self, rules=None, from_module=None, name_modifier=None, lineno=None):
1801        def decorate(maybe_ruleinfo):
1802            if from_module is not None:
1803                try:
1804                    module = self.modules[from_module]
1805                except KeyError:
1806                    raise WorkflowError(
1807                        "Module {} has not been registered with 'module' statement before using it in 'use rule' statement.".format(
1808                            from_module
1809                        )
1810                    )
1811                module.use_rules(
1812                    rules,
1813                    name_modifier,
1814                    ruleinfo=None if callable(maybe_ruleinfo) else maybe_ruleinfo,
1815                )
1816            else:
1817                # local inheritance
1818                if len(rules) > 1:
1819                    raise WorkflowError(
1820                        "'use rule' statement from rule in the same module must declare a single rule but multiple rules are declared."
1821                    )
1822                orig_rule = self._rules[rules[0]]
1823                ruleinfo = maybe_ruleinfo if not callable(maybe_ruleinfo) else None
1824                with WorkflowModifier(
1825                    self,
1826                    rulename_modifier=get_name_modifier_func(rules, name_modifier),
1827                    ruleinfo_overwrite=ruleinfo,
1828                ):
1829                    self.rule(
1830                        name=name_modifier,
1831                        lineno=lineno,
1832                        snakefile=self.included_stack[-1],
1833                    )(orig_rule.ruleinfo)
1834
1835        return decorate
1836
1837    @staticmethod
1838    def _empty_decorator(f):
1839        return f
1840
1841
1842class Subworkflow:
1843    def __init__(self, workflow, name, snakefile, workdir, configfile):
1844        self.workflow = workflow
1845        self.name = name
1846        self._snakefile = snakefile
1847        self._workdir = workdir
1848        self.configfile = configfile
1849
1850    @property
1851    def snakefile(self):
1852        if self._snakefile is None:
1853            return os.path.abspath(os.path.join(self.workdir, "Snakefile"))
1854        if not os.path.isabs(self._snakefile):
1855            return os.path.abspath(os.path.join(self.workflow.basedir, self._snakefile))
1856        return self._snakefile
1857
1858    @property
1859    def workdir(self):
1860        workdir = "." if self._workdir is None else self._workdir
1861        if not os.path.isabs(workdir):
1862            return os.path.abspath(os.path.join(self.workflow.basedir, workdir))
1863        return workdir
1864
1865    def target(self, paths):
1866        if not_iterable(paths):
1867            path = paths
1868            path = (
1869                path
1870                if os.path.isabs(path) or path.startswith("root://")
1871                else os.path.join(self.workdir, path)
1872            )
1873            return flag(path, "subworkflow", self)
1874        return [self.target(path) for path in paths]
1875
1876    def targets(self, dag):
1877        def relpath(f):
1878            if f.startswith(self.workdir):
1879                return os.path.relpath(f, start=self.workdir)
1880            # do not adjust absolute targets outside of workdir
1881            return f
1882
1883        return [
1884            relpath(f)
1885            for job in dag.jobs
1886            for f in job.subworkflow_input
1887            if job.subworkflow_input[f] is self
1888        ]
1889
1890
1891def srcdir(path):
1892    """Return the absolute path, relative to the source directory of the current Snakefile."""
1893    if not workflow.included_stack:
1894        return None
1895    return workflow.current_basedir.join(path).get_path_or_uri()
1896