1__authors__ = "Johannes Köster, Sven Nahnsen"
2__copyright__ = "Copyright 2021, Johannes Köster, Sven Nahnsen"
3__email__ = "johannes.koester@uni-due.de"
4__license__ = "MIT"
5
6from tempfile import TemporaryDirectory
7from pathlib import Path
8import os
9import shutil
10import stat
11
12from snakemake.logging import logger
13from snakemake.jobs import Job
14from snakemake.exceptions import WorkflowError
15from snakemake.caching.hash import ProvenanceHashMap
16from snakemake.caching import LOCATION_ENVVAR, AbstractOutputFileCache
17
18
19class OutputFileCache(AbstractOutputFileCache):
20    """
21    A cache for output files that uses a provenance hash value that
22    describes all steps, parameters, and software needed to generate
23    each output file.
24    """
25
26    def __init__(self):
27        super().__init__()
28        self.path = Path(self.cache_location)
29        # make readable/writeable for all
30        self.file_permissions = (
31            stat.S_IRUSR
32            | stat.S_IWUSR
33            | stat.S_IRGRP
34            | stat.S_IWGRP
35            | stat.S_IROTH
36            | stat.S_IWOTH
37        )
38        # directories need to have exec permission as well (for opening)
39        self.dir_permissions = (
40            self.file_permissions | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
41        )
42
43    def check_writeable(self, cachefile):
44        if not (os.access(cachefile.parent, os.W_OK) or os.access(cachefile, os.W_OK)):
45            self.raise_write_error(cachefile)
46
47    def check_readable(self, cachefile):
48        if not os.access(cachefile, os.R_OK):
49            self.raise_read_error(cachefile)
50
51    def store(self, job: Job):
52        """
53        Store generated job output in the cache.
54        """
55
56        if not os.access(self.path, os.W_OK):
57            raise WorkflowError(
58                "Cannot access cache location {}. Please ensure that "
59                "it is present and writeable.".format(self.path)
60            )
61        with TemporaryDirectory(dir=self.path) as tmpdirname:
62            tmpdir = Path(tmpdirname)
63
64            for outputfile, cachefile in self.get_outputfiles_and_cachefiles(job):
65                if not os.path.exists(outputfile):
66                    raise WorkflowError(
67                        "Cannot move output file {} to cache. It does not exist "
68                        "(maybe it was not created by the job?)."
69                    )
70                self.check_writeable(cachefile)
71                logger.info("Moving output file {} to cache.".format(outputfile))
72
73                tmp = tmpdir / cachefile.name
74                # First move is performed into a tempdir (it might involve a copy if not on the same FS).
75                # This is important, such that network filesystem latency
76                # does not lead to concurrent writes to the same file.
77                # We can use the plain copy method of shutil, because we do not care about the metadata.
78                shutil.move(outputfile, tmp, copy_function=shutil.copy)
79
80                self.set_permissions(tmp)
81
82                # Move to the actual path (now we are on the same FS, hence move is atomic).
83                # Here we use the default copy function, also copying metadata (which is important here).
84                # It will always work, because we are guaranteed to be in the same FS.
85                shutil.move(tmp, cachefile)
86                # now restore the outputfile via a symlink
87                self.symlink(cachefile, outputfile, utime=False)
88
89    def fetch(self, job: Job):
90        """
91        Retrieve cached output file and symlink to the place where the job expects it's output.
92        """
93        for outputfile, cachefile in self.get_outputfiles_and_cachefiles(job):
94
95            if not cachefile.exists():
96                self.raise_cache_miss_exception(job)
97
98            self.check_readable(cachefile)
99            if cachefile.is_dir():
100                # For directories, create a new one and symlink each entry.
101                # Then, the .snakemake_timestamp of the new dir is touched
102                # by the executor.
103                outputfile.mkdir(parents=True, exist_ok=True)
104                for f in cachefile.iterdir():
105                    self.symlink(f, outputfile / f.name)
106            else:
107                self.symlink(cachefile, outputfile)
108
109    def exists(self, job: Job):
110        """
111        Return True if job is already cached
112        """
113        for outputfile, cachefile in self.get_outputfiles_and_cachefiles(job):
114
115            if not cachefile.exists():
116                return False
117
118            self.check_readable(cachefile)
119        return True
120
121    def get_outputfiles_and_cachefiles(self, job: Job):
122        provenance_hash = self.provenance_hash_map.get_provenance_hash(job)
123        base_path = self.path / provenance_hash
124
125        return (
126            (Path(outputfile), base_path.with_suffix(ext))
127            for outputfile, ext in self.get_outputfiles(job)
128        )
129
130    def symlink(self, path, outputfile, utime=True):
131        if os.utime in os.supports_follow_symlinks or not utime:
132            logger.info("Symlinking output file {} from cache.".format(outputfile))
133            os.symlink(path, outputfile)
134            if utime:
135                os.utime(outputfile, follow_symlinks=False)
136        else:
137            logger.info(
138                "Copying output file {} from cache (OS does not support updating the modification date of symlinks).".format(
139                    outputfile
140                )
141            )
142            shutil.copyfile(path, outputfile)
143
144    def set_permissions(self, entry):
145        # make readable/writeable for all
146        if entry.is_dir():
147            # recursively apply permissions for all contained files
148            for root, dirs, files in os.walk(entry):
149                root = Path(root)
150                for d in dirs:
151                    os.chmod(root / d, self.dir_permissions)
152                for f in files:
153                    os.chmod(root / f, self.file_permissions)
154            os.chmod(entry, self.dir_permissions)
155        else:
156            os.chmod(entry, self.file_permissions)
157