1# Copyright (C) 2015-2021 Regents of the University of California
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import os
15import hashlib
16import itertools
17import logging
18import pickle
19import re
20import reprlib
21import stat
22import time
23import urllib.error
24import urllib.parse
25import urllib.request
26import uuid
27from contextlib import contextmanager
28from io import BytesIO
29
30import boto.sdb
31import boto.s3.connection
32from typing import Optional
33from boto.exception import SDBResponseError
34from botocore.exceptions import ClientError
35
36import toil.lib.encryption as encryption
37from toil.fileStores import FileID
38from toil.jobStores.abstractJobStore import (AbstractJobStore,
39                                             ConcurrentFileModificationException,
40                                             JobStoreExistsException,
41                                             NoSuchFileException,
42                                             NoSuchJobException,
43                                             NoSuchJobStoreException)
44from toil.jobStores.aws.utils import (SDBHelper,
45                                      bucket_location_to_region,
46                                      uploadFromPath,
47                                      uploadFile,
48                                      copyKeyMultipart,
49                                      fileSizeAndTime,
50                                      monkeyPatchSdbConnection,
51                                      no_such_sdb_domain,
52                                      region_to_bucket_location,
53                                      retry_s3,
54                                      retry_sdb,
55                                      retryable_s3_errors,
56                                      sdb_unavailable)
57from toil.jobStores.utils import (ReadablePipe,
58                                  ReadableTransformingPipe,
59                                  WritablePipe)
60from toil.lib.compatibility import compat_bytes
61from toil.lib.ec2 import establish_boto3_session
62from toil.lib.ec2nodes import EC2Regions
63from toil.lib.exceptions import panic
64from toil.lib.memoize import strict_bool
65from toil.lib.io import AtomicFileCreate
66from toil.lib.objects import InnerClass
67from toil.lib.retry import retry
68
69boto3_session = establish_boto3_session()
70s3_boto3_resource = boto3_session.resource('s3')
71s3_boto3_client = boto3_session.client('s3')
72logger = logging.getLogger(__name__)
73
74# Sometimes we have to wait for multipart uploads to become real. How long
75# should we wait?
76CONSISTENCY_TICKS = 5
77CONSISTENCY_TIME = 1
78
79
80class ChecksumError(Exception):
81    """Raised when a download from AWS does not contain the correct data."""
82
83
84class AWSJobStore(AbstractJobStore):
85    """
86    A job store that uses Amazon's S3 for file storage and SimpleDB for storing job info and
87    enforcing strong consistency on the S3 file storage. There will be SDB domains for jobs and
88    files and a versioned S3 bucket for file contents. Job objects are pickled, compressed,
89    partitioned into chunks of 1024 bytes and each chunk is stored as a an attribute of the SDB
90    item representing the job. UUIDs are used to identify jobs and files.
91    """
92
93    # Dots in bucket names should be avoided because bucket names are used in HTTPS bucket
94    # URLs where the may interfere with the certificate common name. We use a double
95    # underscore as a separator instead.
96    #
97    bucketNameRe = re.compile(r'^[a-z0-9][a-z0-9-]+[a-z0-9]$')
98
99    # See http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
100    #
101    minBucketNameLen = 3
102    maxBucketNameLen = 63
103    maxNameLen = 10
104    nameSeparator = '--'
105
106    def __init__(self, locator, partSize=50 << 20):
107        """
108        Create a new job store in AWS or load an existing one from there.
109
110        :param int partSize: The size of each individual part used for multipart operations like
111               upload and copy, must be >= 5 MiB but large enough to not exceed 10k parts for the
112               whole file
113        """
114        super(AWSJobStore, self).__init__()
115        region, namePrefix = locator.split(':')
116        regions = EC2Regions.keys()
117        if region not in regions:
118            raise ValueError(f'Region "{region}" is not one of: {regions}')
119        if not self.bucketNameRe.match(namePrefix):
120            raise ValueError("Invalid name prefix '%s'. Name prefixes must contain only digits, "
121                             "hyphens or lower-case letters and must not start or end in a "
122                             "hyphen." % namePrefix)
123        # Reserve 13 for separator and suffix
124        if len(namePrefix) > self.maxBucketNameLen - self.maxNameLen - len(self.nameSeparator):
125            raise ValueError("Invalid name prefix '%s'. Name prefixes may not be longer than 50 "
126                             "characters." % namePrefix)
127        if '--' in namePrefix:
128            raise ValueError("Invalid name prefix '%s'. Name prefixes may not contain "
129                             "%s." % (namePrefix, self.nameSeparator))
130        logger.debug("Instantiating %s for region %s and name prefix '%s'",
131                     self.__class__, region, namePrefix)
132        self.locator = locator
133        self.region = region
134        self.namePrefix = namePrefix
135        self.partSize = partSize
136        self.jobsDomain = None
137        self.filesDomain = None
138        self.filesBucket = None
139        self.db = self._connectSimpleDB()
140
141        self.s3_resource = boto3_session.resource('s3', region_name=self.region)
142        self.s3_client = self.s3_resource.meta.client
143
144    def initialize(self, config):
145        if self._registered:
146            raise JobStoreExistsException(self.locator)
147        self._registered = None
148        try:
149            self._bind(create=True)
150        except:
151            with panic(logger):
152                self.destroy()
153        else:
154            super(AWSJobStore, self).initialize(config)
155            # Only register after job store has been full initialized
156            self._registered = True
157
158    @property
159    def sseKeyPath(self):
160        return self.config.sseKey
161
162    def resume(self):
163        if not self._registered:
164            raise NoSuchJobStoreException(self.locator)
165        self._bind(create=False)
166        super(AWSJobStore, self).resume()
167
168    def _bind(self, create=False, block=True, check_versioning_consistency=True):
169        def qualify(name):
170            assert len(name) <= self.maxNameLen
171            return self.namePrefix + self.nameSeparator + name
172
173        # The order in which this sequence of events happens is important.  We can easily handle the
174        # inability to bind a domain, but it is a little harder to handle some cases of binding the
175        # jobstore bucket.  Maintaining this order allows for an easier `destroy` method.
176        if self.jobsDomain is None:
177            self.jobsDomain = self._bindDomain(qualify('jobs'), create=create, block=block)
178        if self.filesDomain is None:
179            self.filesDomain = self._bindDomain(qualify('files'), create=create, block=block)
180        if self.filesBucket is None:
181            self.filesBucket = self._bindBucket(qualify('files'),
182                                                create=create,
183                                                block=block,
184                                                versioning=True,
185                                                check_versioning_consistency=check_versioning_consistency)
186
187    @property
188    def _registered(self):
189        """
190        A optional boolean property indicating whether this job store is registered. The
191        registry is the authority on deciding if a job store exists or not. If True, this job
192        store exists, if None the job store is transitioning from True to False or vice versa,
193        if False the job store doesn't exist.
194
195        :type: bool|None
196        """
197        # The weird mapping of the SDB item attribute value to the property value is due to
198        # backwards compatibility. 'True' becomes True, that's easy. Toil < 3.3.0 writes this at
199        # the end of job store creation. Absence of either the registry, the item or the
200        # attribute becomes False, representing a truly absent, non-existing job store. An
201        # attribute value of 'False', which is what Toil < 3.3.0 writes at the *beginning* of job
202        # store destruction, indicates a job store in transition, reflecting the fact that 3.3.0
203        # may leak buckets or domains even though the registry reports 'False' for them. We
204        # can't handle job stores that were partially created by 3.3.0, though.
205        registry_domain = self._bindDomain(domain_name='toil-registry',
206                                           create=False,
207                                           block=False)
208        if registry_domain is None:
209            return False
210        else:
211            for attempt in retry_sdb():
212                with attempt:
213                    attributes = registry_domain.get_attributes(item_name=self.namePrefix,
214                                                                attribute_name='exists',
215                                                                consistent_read=True)
216                    try:
217                        exists = attributes['exists']
218                    except KeyError:
219                        return False
220                    else:
221                        if exists == 'True':
222                            return True
223                        elif exists == 'False':
224                            return None
225                        else:
226                            assert False
227
228    @_registered.setter
229    def _registered(self, value):
230
231        registry_domain = self._bindDomain(domain_name='toil-registry',
232                                           # Only create registry domain when registering or
233                                           # transitioning a store
234                                           create=value is not False,
235                                           block=False)
236        if registry_domain is None and value is False:
237            pass
238        else:
239            for attempt in retry_sdb():
240                with attempt:
241                    if value is False:
242                        registry_domain.delete_attributes(item_name=self.namePrefix)
243                    else:
244                        if value is True:
245                            attributes = dict(exists='True')
246                        elif value is None:
247                            attributes = dict(exists='False')
248                        else:
249                            assert False
250                        registry_domain.put_attributes(item_name=self.namePrefix,
251                                                       attributes=attributes)
252
253    def _checkItem(self, item, enforce: bool = True):
254        """
255        Make sure that the given SimpleDB item actually has the attributes we think it should.
256
257        Throw otherwise.
258
259        If enforce is false, log but don't throw.
260        """
261
262        if "overlargeID" not in item:
263            logger.error("overlargeID attribute isn't present: either SimpleDB entry is "
264                         "corrupt or jobstore is from an extremely old Toil: %s", item)
265            if enforce:
266                raise RuntimeError("encountered SimpleDB entry missing required attribute "
267                                   "'overlargeID'; is your job store ancient?")
268
269    def _awsJobFromItem(self, item):
270        self._checkItem(item)
271        if item.get("overlargeID", None):
272            assert self.fileExists(item["overlargeID"])
273            # This is an overlarge job, download the actual attributes
274            # from the file store
275            logger.debug("Loading overlarge job from S3.")
276            with self.readFileStream(item["overlargeID"]) as fh:
277                binary = fh.read()
278        else:
279            binary, _ = SDBHelper.attributesToBinary(item)
280            assert binary is not None
281        job = pickle.loads(binary)
282        if job is not None:
283            job.assignConfig(self.config)
284        return job
285
286    def _awsJobToItem(self, job):
287        binary = pickle.dumps(job, protocol=pickle.HIGHEST_PROTOCOL)
288        if len(binary) > SDBHelper.maxBinarySize(extraReservedChunks=1):
289            # Store as an overlarge job in S3
290            with self.writeFileStream() as (writable, fileID):
291                writable.write(binary)
292            item = SDBHelper.binaryToAttributes(None)
293            item["overlargeID"] = fileID
294        else:
295            item = SDBHelper.binaryToAttributes(binary)
296            item["overlargeID"] = ""
297        return item
298
299    jobsPerBatchInsert = 25
300
301    @contextmanager
302    def batch(self):
303        self._batchedUpdates = []
304        yield
305        batches = [self._batchedUpdates[i:i + self.jobsPerBatchInsert] for i in
306                   range(0, len(self._batchedUpdates), self.jobsPerBatchInsert)]
307
308        for batch in batches:
309            items = {compat_bytes(jobDescription.jobStoreID): self._awsJobToItem(jobDescription) for jobDescription in batch}
310            for attempt in retry_sdb():
311                with attempt:
312                    assert self.jobsDomain.batch_put_attributes(items)
313        self._batchedUpdates = None
314
315    def assignID(self, jobDescription):
316        jobStoreID = self._newJobID()
317        logger.debug("Assigning ID to job %s for '%s'",
318                     jobStoreID, '<no command>' if jobDescription.command is None else jobDescription.command)
319        jobDescription.jobStoreID = jobStoreID
320
321    def create(self, jobDescription):
322        if hasattr(self, "_batchedUpdates") and self._batchedUpdates is not None:
323            self._batchedUpdates.append(jobDescription)
324        else:
325            self.update(jobDescription)
326        return jobDescription
327
328    def exists(self, jobStoreID):
329        for attempt in retry_sdb():
330            with attempt:
331                return bool(self.jobsDomain.get_attributes(
332                    item_name=compat_bytes(jobStoreID),
333                    attribute_name=[SDBHelper.presenceIndicator()],
334                    consistent_read=True))
335
336    def jobs(self):
337        result = None
338        for attempt in retry_sdb():
339            with attempt:
340                result = list(self.jobsDomain.select(
341                    consistent_read=True,
342                    query="select * from `%s`" % self.jobsDomain.name))
343        assert result is not None
344        for jobItem in result:
345            yield self._awsJobFromItem(jobItem)
346
347    def load(self, jobStoreID):
348        item = None
349        for attempt in retry_sdb():
350            with attempt:
351                item = self.jobsDomain.get_attributes(compat_bytes(jobStoreID), consistent_read=True)
352        if not item:
353            raise NoSuchJobException(jobStoreID)
354        job = self._awsJobFromItem(item)
355        if job is None:
356            raise NoSuchJobException(jobStoreID)
357        logger.debug("Loaded job %s", jobStoreID)
358        return job
359
360    def update(self, jobDescription):
361        logger.debug("Updating job %s", jobDescription.jobStoreID)
362        item = self._awsJobToItem(jobDescription)
363        for attempt in retry_sdb():
364            with attempt:
365                assert self.jobsDomain.put_attributes(compat_bytes(jobDescription.jobStoreID), item)
366
367    itemsPerBatchDelete = 25
368
369    def delete(self, jobStoreID):
370        # remove job and replace with jobStoreId.
371        logger.debug("Deleting job %s", jobStoreID)
372
373        # If the job is overlarge, delete its file from the filestore
374        item = None
375        for attempt in retry_sdb():
376            with attempt:
377                item = self.jobsDomain.get_attributes(compat_bytes(jobStoreID), consistent_read=True)
378        # If the overlargeID has fallen off, maybe we partially deleted the
379        # attributes of the item? Or raced on it? Or hit SimpleDB being merely
380        # eventually consistent? We should still be able to get rid of it.
381        self._checkItem(item, enforce = False)
382        if item.get("overlargeID", None):
383            logger.debug("Deleting job from filestore")
384            self.deleteFile(item["overlargeID"])
385        for attempt in retry_sdb():
386            with attempt:
387                self.jobsDomain.delete_attributes(item_name=compat_bytes(jobStoreID))
388        items = None
389        for attempt in retry_sdb():
390            with attempt:
391                items = list(self.filesDomain.select(
392                    consistent_read=True,
393                    query="select version from `%s` where ownerID='%s'" % (
394                        self.filesDomain.name, jobStoreID)))
395        assert items is not None
396        if items:
397            logger.debug("Deleting %d file(s) associated with job %s", len(items), jobStoreID)
398            n = self.itemsPerBatchDelete
399            batches = [items[i:i + n] for i in range(0, len(items), n)]
400            for batch in batches:
401                itemsDict = {item.name: None for item in batch}
402                for attempt in retry_sdb():
403                    with attempt:
404                        self.filesDomain.batch_delete_attributes(itemsDict)
405            for item in items:
406                version = item.get('version')
407                for attempt in retry_s3():
408                    with attempt:
409                        if version:
410                            self.s3_client.delete_object(Bucket=self.filesBucket.name,
411                                                         Key=compat_bytes(item.name),
412                                                         VersionId=version)
413                        else:
414                            self.s3_client.delete_object(Bucket=self.filesBucket.name,
415                                                         Key=compat_bytes(item.name))
416
417    def getEmptyFileStoreID(self, jobStoreID=None, cleanup=False, basename=None):
418        info = self.FileInfo.create(jobStoreID if cleanup else None)
419        with info.uploadStream() as _:
420            # Empty
421            pass
422        info.save()
423        logger.debug("Created %r.", info)
424        return info.fileID
425
426    def _importFile(self, otherCls, url, sharedFileName=None, hardlink=False, symlink=False):
427        if issubclass(otherCls, AWSJobStore):
428            srcObj = self._getObjectForUrl(url, existing=True)
429            size = srcObj.content_length
430            if sharedFileName is None:
431                info = self.FileInfo.create(srcObj.key)
432            else:
433                self._requireValidSharedFileName(sharedFileName)
434                jobStoreFileID = self._sharedFileID(sharedFileName)
435                info = self.FileInfo.loadOrCreate(jobStoreFileID=jobStoreFileID,
436                                                  ownerID=str(self.sharedFileOwnerID),
437                                                  encrypted=None)
438            info.copyFrom(srcObj)
439            info.save()
440            return FileID(info.fileID, size) if sharedFileName is None else None
441        else:
442            return super(AWSJobStore, self)._importFile(otherCls, url,
443                                                        sharedFileName=sharedFileName)
444
445    def _exportFile(self, otherCls, jobStoreFileID, url):
446        if issubclass(otherCls, AWSJobStore):
447            dstObj = self._getObjectForUrl(url)
448            info = self.FileInfo.loadOrFail(jobStoreFileID)
449            info.copyTo(dstObj)
450        else:
451            super(AWSJobStore, self)._defaultExportFile(otherCls, jobStoreFileID, url)
452
453    @classmethod
454    def getSize(cls, url):
455        return cls._getObjectForUrl(url, existing=True).content_length
456
457    @classmethod
458    def _readFromUrl(cls, url, writable):
459        srcObj = cls._getObjectForUrl(url, existing=True)
460        srcObj.download_fileobj(writable)
461        return (
462            srcObj.content_length,
463            False  # executable bit is always False
464        )
465
466    @classmethod
467    def _writeToUrl(cls, readable, url, executable=False):
468        dstObj = cls._getObjectForUrl(url)
469
470        logger.debug("Uploading %s", dstObj.key)
471        # uploadFile takes care of using multipart upload if the file is larger than partSize (default to 5MB)
472        uploadFile(readable=readable,
473                   resource=s3_boto3_resource,
474                   bucketName=dstObj.bucket_name,
475                   fileID=dstObj.key,
476                   partSize=5 * 1000 * 1000)
477
478    @staticmethod
479    def _getObjectForUrl(url, existing: Optional[bool] = None):
480        """
481        Extracts a key (object) from a given s3:// URL.
482
483        :param bool existing: If True, key is expected to exist. If False, key is expected not to
484               exists and it will be created. If None, the key will be created if it doesn't exist.
485
486        :rtype: S3.Object
487        """
488        keyName = url.path[1:]
489        bucketName = url.netloc
490
491        botoargs = {}
492        host = os.environ.get('TOIL_S3_HOST', None)
493        port = os.environ.get('TOIL_S3_PORT', None)
494        protocol = 'https'
495        if os.environ.get('TOIL_S3_USE_SSL', True) == 'False':
496            protocol = 'http'
497        if host:
498            botoargs['endpoint_url'] = f'{protocol}://{host}' + f':{port}' if port else ''
499
500        # TODO: OrdinaryCallingFormat equivalent in boto3?
501        # if botoargs:
502        #     botoargs['calling_format'] = boto.s3.connection.OrdinaryCallingFormat()
503
504        # Get the bucket's region to avoid a redirect per request
505        region = AWSJobStore.getBucketRegion(bucketName)
506        s3 = boto3_session.resource('s3', region_name=region, **botoargs)
507        obj = s3.Object(bucketName, keyName)
508        objExists = True
509
510        try:
511            obj.load()
512        except ClientError as e:
513            if e.response.get('ResponseMetadata', {}).get('HTTPStatusCode') == 404:
514                objExists = False
515            else:
516                raise
517        if existing is True and not objExists:
518            raise RuntimeError(f"Key '{keyName}' does not exist in bucket '{bucketName}'.")
519        elif existing is False and objExists:
520            raise RuntimeError("Key '{keyName}' exists in bucket '{bucketName}'.")
521
522        if not objExists:
523            obj.put()  # write an empty file
524        return obj
525
526    @classmethod
527    def _supportsUrl(cls, url, export=False):
528        return url.scheme.lower() == 's3'
529
530    def writeFile(self, localFilePath, jobStoreID=None, cleanup=False):
531        info = self.FileInfo.create(jobStoreID if cleanup else None)
532        info.upload(localFilePath, not self.config.disableJobStoreChecksumVerification)
533        info.save()
534        logger.debug("Wrote %r of from %r", info, localFilePath)
535        return info.fileID
536
537    @contextmanager
538    def writeFileStream(self, jobStoreID=None, cleanup=False, basename=None, encoding=None, errors=None):
539        info = self.FileInfo.create(jobStoreID if cleanup else None)
540        with info.uploadStream(encoding=encoding, errors=errors) as writable:
541            yield writable, info.fileID
542        info.save()
543        logger.debug("Wrote %r.", info)
544
545    @contextmanager
546    def writeSharedFileStream(self, sharedFileName, isProtected=None, encoding=None, errors=None):
547        self._requireValidSharedFileName(sharedFileName)
548        info = self.FileInfo.loadOrCreate(jobStoreFileID=self._sharedFileID(sharedFileName),
549                                          ownerID=str(self.sharedFileOwnerID),
550                                          encrypted=isProtected)
551        with info.uploadStream(encoding=encoding, errors=errors) as writable:
552            yield writable
553        info.save()
554        logger.debug("Wrote %r for shared file %r.", info, sharedFileName)
555
556    def updateFile(self, jobStoreFileID, localFilePath):
557        info = self.FileInfo.loadOrFail(jobStoreFileID)
558        info.upload(localFilePath, not self.config.disableJobStoreChecksumVerification)
559        info.save()
560        logger.debug("Wrote %r from path %r.", info, localFilePath)
561
562    @contextmanager
563    def updateFileStream(self, jobStoreFileID, encoding=None, errors=None):
564        info = self.FileInfo.loadOrFail(jobStoreFileID)
565        with info.uploadStream(encoding=encoding, errors=errors) as writable:
566            yield writable
567        info.save()
568        logger.debug("Wrote %r from stream.", info)
569
570    def fileExists(self, jobStoreFileID):
571        return self.FileInfo.exists(jobStoreFileID)
572
573    def getFileSize(self, jobStoreFileID):
574        if not self.fileExists(jobStoreFileID):
575            return 0
576        info = self.FileInfo.loadOrFail(jobStoreFileID)
577        return info.getSize()
578
579    def readFile(self, jobStoreFileID, localFilePath, symlink=False):
580        info = self.FileInfo.loadOrFail(jobStoreFileID)
581        logger.debug("Reading %r into %r.", info, localFilePath)
582        info.download(localFilePath, not self.config.disableJobStoreChecksumVerification)
583        if getattr(jobStoreFileID, 'executable', False):
584            os.chmod(localFilePath, os.stat(localFilePath).st_mode | stat.S_IXUSR)
585
586    @contextmanager
587    def readFileStream(self, jobStoreFileID, encoding=None, errors=None):
588        info = self.FileInfo.loadOrFail(jobStoreFileID)
589        logger.debug("Reading %r into stream.", info)
590        with info.downloadStream(encoding=encoding, errors=errors) as readable:
591            yield readable
592
593    @contextmanager
594    def readSharedFileStream(self, sharedFileName, encoding=None, errors=None):
595        self._requireValidSharedFileName(sharedFileName)
596        jobStoreFileID = self._sharedFileID(sharedFileName)
597        info = self.FileInfo.loadOrFail(jobStoreFileID, customName=sharedFileName)
598        logger.debug("Reading %r for shared file %r into stream.", info, sharedFileName)
599        with info.downloadStream(encoding=encoding, errors=errors) as readable:
600            yield readable
601
602    def deleteFile(self, jobStoreFileID):
603        info = self.FileInfo.load(jobStoreFileID)
604        if info is None:
605            logger.debug("File %s does not exist, skipping deletion.", jobStoreFileID)
606        else:
607            info.delete()
608
609    def writeStatsAndLogging(self, statsAndLoggingString):
610        info = self.FileInfo.create(str(self.statsFileOwnerID))
611        with info.uploadStream(multipart=False) as writeable:
612            if isinstance(statsAndLoggingString, str):
613                # This stream is for binary data, so encode any non-encoded things
614                statsAndLoggingString = statsAndLoggingString.encode('utf-8', errors='ignore')
615            writeable.write(statsAndLoggingString)
616        info.save()
617
618    def readStatsAndLogging(self, callback, readAll=False):
619        itemsProcessed = 0
620
621        for info in self._readStatsAndLogging(callback, self.statsFileOwnerID):
622            info._ownerID = self.readStatsFileOwnerID
623            info.save()
624            itemsProcessed += 1
625
626        if readAll:
627            for _ in self._readStatsAndLogging(callback, self.readStatsFileOwnerID):
628                itemsProcessed += 1
629
630        return itemsProcessed
631
632    def _readStatsAndLogging(self, callback, ownerId):
633        items = None
634        for attempt in retry_sdb():
635            with attempt:
636                items = list(self.filesDomain.select(
637                    consistent_read=True,
638                    query="select * from `%s` where ownerID='%s'" % (
639                        self.filesDomain.name, str(ownerId))))
640        assert items is not None
641        for item in items:
642            info = self.FileInfo.fromItem(item)
643            with info.downloadStream() as readable:
644                callback(readable)
645            yield info
646
647    # TODO: Make this retry more specific?
648    #  example: https://github.com/DataBiosphere/toil/issues/3378
649    @retry()
650    def getPublicUrl(self, jobStoreFileID):
651        info = self.FileInfo.loadOrFail(jobStoreFileID)
652        if info.content is not None:
653            with info.uploadStream(allowInlining=False) as f:
654                f.write(info.content)
655
656        self.filesBucket.Object(compat_bytes(jobStoreFileID)).Acl().put(ACL='public-read')
657
658        url = self.s3_client.generate_presigned_url('get_object',
659                                                    Params={'Bucket': self.filesBucket.name,
660                                                            'Key': compat_bytes(jobStoreFileID),
661                                                            'VersionId': info.version},
662                                                    ExpiresIn=self.publicUrlExpiration.total_seconds())
663
664        # boto doesn't properly remove the x-amz-security-token parameter when
665        # query_auth is False when using an IAM role (see issue #2043). Including the
666        # x-amz-security-token parameter without the access key results in a 403,
667        # even if the resource is public, so we need to remove it.
668        scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url)
669        params = urllib.parse.parse_qs(query)
670        if 'x-amz-security-token' in params:
671            del params['x-amz-security-token']
672        if 'AWSAccessKeyId' in params:
673            del params['AWSAccessKeyId']
674        if 'Signature' in params:
675            del params['Signature']
676        query = urllib.parse.urlencode(params, doseq=True)
677        url = urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
678        return url
679
680    def getSharedPublicUrl(self, sharedFileName):
681        self._requireValidSharedFileName(sharedFileName)
682        return self.getPublicUrl(self._sharedFileID(sharedFileName))
683
684    def _connectSimpleDB(self):
685        """
686        :rtype: SDBConnection
687        """
688        db = boto.sdb.connect_to_region(self.region)
689        if db is None:
690            raise ValueError("Could not connect to SimpleDB. Make sure '%s' is a valid SimpleDB region." % self.region)
691        monkeyPatchSdbConnection(db)
692        return db
693
694    def _bindBucket(self,
695                    bucket_name: str,
696                    create: bool = False,
697                    block: bool = True,
698                    versioning: bool = False,
699                    check_versioning_consistency: bool = True):
700        """
701        Return the Boto Bucket object representing the S3 bucket with the given name. If the
702        bucket does not exist and `create` is True, it will be created.
703
704        :param str bucket_name: the name of the bucket to bind to
705
706        :param bool create: Whether to create bucket the if it doesn't exist
707
708        :param bool block: If False, return None if the bucket doesn't exist. If True, wait until
709               bucket appears. Ignored if `create` is True.
710
711        :rtype: Bucket|None
712        :raises botocore.exceptions.ClientError: If `block` is True and the bucket still doesn't exist after the
713                retry timeout expires.
714        """
715        assert self.minBucketNameLen <= len(bucket_name) <= self.maxBucketNameLen
716        assert self.bucketNameRe.match(bucket_name)
717        logger.debug("Binding to job store bucket '%s'.", bucket_name)
718
719        def bucket_creation_pending(error):
720            # https://github.com/BD2KGenomics/toil/issues/955
721            # https://github.com/BD2KGenomics/toil/issues/995
722            # https://github.com/BD2KGenomics/toil/issues/1093
723
724            # BucketAlreadyOwnedByYou == 409
725            # OperationAborted == 409
726            # NoSuchBucket == 404
727            return (isinstance(error, ClientError) and
728                    error.response.get('ResponseMetadata', {}).get('HTTPStatusCode') in (404, 409))
729
730        bucketExisted = True
731        for attempt in retry_s3(predicate=bucket_creation_pending):
732            with attempt:
733                try:
734                    # the head_bucket() call makes sure that the bucket exists and the user can access it
735                    self.s3_client.head_bucket(Bucket=bucket_name)
736
737                    bucket = self.s3_resource.Bucket(bucket_name)
738                except ClientError as e:
739                    error_code = e.response.get('ResponseMetadata', {}).get('HTTPStatusCode')
740                    if error_code == 404:
741                        bucketExisted = False
742                        logger.debug("Bucket '%s' does not exist.", bucket_name)
743                        if create:
744                            logger.debug("Creating bucket '%s'.", bucket_name)
745                            location = region_to_bucket_location(self.region)
746                            bucket = self.s3_resource.create_bucket(
747                                Bucket=bucket_name,
748                                CreateBucketConfiguration={'LocationConstraint': location})
749                            # Wait until the bucket exists before checking the region and adding tags
750                            bucket.wait_until_exists()
751
752                            # It is possible for create_bucket to return but
753                            # for an immediate request for the bucket region to
754                            # produce an S3ResponseError with code
755                            # NoSuchBucket. We let that kick us back up to the
756                            # main retry loop.
757                            assert self.getBucketRegion(bucket_name) == self.region
758
759                            owner_tag = os.environ.get('TOIL_OWNER_TAG')
760                            if owner_tag:
761                                bucket_tagging = self.s3_resource.BucketTagging(bucket_name)
762                                bucket_tagging.put(Tagging={'TagSet': [{'Key': 'Owner', 'Value': owner_tag}]})
763                        elif block:
764                            raise
765                        else:
766                            return None
767                    elif error_code == 301:
768                        # This is raised if the user attempts to get a bucket in a region outside
769                        # the specified one, if the specified one is not `us-east-1`.  The us-east-1
770                        # server allows a user to use buckets from any region.
771                        raise BucketLocationConflictException(self.getBucketRegion(bucket_name))
772                    else:
773                        raise
774                else:
775                    bucketRegion = self.getBucketRegion(bucket_name)
776                    if bucketRegion != self.region:
777                        raise BucketLocationConflictException(bucketRegion)
778
779                if versioning and not bucketExisted:
780                    # only call this method on bucket creation
781                    bucket.Versioning().enable()
782                    # Now wait until versioning is actually on. Some uploads
783                    # would come back with no versions; maybe they were
784                    # happening too fast and this setting isn't sufficiently
785                    # consistent?
786                    time.sleep(1)
787                    while not self._getBucketVersioning(bucket_name):
788                        logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...")
789                        time.sleep(1)
790                elif check_versioning_consistency:
791                    # now test for versioning consistency
792                    # we should never see any of these errors since 'versioning' should always be true
793                    bucket_versioning = self._getBucketVersioning(bucket_name)
794                    if bucket_versioning != versioning:
795                        assert False, 'Cannot modify versioning on existing bucket'
796                    elif bucket_versioning is None:
797                        assert False, 'Cannot use a bucket with versioning suspended'
798                if bucketExisted:
799                    logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.")
800                else:
801                    logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.")
802
803                return bucket
804
805    def _bindDomain(self, domain_name, create=False, block=True):
806        """
807        Return the Boto Domain object representing the SDB domain of the given name. If the
808        domain does not exist and `create` is True, it will be created.
809
810        :param str domain_name: the name of the domain to bind to
811
812        :param bool create: True if domain should be created if it doesn't exist
813
814        :param bool block: If False, return None if the domain doesn't exist. If True, wait until
815               domain appears. This parameter is ignored if create is True.
816
817        :rtype: Domain|None
818        :raises SDBResponseError: If `block` is True and the domain still doesn't exist after the
819                retry timeout expires.
820        """
821        logger.debug("Binding to job store domain '%s'.", domain_name)
822        retryargs = dict(predicate=lambda e: no_such_sdb_domain(e) or sdb_unavailable(e))
823        if not block:
824            retryargs['timeout'] = 15
825        for attempt in retry_sdb(**retryargs):
826            with attempt:
827                try:
828                    return self.db.get_domain(domain_name)
829                except SDBResponseError as e:
830                    if no_such_sdb_domain(e):
831                        if create:
832                            return self.db.create_domain(domain_name)
833                        elif block:
834                            raise
835                        else:
836                            return None
837                    else:
838                        raise
839
840    def _newJobID(self):
841        return str(uuid.uuid4())
842
843    # A dummy job ID under which all shared files are stored
844    sharedFileOwnerID = uuid.UUID('891f7db6-e4d9-4221-a58e-ab6cc4395f94')
845
846    # A dummy job ID under which all unread stats files are stored
847    statsFileOwnerID = uuid.UUID('bfcf5286-4bc7-41ef-a85d-9ab415b69d53')
848
849    # A dummy job ID under which all read stats files are stored
850    readStatsFileOwnerID = uuid.UUID('e77fc3aa-d232-4255-ae04-f64ee8eb0bfa')
851
852    def _sharedFileID(self, sharedFileName):
853        return str(uuid.uuid5(self.sharedFileOwnerID, sharedFileName))
854
855    @InnerClass
856    class FileInfo(SDBHelper):
857        """
858        Represents a file in this job store.
859        """
860        outer = None
861        """
862        :type: AWSJobStore
863        """
864
865        def __init__(self, fileID, ownerID, encrypted,
866                     version=None, content=None, numContentChunks=0, checksum=None):
867            """
868            :type fileID: str
869            :param fileID: the file's ID
870
871            :type ownerID: str
872            :param ownerID: ID of the entity owning this file, typically a job ID aka jobStoreID
873
874            :type encrypted: bool
875            :param encrypted: whether the file is stored in encrypted form
876
877            :type version: str|None
878            :param version: a non-empty string containing the most recent version of the S3
879            object storing this file's content, None if the file is new, or empty string if the
880            file is inlined.
881
882            :type content: str|None
883            :param content: this file's inlined content
884
885            :type numContentChunks: int
886            :param numContentChunks: the number of SDB domain attributes occupied by this files
887
888            :type checksum: str|None
889            :param checksum: the checksum of the file, if available. Formatted
890            as <algorithm>$<lowercase hex hash>.
891
892            inlined content. Note that an inlined empty string still occupies one chunk.
893            """
894            super(AWSJobStore.FileInfo, self).__init__()
895            self._fileID = fileID
896            self._ownerID = ownerID
897            self.encrypted = encrypted
898            self._version = version
899            self._previousVersion = version
900            self._content = content
901            self._checksum = checksum
902            self._numContentChunks = numContentChunks
903
904        @property
905        def fileID(self):
906            return self._fileID
907
908        @property
909        def ownerID(self):
910            return self._ownerID
911
912        @property
913        def version(self):
914            return self._version
915
916        @version.setter
917        def version(self, version):
918            # Version should only change once
919            assert self._previousVersion == self._version
920            self._version = version
921            if version:
922                self.content = None
923
924        @property
925        def previousVersion(self):
926            return self._previousVersion
927
928        @property
929        def content(self):
930            return self._content
931
932        @property
933        def checksum(self):
934            return self._checksum
935
936        @checksum.setter
937        def checksum(self, checksum):
938            self._checksum = checksum
939
940        @content.setter
941        def content(self, content):
942            assert content is None or isinstance(content, bytes)
943            self._content = content
944            if content is not None:
945                self.version = ''
946
947        @classmethod
948        def create(cls, ownerID):
949            return cls(str(uuid.uuid4()), ownerID, encrypted=cls.outer.sseKeyPath is not None)
950
951        @classmethod
952        def presenceIndicator(cls):
953            return 'encrypted'
954
955        @classmethod
956        def exists(cls, jobStoreFileID):
957            for attempt in retry_sdb():
958                with attempt:
959                    return bool(cls.outer.filesDomain.get_attributes(
960                        item_name=compat_bytes(jobStoreFileID),
961                        attribute_name=[cls.presenceIndicator()],
962                        consistent_read=True))
963
964        @classmethod
965        def load(cls, jobStoreFileID):
966            for attempt in retry_sdb():
967                with attempt:
968                    self = cls.fromItem(
969                        cls.outer.filesDomain.get_attributes(item_name=compat_bytes(jobStoreFileID),
970                                                             consistent_read=True))
971                    return self
972
973        @classmethod
974        def loadOrCreate(cls, jobStoreFileID, ownerID, encrypted):
975            self = cls.load(jobStoreFileID)
976            if encrypted is None:
977                encrypted = cls.outer.sseKeyPath is not None
978            if self is None:
979                self = cls(jobStoreFileID, ownerID, encrypted=encrypted)
980            else:
981                assert self.fileID == jobStoreFileID
982                assert self.ownerID == ownerID
983                self.encrypted = encrypted
984            return self
985
986        @classmethod
987        def loadOrFail(cls, jobStoreFileID, customName=None):
988            """
989            :rtype: AWSJobStore.FileInfo
990            :return: an instance of this class representing the file with the given ID
991            :raises NoSuchFileException: if given file does not exist
992            """
993            self = cls.load(jobStoreFileID)
994            if self is None:
995                raise NoSuchFileException(jobStoreFileID, customName=customName)
996            else:
997                return self
998
999        @classmethod
1000        def fromItem(cls, item):
1001            """
1002            Convert an SDB item to an instance of this class.
1003
1004            :type item: Item
1005            """
1006            assert item is not None
1007
1008            # Strings come back from SDB as unicode
1009            def strOrNone(s):
1010                return s if s is None else str(s)
1011
1012            # ownerID and encrypted are the only mandatory attributes
1013            ownerID = strOrNone(item.get('ownerID'))
1014            encrypted = item.get('encrypted')
1015            if ownerID is None:
1016                assert encrypted is None
1017                return None
1018            else:
1019                version = strOrNone(item['version'])
1020                checksum = strOrNone(item.get('checksum'))
1021                encrypted = strict_bool(encrypted)
1022                content, numContentChunks = cls.attributesToBinary(item)
1023                if encrypted:
1024                    sseKeyPath = cls.outer.sseKeyPath
1025                    if sseKeyPath is None:
1026                        raise AssertionError('Content is encrypted but no key was provided.')
1027                    if content is not None:
1028                        content = encryption.decrypt(content, sseKeyPath)
1029                self = cls(fileID=item.name, ownerID=ownerID, encrypted=encrypted, version=version,
1030                           content=content, numContentChunks=numContentChunks, checksum=checksum)
1031                return self
1032
1033        def toItem(self):
1034            """
1035            Convert this instance to an attribute dictionary suitable for SDB put_attributes().
1036
1037            :rtype: (dict,int)
1038
1039            :return: the attributes dict and an integer specifying the the number of chunk
1040                     attributes in the dictionary that are used for storing inlined content.
1041            """
1042            content = self.content
1043            assert content is None or isinstance(content, bytes)
1044            if self.encrypted and content is not None:
1045                sseKeyPath = self.outer.sseKeyPath
1046                if sseKeyPath is None:
1047                    raise AssertionError('Encryption requested but no key was provided.')
1048                content = encryption.encrypt(content, sseKeyPath)
1049            assert content is None or isinstance(content, bytes)
1050            attributes = self.binaryToAttributes(content)
1051            numChunks = attributes['numChunks']
1052            attributes.update(dict(ownerID=self.ownerID,
1053                                   encrypted=self.encrypted,
1054                                   version=self.version or '',
1055                                   checksum=self.checksum or ''))
1056            return attributes, numChunks
1057
1058        @classmethod
1059        def _reservedAttributes(cls):
1060            return 3 + super(AWSJobStore.FileInfo, cls)._reservedAttributes()
1061
1062        @staticmethod
1063        def maxInlinedSize():
1064            return 256
1065
1066        def save(self):
1067            attributes, numNewContentChunks = self.toItem()
1068            # False stands for absence
1069            expected = ['version', False if self.previousVersion is None else self.previousVersion]
1070            try:
1071                for attempt in retry_sdb():
1072                    with attempt:
1073                        assert self.outer.filesDomain.put_attributes(item_name=compat_bytes(self.fileID),
1074                                                                     attributes=attributes,
1075                                                                     expected_value=expected)
1076                # clean up the old version of the file if necessary and safe
1077                if self.previousVersion and (self.previousVersion != self.version):
1078                    for attempt in retry_s3():
1079                        with attempt:
1080                            self.outer.s3_client.delete_object(Bucket=self.outer.filesBucket.name,
1081                                                               Key=compat_bytes(self.fileID),
1082                                                               VersionId=self.previousVersion)
1083                self._previousVersion = self._version
1084                if numNewContentChunks < self._numContentChunks:
1085                    residualChunks = range(numNewContentChunks, self._numContentChunks)
1086                    attributes = [self._chunkName(i) for i in residualChunks]
1087                    for attempt in retry_sdb():
1088                        with attempt:
1089                            self.outer.filesDomain.delete_attributes(compat_bytes(self.fileID),
1090                                                                     attributes=attributes)
1091                self._numContentChunks = numNewContentChunks
1092            except SDBResponseError as e:
1093                if e.error_code == 'ConditionalCheckFailed':
1094                    raise ConcurrentFileModificationException(self.fileID)
1095                else:
1096                    raise
1097
1098        def upload(self, localFilePath, calculateChecksum=True):
1099            file_size, file_time = fileSizeAndTime(localFilePath)
1100            if file_size <= self.maxInlinedSize():
1101                with open(localFilePath, 'rb') as f:
1102                    self.content = f.read()
1103                # Clear out any old checksum in case of overwrite
1104                self.checksum = ''
1105            else:
1106                headerArgs = self._s3EncryptionArgs()
1107                # Create a new Resource in case it needs to be on its own thread
1108                resource = boto3_session.resource('s3', region_name=self.outer.region)
1109
1110                self.checksum = self._get_file_checksum(localFilePath) if calculateChecksum else None
1111                self.version = uploadFromPath(localFilePath,
1112                                              resource=resource,
1113                                              bucketName=self.outer.filesBucket.name,
1114                                              fileID=compat_bytes(self.fileID),
1115                                              headerArgs=headerArgs,
1116                                              partSize=self.outer.partSize)
1117
1118        def _start_checksum(self, to_match=None, algorithm='sha1'):
1119            """
1120            Get a hasher that can be used with _update_checksum and
1121            _finish_checksum.
1122
1123            If to_match is set, it is a precomputed checksum which we expect
1124            the result to match.
1125
1126            The right way to compare checksums is to feed in the checksum to be
1127            matched, so we can see its algorithm, instead of getting a new one
1128            and comparing. If a checksum to match is fed in, _finish_checksum()
1129            will raise a ChecksumError if it isn't matched.
1130            """
1131
1132            # If we have an expexted result it will go here.
1133            expected = None
1134
1135            if to_match is not None:
1136                parts = to_match.split('$')
1137                algorithm = parts[0]
1138                expected = parts[1]
1139
1140            wrapped = getattr(hashlib, algorithm)()
1141            logger.debug(f'Starting {algorithm} checksum to match {expected}')
1142            return algorithm, wrapped, expected
1143
1144        def _update_checksum(self, checksum_in_progress, data):
1145            """
1146            Update a checksum in progress from _start_checksum with new data.
1147            """
1148            checksum_in_progress[1].update(data)
1149
1150        def _finish_checksum(self, checksum_in_progress):
1151            """
1152            Complete a checksum in progress from _start_checksum and return the
1153            checksum result string.
1154            """
1155
1156            result_hash = checksum_in_progress[1].hexdigest()
1157
1158            logger.debug(f'Completed checksum with hash {result_hash} vs. expected {checksum_in_progress[2]}')
1159            if checksum_in_progress[2] is not None:
1160                # We expected a particular hash
1161                if result_hash != checksum_in_progress[2]:
1162                    raise ChecksumError('Checksum mismatch. Expected: %s Actual: %s' %
1163                        (checksum_in_progress[2], result_hash))
1164
1165            return '$'.join([checksum_in_progress[0], result_hash])
1166
1167        def _get_file_checksum(self, localFilePath, to_match=None):
1168            with open(localFilePath, 'rb') as f:
1169                hasher = self._start_checksum(to_match=to_match)
1170                contents = f.read(1024 * 1024)
1171                while contents != b'':
1172                    self._update_checksum(hasher, contents)
1173                    contents = f.read(1024 * 1024)
1174                return self._finish_checksum(hasher)
1175
1176        @contextmanager
1177        def uploadStream(self, multipart=True, allowInlining=True, encoding=None, errors=None):
1178            """
1179            Context manager that gives out a binary or text mode upload stream to upload data.
1180            """
1181
1182            # Note that we have to handle already having a content or a version
1183            # if we are overwriting something.
1184
1185            # But make sure we don't have both.
1186            assert not (bool(self.version) and self.content is not None)
1187
1188            info = self
1189            store = self.outer
1190
1191            class MultiPartPipe(WritablePipe):
1192                def readFrom(self, readable):
1193                    # Get the first block of data we want to put
1194                    buf = readable.read(store.partSize)
1195                    assert isinstance(buf, bytes)
1196
1197                    if allowInlining and len(buf) <= info.maxInlinedSize():
1198                        logger.debug('Inlining content of %d bytes', len(buf))
1199                        info.content = buf
1200                        # There will be no checksum
1201                        info.checksum = ''
1202                    else:
1203                        # We will compute a checksum
1204                        hasher = info._start_checksum()
1205                        logger.debug('Updating checksum with %d bytes', len(buf))
1206                        info._update_checksum(hasher, buf)
1207
1208                        client = store.s3_client
1209                        bucket_name = store.filesBucket.name
1210                        headerArgs = info._s3EncryptionArgs()
1211
1212                        for attempt in retry_s3():
1213                            with attempt:
1214                                logger.debug('Starting multipart upload')
1215                                # low-level clients are thread safe
1216                                upload = client.create_multipart_upload(Bucket=bucket_name,
1217                                                                        Key=compat_bytes(info.fileID),
1218                                                                        **headerArgs)
1219                                uploadId = upload['UploadId']
1220                                parts = []
1221                                logger.debug('Multipart upload started as %s', uploadId)
1222
1223                        for i in range(CONSISTENCY_TICKS):
1224                            # Sometimes we can create a multipart upload and not see it. Wait around for it.
1225                            response = client.list_multipart_uploads(Bucket=bucket_name,
1226                                                                     MaxUploads=1,
1227                                                                     Prefix=compat_bytes(info.fileID))
1228                            if len(response['Uploads']) != 0 and response['Uploads'][0]['UploadId'] == uploadId:
1229                                logger.debug('Multipart upload visible as %s', uploadId)
1230                                break
1231                            else:
1232                                logger.debug('Multipart upload %s is not visible; we see %s', uploadId, response['Uploads'])
1233                                time.sleep(CONSISTENCY_TIME * 2 ** i)
1234
1235                        try:
1236                            for part_num in itertools.count():
1237                                for attempt in retry_s3():
1238                                    with attempt:
1239                                        logger.debug('Uploading part %d of %d bytes to %s', part_num + 1, len(buf), uploadId)
1240                                        # TODO: include the Content-MD5 header:
1241                                        #  https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.complete_multipart_upload
1242                                        part = client.upload_part(Bucket=bucket_name,
1243                                                                  Key=compat_bytes(info.fileID),
1244                                                                  PartNumber=part_num + 1,
1245                                                                  UploadId=uploadId,
1246                                                                  Body=BytesIO(buf),
1247                                                                  **headerArgs)
1248
1249                                        parts.append({"PartNumber": part_num + 1, "ETag": part["ETag"]})
1250
1251                                # Get the next block of data we want to put
1252                                buf = readable.read(info.outer.partSize)
1253                                assert isinstance(buf, bytes)
1254                                if len(buf) == 0:
1255                                    # Don't allow any part other than the very first to be empty.
1256                                    break
1257                                info._update_checksum(hasher, buf)
1258                        except:
1259                            with panic(log=logger):
1260                                for attempt in retry_s3():
1261                                    with attempt:
1262                                        client.abort_multipart_upload(Bucket=bucket_name,
1263                                                                      Key=compat_bytes(info.fileID),
1264                                                                      UploadId=uploadId)
1265
1266                        else:
1267
1268                            while not store._getBucketVersioning(store.filesBucket.name):
1269                                logger.warning('Versioning does not appear to be enabled yet. Deferring multipart '
1270                                               'upload completion...')
1271                                time.sleep(1)
1272
1273                            # Save the checksum
1274                            info.checksum = info._finish_checksum(hasher)
1275
1276                            for attempt in retry_s3():
1277                                with attempt:
1278                                    logger.debug('Attempting to complete upload...')
1279                                    completed = client.complete_multipart_upload(
1280                                        Bucket=bucket_name,
1281                                        Key=compat_bytes(info.fileID),
1282                                        UploadId=uploadId,
1283                                        MultipartUpload={"Parts": parts})
1284
1285                                    logger.debug('Completed upload object of type %s: %s', str(type(completed)),
1286                                                 repr(completed))
1287                                    info.version = completed['VersionId']
1288                                    logger.debug('Completed upload with version %s', str(info.version))
1289
1290                            if info.version is None:
1291                                # Somehow we don't know the version. Try and get it.
1292                                for attempt in retry_s3(predicate=lambda e: retryable_s3_errors(e) or isinstance(e, AssertionError)):
1293                                    with attempt:
1294                                        version = client.head_object(Bucket=bucket_name,
1295                                                                     Key=compat_bytes(info.fileID),
1296                                                                     **headerArgs).get('VersionId', None)
1297                                        logger.warning('Loaded key for upload with no version and got version %s',
1298                                                       str(version))
1299                                        info.version = version
1300                                        assert info.version is not None
1301
1302                    # Make sure we actually wrote something, even if an empty file
1303                    assert (bool(info.version) or info.content is not None)
1304
1305            class SinglePartPipe(WritablePipe):
1306                def readFrom(self, readable):
1307                    buf = readable.read()
1308                    assert isinstance(buf, bytes)
1309                    dataLength = len(buf)
1310                    if allowInlining and dataLength <= info.maxInlinedSize():
1311                        logger.debug('Inlining content of %d bytes', len(buf))
1312                        info.content = buf
1313                        # There will be no checksum
1314                        info.checksum = ''
1315                    else:
1316                        # We will compute a checksum
1317                        hasher = info._start_checksum()
1318                        info._update_checksum(hasher, buf)
1319                        info.checksum = info._finish_checksum(hasher)
1320
1321                        bucket_name = store.filesBucket.name
1322                        headerArgs = info._s3EncryptionArgs()
1323                        client = store.s3_client
1324
1325                        buf = BytesIO(buf)
1326
1327                        while not store._getBucketVersioning(bucket_name):
1328                            logger.warning('Versioning does not appear to be enabled yet. Deferring single part '
1329                                           'upload...')
1330                            time.sleep(1)
1331
1332                        for attempt in retry_s3():
1333                            with attempt:
1334                                logger.debug('Uploading single part of %d bytes', dataLength)
1335                                client.upload_fileobj(Bucket=bucket_name,
1336                                                      Key=compat_bytes(info.fileID),
1337                                                      Fileobj=buf,
1338                                                      ExtraArgs=headerArgs)
1339
1340                                # use head_object with the SSE headers to access versionId and content_length attributes
1341                                headObj = client.head_object(Bucket=bucket_name,
1342                                                             Key=compat_bytes(info.fileID),
1343                                                             **headerArgs)
1344                                assert dataLength == headObj.get('ContentLength', None)
1345                                info.version = headObj.get('VersionId', None)
1346                                logger.debug('Upload received version %s', str(info.version))
1347
1348                        if info.version is None:
1349                            # Somehow we don't know the version
1350                            for attempt in retry_s3(predicate=lambda e: retryable_s3_errors(e) or isinstance(e, AssertionError)):
1351                                with attempt:
1352                                    headObj = client.head_object(Bucket=bucket_name,
1353                                                                 Key=compat_bytes(info.fileID),
1354                                                                 **headerArgs)
1355                                    info.version = headObj.get('VersionId', None)
1356                                    logger.warning('Reloaded key with no version and got version %s', str(info.version))
1357                                    assert info.version is not None
1358
1359                    # Make sure we actually wrote something, even if an empty file
1360                    assert (bool(info.version) or info.content is not None)
1361
1362            if multipart:
1363                pipe = MultiPartPipe(encoding=encoding, errors=errors)
1364            else:
1365                pipe = SinglePartPipe(encoding=encoding, errors=errors)
1366
1367            with pipe as writable:
1368                yield writable
1369
1370            if not pipe.reader_done:
1371                logger.debug('Version: {} Content: {}'.format(self.version, self.content))
1372                raise RuntimeError('Escaped context manager without written data being read!')
1373
1374            # We check our work to make sure we have exactly one of embedded
1375            # content or a real object version.
1376
1377            if self.content is None:
1378                if not bool(self.version):
1379                    logger.debug('Version: {} Content: {}'.format(self.version, self.content))
1380                    raise RuntimeError('No content added and no version created')
1381            else:
1382                if bool(self.version):
1383                    logger.debug('Version: {} Content: {}'.format(self.version, self.content))
1384                    raise RuntimeError('Content added and version created')
1385
1386        def copyFrom(self, srcObj):
1387            """
1388            Copies contents of source key into this file.
1389
1390            :param S3.Object srcObj: The key (object) that will be copied from
1391            """
1392            assert srcObj.content_length is not None
1393            if srcObj.content_length <= self.maxInlinedSize():
1394                self.content = srcObj.get().get('Body').read()
1395            else:
1396                # Create a new Resource in case it needs to be on its own thread
1397                resource = boto3_session.resource('s3', region_name=self.outer.region)
1398                self.version = copyKeyMultipart(resource,
1399                                                srcBucketName=compat_bytes(srcObj.bucket_name),
1400                                                srcKeyName=compat_bytes(srcObj.key),
1401                                                srcKeyVersion=compat_bytes(srcObj.version_id),
1402                                                dstBucketName=compat_bytes(self.outer.filesBucket.name),
1403                                                dstKeyName=compat_bytes(self._fileID),
1404                                                sseAlgorithm='AES256',
1405                                                sseKey=self._getSSEKey())
1406
1407        def copyTo(self, dstObj):
1408            """
1409            Copies contents of this file to the given key.
1410
1411            :param S3.Object dstObj: The key (object) to copy this file's content to
1412            """
1413            if self.content is not None:
1414                for attempt in retry_s3():
1415                    with attempt:
1416                        dstObj.put(Body=self.content)
1417            elif self.version:
1418                # Create a new Resource in case it needs to be on its own thread
1419                resource = boto3_session.resource('s3', region_name=self.outer.region)
1420
1421                for attempt in retry_s3():
1422                    # encrypted = True if self.outer.sseKeyPath else False
1423                    with attempt:
1424                        copyKeyMultipart(resource,
1425                                         srcBucketName=compat_bytes(self.outer.filesBucket.name),
1426                                         srcKeyName=compat_bytes(self.fileID),
1427                                         srcKeyVersion=compat_bytes(self.version),
1428                                         dstBucketName=compat_bytes(dstObj.bucket_name),
1429                                         dstKeyName=compat_bytes(dstObj.key),
1430                                         copySourceSseAlgorithm='AES256',
1431                                         copySourceSseKey=self._getSSEKey())
1432            else:
1433                assert False
1434
1435        def download(self, localFilePath, verifyChecksum=True):
1436            if self.content is not None:
1437                with AtomicFileCreate(localFilePath) as tmpPath:
1438                    with open(tmpPath, 'wb') as f:
1439                        f.write(self.content)
1440            elif self.version:
1441                headerArgs = self._s3EncryptionArgs()
1442                obj = self.outer.filesBucket.Object(compat_bytes(self.fileID))
1443
1444                for attempt in retry_s3(predicate=lambda e: retryable_s3_errors(e) or isinstance(e, ChecksumError)):
1445                    with attempt:
1446                        with AtomicFileCreate(localFilePath) as tmpPath:
1447                            obj.download_file(Filename=tmpPath, ExtraArgs={'VersionId': self.version, **headerArgs})
1448
1449                        if verifyChecksum and self.checksum:
1450                            try:
1451                                # This automatically compares the result and matches the algorithm.
1452                                self._get_file_checksum(localFilePath, self.checksum)
1453                            except ChecksumError as e:
1454                                # Annotate checksum mismatches with file name
1455                                raise ChecksumError('Checksums do not match for file %s.' % localFilePath) from e
1456                                # The error will get caught and result in a retry of the download until we run out of retries.
1457                                # TODO: handle obviously truncated downloads by resuming instead.
1458            else:
1459                assert False
1460
1461        @contextmanager
1462        def downloadStream(self, verifyChecksum=True, encoding=None, errors=None):
1463            """
1464            Context manager that gives out a download stream to download data.
1465            """
1466            info = self
1467
1468            class DownloadPipe(ReadablePipe):
1469                def writeTo(self, writable):
1470                    if info.content is not None:
1471                        writable.write(info.content)
1472                    elif info.version:
1473                        headerArgs = info._s3EncryptionArgs()
1474                        obj = info.outer.filesBucket.Object(compat_bytes(info.fileID))
1475                        for attempt in retry_s3():
1476                            with attempt:
1477                                obj.download_fileobj(writable, ExtraArgs={'VersionId': info.version, **headerArgs})
1478                    else:
1479                        assert False
1480
1481            class HashingPipe(ReadableTransformingPipe):
1482                """
1483                Class which checksums all the data read through it. If it
1484                reaches EOF and the checksum isn't correct, raises
1485                ChecksumError.
1486
1487                Assumes info actually has a checksum.
1488                """
1489
1490                def transform(self, readable, writable):
1491                    hasher = info._start_checksum(to_match=info.checksum)
1492                    contents = readable.read(1024 * 1024)
1493                    while contents != b'':
1494                        info._update_checksum(hasher, contents)
1495                        try:
1496                            writable.write(contents)
1497                        except BrokenPipeError:
1498                            # Read was stopped early by user code.
1499                            # Can't check the checksum.
1500                            return
1501                        contents = readable.read(1024 * 1024)
1502                    # We reached EOF in the input.
1503                    # Finish checksumming and verify.
1504                    info._finish_checksum(hasher)
1505                    # Now stop so EOF happens in the output.
1506
1507            if verifyChecksum and self.checksum:
1508                with DownloadPipe() as readable:
1509                    # Interpose a pipe to check the hash
1510                    with HashingPipe(readable, encoding=encoding, errors=errors) as verified:
1511                        yield verified
1512            else:
1513                # Readable end of pipe produces text mode output if encoding specified
1514                with DownloadPipe(encoding=encoding, errors=errors) as readable:
1515                    # No true checksum available, so don't hash
1516                    yield readable
1517
1518        def delete(self):
1519            store = self.outer
1520            if self.previousVersion is not None:
1521                for attempt in retry_sdb():
1522                    with attempt:
1523                        store.filesDomain.delete_attributes(
1524                            compat_bytes(self.fileID),
1525                            expected_values=['version', self.previousVersion])
1526                if self.previousVersion:
1527                    for attempt in retry_s3():
1528                        with attempt:
1529                            store.s3_client.delete_object(Bucket=store.filesBucket.name,
1530                                                          Key=compat_bytes(self.fileID),
1531                                                          VersionId=self.previousVersion)
1532
1533        def getSize(self):
1534            """
1535            Return the size of the referenced item in bytes.
1536            """
1537            if self.content is not None:
1538                return len(self.content)
1539            elif self.version:
1540                for attempt in retry_s3():
1541                    with attempt:
1542                        obj = self.outer.filesBucket.Object(compat_bytes(self.fileID))
1543                        return obj.content_length
1544            else:
1545                return 0
1546
1547        def _getSSEKey(self) -> Optional[bytes]:
1548            sseKeyPath = self.outer.sseKeyPath
1549            if sseKeyPath:
1550                with open(sseKeyPath, 'rb') as f:
1551                    sseKey = f.read()
1552                return sseKey
1553
1554        def _s3EncryptionArgs(self):
1555            # the keys of the returned dictionary are unpacked to the corresponding boto3 optional
1556            # parameters and will be used to set the http headers
1557            if self.encrypted:
1558                sseKey = self._getSSEKey()
1559                assert sseKey is not None, 'Content is encrypted but no key was provided.'
1560                assert len(sseKey) == 32
1561                # boto3 encodes the key and calculates the MD5 for us
1562                return {'SSECustomerAlgorithm': 'AES256', 'SSECustomerKey': sseKey}
1563            else:
1564                return {}
1565
1566        def __repr__(self):
1567            r = custom_repr
1568            d = (('fileID', r(self.fileID)),
1569                 ('ownerID', r(self.ownerID)),
1570                 ('encrypted', r(self.encrypted)),
1571                 ('version', r(self.version)),
1572                 ('previousVersion', r(self.previousVersion)),
1573                 ('content', r(self.content)),
1574                 ('checksum', r(self.checksum)),
1575                 ('_numContentChunks', r(self._numContentChunks)))
1576            return "{}({})".format(type(self).__name__,
1577                                   ', '.join('%s=%s' % (k, v) for k, v in d))
1578
1579    versionings = dict(Enabled=True, Disabled=False, Suspended=None)
1580
1581    def _getBucketVersioning(self, bucket_name):
1582        """
1583        The status attribute of BucketVersioning can be 'Enabled', 'Suspended' or None (Disabled)
1584        which we map to True, None and False respectively. Note that we've never seen a versioning
1585        status of 'Disabled', only the None return value. Calling BucketVersioning.suspend() will
1586        cause BucketVersioning.status to then return 'Suspended' even on a new bucket that never
1587        had versioning enabled.
1588
1589        :param bucket_name: str
1590        """
1591        for attempt in retry_s3():
1592            with attempt:
1593                status = self.s3_resource.BucketVersioning(bucket_name).status
1594                return self.versionings.get(status) if status else False
1595
1596    @staticmethod
1597    def getBucketRegion(bucket_name: str):
1598        for attempt in retry_s3():
1599            with attempt:
1600                loc = s3_boto3_client.get_bucket_location(Bucket=bucket_name)
1601                return bucket_location_to_region(loc.get('LocationConstraint', None))
1602
1603    # TODO: Make this retry more specific?
1604    #  example: https://github.com/DataBiosphere/toil/issues/3378
1605    @retry()
1606    def destroy(self):
1607        # FIXME: Destruction of encrypted stores only works after initialize() or .resume()
1608        # See https://github.com/BD2KGenomics/toil/issues/1041
1609        try:
1610            self._bind(create=False, block=False, check_versioning_consistency=False)
1611        except BucketLocationConflictException:
1612            # If the unique jobstore bucket name existed, _bind would have raised a
1613            # BucketLocationConflictException before calling destroy.  Calling _bind here again
1614            # would reraise the same exception so we need to catch and ignore that exception.
1615            pass
1616        # TODO: Add other failure cases to be ignored here.
1617        self._registered = None
1618        if self.filesBucket is not None:
1619            self._delete_bucket(self.filesBucket)
1620            self.filesBucket = None
1621        for name in 'filesDomain', 'jobsDomain':
1622            domain = getattr(self, name)
1623            if domain is not None:
1624                self._delete_domain(domain)
1625                setattr(self, name, None)
1626        self._registered = False
1627
1628    def _delete_domain(self, domain):
1629        for attempt in retry_sdb():
1630            with attempt:
1631                try:
1632                    domain.delete()
1633                except SDBResponseError as e:
1634                    if not no_such_sdb_domain(e):
1635                        raise
1636
1637    def _delete_bucket(self, bucket):
1638        """
1639        :param bucket: S3.Bucket
1640        """
1641        for attempt in retry_s3():
1642            with attempt:
1643                try:
1644                    uploads = s3_boto3_client.list_multipart_uploads(Bucket=bucket.name).get('Uploads')
1645                    if uploads:
1646                        for u in uploads:
1647                            s3_boto3_client.abort_multipart_upload(Bucket=bucket.name,
1648                                                                   Key=u["Key"],
1649                                                                   UploadId=u["UploadId"])
1650
1651                    bucket.objects.all().delete()
1652                    bucket.object_versions.delete()
1653                    bucket.delete()
1654                except s3_boto3_client.exceptions.NoSuchBucket:
1655                    pass
1656                except ClientError as e:
1657                    if e.response.get('ResponseMetadata', {}).get('HTTPStatusCode') != 404:
1658                        raise
1659
1660
1661aRepr = reprlib.Repr()
1662aRepr.maxstring = 38  # so UUIDs don't get truncated (36 for UUID plus 2 for quotes)
1663custom_repr = aRepr.repr
1664
1665
1666class BucketLocationConflictException(Exception):
1667    def __init__(self, bucketRegion):
1668        super(BucketLocationConflictException, self).__init__(
1669            'A bucket with the same name as the jobstore was found in another region (%s). '
1670            'Cannot proceed as the unique bucket name is already in use.' % bucketRegion)
1671