1# Copyright (C) 2015-2021 Regents of the University of California
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import errno
15import logging
16import os
17import stat
18import pickle
19import random
20import re
21import shutil
22import stat
23import tempfile
24import time
25import uuid
26from contextlib import contextmanager
27
28from toil.fileStores import FileID
29from toil.job import TemporaryID
30from toil.jobStores.abstractJobStore import (AbstractJobStore,
31                                             JobStoreExistsException,
32                                             NoSuchFileException,
33                                             NoSuchJobException,
34                                             NoSuchJobStoreException)
35from toil.lib.io import AtomicFileCreate, atomic_copy, atomic_copyobj, robust_rmtree
36
37logger = logging.getLogger(__name__)
38
39
40class FileJobStore(AbstractJobStore):
41    """
42    A job store that uses a directory on a locally attached file system. To be compatible with
43    distributed batch systems, that file system must be shared by all worker nodes.
44    """
45
46    # Valid chars for the creation of temporary "spray" directories.
47    # Note that on case-insensitive filesystems we're twice as likely to use
48    # letter directories as number directories.
49    validDirs = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
50    validDirsSet = set(validDirs)
51
52    # What prefix should be on the per-job job directories, to distinguish them
53    # from the spray directories?
54    JOB_DIR_PREFIX = 'instance-'
55
56    # What prefix do we put on the per-job-name directories we sort jobs into?
57    JOB_NAME_DIR_PREFIX = 'kind-'
58
59    # 10Mb RAM chunks when reading/writing files
60    BUFFER_SIZE = 10485760 # 10Mb
61
62    def __init__(self, path, fanOut=1000):
63        """
64        :param str path: Path to directory holding the job store
65        :param int fanOut: Number of items to have in a directory before making
66                           subdirectories
67        """
68        super(FileJobStore, self).__init__()
69        self.jobStoreDir = os.path.abspath(path)
70        logger.debug("Path to job store directory is '%s'.", self.jobStoreDir)
71
72        # Directory where actual job files go, and their job-associated temp files
73        self.jobsDir = os.path.join(self.jobStoreDir, 'jobs')
74        # Directory where stats files go
75        self.statsDir = os.path.join(self.jobStoreDir, 'stats')
76        # Directory where non-job-associated files for the file store go
77        self.filesDir = os.path.join(self.jobStoreDir, 'files/no-job')
78        # Directory where job-associated files for the file store go.
79        # Each per-job directory in here will have separate directories for
80        # files to clean up and files to not clean up when the job is deleted.
81        self.jobFilesDir = os.path.join(self.jobStoreDir, 'files/for-job')
82        # Directory where shared files go
83        self.sharedFilesDir = os.path.join(self.jobStoreDir, 'files/shared')
84
85        self.fanOut = fanOut
86
87        self.linkImports = None
88        self.moveExports = None
89
90    def __repr__(self):
91        return f'FileJobStore({self.jobStoreDir})'
92
93    def initialize(self, config):
94        try:
95            os.mkdir(self.jobStoreDir)
96        except OSError as e:
97            if e.errno == errno.EEXIST:
98                raise JobStoreExistsException(self.jobStoreDir)
99            else:
100                raise
101        os.makedirs(self.jobsDir, exist_ok=True)
102        os.makedirs(self.statsDir, exist_ok=True)
103        os.makedirs(self.filesDir, exist_ok=True)
104        os.makedirs(self.jobFilesDir, exist_ok=True)
105        os.makedirs(self.sharedFilesDir, exist_ok=True)
106        self.linkImports = config.linkImports
107        self.moveExports = config.moveExports
108        super(FileJobStore, self).initialize(config)
109
110    def resume(self):
111        if not os.path.isdir(self.jobStoreDir):
112            raise NoSuchJobStoreException(self.jobStoreDir)
113        super(FileJobStore, self).resume()
114
115    def destroy(self):
116        if os.path.exists(self.jobStoreDir):
117            robust_rmtree(self.jobStoreDir)
118
119    ##########################################
120    # The following methods deal with creating/loading/updating/writing/checking for the
121    # existence of jobs
122    ##########################################
123
124    def assignID(self, jobDescription):
125        # Get the job's name. We want to group jobs with the same name together.
126        # This will be e.g. the function name for wrapped-function jobs.
127        # Make sure to render it filename-safe
128        usefulFilename = self._makeStringFilenameSafe(jobDescription.jobName)
129
130        # Make a unique temp directory under a directory for this job name,
131        # possibly sprayed across multiple levels of subdirectories.
132        absJobDir = tempfile.mkdtemp(prefix=self.JOB_DIR_PREFIX,
133                                     dir=self._getArbitraryJobsDirForName(usefulFilename))
134
135        jobDescription.jobStoreID = self._getJobIdFromDir(absJobDir)
136
137    def create(self, jobDescription):
138        if hasattr(self, "_batchedUpdates") and self._batchedUpdates is not None:
139            # Save it later
140            self._batchedUpdates.append(jobDescription)
141        else:
142            # Save it now
143            self.update(jobDescription)
144        return jobDescription
145
146    @contextmanager
147    def batch(self):
148        self._batchedUpdates = []
149        yield
150        for jobDescription in self._batchedUpdates:
151            self.update(jobDescription)
152        self._batchedUpdates = None
153
154    def _waitForExists(self, jobStoreID, maxTries=35, sleepTime=1):
155        """
156        Spin-wait and block for a job to appear before returning
157        False if it does not.
158        """
159        return self._waitForFile(self._getJobFileName(jobStoreID), maxTries=maxTries, sleepTime=sleepTime)
160
161    def _waitForFile(self, fileName, maxTries=35, sleepTime=1):
162        """
163        Spin-wait and block for a file or directory to appear before returning
164        False if it does not.
165
166        The total max wait time is maxTries * sleepTime. The current default is
167        tuned to match Linux NFS defaults where the client's cache of the directory
168        listing on the server is supposed to become coherent within 30 sec.
169        Delayes beyond that would probably indicate a pathologically slow file system
170        that just should not be used for the jobStore.
171
172        The warning will be sent to the log only on the first retry.
173
174        In practice, the need for retries happens rarely, but it does happen
175        over the course of large workflows with a jobStore on a busy NFS.
176        """
177        for iTry in range(1,maxTries+1):
178            if os.path.exists(fileName):
179                return True
180            if iTry >= maxTries:
181                return False
182            elif iTry == 1:
183                logger.warning(("Path `{}` does not exist (yet). We will try #{} more times with {}s "
184                        "intervals.").format(fileName, maxTries - iTry, sleepTime))
185            time.sleep(sleepTime)
186        return False
187
188    def exists(self, jobStoreID):
189        return os.path.exists(self._getJobFileName(jobStoreID))
190
191    def getPublicUrl(self, jobStoreFileID):
192        self._checkJobStoreFileID(jobStoreFileID)
193        jobStorePath = self._getFilePathFromId(jobStoreFileID)
194        if os.path.exists(jobStorePath):
195            return 'file:' + jobStorePath
196        else:
197            raise NoSuchFileException(jobStoreFileID)
198
199    def getSharedPublicUrl(self, sharedFileName):
200        jobStorePath = os.path.join(self.sharedFilesDir, sharedFileName)
201        if not os.path.exists(jobStorePath):
202            raise NoSuchFileException(sharedFileName)
203        return 'file:' + jobStorePath
204
205    def load(self, jobStoreID):
206        self._checkJobStoreIdExists(jobStoreID)
207        # Load a valid version of the job
208        jobFile = self._getJobFileName(jobStoreID)
209        with open(jobFile, 'rb') as fileHandle:
210            job = pickle.load(fileHandle)
211
212        # Pass along the current config, which is the JobStore's responsibility.
213        job.assignConfig(self.config)
214
215        # The following cleans up any issues resulting from the failure of the
216        # job during writing by the batch system.
217        if os.path.isfile(jobFile + ".new"):
218            logger.warning("There was a .new file for the job: %s", jobStoreID)
219            os.remove(jobFile + ".new")
220            job.setupJobAfterFailure()
221        return job
222
223    def update(self, job):
224        assert job.jobStoreID is not None, f"Tried to update job {job} without an ID"
225        assert not isinstance(job.jobStoreID, TemporaryID), f"Tried to update job {job} without an assigned ID"
226
227        # The job is serialised to a file suffixed by ".new"
228        # We insist on creating the file; an existing .new file indicates
229        # multiple simultaneous attempts to update the job, which will lose
230        # updates.
231        # The file is then moved to its correct path.
232        # Atomicity guarantees use the fact the underlying file systems "move"
233        # function is atomic.
234        with open(self._getJobFileName(job.jobStoreID) + ".new", 'xb') as f:
235            pickle.dump(job, f)
236        # This should be atomic for the file system
237        os.rename(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID))
238
239    def delete(self, jobStoreID):
240        # The jobStoreID is the relative path to the directory containing the job,
241        # removing this directory deletes the job.
242        if self.exists(jobStoreID):
243            # Remove the job-associated files in need of cleanup, which may or
244            # may not live under the job's directory.
245            robust_rmtree(self._getJobFilesCleanupDir(jobStoreID))
246            # Remove the job's directory itself.
247            robust_rmtree(self._getJobDirFromId(jobStoreID))
248
249    def jobs(self):
250        # Walk through list of temporary directories searching for jobs.
251        # Jobs are files that start with 'job'.
252        # Note that this also catches jobWhatever.new which exists if an update
253        # is in progress.
254        for tempDir in self._jobDirectories():
255            for i in os.listdir(tempDir):
256                if i.startswith(self.JOB_DIR_PREFIX):
257                    # This is a job instance directory
258                    jobId = self._getJobIdFromDir(os.path.join(tempDir, i))
259                    try:
260                        if self.exists(jobId):
261                            yield self.load(jobId)
262                    except NoSuchJobException:
263                        # An orphaned job may leave an empty or incomplete job file which we can safely ignore
264                        pass
265
266    ##########################################
267    # Functions that deal with temporary files associated with jobs
268    ##########################################
269
270    @contextmanager
271    def optionalHardCopy(self, hardlink):
272        if hardlink:
273            saved = self.linkImports
274            self.linkImports = False
275        yield
276        if hardlink:
277            self.linkImports = saved
278
279    def _copyOrLink(self, srcURL, destPath, symlink=False):
280        # linking is not done be default because of issue #1755
281        srcPath = self._extractPathFromUrl(srcURL)
282        if self.linkImports or symlink:
283            os.symlink(os.path.realpath(srcPath), destPath)
284        else:
285            atomic_copy(srcPath, destPath)
286
287    def _importFile(self, otherCls, url, sharedFileName=None, hardlink=False, symlink=False):
288        if issubclass(otherCls, FileJobStore):
289            if sharedFileName is None:
290                executable = os.stat(url.path).st_mode & stat.S_IXUSR != 0
291                absPath = self._getUniqueFilePath(url.path)  # use this to get a valid path to write to in job store
292                with self.optionalHardCopy(hardlink):
293                    self._copyOrLink(url, absPath, symlink=symlink)
294                # TODO: os.stat(absPath).st_size consistently gives values lower than
295                # getDirSizeRecursively()
296                return FileID(self._getFileIdFromPath(absPath), os.stat(absPath).st_size, executable)
297            else:
298                self._requireValidSharedFileName(sharedFileName)
299                path = self._getSharedFilePath(sharedFileName)
300                with self.optionalHardCopy(hardlink):
301                    self._copyOrLink(url, path, symlink=symlink)
302                return None
303        else:
304            return super(FileJobStore, self)._importFile(otherCls, url,
305                                                         sharedFileName=sharedFileName)
306
307    def _exportFile(self, otherCls, jobStoreFileID, url):
308        if issubclass(otherCls, FileJobStore):
309            srcPath = self._getFilePathFromId(jobStoreFileID)
310            destPath = self._extractPathFromUrl(url)
311            executable = getattr(jobStoreFileID, 'executable', False)
312            if self.moveExports:
313                self._move_and_linkback(srcPath, destPath, executable=executable)
314            else:
315                atomic_copy(srcPath, destPath, executable=executable)
316        else:
317            super(FileJobStore, self)._defaultExportFile(otherCls, jobStoreFileID, url)
318
319    def _move_and_linkback(self, srcPath, destPath, executable):
320        logger.debug("moveExports option, Moving src=%s to dest=%s ; then symlinking dest to src", srcPath, destPath)
321        shutil.move(srcPath, destPath)
322        os.symlink(destPath, srcPath)
323        if executable:
324            os.chmod(destPath, os.stat(destPath).st_mode | stat.S_IXUSR)
325
326    @classmethod
327    def getSize(cls, url):
328        return os.stat(cls._extractPathFromUrl(url)).st_size
329
330    @classmethod
331    def _readFromUrl(cls, url, writable):
332        """
333        Writes the contents of a file to a source (writes url to writable)
334        using a ~10Mb buffer.
335
336        :param str url: A path as a string of the file to be read from.
337        :param object writable: An open file object to write to.
338        """
339
340        # we use a ~10Mb buffer to improve speed
341        with open(cls._extractPathFromUrl(url), 'rb') as readable:
342            shutil.copyfileobj(readable, writable, length=cls.BUFFER_SIZE)
343            # Return the number of bytes we read when we reached EOF.
344            executable = os.stat(readable.name).st_mode & stat.S_IXUSR
345            return readable.tell(), executable
346
347
348    @classmethod
349    def _writeToUrl(cls, readable, url, executable=False):
350        """
351        Writes the contents of a file to a source (writes readable to url)
352        using a ~10Mb buffer.
353
354        :param str url: A path as a string of the file to be written to.
355        :param object readable: An open file object to read from.
356        """
357        # we use a ~10Mb buffer to improve speed
358        atomic_copyobj(readable,
359                       cls._extractPathFromUrl(url),
360                       length=cls.BUFFER_SIZE,
361                       executable=executable)
362
363
364    @staticmethod
365    def _extractPathFromUrl(url):
366        """
367        :return: local file path of file pointed at by the given URL
368        """
369        if url.netloc != '' and url.netloc != 'localhost':
370            raise RuntimeError("The URL '%s' is invalid" % url.geturl())
371        return url.netloc + url.path
372
373    @classmethod
374    def _supportsUrl(cls, url, export=False):
375        return url.scheme.lower() == 'file'
376
377    def _makeStringFilenameSafe(self, arbitraryString, maxLength=240):
378        """
379        Given an arbitrary string, produce a filename-safe though not
380        necessarily unique string based on it.
381
382        The input string may be discarded altogether and replaced with any
383        other nonempty filename-safe string.
384
385        :param str arbitraryString: An arbitrary string
386        :param int maxLength: Maximum length of the result, to keep it plus
387                              any prefix or suffix under the filesystem's
388                              path component length limit
389
390        :return: A filename-safe string
391        """
392
393        # We will fill this in with the filename-safe parts we find.
394        parts = []
395
396        for substring in re.findall("[A-Za-z0-9._-]+", arbitraryString):
397            # Collect all the matching substrings
398            parts.append(substring)
399
400        if len(parts) == 0:
401            parts.append("UNPRINTABLE")
402
403        # Glue it all together, and truncate to length
404        return '_'.join(parts)[:maxLength]
405
406    def writeFile(self, localFilePath, jobStoreID=None, cleanup=False):
407        absPath = self._getUniqueFilePath(localFilePath, jobStoreID, cleanup)
408        relPath = self._getFileIdFromPath(absPath)
409        atomic_copy(localFilePath, absPath)
410        return relPath
411
412    @contextmanager
413    def writeFileStream(self, jobStoreID=None, cleanup=False, basename=None, encoding=None, errors=None):
414        if not basename:
415            basename = 'stream'
416        absPath = self._getUniqueFilePath(basename, jobStoreID, cleanup)
417        relPath = self._getFileIdFromPath(absPath)
418
419        with open(absPath, 'wb' if encoding == None else 'wt', encoding=encoding, errors=errors) as f:
420            # Don't yield while holding an open file descriptor to the temp
421            # file. That can result in temp files still being open when we try
422            # to clean ourselves up, somehow, for certain workloads.
423            yield f, relPath
424
425    def getEmptyFileStoreID(self, jobStoreID=None, cleanup=False, basename=None):
426        with self.writeFileStream(jobStoreID, cleanup, basename) as (fileHandle, jobStoreFileID):
427            return jobStoreFileID
428
429    def updateFile(self, jobStoreFileID, localFilePath):
430        self._checkJobStoreFileID(jobStoreFileID)
431        jobStoreFilePath = self._getFilePathFromId(jobStoreFileID)
432
433        if os.path.samefile(jobStoreFilePath, localFilePath):
434            # The files are already the same file. We can't copy on eover the other.
435            return
436
437        atomic_copy(localFilePath, jobStoreFilePath)
438
439    def readFile(self, jobStoreFileID, localFilePath, symlink=False):
440        self._checkJobStoreFileID(jobStoreFileID)
441        jobStoreFilePath = self._getFilePathFromId(jobStoreFileID)
442        localDirPath = os.path.dirname(localFilePath)
443        executable = getattr(jobStoreFileID, 'executable', False)
444
445        if not symlink and os.path.islink(localFilePath):
446            # We had a symlink and want to clobber it with a hardlink or copy.
447            os.unlink(localFilePath)
448
449        if os.path.exists(localFilePath) and os.path.samefile(jobStoreFilePath, localFilePath):
450            # The files are already the same: same name, hardlinked, or
451            # symlinked. There is nothing to do, and trying to shutil.copyfile
452            # one over the other will fail.
453            return
454
455        if symlink:
456            # If the reader will accept a symlink, so always give them one.
457            # There's less that can go wrong.
458            try:
459                os.symlink(jobStoreFilePath, localFilePath)
460                # It worked!
461                return
462            except OSError as e:
463                if e.errno == errno.EEXIST:
464                    # Overwrite existing file, emulating shutil.copyfile().
465                    os.unlink(localFilePath)
466                    # It would be very unlikely to fail again for same reason but possible
467                    # nonetheless in which case we should just give up.
468                    os.symlink(jobStoreFilePath, localFilePath)
469                    # Now we succeeded and don't need to copy
470                    return
471                else:
472                    raise
473
474        # If we get here, symlinking isn't an option.
475        if os.stat(jobStoreFilePath).st_dev == os.stat(localDirPath).st_dev:
476            # It is possible that we can hard link the file.
477            # Note that even if the device numbers match, we can end up trying
478            # to create a "cross-device" link.
479
480            try:
481                os.link(jobStoreFilePath, localFilePath)
482                # It worked!
483                return
484            except OSError as e:
485                if e.errno == errno.EEXIST:
486                    # Overwrite existing file, emulating shutil.copyfile().
487                    os.unlink(localFilePath)
488                    # It would be very unlikely to fail again for same reason but possible
489                    # nonetheless in which case we should just give up.
490                    os.link(jobStoreFilePath, localFilePath)
491                    # Now we succeeded and don't need to copy
492                    return
493                elif e.errno == errno.EXDEV:
494                    # It's a cross-device link even though it didn't appear to be.
495                    # Just keep going and hit the file copy case.
496                    pass
497                else:
498                    logger.critical('Unexpected OSError when reading file from job store')
499                    logger.critical('jobStoreFilePath: ' + jobStoreFilePath + ' ' + str(os.path.exists(jobStoreFilePath)))
500                    logger.critical('localFilePath: ' + localFilePath + ' ' + str(os.path.exists(localFilePath)))
501                    raise
502
503        # If we get here, neither a symlink nor a hardlink will work.
504        # Make a complete copy.
505        atomic_copy(jobStoreFilePath, localFilePath, executable=executable)
506
507    def deleteFile(self, jobStoreFileID):
508        if not self.fileExists(jobStoreFileID):
509            return
510        os.remove(self._getFilePathFromId(jobStoreFileID))
511
512    def fileExists(self, jobStoreFileID):
513        absPath = self._getFilePathFromId(jobStoreFileID)
514
515        if (not absPath.startswith(self.jobsDir) and
516            not absPath.startswith(self.filesDir) and
517            not absPath.startswith(self.jobFilesDir)):
518            # Don't even look for it, it is out of bounds.
519            raise NoSuchFileException(jobStoreFileID)
520
521        try:
522            st = os.stat(absPath)
523        except os.error:
524            return False
525        if not stat.S_ISREG(st.st_mode):
526            raise NoSuchFileException(jobStoreFileID)
527        return True
528
529    def getFileSize(self, jobStoreFileID):
530        # Duplicate a bunch of fileExists to save on stat calls
531        absPath = self._getFilePathFromId(jobStoreFileID)
532
533        if (not absPath.startswith(self.jobsDir) and
534            not absPath.startswith(self.filesDir) and
535            not absPath.startswith(self.jobFilesDir)):
536            # Don't even look for it, it is out of bounds.
537            raise NoSuchFileException(jobStoreFileID)
538
539        try:
540            st = os.stat(absPath)
541        except os.error:
542            return 0
543        return st.st_size
544
545    @contextmanager
546    def updateFileStream(self, jobStoreFileID, encoding=None, errors=None):
547        self._checkJobStoreFileID(jobStoreFileID)
548        # File objects are context managers (CM) so we could simply return what open returns.
549        # However, it is better to wrap it in another CM so as to prevent users from accessing
550        # the file object directly, without a with statement.
551        with open(self._getFilePathFromId(jobStoreFileID), 'wb' if encoding == None else 'wt', encoding=encoding, errors=errors) as f:
552            yield f
553
554    @contextmanager
555    def readFileStream(self, jobStoreFileID, encoding=None, errors=None):
556        self._checkJobStoreFileID(jobStoreFileID)
557        with open(self._getFilePathFromId(jobStoreFileID), 'rb' if encoding == None else 'rt', encoding=encoding, errors=errors) as f:
558            yield f
559
560    ##########################################
561    # The following methods deal with shared files, i.e. files not associated
562    # with specific jobs.
563    ##########################################
564
565    def _getSharedFilePath(self, sharedFileName):
566        return os.path.join(self.sharedFilesDir, sharedFileName)
567
568    @contextmanager
569    def writeSharedFileStream(self, sharedFileName, isProtected=None, encoding=None, errors=None):
570        # the isProtected parameter has no effect on the fileStore
571        self._requireValidSharedFileName(sharedFileName)
572        with AtomicFileCreate(self._getSharedFilePath(sharedFileName)) as tmpSharedFilePath:
573            with open(tmpSharedFilePath, 'wb' if encoding == None else 'wt', encoding=encoding, errors=None) as f:
574                yield f
575
576    @contextmanager
577    def readSharedFileStream(self, sharedFileName, encoding=None, errors=None):
578        self._requireValidSharedFileName(sharedFileName)
579        try:
580            with open(self._getSharedFilePath(sharedFileName), 'rb' if encoding == None else 'rt', encoding=encoding, errors=errors) as f:
581                yield f
582
583        except IOError as e:
584            if e.errno == errno.ENOENT:
585                raise NoSuchFileException(sharedFileName)
586            else:
587                raise
588
589    def writeStatsAndLogging(self, statsAndLoggingString):
590        # Temporary files are placed in the stats directory tree
591        tempStatsFileName = "stats" + str(uuid.uuid4().hex) + ".new"
592        tempStatsFile = os.path.join(self._getArbitraryStatsDir(), tempStatsFileName)
593        writeFormat = 'w' if isinstance(statsAndLoggingString, str) else 'wb'
594        with open(tempStatsFile, writeFormat) as f:
595            f.write(statsAndLoggingString)
596        os.rename(tempStatsFile, tempStatsFile[:-4])  # This operation is atomic
597
598    def readStatsAndLogging(self, callback, readAll=False):
599        numberOfFilesProcessed = 0
600        for tempDir in self._statsDirectories():
601            for tempFile in os.listdir(tempDir):
602                if tempFile.startswith('stats'):
603                    absTempFile = os.path.join(tempDir, tempFile)
604                    if os.path.isfile(absTempFile):
605                        if readAll or not tempFile.endswith('.new'):
606                            with open(absTempFile, 'rb') as fH:
607                                callback(fH)
608                            numberOfFilesProcessed += 1
609                            newName = tempFile.rsplit('.', 1)[0] + '.new'
610                            newAbsTempFile = os.path.join(tempDir, newName)
611                            # Mark this item as read
612                            os.rename(absTempFile, newAbsTempFile)
613        return numberOfFilesProcessed
614
615    ##########################################
616    # Private methods
617    ##########################################
618
619    def _getJobDirFromId(self, jobStoreID):
620        """
621
622        Find the directory for a job, which holds its job file.
623
624        :param str jobStoreID: ID of a job, which is a relative to self.jobsDir.
625        :rtype : string, string is the absolute path to a job directory inside self.jobsDir.
626        """
627        return os.path.join(self.jobsDir, jobStoreID)
628
629    def _getJobIdFromDir(self, absPath):
630        """
631        :param str absPath: The absolute path to a job directory under self.jobsDir which represents a job.
632        :rtype : string, string is the job ID, which is a path relative to self.jobsDir
633        """
634        return absPath[len(self.jobsDir)+1:]
635
636    def _getJobFileName(self, jobStoreID):
637        """
638        Return the path to the file containing the serialised JobDescription instance for the given
639        job.
640
641        :rtype: str
642        """
643        return os.path.join(self._getJobDirFromId(jobStoreID), "job")
644
645    def _getJobFilesDir(self, jobStoreID):
646        """
647        Return the path to the directory that should hold files made by the
648        given job that should survive its deletion.
649
650        This directory will only be created if files are to be put in it.
651
652        :rtype : string, string is the absolute path to the job's files
653                 directory
654        """
655
656        return os.path.join(self.jobFilesDir, jobStoreID)
657
658    def _getJobFilesCleanupDir(self, jobStoreID):
659        """
660        Return the path to the directory that should hold files made by the
661        given job that will be deleted when the job is deleted.
662
663        This directory will only be created if files are to be put in it.
664
665        It may or may not be a subdirectory of the job's own directory.
666
667        :rtype : string, string is the absolute path to the job's cleanup
668                 files directory
669        """
670
671        return os.path.join(self.jobFilesDir, jobStoreID, "cleanup")
672
673    def _checkJobStoreIdAssigned(self, jobStoreID):
674        """
675        Do nothing if the given job store ID has been assigned by
676        :meth:`assignID`, and the corresponding job has not yet been
677        deleted, even if the JobDescription hasn't yet been saved for the first
678        time.
679
680        If the ID has not been assigned, raises a NoSuchJobException.
681        """
682
683        if not self._waitForFile(self._getJobDirFromId(jobStoreID)):
684            raise NoSuchJobException(jobStoreID)
685
686    def _checkJobStoreIdExists(self, jobStoreID):
687        """
688        Raises a NoSuchJobException if the job with ID jobStoreID does not exist.
689        """
690        if not self._waitForExists(jobStoreID, 30):
691            raise NoSuchJobException(jobStoreID)
692
693    def _getFilePathFromId(self, jobStoreFileID):
694        """
695        :param str jobStoreFileID: The ID of a file
696
697        :rtype : string, string is the absolute path that that file should
698                 appear at on disk, under either self.jobsDir if it is to be
699                 cleaned up with a job, or self.filesDir otherwise.
700        """
701
702        # We just make the file IDs paths under the job store overall.
703        absPath = os.path.join(self.jobStoreDir, jobStoreFileID)
704
705        # Don't validate here, we are called by the validation logic
706
707        return absPath
708
709    def _getFileIdFromPath(self, absPath):
710        """
711        :param str absPath: The absolute path of a file.
712
713        :rtype : string, string is the file ID.
714        """
715
716        return absPath[len(self.jobStoreDir)+1:]
717
718    def _checkJobStoreFileID(self, jobStoreFileID):
719        """
720        :raise NoSuchFileException: if the file with ID jobStoreFileID does
721                                    not exist or is not a file
722        """
723        if not self.fileExists(jobStoreFileID):
724            raise NoSuchFileException(jobStoreFileID)
725
726    def _getArbitraryJobsDirForName(self, jobNameSlug):
727        """
728        Gets a temporary directory in a multi-level hierarchy in self.jobsDir.
729        The directory is not unique and may already have other jobs' directories in it.
730        We organize them at the top level by job name, to be user-inspectable.
731
732        We make sure to prepend a string so that job names can't collide with
733        spray directory names.
734
735        :param str jobNameSlug: A partial filename derived from the job name.
736                                Used as the first level of the directory hierarchy.
737
738        :rtype : string, path to temporary directory in which to place files/directories.
739
740
741        """
742
743
744        if len(os.listdir(self.jobsDir)) > self.fanOut:
745            # Make sure that we don't over-fill the root with too many unique job names.
746            # Go in a subdirectory tree, and then go by job name and make another tree.
747            return self._getDynamicSprayDir(os.path.join(self._getDynamicSprayDir(self.jobsDir),
748                                                                                  self.JOB_NAME_DIR_PREFIX + jobNameSlug))
749        else:
750            # Just go in the root
751            return self._getDynamicSprayDir(os.path.join(self.jobsDir, self.JOB_NAME_DIR_PREFIX + jobNameSlug))
752
753    def _getArbitraryStatsDir(self):
754        """
755        Gets a temporary directory in a multi-level hierarchy in self.statsDir.
756        The directory is not unique and may already have other stats files in it.
757
758        :rtype : string, path to temporary directory in which to place files/directories.
759
760
761        """
762
763        return self._getDynamicSprayDir(self.statsDir)
764
765    def _getArbitraryFilesDir(self):
766        """
767        Gets a temporary directory in a multi-level hierarchy in self.filesDir.
768        The directory is not unique and may already have other user files in it.
769
770        :rtype : string, path to temporary directory in which to place files/directories.
771
772
773        """
774
775        return self._getDynamicSprayDir(self.filesDir)
776
777    def _getDynamicSprayDir(self, root):
778        """
779        Gets a temporary directory in a possibly multi-level hierarchy of
780        directories under the given root.
781
782        Each time a directory in the hierarchy starts to fill up, additional
783        hierarchy levels are created under it, and we randomly "spray" further
784        files and directories across them.
785
786        We can't actually enforce that we never go over our internal limit for
787        files in a directory, because any number of calls to this function can
788        be happening simultaneously. But we can enforce that, once too many
789        files are visible on disk, only subdirectories will be created.
790
791        The returned directory will exist, and may contain other data already.
792
793        The caller may not create any files or directories in the returned
794        directory with single-character names that are in self.validDirs.
795
796        :param str root : directory to put the hierarchy under, which will
797                          fill first.
798
799        :rtype : string, path to temporary directory in which to place
800                 files/directories.
801        """
802        tempDir = root
803
804        # Make sure the root exists
805        os.makedirs(tempDir, exist_ok=True)
806
807        while len(os.listdir(tempDir)) >= self.fanOut:
808            # We need to use a layer of directories under here to avoid over-packing the directory
809            tempDir = os.path.join(tempDir, random.choice(self.validDirs))
810            os.makedirs(tempDir, exist_ok=True)
811
812        # When we get here, we found a sufficiently empty directory
813        return tempDir
814
815    def _walkDynamicSprayDir(self, root):
816        """
817        Walks over a directory tree filled in by _getDynamicSprayDir.
818
819        Yields each directory _getDynamicSprayDir has ever returned, and no
820        directories it has not returned (besides the root).
821
822        If the caller looks in the directory, they must ignore subdirectories
823        with single-character names in self.validDirs.
824
825        :param str root : directory the hierarchy was put under
826
827        :rtype : an iterator over directories
828        """
829
830        # Always yield the root.
831        # The caller is responsible for dealing with it if it has gone away.
832        yield root
833
834        children = []
835
836        try:
837            # Look for children
838            children = os.listdir(root)
839        except:
840            # Don't care if they are gone
841            pass
842
843        for child in children:
844            # Go over all the children
845            if child not in self.validDirsSet:
846                # Only look at our reserved names we use for fan-out
847                continue
848
849            # We made this directory, so go look in it
850            childPath = os.path.join(root, child)
851
852            # Recurse
853            for item in self._walkDynamicSprayDir(childPath):
854                yield item
855
856    def _jobDirectories(self):
857        """
858        :rtype : an iterator to the temporary directories containing job
859                 files. They may also contain directories containing more
860                 job files.
861        """
862
863        # Walking the job directories is more complicated.
864        # We have one layer of spray (which is sometimes bypassed, but that's OK), then a job name, then another layer.
865        # We can tell the job name directories from the spray directories because they start with self.JOB_NAME_DIR_PREFIX.
866        # We never look at the directories containing the job name directories,
867        # so they aren't mistaken for the leaf-level per-job job directories.
868
869        for jobHoldingDir in self._walkDynamicSprayDir(self.jobsDir):
870            # For every directory in the first spray, look at children
871            children = []
872
873            try:
874                children = os.listdir(jobHoldingDir)
875            except:
876                pass
877
878            for jobNameDir in children:
879                if not jobNameDir.startswith(self.JOB_NAME_DIR_PREFIX):
880                    continue
881
882                # Now we have only the directories that are named after jobs. Look inside them.
883                for inner in self._walkDynamicSprayDir(os.path.join(jobHoldingDir, jobNameDir)):
884                    yield inner
885
886
887    def _statsDirectories(self):
888        """
889        :rtype : an iterator to the temporary directories containing stats
890                 files. They may also contain directories containing more
891                 stats files.
892        """
893
894        return self._walkDynamicSprayDir(self.statsDir)
895
896    def _getUniqueFilePath(self, fileName, jobStoreID=None, cleanup=False):
897        """
898        Create unique file name within a jobStore directory or tmp directory.
899
900        :param fileName: A file name, which can be a full path as only the
901        basename will be used.
902        :param jobStoreID: If given, the path returned will be in a directory including the job's ID as part of its path.
903        :param bool cleanup: If True and jobStoreID is set, the path will be in
904            a place such that it gets deleted when the job is deleted.
905        :return: The full path with a unique file name.
906        """
907
908        # Give the file a unique directory that either will be cleaned up with a job or won't.
909        directory = self._getFileDirectory(jobStoreID, cleanup)
910        # And then a path under it
911        uniquePath = os.path.join(directory, os.path.basename(fileName))
912        # No need to check if it exists already; it is in a unique directory.
913        return uniquePath
914
915    def _getFileDirectory(self, jobStoreID=None, cleanup=False):
916        """
917        Get a new empty directory path for a file to be stored at.
918
919
920        :param str jobStoreID: If the jobStoreID is not None, the file wil
921               be associated with the job with that ID.
922
923        :param bool cleanup: If cleanup is also True, this directory
924               will be cleaned up when the job is deleted.
925
926        :rtype :string, string is the absolute path to a directory to put the file in.
927        """
928        if jobStoreID != None:
929            # Make a temporary file within the job's files directory
930
931            # Make sure the job is legit
932            self._checkJobStoreIdAssigned(jobStoreID)
933            # Find where all its created files should live, depending on if
934            # they need to go away when the job is deleted or not.
935            jobFilesDir = self._getJobFilesDir(jobStoreID) if not cleanup else self._getJobFilesCleanupDir(jobStoreID)
936
937            # Lazily create the parent directory.
938            # We don't want our tree filled with confusingly empty directories.
939            os.makedirs(jobFilesDir, exist_ok=True)
940
941            # Then make a temp directory inside it
942            filesDir = os.path.join(jobFilesDir, 'file-' + uuid.uuid4().hex)
943            os.mkdir(filesDir)
944            return filesDir
945        else:
946            # Make a temporary file within the non-job-associated files hierarchy
947            filesDir = os.path.join(self._getArbitraryFilesDir(), 'file-' + uuid.uuid4().hex)
948            os.mkdir(filesDir)
949            return filesDir
950