1# Copyright (C) 2015-2021 Regents of the University of California 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import errno 15import logging 16import os 17import stat 18import pickle 19import random 20import re 21import shutil 22import stat 23import tempfile 24import time 25import uuid 26from contextlib import contextmanager 27 28from toil.fileStores import FileID 29from toil.job import TemporaryID 30from toil.jobStores.abstractJobStore import (AbstractJobStore, 31 JobStoreExistsException, 32 NoSuchFileException, 33 NoSuchJobException, 34 NoSuchJobStoreException) 35from toil.lib.io import AtomicFileCreate, atomic_copy, atomic_copyobj, robust_rmtree 36 37logger = logging.getLogger(__name__) 38 39 40class FileJobStore(AbstractJobStore): 41 """ 42 A job store that uses a directory on a locally attached file system. To be compatible with 43 distributed batch systems, that file system must be shared by all worker nodes. 44 """ 45 46 # Valid chars for the creation of temporary "spray" directories. 47 # Note that on case-insensitive filesystems we're twice as likely to use 48 # letter directories as number directories. 49 validDirs = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" 50 validDirsSet = set(validDirs) 51 52 # What prefix should be on the per-job job directories, to distinguish them 53 # from the spray directories? 54 JOB_DIR_PREFIX = 'instance-' 55 56 # What prefix do we put on the per-job-name directories we sort jobs into? 57 JOB_NAME_DIR_PREFIX = 'kind-' 58 59 # 10Mb RAM chunks when reading/writing files 60 BUFFER_SIZE = 10485760 # 10Mb 61 62 def __init__(self, path, fanOut=1000): 63 """ 64 :param str path: Path to directory holding the job store 65 :param int fanOut: Number of items to have in a directory before making 66 subdirectories 67 """ 68 super(FileJobStore, self).__init__() 69 self.jobStoreDir = os.path.abspath(path) 70 logger.debug("Path to job store directory is '%s'.", self.jobStoreDir) 71 72 # Directory where actual job files go, and their job-associated temp files 73 self.jobsDir = os.path.join(self.jobStoreDir, 'jobs') 74 # Directory where stats files go 75 self.statsDir = os.path.join(self.jobStoreDir, 'stats') 76 # Directory where non-job-associated files for the file store go 77 self.filesDir = os.path.join(self.jobStoreDir, 'files/no-job') 78 # Directory where job-associated files for the file store go. 79 # Each per-job directory in here will have separate directories for 80 # files to clean up and files to not clean up when the job is deleted. 81 self.jobFilesDir = os.path.join(self.jobStoreDir, 'files/for-job') 82 # Directory where shared files go 83 self.sharedFilesDir = os.path.join(self.jobStoreDir, 'files/shared') 84 85 self.fanOut = fanOut 86 87 self.linkImports = None 88 self.moveExports = None 89 90 def __repr__(self): 91 return f'FileJobStore({self.jobStoreDir})' 92 93 def initialize(self, config): 94 try: 95 os.mkdir(self.jobStoreDir) 96 except OSError as e: 97 if e.errno == errno.EEXIST: 98 raise JobStoreExistsException(self.jobStoreDir) 99 else: 100 raise 101 os.makedirs(self.jobsDir, exist_ok=True) 102 os.makedirs(self.statsDir, exist_ok=True) 103 os.makedirs(self.filesDir, exist_ok=True) 104 os.makedirs(self.jobFilesDir, exist_ok=True) 105 os.makedirs(self.sharedFilesDir, exist_ok=True) 106 self.linkImports = config.linkImports 107 self.moveExports = config.moveExports 108 super(FileJobStore, self).initialize(config) 109 110 def resume(self): 111 if not os.path.isdir(self.jobStoreDir): 112 raise NoSuchJobStoreException(self.jobStoreDir) 113 super(FileJobStore, self).resume() 114 115 def destroy(self): 116 if os.path.exists(self.jobStoreDir): 117 robust_rmtree(self.jobStoreDir) 118 119 ########################################## 120 # The following methods deal with creating/loading/updating/writing/checking for the 121 # existence of jobs 122 ########################################## 123 124 def assignID(self, jobDescription): 125 # Get the job's name. We want to group jobs with the same name together. 126 # This will be e.g. the function name for wrapped-function jobs. 127 # Make sure to render it filename-safe 128 usefulFilename = self._makeStringFilenameSafe(jobDescription.jobName) 129 130 # Make a unique temp directory under a directory for this job name, 131 # possibly sprayed across multiple levels of subdirectories. 132 absJobDir = tempfile.mkdtemp(prefix=self.JOB_DIR_PREFIX, 133 dir=self._getArbitraryJobsDirForName(usefulFilename)) 134 135 jobDescription.jobStoreID = self._getJobIdFromDir(absJobDir) 136 137 def create(self, jobDescription): 138 if hasattr(self, "_batchedUpdates") and self._batchedUpdates is not None: 139 # Save it later 140 self._batchedUpdates.append(jobDescription) 141 else: 142 # Save it now 143 self.update(jobDescription) 144 return jobDescription 145 146 @contextmanager 147 def batch(self): 148 self._batchedUpdates = [] 149 yield 150 for jobDescription in self._batchedUpdates: 151 self.update(jobDescription) 152 self._batchedUpdates = None 153 154 def _waitForExists(self, jobStoreID, maxTries=35, sleepTime=1): 155 """ 156 Spin-wait and block for a job to appear before returning 157 False if it does not. 158 """ 159 return self._waitForFile(self._getJobFileName(jobStoreID), maxTries=maxTries, sleepTime=sleepTime) 160 161 def _waitForFile(self, fileName, maxTries=35, sleepTime=1): 162 """ 163 Spin-wait and block for a file or directory to appear before returning 164 False if it does not. 165 166 The total max wait time is maxTries * sleepTime. The current default is 167 tuned to match Linux NFS defaults where the client's cache of the directory 168 listing on the server is supposed to become coherent within 30 sec. 169 Delayes beyond that would probably indicate a pathologically slow file system 170 that just should not be used for the jobStore. 171 172 The warning will be sent to the log only on the first retry. 173 174 In practice, the need for retries happens rarely, but it does happen 175 over the course of large workflows with a jobStore on a busy NFS. 176 """ 177 for iTry in range(1,maxTries+1): 178 if os.path.exists(fileName): 179 return True 180 if iTry >= maxTries: 181 return False 182 elif iTry == 1: 183 logger.warning(("Path `{}` does not exist (yet). We will try #{} more times with {}s " 184 "intervals.").format(fileName, maxTries - iTry, sleepTime)) 185 time.sleep(sleepTime) 186 return False 187 188 def exists(self, jobStoreID): 189 return os.path.exists(self._getJobFileName(jobStoreID)) 190 191 def getPublicUrl(self, jobStoreFileID): 192 self._checkJobStoreFileID(jobStoreFileID) 193 jobStorePath = self._getFilePathFromId(jobStoreFileID) 194 if os.path.exists(jobStorePath): 195 return 'file:' + jobStorePath 196 else: 197 raise NoSuchFileException(jobStoreFileID) 198 199 def getSharedPublicUrl(self, sharedFileName): 200 jobStorePath = os.path.join(self.sharedFilesDir, sharedFileName) 201 if not os.path.exists(jobStorePath): 202 raise NoSuchFileException(sharedFileName) 203 return 'file:' + jobStorePath 204 205 def load(self, jobStoreID): 206 self._checkJobStoreIdExists(jobStoreID) 207 # Load a valid version of the job 208 jobFile = self._getJobFileName(jobStoreID) 209 with open(jobFile, 'rb') as fileHandle: 210 job = pickle.load(fileHandle) 211 212 # Pass along the current config, which is the JobStore's responsibility. 213 job.assignConfig(self.config) 214 215 # The following cleans up any issues resulting from the failure of the 216 # job during writing by the batch system. 217 if os.path.isfile(jobFile + ".new"): 218 logger.warning("There was a .new file for the job: %s", jobStoreID) 219 os.remove(jobFile + ".new") 220 job.setupJobAfterFailure() 221 return job 222 223 def update(self, job): 224 assert job.jobStoreID is not None, f"Tried to update job {job} without an ID" 225 assert not isinstance(job.jobStoreID, TemporaryID), f"Tried to update job {job} without an assigned ID" 226 227 # The job is serialised to a file suffixed by ".new" 228 # We insist on creating the file; an existing .new file indicates 229 # multiple simultaneous attempts to update the job, which will lose 230 # updates. 231 # The file is then moved to its correct path. 232 # Atomicity guarantees use the fact the underlying file systems "move" 233 # function is atomic. 234 with open(self._getJobFileName(job.jobStoreID) + ".new", 'xb') as f: 235 pickle.dump(job, f) 236 # This should be atomic for the file system 237 os.rename(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID)) 238 239 def delete(self, jobStoreID): 240 # The jobStoreID is the relative path to the directory containing the job, 241 # removing this directory deletes the job. 242 if self.exists(jobStoreID): 243 # Remove the job-associated files in need of cleanup, which may or 244 # may not live under the job's directory. 245 robust_rmtree(self._getJobFilesCleanupDir(jobStoreID)) 246 # Remove the job's directory itself. 247 robust_rmtree(self._getJobDirFromId(jobStoreID)) 248 249 def jobs(self): 250 # Walk through list of temporary directories searching for jobs. 251 # Jobs are files that start with 'job'. 252 # Note that this also catches jobWhatever.new which exists if an update 253 # is in progress. 254 for tempDir in self._jobDirectories(): 255 for i in os.listdir(tempDir): 256 if i.startswith(self.JOB_DIR_PREFIX): 257 # This is a job instance directory 258 jobId = self._getJobIdFromDir(os.path.join(tempDir, i)) 259 try: 260 if self.exists(jobId): 261 yield self.load(jobId) 262 except NoSuchJobException: 263 # An orphaned job may leave an empty or incomplete job file which we can safely ignore 264 pass 265 266 ########################################## 267 # Functions that deal with temporary files associated with jobs 268 ########################################## 269 270 @contextmanager 271 def optionalHardCopy(self, hardlink): 272 if hardlink: 273 saved = self.linkImports 274 self.linkImports = False 275 yield 276 if hardlink: 277 self.linkImports = saved 278 279 def _copyOrLink(self, srcURL, destPath, symlink=False): 280 # linking is not done be default because of issue #1755 281 srcPath = self._extractPathFromUrl(srcURL) 282 if self.linkImports or symlink: 283 os.symlink(os.path.realpath(srcPath), destPath) 284 else: 285 atomic_copy(srcPath, destPath) 286 287 def _importFile(self, otherCls, url, sharedFileName=None, hardlink=False, symlink=False): 288 if issubclass(otherCls, FileJobStore): 289 if sharedFileName is None: 290 executable = os.stat(url.path).st_mode & stat.S_IXUSR != 0 291 absPath = self._getUniqueFilePath(url.path) # use this to get a valid path to write to in job store 292 with self.optionalHardCopy(hardlink): 293 self._copyOrLink(url, absPath, symlink=symlink) 294 # TODO: os.stat(absPath).st_size consistently gives values lower than 295 # getDirSizeRecursively() 296 return FileID(self._getFileIdFromPath(absPath), os.stat(absPath).st_size, executable) 297 else: 298 self._requireValidSharedFileName(sharedFileName) 299 path = self._getSharedFilePath(sharedFileName) 300 with self.optionalHardCopy(hardlink): 301 self._copyOrLink(url, path, symlink=symlink) 302 return None 303 else: 304 return super(FileJobStore, self)._importFile(otherCls, url, 305 sharedFileName=sharedFileName) 306 307 def _exportFile(self, otherCls, jobStoreFileID, url): 308 if issubclass(otherCls, FileJobStore): 309 srcPath = self._getFilePathFromId(jobStoreFileID) 310 destPath = self._extractPathFromUrl(url) 311 executable = getattr(jobStoreFileID, 'executable', False) 312 if self.moveExports: 313 self._move_and_linkback(srcPath, destPath, executable=executable) 314 else: 315 atomic_copy(srcPath, destPath, executable=executable) 316 else: 317 super(FileJobStore, self)._defaultExportFile(otherCls, jobStoreFileID, url) 318 319 def _move_and_linkback(self, srcPath, destPath, executable): 320 logger.debug("moveExports option, Moving src=%s to dest=%s ; then symlinking dest to src", srcPath, destPath) 321 shutil.move(srcPath, destPath) 322 os.symlink(destPath, srcPath) 323 if executable: 324 os.chmod(destPath, os.stat(destPath).st_mode | stat.S_IXUSR) 325 326 @classmethod 327 def getSize(cls, url): 328 return os.stat(cls._extractPathFromUrl(url)).st_size 329 330 @classmethod 331 def _readFromUrl(cls, url, writable): 332 """ 333 Writes the contents of a file to a source (writes url to writable) 334 using a ~10Mb buffer. 335 336 :param str url: A path as a string of the file to be read from. 337 :param object writable: An open file object to write to. 338 """ 339 340 # we use a ~10Mb buffer to improve speed 341 with open(cls._extractPathFromUrl(url), 'rb') as readable: 342 shutil.copyfileobj(readable, writable, length=cls.BUFFER_SIZE) 343 # Return the number of bytes we read when we reached EOF. 344 executable = os.stat(readable.name).st_mode & stat.S_IXUSR 345 return readable.tell(), executable 346 347 348 @classmethod 349 def _writeToUrl(cls, readable, url, executable=False): 350 """ 351 Writes the contents of a file to a source (writes readable to url) 352 using a ~10Mb buffer. 353 354 :param str url: A path as a string of the file to be written to. 355 :param object readable: An open file object to read from. 356 """ 357 # we use a ~10Mb buffer to improve speed 358 atomic_copyobj(readable, 359 cls._extractPathFromUrl(url), 360 length=cls.BUFFER_SIZE, 361 executable=executable) 362 363 364 @staticmethod 365 def _extractPathFromUrl(url): 366 """ 367 :return: local file path of file pointed at by the given URL 368 """ 369 if url.netloc != '' and url.netloc != 'localhost': 370 raise RuntimeError("The URL '%s' is invalid" % url.geturl()) 371 return url.netloc + url.path 372 373 @classmethod 374 def _supportsUrl(cls, url, export=False): 375 return url.scheme.lower() == 'file' 376 377 def _makeStringFilenameSafe(self, arbitraryString, maxLength=240): 378 """ 379 Given an arbitrary string, produce a filename-safe though not 380 necessarily unique string based on it. 381 382 The input string may be discarded altogether and replaced with any 383 other nonempty filename-safe string. 384 385 :param str arbitraryString: An arbitrary string 386 :param int maxLength: Maximum length of the result, to keep it plus 387 any prefix or suffix under the filesystem's 388 path component length limit 389 390 :return: A filename-safe string 391 """ 392 393 # We will fill this in with the filename-safe parts we find. 394 parts = [] 395 396 for substring in re.findall("[A-Za-z0-9._-]+", arbitraryString): 397 # Collect all the matching substrings 398 parts.append(substring) 399 400 if len(parts) == 0: 401 parts.append("UNPRINTABLE") 402 403 # Glue it all together, and truncate to length 404 return '_'.join(parts)[:maxLength] 405 406 def writeFile(self, localFilePath, jobStoreID=None, cleanup=False): 407 absPath = self._getUniqueFilePath(localFilePath, jobStoreID, cleanup) 408 relPath = self._getFileIdFromPath(absPath) 409 atomic_copy(localFilePath, absPath) 410 return relPath 411 412 @contextmanager 413 def writeFileStream(self, jobStoreID=None, cleanup=False, basename=None, encoding=None, errors=None): 414 if not basename: 415 basename = 'stream' 416 absPath = self._getUniqueFilePath(basename, jobStoreID, cleanup) 417 relPath = self._getFileIdFromPath(absPath) 418 419 with open(absPath, 'wb' if encoding == None else 'wt', encoding=encoding, errors=errors) as f: 420 # Don't yield while holding an open file descriptor to the temp 421 # file. That can result in temp files still being open when we try 422 # to clean ourselves up, somehow, for certain workloads. 423 yield f, relPath 424 425 def getEmptyFileStoreID(self, jobStoreID=None, cleanup=False, basename=None): 426 with self.writeFileStream(jobStoreID, cleanup, basename) as (fileHandle, jobStoreFileID): 427 return jobStoreFileID 428 429 def updateFile(self, jobStoreFileID, localFilePath): 430 self._checkJobStoreFileID(jobStoreFileID) 431 jobStoreFilePath = self._getFilePathFromId(jobStoreFileID) 432 433 if os.path.samefile(jobStoreFilePath, localFilePath): 434 # The files are already the same file. We can't copy on eover the other. 435 return 436 437 atomic_copy(localFilePath, jobStoreFilePath) 438 439 def readFile(self, jobStoreFileID, localFilePath, symlink=False): 440 self._checkJobStoreFileID(jobStoreFileID) 441 jobStoreFilePath = self._getFilePathFromId(jobStoreFileID) 442 localDirPath = os.path.dirname(localFilePath) 443 executable = getattr(jobStoreFileID, 'executable', False) 444 445 if not symlink and os.path.islink(localFilePath): 446 # We had a symlink and want to clobber it with a hardlink or copy. 447 os.unlink(localFilePath) 448 449 if os.path.exists(localFilePath) and os.path.samefile(jobStoreFilePath, localFilePath): 450 # The files are already the same: same name, hardlinked, or 451 # symlinked. There is nothing to do, and trying to shutil.copyfile 452 # one over the other will fail. 453 return 454 455 if symlink: 456 # If the reader will accept a symlink, so always give them one. 457 # There's less that can go wrong. 458 try: 459 os.symlink(jobStoreFilePath, localFilePath) 460 # It worked! 461 return 462 except OSError as e: 463 if e.errno == errno.EEXIST: 464 # Overwrite existing file, emulating shutil.copyfile(). 465 os.unlink(localFilePath) 466 # It would be very unlikely to fail again for same reason but possible 467 # nonetheless in which case we should just give up. 468 os.symlink(jobStoreFilePath, localFilePath) 469 # Now we succeeded and don't need to copy 470 return 471 else: 472 raise 473 474 # If we get here, symlinking isn't an option. 475 if os.stat(jobStoreFilePath).st_dev == os.stat(localDirPath).st_dev: 476 # It is possible that we can hard link the file. 477 # Note that even if the device numbers match, we can end up trying 478 # to create a "cross-device" link. 479 480 try: 481 os.link(jobStoreFilePath, localFilePath) 482 # It worked! 483 return 484 except OSError as e: 485 if e.errno == errno.EEXIST: 486 # Overwrite existing file, emulating shutil.copyfile(). 487 os.unlink(localFilePath) 488 # It would be very unlikely to fail again for same reason but possible 489 # nonetheless in which case we should just give up. 490 os.link(jobStoreFilePath, localFilePath) 491 # Now we succeeded and don't need to copy 492 return 493 elif e.errno == errno.EXDEV: 494 # It's a cross-device link even though it didn't appear to be. 495 # Just keep going and hit the file copy case. 496 pass 497 else: 498 logger.critical('Unexpected OSError when reading file from job store') 499 logger.critical('jobStoreFilePath: ' + jobStoreFilePath + ' ' + str(os.path.exists(jobStoreFilePath))) 500 logger.critical('localFilePath: ' + localFilePath + ' ' + str(os.path.exists(localFilePath))) 501 raise 502 503 # If we get here, neither a symlink nor a hardlink will work. 504 # Make a complete copy. 505 atomic_copy(jobStoreFilePath, localFilePath, executable=executable) 506 507 def deleteFile(self, jobStoreFileID): 508 if not self.fileExists(jobStoreFileID): 509 return 510 os.remove(self._getFilePathFromId(jobStoreFileID)) 511 512 def fileExists(self, jobStoreFileID): 513 absPath = self._getFilePathFromId(jobStoreFileID) 514 515 if (not absPath.startswith(self.jobsDir) and 516 not absPath.startswith(self.filesDir) and 517 not absPath.startswith(self.jobFilesDir)): 518 # Don't even look for it, it is out of bounds. 519 raise NoSuchFileException(jobStoreFileID) 520 521 try: 522 st = os.stat(absPath) 523 except os.error: 524 return False 525 if not stat.S_ISREG(st.st_mode): 526 raise NoSuchFileException(jobStoreFileID) 527 return True 528 529 def getFileSize(self, jobStoreFileID): 530 # Duplicate a bunch of fileExists to save on stat calls 531 absPath = self._getFilePathFromId(jobStoreFileID) 532 533 if (not absPath.startswith(self.jobsDir) and 534 not absPath.startswith(self.filesDir) and 535 not absPath.startswith(self.jobFilesDir)): 536 # Don't even look for it, it is out of bounds. 537 raise NoSuchFileException(jobStoreFileID) 538 539 try: 540 st = os.stat(absPath) 541 except os.error: 542 return 0 543 return st.st_size 544 545 @contextmanager 546 def updateFileStream(self, jobStoreFileID, encoding=None, errors=None): 547 self._checkJobStoreFileID(jobStoreFileID) 548 # File objects are context managers (CM) so we could simply return what open returns. 549 # However, it is better to wrap it in another CM so as to prevent users from accessing 550 # the file object directly, without a with statement. 551 with open(self._getFilePathFromId(jobStoreFileID), 'wb' if encoding == None else 'wt', encoding=encoding, errors=errors) as f: 552 yield f 553 554 @contextmanager 555 def readFileStream(self, jobStoreFileID, encoding=None, errors=None): 556 self._checkJobStoreFileID(jobStoreFileID) 557 with open(self._getFilePathFromId(jobStoreFileID), 'rb' if encoding == None else 'rt', encoding=encoding, errors=errors) as f: 558 yield f 559 560 ########################################## 561 # The following methods deal with shared files, i.e. files not associated 562 # with specific jobs. 563 ########################################## 564 565 def _getSharedFilePath(self, sharedFileName): 566 return os.path.join(self.sharedFilesDir, sharedFileName) 567 568 @contextmanager 569 def writeSharedFileStream(self, sharedFileName, isProtected=None, encoding=None, errors=None): 570 # the isProtected parameter has no effect on the fileStore 571 self._requireValidSharedFileName(sharedFileName) 572 with AtomicFileCreate(self._getSharedFilePath(sharedFileName)) as tmpSharedFilePath: 573 with open(tmpSharedFilePath, 'wb' if encoding == None else 'wt', encoding=encoding, errors=None) as f: 574 yield f 575 576 @contextmanager 577 def readSharedFileStream(self, sharedFileName, encoding=None, errors=None): 578 self._requireValidSharedFileName(sharedFileName) 579 try: 580 with open(self._getSharedFilePath(sharedFileName), 'rb' if encoding == None else 'rt', encoding=encoding, errors=errors) as f: 581 yield f 582 583 except IOError as e: 584 if e.errno == errno.ENOENT: 585 raise NoSuchFileException(sharedFileName) 586 else: 587 raise 588 589 def writeStatsAndLogging(self, statsAndLoggingString): 590 # Temporary files are placed in the stats directory tree 591 tempStatsFileName = "stats" + str(uuid.uuid4().hex) + ".new" 592 tempStatsFile = os.path.join(self._getArbitraryStatsDir(), tempStatsFileName) 593 writeFormat = 'w' if isinstance(statsAndLoggingString, str) else 'wb' 594 with open(tempStatsFile, writeFormat) as f: 595 f.write(statsAndLoggingString) 596 os.rename(tempStatsFile, tempStatsFile[:-4]) # This operation is atomic 597 598 def readStatsAndLogging(self, callback, readAll=False): 599 numberOfFilesProcessed = 0 600 for tempDir in self._statsDirectories(): 601 for tempFile in os.listdir(tempDir): 602 if tempFile.startswith('stats'): 603 absTempFile = os.path.join(tempDir, tempFile) 604 if os.path.isfile(absTempFile): 605 if readAll or not tempFile.endswith('.new'): 606 with open(absTempFile, 'rb') as fH: 607 callback(fH) 608 numberOfFilesProcessed += 1 609 newName = tempFile.rsplit('.', 1)[0] + '.new' 610 newAbsTempFile = os.path.join(tempDir, newName) 611 # Mark this item as read 612 os.rename(absTempFile, newAbsTempFile) 613 return numberOfFilesProcessed 614 615 ########################################## 616 # Private methods 617 ########################################## 618 619 def _getJobDirFromId(self, jobStoreID): 620 """ 621 622 Find the directory for a job, which holds its job file. 623 624 :param str jobStoreID: ID of a job, which is a relative to self.jobsDir. 625 :rtype : string, string is the absolute path to a job directory inside self.jobsDir. 626 """ 627 return os.path.join(self.jobsDir, jobStoreID) 628 629 def _getJobIdFromDir(self, absPath): 630 """ 631 :param str absPath: The absolute path to a job directory under self.jobsDir which represents a job. 632 :rtype : string, string is the job ID, which is a path relative to self.jobsDir 633 """ 634 return absPath[len(self.jobsDir)+1:] 635 636 def _getJobFileName(self, jobStoreID): 637 """ 638 Return the path to the file containing the serialised JobDescription instance for the given 639 job. 640 641 :rtype: str 642 """ 643 return os.path.join(self._getJobDirFromId(jobStoreID), "job") 644 645 def _getJobFilesDir(self, jobStoreID): 646 """ 647 Return the path to the directory that should hold files made by the 648 given job that should survive its deletion. 649 650 This directory will only be created if files are to be put in it. 651 652 :rtype : string, string is the absolute path to the job's files 653 directory 654 """ 655 656 return os.path.join(self.jobFilesDir, jobStoreID) 657 658 def _getJobFilesCleanupDir(self, jobStoreID): 659 """ 660 Return the path to the directory that should hold files made by the 661 given job that will be deleted when the job is deleted. 662 663 This directory will only be created if files are to be put in it. 664 665 It may or may not be a subdirectory of the job's own directory. 666 667 :rtype : string, string is the absolute path to the job's cleanup 668 files directory 669 """ 670 671 return os.path.join(self.jobFilesDir, jobStoreID, "cleanup") 672 673 def _checkJobStoreIdAssigned(self, jobStoreID): 674 """ 675 Do nothing if the given job store ID has been assigned by 676 :meth:`assignID`, and the corresponding job has not yet been 677 deleted, even if the JobDescription hasn't yet been saved for the first 678 time. 679 680 If the ID has not been assigned, raises a NoSuchJobException. 681 """ 682 683 if not self._waitForFile(self._getJobDirFromId(jobStoreID)): 684 raise NoSuchJobException(jobStoreID) 685 686 def _checkJobStoreIdExists(self, jobStoreID): 687 """ 688 Raises a NoSuchJobException if the job with ID jobStoreID does not exist. 689 """ 690 if not self._waitForExists(jobStoreID, 30): 691 raise NoSuchJobException(jobStoreID) 692 693 def _getFilePathFromId(self, jobStoreFileID): 694 """ 695 :param str jobStoreFileID: The ID of a file 696 697 :rtype : string, string is the absolute path that that file should 698 appear at on disk, under either self.jobsDir if it is to be 699 cleaned up with a job, or self.filesDir otherwise. 700 """ 701 702 # We just make the file IDs paths under the job store overall. 703 absPath = os.path.join(self.jobStoreDir, jobStoreFileID) 704 705 # Don't validate here, we are called by the validation logic 706 707 return absPath 708 709 def _getFileIdFromPath(self, absPath): 710 """ 711 :param str absPath: The absolute path of a file. 712 713 :rtype : string, string is the file ID. 714 """ 715 716 return absPath[len(self.jobStoreDir)+1:] 717 718 def _checkJobStoreFileID(self, jobStoreFileID): 719 """ 720 :raise NoSuchFileException: if the file with ID jobStoreFileID does 721 not exist or is not a file 722 """ 723 if not self.fileExists(jobStoreFileID): 724 raise NoSuchFileException(jobStoreFileID) 725 726 def _getArbitraryJobsDirForName(self, jobNameSlug): 727 """ 728 Gets a temporary directory in a multi-level hierarchy in self.jobsDir. 729 The directory is not unique and may already have other jobs' directories in it. 730 We organize them at the top level by job name, to be user-inspectable. 731 732 We make sure to prepend a string so that job names can't collide with 733 spray directory names. 734 735 :param str jobNameSlug: A partial filename derived from the job name. 736 Used as the first level of the directory hierarchy. 737 738 :rtype : string, path to temporary directory in which to place files/directories. 739 740 741 """ 742 743 744 if len(os.listdir(self.jobsDir)) > self.fanOut: 745 # Make sure that we don't over-fill the root with too many unique job names. 746 # Go in a subdirectory tree, and then go by job name and make another tree. 747 return self._getDynamicSprayDir(os.path.join(self._getDynamicSprayDir(self.jobsDir), 748 self.JOB_NAME_DIR_PREFIX + jobNameSlug)) 749 else: 750 # Just go in the root 751 return self._getDynamicSprayDir(os.path.join(self.jobsDir, self.JOB_NAME_DIR_PREFIX + jobNameSlug)) 752 753 def _getArbitraryStatsDir(self): 754 """ 755 Gets a temporary directory in a multi-level hierarchy in self.statsDir. 756 The directory is not unique and may already have other stats files in it. 757 758 :rtype : string, path to temporary directory in which to place files/directories. 759 760 761 """ 762 763 return self._getDynamicSprayDir(self.statsDir) 764 765 def _getArbitraryFilesDir(self): 766 """ 767 Gets a temporary directory in a multi-level hierarchy in self.filesDir. 768 The directory is not unique and may already have other user files in it. 769 770 :rtype : string, path to temporary directory in which to place files/directories. 771 772 773 """ 774 775 return self._getDynamicSprayDir(self.filesDir) 776 777 def _getDynamicSprayDir(self, root): 778 """ 779 Gets a temporary directory in a possibly multi-level hierarchy of 780 directories under the given root. 781 782 Each time a directory in the hierarchy starts to fill up, additional 783 hierarchy levels are created under it, and we randomly "spray" further 784 files and directories across them. 785 786 We can't actually enforce that we never go over our internal limit for 787 files in a directory, because any number of calls to this function can 788 be happening simultaneously. But we can enforce that, once too many 789 files are visible on disk, only subdirectories will be created. 790 791 The returned directory will exist, and may contain other data already. 792 793 The caller may not create any files or directories in the returned 794 directory with single-character names that are in self.validDirs. 795 796 :param str root : directory to put the hierarchy under, which will 797 fill first. 798 799 :rtype : string, path to temporary directory in which to place 800 files/directories. 801 """ 802 tempDir = root 803 804 # Make sure the root exists 805 os.makedirs(tempDir, exist_ok=True) 806 807 while len(os.listdir(tempDir)) >= self.fanOut: 808 # We need to use a layer of directories under here to avoid over-packing the directory 809 tempDir = os.path.join(tempDir, random.choice(self.validDirs)) 810 os.makedirs(tempDir, exist_ok=True) 811 812 # When we get here, we found a sufficiently empty directory 813 return tempDir 814 815 def _walkDynamicSprayDir(self, root): 816 """ 817 Walks over a directory tree filled in by _getDynamicSprayDir. 818 819 Yields each directory _getDynamicSprayDir has ever returned, and no 820 directories it has not returned (besides the root). 821 822 If the caller looks in the directory, they must ignore subdirectories 823 with single-character names in self.validDirs. 824 825 :param str root : directory the hierarchy was put under 826 827 :rtype : an iterator over directories 828 """ 829 830 # Always yield the root. 831 # The caller is responsible for dealing with it if it has gone away. 832 yield root 833 834 children = [] 835 836 try: 837 # Look for children 838 children = os.listdir(root) 839 except: 840 # Don't care if they are gone 841 pass 842 843 for child in children: 844 # Go over all the children 845 if child not in self.validDirsSet: 846 # Only look at our reserved names we use for fan-out 847 continue 848 849 # We made this directory, so go look in it 850 childPath = os.path.join(root, child) 851 852 # Recurse 853 for item in self._walkDynamicSprayDir(childPath): 854 yield item 855 856 def _jobDirectories(self): 857 """ 858 :rtype : an iterator to the temporary directories containing job 859 files. They may also contain directories containing more 860 job files. 861 """ 862 863 # Walking the job directories is more complicated. 864 # We have one layer of spray (which is sometimes bypassed, but that's OK), then a job name, then another layer. 865 # We can tell the job name directories from the spray directories because they start with self.JOB_NAME_DIR_PREFIX. 866 # We never look at the directories containing the job name directories, 867 # so they aren't mistaken for the leaf-level per-job job directories. 868 869 for jobHoldingDir in self._walkDynamicSprayDir(self.jobsDir): 870 # For every directory in the first spray, look at children 871 children = [] 872 873 try: 874 children = os.listdir(jobHoldingDir) 875 except: 876 pass 877 878 for jobNameDir in children: 879 if not jobNameDir.startswith(self.JOB_NAME_DIR_PREFIX): 880 continue 881 882 # Now we have only the directories that are named after jobs. Look inside them. 883 for inner in self._walkDynamicSprayDir(os.path.join(jobHoldingDir, jobNameDir)): 884 yield inner 885 886 887 def _statsDirectories(self): 888 """ 889 :rtype : an iterator to the temporary directories containing stats 890 files. They may also contain directories containing more 891 stats files. 892 """ 893 894 return self._walkDynamicSprayDir(self.statsDir) 895 896 def _getUniqueFilePath(self, fileName, jobStoreID=None, cleanup=False): 897 """ 898 Create unique file name within a jobStore directory or tmp directory. 899 900 :param fileName: A file name, which can be a full path as only the 901 basename will be used. 902 :param jobStoreID: If given, the path returned will be in a directory including the job's ID as part of its path. 903 :param bool cleanup: If True and jobStoreID is set, the path will be in 904 a place such that it gets deleted when the job is deleted. 905 :return: The full path with a unique file name. 906 """ 907 908 # Give the file a unique directory that either will be cleaned up with a job or won't. 909 directory = self._getFileDirectory(jobStoreID, cleanup) 910 # And then a path under it 911 uniquePath = os.path.join(directory, os.path.basename(fileName)) 912 # No need to check if it exists already; it is in a unique directory. 913 return uniquePath 914 915 def _getFileDirectory(self, jobStoreID=None, cleanup=False): 916 """ 917 Get a new empty directory path for a file to be stored at. 918 919 920 :param str jobStoreID: If the jobStoreID is not None, the file wil 921 be associated with the job with that ID. 922 923 :param bool cleanup: If cleanup is also True, this directory 924 will be cleaned up when the job is deleted. 925 926 :rtype :string, string is the absolute path to a directory to put the file in. 927 """ 928 if jobStoreID != None: 929 # Make a temporary file within the job's files directory 930 931 # Make sure the job is legit 932 self._checkJobStoreIdAssigned(jobStoreID) 933 # Find where all its created files should live, depending on if 934 # they need to go away when the job is deleted or not. 935 jobFilesDir = self._getJobFilesDir(jobStoreID) if not cleanup else self._getJobFilesCleanupDir(jobStoreID) 936 937 # Lazily create the parent directory. 938 # We don't want our tree filled with confusingly empty directories. 939 os.makedirs(jobFilesDir, exist_ok=True) 940 941 # Then make a temp directory inside it 942 filesDir = os.path.join(jobFilesDir, 'file-' + uuid.uuid4().hex) 943 os.mkdir(filesDir) 944 return filesDir 945 else: 946 # Make a temporary file within the non-job-associated files hierarchy 947 filesDir = os.path.join(self._getArbitraryFilesDir(), 'file-' + uuid.uuid4().hex) 948 os.mkdir(filesDir) 949 return filesDir 950