1__authors__ = "Johannes Köster, Sven Nahnsen" 2__copyright__ = "Copyright 2021, Johannes Köster, Sven Nahnsen" 3__email__ = "johannes.koester@uni-due.de" 4__license__ = "MIT" 5 6from tempfile import TemporaryDirectory 7from pathlib import Path 8import os 9import shutil 10import stat 11 12from snakemake.logging import logger 13from snakemake.jobs import Job 14from snakemake.exceptions import WorkflowError 15from snakemake.caching.hash import ProvenanceHashMap 16from snakemake.caching import LOCATION_ENVVAR, AbstractOutputFileCache 17 18 19class OutputFileCache(AbstractOutputFileCache): 20 """ 21 A cache for output files that uses a provenance hash value that 22 describes all steps, parameters, and software needed to generate 23 each output file. 24 """ 25 26 def __init__(self): 27 super().__init__() 28 self.path = Path(self.cache_location) 29 # make readable/writeable for all 30 self.file_permissions = ( 31 stat.S_IRUSR 32 | stat.S_IWUSR 33 | stat.S_IRGRP 34 | stat.S_IWGRP 35 | stat.S_IROTH 36 | stat.S_IWOTH 37 ) 38 # directories need to have exec permission as well (for opening) 39 self.dir_permissions = ( 40 self.file_permissions | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH 41 ) 42 43 def check_writeable(self, cachefile): 44 if not (os.access(cachefile.parent, os.W_OK) or os.access(cachefile, os.W_OK)): 45 self.raise_write_error(cachefile) 46 47 def check_readable(self, cachefile): 48 if not os.access(cachefile, os.R_OK): 49 self.raise_read_error(cachefile) 50 51 def store(self, job: Job): 52 """ 53 Store generated job output in the cache. 54 """ 55 56 if not os.access(self.path, os.W_OK): 57 raise WorkflowError( 58 "Cannot access cache location {}. Please ensure that " 59 "it is present and writeable.".format(self.path) 60 ) 61 with TemporaryDirectory(dir=self.path) as tmpdirname: 62 tmpdir = Path(tmpdirname) 63 64 for outputfile, cachefile in self.get_outputfiles_and_cachefiles(job): 65 if not os.path.exists(outputfile): 66 raise WorkflowError( 67 "Cannot move output file {} to cache. It does not exist " 68 "(maybe it was not created by the job?)." 69 ) 70 self.check_writeable(cachefile) 71 logger.info("Moving output file {} to cache.".format(outputfile)) 72 73 tmp = tmpdir / cachefile.name 74 # First move is performed into a tempdir (it might involve a copy if not on the same FS). 75 # This is important, such that network filesystem latency 76 # does not lead to concurrent writes to the same file. 77 # We can use the plain copy method of shutil, because we do not care about the metadata. 78 shutil.move(outputfile, tmp, copy_function=shutil.copy) 79 80 self.set_permissions(tmp) 81 82 # Move to the actual path (now we are on the same FS, hence move is atomic). 83 # Here we use the default copy function, also copying metadata (which is important here). 84 # It will always work, because we are guaranteed to be in the same FS. 85 shutil.move(tmp, cachefile) 86 # now restore the outputfile via a symlink 87 self.symlink(cachefile, outputfile, utime=False) 88 89 def fetch(self, job: Job): 90 """ 91 Retrieve cached output file and symlink to the place where the job expects it's output. 92 """ 93 for outputfile, cachefile in self.get_outputfiles_and_cachefiles(job): 94 95 if not cachefile.exists(): 96 self.raise_cache_miss_exception(job) 97 98 self.check_readable(cachefile) 99 if cachefile.is_dir(): 100 # For directories, create a new one and symlink each entry. 101 # Then, the .snakemake_timestamp of the new dir is touched 102 # by the executor. 103 outputfile.mkdir(parents=True, exist_ok=True) 104 for f in cachefile.iterdir(): 105 self.symlink(f, outputfile / f.name) 106 else: 107 self.symlink(cachefile, outputfile) 108 109 def exists(self, job: Job): 110 """ 111 Return True if job is already cached 112 """ 113 for outputfile, cachefile in self.get_outputfiles_and_cachefiles(job): 114 115 if not cachefile.exists(): 116 return False 117 118 self.check_readable(cachefile) 119 return True 120 121 def get_outputfiles_and_cachefiles(self, job: Job): 122 provenance_hash = self.provenance_hash_map.get_provenance_hash(job) 123 base_path = self.path / provenance_hash 124 125 return ( 126 (Path(outputfile), base_path.with_suffix(ext)) 127 for outputfile, ext in self.get_outputfiles(job) 128 ) 129 130 def symlink(self, path, outputfile, utime=True): 131 if os.utime in os.supports_follow_symlinks or not utime: 132 logger.info("Symlinking output file {} from cache.".format(outputfile)) 133 os.symlink(path, outputfile) 134 if utime: 135 os.utime(outputfile, follow_symlinks=False) 136 else: 137 logger.info( 138 "Copying output file {} from cache (OS does not support updating the modification date of symlinks).".format( 139 outputfile 140 ) 141 ) 142 shutil.copyfile(path, outputfile) 143 144 def set_permissions(self, entry): 145 # make readable/writeable for all 146 if entry.is_dir(): 147 # recursively apply permissions for all contained files 148 for root, dirs, files in os.walk(entry): 149 root = Path(root) 150 for d in dirs: 151 os.chmod(root / d, self.dir_permissions) 152 for f in files: 153 os.chmod(root / f, self.file_permissions) 154 os.chmod(entry, self.dir_permissions) 155 else: 156 os.chmod(entry, self.file_permissions) 157