1# Copyright (C) 2015-2021 Regents of the University of California 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import os 15import hashlib 16import itertools 17import logging 18import pickle 19import re 20import reprlib 21import stat 22import time 23import urllib.error 24import urllib.parse 25import urllib.request 26import uuid 27from contextlib import contextmanager 28from io import BytesIO 29 30import boto.sdb 31import boto.s3.connection 32from typing import Optional 33from boto.exception import SDBResponseError 34from botocore.exceptions import ClientError 35 36import toil.lib.encryption as encryption 37from toil.fileStores import FileID 38from toil.jobStores.abstractJobStore import (AbstractJobStore, 39 ConcurrentFileModificationException, 40 JobStoreExistsException, 41 NoSuchFileException, 42 NoSuchJobException, 43 NoSuchJobStoreException) 44from toil.jobStores.aws.utils import (SDBHelper, 45 bucket_location_to_region, 46 uploadFromPath, 47 uploadFile, 48 copyKeyMultipart, 49 fileSizeAndTime, 50 monkeyPatchSdbConnection, 51 no_such_sdb_domain, 52 region_to_bucket_location, 53 retry_s3, 54 retry_sdb, 55 retryable_s3_errors, 56 sdb_unavailable) 57from toil.jobStores.utils import (ReadablePipe, 58 ReadableTransformingPipe, 59 WritablePipe) 60from toil.lib.compatibility import compat_bytes 61from toil.lib.ec2 import establish_boto3_session 62from toil.lib.ec2nodes import EC2Regions 63from toil.lib.exceptions import panic 64from toil.lib.memoize import strict_bool 65from toil.lib.io import AtomicFileCreate 66from toil.lib.objects import InnerClass 67from toil.lib.retry import retry 68 69boto3_session = establish_boto3_session() 70s3_boto3_resource = boto3_session.resource('s3') 71s3_boto3_client = boto3_session.client('s3') 72logger = logging.getLogger(__name__) 73 74# Sometimes we have to wait for multipart uploads to become real. How long 75# should we wait? 76CONSISTENCY_TICKS = 5 77CONSISTENCY_TIME = 1 78 79 80class ChecksumError(Exception): 81 """Raised when a download from AWS does not contain the correct data.""" 82 83 84class AWSJobStore(AbstractJobStore): 85 """ 86 A job store that uses Amazon's S3 for file storage and SimpleDB for storing job info and 87 enforcing strong consistency on the S3 file storage. There will be SDB domains for jobs and 88 files and a versioned S3 bucket for file contents. Job objects are pickled, compressed, 89 partitioned into chunks of 1024 bytes and each chunk is stored as a an attribute of the SDB 90 item representing the job. UUIDs are used to identify jobs and files. 91 """ 92 93 # Dots in bucket names should be avoided because bucket names are used in HTTPS bucket 94 # URLs where the may interfere with the certificate common name. We use a double 95 # underscore as a separator instead. 96 # 97 bucketNameRe = re.compile(r'^[a-z0-9][a-z0-9-]+[a-z0-9]$') 98 99 # See http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html 100 # 101 minBucketNameLen = 3 102 maxBucketNameLen = 63 103 maxNameLen = 10 104 nameSeparator = '--' 105 106 def __init__(self, locator, partSize=50 << 20): 107 """ 108 Create a new job store in AWS or load an existing one from there. 109 110 :param int partSize: The size of each individual part used for multipart operations like 111 upload and copy, must be >= 5 MiB but large enough to not exceed 10k parts for the 112 whole file 113 """ 114 super(AWSJobStore, self).__init__() 115 region, namePrefix = locator.split(':') 116 regions = EC2Regions.keys() 117 if region not in regions: 118 raise ValueError(f'Region "{region}" is not one of: {regions}') 119 if not self.bucketNameRe.match(namePrefix): 120 raise ValueError("Invalid name prefix '%s'. Name prefixes must contain only digits, " 121 "hyphens or lower-case letters and must not start or end in a " 122 "hyphen." % namePrefix) 123 # Reserve 13 for separator and suffix 124 if len(namePrefix) > self.maxBucketNameLen - self.maxNameLen - len(self.nameSeparator): 125 raise ValueError("Invalid name prefix '%s'. Name prefixes may not be longer than 50 " 126 "characters." % namePrefix) 127 if '--' in namePrefix: 128 raise ValueError("Invalid name prefix '%s'. Name prefixes may not contain " 129 "%s." % (namePrefix, self.nameSeparator)) 130 logger.debug("Instantiating %s for region %s and name prefix '%s'", 131 self.__class__, region, namePrefix) 132 self.locator = locator 133 self.region = region 134 self.namePrefix = namePrefix 135 self.partSize = partSize 136 self.jobsDomain = None 137 self.filesDomain = None 138 self.filesBucket = None 139 self.db = self._connectSimpleDB() 140 141 self.s3_resource = boto3_session.resource('s3', region_name=self.region) 142 self.s3_client = self.s3_resource.meta.client 143 144 def initialize(self, config): 145 if self._registered: 146 raise JobStoreExistsException(self.locator) 147 self._registered = None 148 try: 149 self._bind(create=True) 150 except: 151 with panic(logger): 152 self.destroy() 153 else: 154 super(AWSJobStore, self).initialize(config) 155 # Only register after job store has been full initialized 156 self._registered = True 157 158 @property 159 def sseKeyPath(self): 160 return self.config.sseKey 161 162 def resume(self): 163 if not self._registered: 164 raise NoSuchJobStoreException(self.locator) 165 self._bind(create=False) 166 super(AWSJobStore, self).resume() 167 168 def _bind(self, create=False, block=True, check_versioning_consistency=True): 169 def qualify(name): 170 assert len(name) <= self.maxNameLen 171 return self.namePrefix + self.nameSeparator + name 172 173 # The order in which this sequence of events happens is important. We can easily handle the 174 # inability to bind a domain, but it is a little harder to handle some cases of binding the 175 # jobstore bucket. Maintaining this order allows for an easier `destroy` method. 176 if self.jobsDomain is None: 177 self.jobsDomain = self._bindDomain(qualify('jobs'), create=create, block=block) 178 if self.filesDomain is None: 179 self.filesDomain = self._bindDomain(qualify('files'), create=create, block=block) 180 if self.filesBucket is None: 181 self.filesBucket = self._bindBucket(qualify('files'), 182 create=create, 183 block=block, 184 versioning=True, 185 check_versioning_consistency=check_versioning_consistency) 186 187 @property 188 def _registered(self): 189 """ 190 A optional boolean property indicating whether this job store is registered. The 191 registry is the authority on deciding if a job store exists or not. If True, this job 192 store exists, if None the job store is transitioning from True to False or vice versa, 193 if False the job store doesn't exist. 194 195 :type: bool|None 196 """ 197 # The weird mapping of the SDB item attribute value to the property value is due to 198 # backwards compatibility. 'True' becomes True, that's easy. Toil < 3.3.0 writes this at 199 # the end of job store creation. Absence of either the registry, the item or the 200 # attribute becomes False, representing a truly absent, non-existing job store. An 201 # attribute value of 'False', which is what Toil < 3.3.0 writes at the *beginning* of job 202 # store destruction, indicates a job store in transition, reflecting the fact that 3.3.0 203 # may leak buckets or domains even though the registry reports 'False' for them. We 204 # can't handle job stores that were partially created by 3.3.0, though. 205 registry_domain = self._bindDomain(domain_name='toil-registry', 206 create=False, 207 block=False) 208 if registry_domain is None: 209 return False 210 else: 211 for attempt in retry_sdb(): 212 with attempt: 213 attributes = registry_domain.get_attributes(item_name=self.namePrefix, 214 attribute_name='exists', 215 consistent_read=True) 216 try: 217 exists = attributes['exists'] 218 except KeyError: 219 return False 220 else: 221 if exists == 'True': 222 return True 223 elif exists == 'False': 224 return None 225 else: 226 assert False 227 228 @_registered.setter 229 def _registered(self, value): 230 231 registry_domain = self._bindDomain(domain_name='toil-registry', 232 # Only create registry domain when registering or 233 # transitioning a store 234 create=value is not False, 235 block=False) 236 if registry_domain is None and value is False: 237 pass 238 else: 239 for attempt in retry_sdb(): 240 with attempt: 241 if value is False: 242 registry_domain.delete_attributes(item_name=self.namePrefix) 243 else: 244 if value is True: 245 attributes = dict(exists='True') 246 elif value is None: 247 attributes = dict(exists='False') 248 else: 249 assert False 250 registry_domain.put_attributes(item_name=self.namePrefix, 251 attributes=attributes) 252 253 def _checkItem(self, item, enforce: bool = True): 254 """ 255 Make sure that the given SimpleDB item actually has the attributes we think it should. 256 257 Throw otherwise. 258 259 If enforce is false, log but don't throw. 260 """ 261 262 if "overlargeID" not in item: 263 logger.error("overlargeID attribute isn't present: either SimpleDB entry is " 264 "corrupt or jobstore is from an extremely old Toil: %s", item) 265 if enforce: 266 raise RuntimeError("encountered SimpleDB entry missing required attribute " 267 "'overlargeID'; is your job store ancient?") 268 269 def _awsJobFromItem(self, item): 270 self._checkItem(item) 271 if item.get("overlargeID", None): 272 assert self.fileExists(item["overlargeID"]) 273 # This is an overlarge job, download the actual attributes 274 # from the file store 275 logger.debug("Loading overlarge job from S3.") 276 with self.readFileStream(item["overlargeID"]) as fh: 277 binary = fh.read() 278 else: 279 binary, _ = SDBHelper.attributesToBinary(item) 280 assert binary is not None 281 job = pickle.loads(binary) 282 if job is not None: 283 job.assignConfig(self.config) 284 return job 285 286 def _awsJobToItem(self, job): 287 binary = pickle.dumps(job, protocol=pickle.HIGHEST_PROTOCOL) 288 if len(binary) > SDBHelper.maxBinarySize(extraReservedChunks=1): 289 # Store as an overlarge job in S3 290 with self.writeFileStream() as (writable, fileID): 291 writable.write(binary) 292 item = SDBHelper.binaryToAttributes(None) 293 item["overlargeID"] = fileID 294 else: 295 item = SDBHelper.binaryToAttributes(binary) 296 item["overlargeID"] = "" 297 return item 298 299 jobsPerBatchInsert = 25 300 301 @contextmanager 302 def batch(self): 303 self._batchedUpdates = [] 304 yield 305 batches = [self._batchedUpdates[i:i + self.jobsPerBatchInsert] for i in 306 range(0, len(self._batchedUpdates), self.jobsPerBatchInsert)] 307 308 for batch in batches: 309 items = {compat_bytes(jobDescription.jobStoreID): self._awsJobToItem(jobDescription) for jobDescription in batch} 310 for attempt in retry_sdb(): 311 with attempt: 312 assert self.jobsDomain.batch_put_attributes(items) 313 self._batchedUpdates = None 314 315 def assignID(self, jobDescription): 316 jobStoreID = self._newJobID() 317 logger.debug("Assigning ID to job %s for '%s'", 318 jobStoreID, '<no command>' if jobDescription.command is None else jobDescription.command) 319 jobDescription.jobStoreID = jobStoreID 320 321 def create(self, jobDescription): 322 if hasattr(self, "_batchedUpdates") and self._batchedUpdates is not None: 323 self._batchedUpdates.append(jobDescription) 324 else: 325 self.update(jobDescription) 326 return jobDescription 327 328 def exists(self, jobStoreID): 329 for attempt in retry_sdb(): 330 with attempt: 331 return bool(self.jobsDomain.get_attributes( 332 item_name=compat_bytes(jobStoreID), 333 attribute_name=[SDBHelper.presenceIndicator()], 334 consistent_read=True)) 335 336 def jobs(self): 337 result = None 338 for attempt in retry_sdb(): 339 with attempt: 340 result = list(self.jobsDomain.select( 341 consistent_read=True, 342 query="select * from `%s`" % self.jobsDomain.name)) 343 assert result is not None 344 for jobItem in result: 345 yield self._awsJobFromItem(jobItem) 346 347 def load(self, jobStoreID): 348 item = None 349 for attempt in retry_sdb(): 350 with attempt: 351 item = self.jobsDomain.get_attributes(compat_bytes(jobStoreID), consistent_read=True) 352 if not item: 353 raise NoSuchJobException(jobStoreID) 354 job = self._awsJobFromItem(item) 355 if job is None: 356 raise NoSuchJobException(jobStoreID) 357 logger.debug("Loaded job %s", jobStoreID) 358 return job 359 360 def update(self, jobDescription): 361 logger.debug("Updating job %s", jobDescription.jobStoreID) 362 item = self._awsJobToItem(jobDescription) 363 for attempt in retry_sdb(): 364 with attempt: 365 assert self.jobsDomain.put_attributes(compat_bytes(jobDescription.jobStoreID), item) 366 367 itemsPerBatchDelete = 25 368 369 def delete(self, jobStoreID): 370 # remove job and replace with jobStoreId. 371 logger.debug("Deleting job %s", jobStoreID) 372 373 # If the job is overlarge, delete its file from the filestore 374 item = None 375 for attempt in retry_sdb(): 376 with attempt: 377 item = self.jobsDomain.get_attributes(compat_bytes(jobStoreID), consistent_read=True) 378 # If the overlargeID has fallen off, maybe we partially deleted the 379 # attributes of the item? Or raced on it? Or hit SimpleDB being merely 380 # eventually consistent? We should still be able to get rid of it. 381 self._checkItem(item, enforce = False) 382 if item.get("overlargeID", None): 383 logger.debug("Deleting job from filestore") 384 self.deleteFile(item["overlargeID"]) 385 for attempt in retry_sdb(): 386 with attempt: 387 self.jobsDomain.delete_attributes(item_name=compat_bytes(jobStoreID)) 388 items = None 389 for attempt in retry_sdb(): 390 with attempt: 391 items = list(self.filesDomain.select( 392 consistent_read=True, 393 query="select version from `%s` where ownerID='%s'" % ( 394 self.filesDomain.name, jobStoreID))) 395 assert items is not None 396 if items: 397 logger.debug("Deleting %d file(s) associated with job %s", len(items), jobStoreID) 398 n = self.itemsPerBatchDelete 399 batches = [items[i:i + n] for i in range(0, len(items), n)] 400 for batch in batches: 401 itemsDict = {item.name: None for item in batch} 402 for attempt in retry_sdb(): 403 with attempt: 404 self.filesDomain.batch_delete_attributes(itemsDict) 405 for item in items: 406 version = item.get('version') 407 for attempt in retry_s3(): 408 with attempt: 409 if version: 410 self.s3_client.delete_object(Bucket=self.filesBucket.name, 411 Key=compat_bytes(item.name), 412 VersionId=version) 413 else: 414 self.s3_client.delete_object(Bucket=self.filesBucket.name, 415 Key=compat_bytes(item.name)) 416 417 def getEmptyFileStoreID(self, jobStoreID=None, cleanup=False, basename=None): 418 info = self.FileInfo.create(jobStoreID if cleanup else None) 419 with info.uploadStream() as _: 420 # Empty 421 pass 422 info.save() 423 logger.debug("Created %r.", info) 424 return info.fileID 425 426 def _importFile(self, otherCls, url, sharedFileName=None, hardlink=False, symlink=False): 427 if issubclass(otherCls, AWSJobStore): 428 srcObj = self._getObjectForUrl(url, existing=True) 429 size = srcObj.content_length 430 if sharedFileName is None: 431 info = self.FileInfo.create(srcObj.key) 432 else: 433 self._requireValidSharedFileName(sharedFileName) 434 jobStoreFileID = self._sharedFileID(sharedFileName) 435 info = self.FileInfo.loadOrCreate(jobStoreFileID=jobStoreFileID, 436 ownerID=str(self.sharedFileOwnerID), 437 encrypted=None) 438 info.copyFrom(srcObj) 439 info.save() 440 return FileID(info.fileID, size) if sharedFileName is None else None 441 else: 442 return super(AWSJobStore, self)._importFile(otherCls, url, 443 sharedFileName=sharedFileName) 444 445 def _exportFile(self, otherCls, jobStoreFileID, url): 446 if issubclass(otherCls, AWSJobStore): 447 dstObj = self._getObjectForUrl(url) 448 info = self.FileInfo.loadOrFail(jobStoreFileID) 449 info.copyTo(dstObj) 450 else: 451 super(AWSJobStore, self)._defaultExportFile(otherCls, jobStoreFileID, url) 452 453 @classmethod 454 def getSize(cls, url): 455 return cls._getObjectForUrl(url, existing=True).content_length 456 457 @classmethod 458 def _readFromUrl(cls, url, writable): 459 srcObj = cls._getObjectForUrl(url, existing=True) 460 srcObj.download_fileobj(writable) 461 return ( 462 srcObj.content_length, 463 False # executable bit is always False 464 ) 465 466 @classmethod 467 def _writeToUrl(cls, readable, url, executable=False): 468 dstObj = cls._getObjectForUrl(url) 469 470 logger.debug("Uploading %s", dstObj.key) 471 # uploadFile takes care of using multipart upload if the file is larger than partSize (default to 5MB) 472 uploadFile(readable=readable, 473 resource=s3_boto3_resource, 474 bucketName=dstObj.bucket_name, 475 fileID=dstObj.key, 476 partSize=5 * 1000 * 1000) 477 478 @staticmethod 479 def _getObjectForUrl(url, existing: Optional[bool] = None): 480 """ 481 Extracts a key (object) from a given s3:// URL. 482 483 :param bool existing: If True, key is expected to exist. If False, key is expected not to 484 exists and it will be created. If None, the key will be created if it doesn't exist. 485 486 :rtype: S3.Object 487 """ 488 keyName = url.path[1:] 489 bucketName = url.netloc 490 491 botoargs = {} 492 host = os.environ.get('TOIL_S3_HOST', None) 493 port = os.environ.get('TOIL_S3_PORT', None) 494 protocol = 'https' 495 if os.environ.get('TOIL_S3_USE_SSL', True) == 'False': 496 protocol = 'http' 497 if host: 498 botoargs['endpoint_url'] = f'{protocol}://{host}' + f':{port}' if port else '' 499 500 # TODO: OrdinaryCallingFormat equivalent in boto3? 501 # if botoargs: 502 # botoargs['calling_format'] = boto.s3.connection.OrdinaryCallingFormat() 503 504 # Get the bucket's region to avoid a redirect per request 505 region = AWSJobStore.getBucketRegion(bucketName) 506 s3 = boto3_session.resource('s3', region_name=region, **botoargs) 507 obj = s3.Object(bucketName, keyName) 508 objExists = True 509 510 try: 511 obj.load() 512 except ClientError as e: 513 if e.response.get('ResponseMetadata', {}).get('HTTPStatusCode') == 404: 514 objExists = False 515 else: 516 raise 517 if existing is True and not objExists: 518 raise RuntimeError(f"Key '{keyName}' does not exist in bucket '{bucketName}'.") 519 elif existing is False and objExists: 520 raise RuntimeError("Key '{keyName}' exists in bucket '{bucketName}'.") 521 522 if not objExists: 523 obj.put() # write an empty file 524 return obj 525 526 @classmethod 527 def _supportsUrl(cls, url, export=False): 528 return url.scheme.lower() == 's3' 529 530 def writeFile(self, localFilePath, jobStoreID=None, cleanup=False): 531 info = self.FileInfo.create(jobStoreID if cleanup else None) 532 info.upload(localFilePath, not self.config.disableJobStoreChecksumVerification) 533 info.save() 534 logger.debug("Wrote %r of from %r", info, localFilePath) 535 return info.fileID 536 537 @contextmanager 538 def writeFileStream(self, jobStoreID=None, cleanup=False, basename=None, encoding=None, errors=None): 539 info = self.FileInfo.create(jobStoreID if cleanup else None) 540 with info.uploadStream(encoding=encoding, errors=errors) as writable: 541 yield writable, info.fileID 542 info.save() 543 logger.debug("Wrote %r.", info) 544 545 @contextmanager 546 def writeSharedFileStream(self, sharedFileName, isProtected=None, encoding=None, errors=None): 547 self._requireValidSharedFileName(sharedFileName) 548 info = self.FileInfo.loadOrCreate(jobStoreFileID=self._sharedFileID(sharedFileName), 549 ownerID=str(self.sharedFileOwnerID), 550 encrypted=isProtected) 551 with info.uploadStream(encoding=encoding, errors=errors) as writable: 552 yield writable 553 info.save() 554 logger.debug("Wrote %r for shared file %r.", info, sharedFileName) 555 556 def updateFile(self, jobStoreFileID, localFilePath): 557 info = self.FileInfo.loadOrFail(jobStoreFileID) 558 info.upload(localFilePath, not self.config.disableJobStoreChecksumVerification) 559 info.save() 560 logger.debug("Wrote %r from path %r.", info, localFilePath) 561 562 @contextmanager 563 def updateFileStream(self, jobStoreFileID, encoding=None, errors=None): 564 info = self.FileInfo.loadOrFail(jobStoreFileID) 565 with info.uploadStream(encoding=encoding, errors=errors) as writable: 566 yield writable 567 info.save() 568 logger.debug("Wrote %r from stream.", info) 569 570 def fileExists(self, jobStoreFileID): 571 return self.FileInfo.exists(jobStoreFileID) 572 573 def getFileSize(self, jobStoreFileID): 574 if not self.fileExists(jobStoreFileID): 575 return 0 576 info = self.FileInfo.loadOrFail(jobStoreFileID) 577 return info.getSize() 578 579 def readFile(self, jobStoreFileID, localFilePath, symlink=False): 580 info = self.FileInfo.loadOrFail(jobStoreFileID) 581 logger.debug("Reading %r into %r.", info, localFilePath) 582 info.download(localFilePath, not self.config.disableJobStoreChecksumVerification) 583 if getattr(jobStoreFileID, 'executable', False): 584 os.chmod(localFilePath, os.stat(localFilePath).st_mode | stat.S_IXUSR) 585 586 @contextmanager 587 def readFileStream(self, jobStoreFileID, encoding=None, errors=None): 588 info = self.FileInfo.loadOrFail(jobStoreFileID) 589 logger.debug("Reading %r into stream.", info) 590 with info.downloadStream(encoding=encoding, errors=errors) as readable: 591 yield readable 592 593 @contextmanager 594 def readSharedFileStream(self, sharedFileName, encoding=None, errors=None): 595 self._requireValidSharedFileName(sharedFileName) 596 jobStoreFileID = self._sharedFileID(sharedFileName) 597 info = self.FileInfo.loadOrFail(jobStoreFileID, customName=sharedFileName) 598 logger.debug("Reading %r for shared file %r into stream.", info, sharedFileName) 599 with info.downloadStream(encoding=encoding, errors=errors) as readable: 600 yield readable 601 602 def deleteFile(self, jobStoreFileID): 603 info = self.FileInfo.load(jobStoreFileID) 604 if info is None: 605 logger.debug("File %s does not exist, skipping deletion.", jobStoreFileID) 606 else: 607 info.delete() 608 609 def writeStatsAndLogging(self, statsAndLoggingString): 610 info = self.FileInfo.create(str(self.statsFileOwnerID)) 611 with info.uploadStream(multipart=False) as writeable: 612 if isinstance(statsAndLoggingString, str): 613 # This stream is for binary data, so encode any non-encoded things 614 statsAndLoggingString = statsAndLoggingString.encode('utf-8', errors='ignore') 615 writeable.write(statsAndLoggingString) 616 info.save() 617 618 def readStatsAndLogging(self, callback, readAll=False): 619 itemsProcessed = 0 620 621 for info in self._readStatsAndLogging(callback, self.statsFileOwnerID): 622 info._ownerID = self.readStatsFileOwnerID 623 info.save() 624 itemsProcessed += 1 625 626 if readAll: 627 for _ in self._readStatsAndLogging(callback, self.readStatsFileOwnerID): 628 itemsProcessed += 1 629 630 return itemsProcessed 631 632 def _readStatsAndLogging(self, callback, ownerId): 633 items = None 634 for attempt in retry_sdb(): 635 with attempt: 636 items = list(self.filesDomain.select( 637 consistent_read=True, 638 query="select * from `%s` where ownerID='%s'" % ( 639 self.filesDomain.name, str(ownerId)))) 640 assert items is not None 641 for item in items: 642 info = self.FileInfo.fromItem(item) 643 with info.downloadStream() as readable: 644 callback(readable) 645 yield info 646 647 # TODO: Make this retry more specific? 648 # example: https://github.com/DataBiosphere/toil/issues/3378 649 @retry() 650 def getPublicUrl(self, jobStoreFileID): 651 info = self.FileInfo.loadOrFail(jobStoreFileID) 652 if info.content is not None: 653 with info.uploadStream(allowInlining=False) as f: 654 f.write(info.content) 655 656 self.filesBucket.Object(compat_bytes(jobStoreFileID)).Acl().put(ACL='public-read') 657 658 url = self.s3_client.generate_presigned_url('get_object', 659 Params={'Bucket': self.filesBucket.name, 660 'Key': compat_bytes(jobStoreFileID), 661 'VersionId': info.version}, 662 ExpiresIn=self.publicUrlExpiration.total_seconds()) 663 664 # boto doesn't properly remove the x-amz-security-token parameter when 665 # query_auth is False when using an IAM role (see issue #2043). Including the 666 # x-amz-security-token parameter without the access key results in a 403, 667 # even if the resource is public, so we need to remove it. 668 scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url) 669 params = urllib.parse.parse_qs(query) 670 if 'x-amz-security-token' in params: 671 del params['x-amz-security-token'] 672 if 'AWSAccessKeyId' in params: 673 del params['AWSAccessKeyId'] 674 if 'Signature' in params: 675 del params['Signature'] 676 query = urllib.parse.urlencode(params, doseq=True) 677 url = urllib.parse.urlunsplit((scheme, netloc, path, query, fragment)) 678 return url 679 680 def getSharedPublicUrl(self, sharedFileName): 681 self._requireValidSharedFileName(sharedFileName) 682 return self.getPublicUrl(self._sharedFileID(sharedFileName)) 683 684 def _connectSimpleDB(self): 685 """ 686 :rtype: SDBConnection 687 """ 688 db = boto.sdb.connect_to_region(self.region) 689 if db is None: 690 raise ValueError("Could not connect to SimpleDB. Make sure '%s' is a valid SimpleDB region." % self.region) 691 monkeyPatchSdbConnection(db) 692 return db 693 694 def _bindBucket(self, 695 bucket_name: str, 696 create: bool = False, 697 block: bool = True, 698 versioning: bool = False, 699 check_versioning_consistency: bool = True): 700 """ 701 Return the Boto Bucket object representing the S3 bucket with the given name. If the 702 bucket does not exist and `create` is True, it will be created. 703 704 :param str bucket_name: the name of the bucket to bind to 705 706 :param bool create: Whether to create bucket the if it doesn't exist 707 708 :param bool block: If False, return None if the bucket doesn't exist. If True, wait until 709 bucket appears. Ignored if `create` is True. 710 711 :rtype: Bucket|None 712 :raises botocore.exceptions.ClientError: If `block` is True and the bucket still doesn't exist after the 713 retry timeout expires. 714 """ 715 assert self.minBucketNameLen <= len(bucket_name) <= self.maxBucketNameLen 716 assert self.bucketNameRe.match(bucket_name) 717 logger.debug("Binding to job store bucket '%s'.", bucket_name) 718 719 def bucket_creation_pending(error): 720 # https://github.com/BD2KGenomics/toil/issues/955 721 # https://github.com/BD2KGenomics/toil/issues/995 722 # https://github.com/BD2KGenomics/toil/issues/1093 723 724 # BucketAlreadyOwnedByYou == 409 725 # OperationAborted == 409 726 # NoSuchBucket == 404 727 return (isinstance(error, ClientError) and 728 error.response.get('ResponseMetadata', {}).get('HTTPStatusCode') in (404, 409)) 729 730 bucketExisted = True 731 for attempt in retry_s3(predicate=bucket_creation_pending): 732 with attempt: 733 try: 734 # the head_bucket() call makes sure that the bucket exists and the user can access it 735 self.s3_client.head_bucket(Bucket=bucket_name) 736 737 bucket = self.s3_resource.Bucket(bucket_name) 738 except ClientError as e: 739 error_code = e.response.get('ResponseMetadata', {}).get('HTTPStatusCode') 740 if error_code == 404: 741 bucketExisted = False 742 logger.debug("Bucket '%s' does not exist.", bucket_name) 743 if create: 744 logger.debug("Creating bucket '%s'.", bucket_name) 745 location = region_to_bucket_location(self.region) 746 bucket = self.s3_resource.create_bucket( 747 Bucket=bucket_name, 748 CreateBucketConfiguration={'LocationConstraint': location}) 749 # Wait until the bucket exists before checking the region and adding tags 750 bucket.wait_until_exists() 751 752 # It is possible for create_bucket to return but 753 # for an immediate request for the bucket region to 754 # produce an S3ResponseError with code 755 # NoSuchBucket. We let that kick us back up to the 756 # main retry loop. 757 assert self.getBucketRegion(bucket_name) == self.region 758 759 owner_tag = os.environ.get('TOIL_OWNER_TAG') 760 if owner_tag: 761 bucket_tagging = self.s3_resource.BucketTagging(bucket_name) 762 bucket_tagging.put(Tagging={'TagSet': [{'Key': 'Owner', 'Value': owner_tag}]}) 763 elif block: 764 raise 765 else: 766 return None 767 elif error_code == 301: 768 # This is raised if the user attempts to get a bucket in a region outside 769 # the specified one, if the specified one is not `us-east-1`. The us-east-1 770 # server allows a user to use buckets from any region. 771 raise BucketLocationConflictException(self.getBucketRegion(bucket_name)) 772 else: 773 raise 774 else: 775 bucketRegion = self.getBucketRegion(bucket_name) 776 if bucketRegion != self.region: 777 raise BucketLocationConflictException(bucketRegion) 778 779 if versioning and not bucketExisted: 780 # only call this method on bucket creation 781 bucket.Versioning().enable() 782 # Now wait until versioning is actually on. Some uploads 783 # would come back with no versions; maybe they were 784 # happening too fast and this setting isn't sufficiently 785 # consistent? 786 time.sleep(1) 787 while not self._getBucketVersioning(bucket_name): 788 logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...") 789 time.sleep(1) 790 elif check_versioning_consistency: 791 # now test for versioning consistency 792 # we should never see any of these errors since 'versioning' should always be true 793 bucket_versioning = self._getBucketVersioning(bucket_name) 794 if bucket_versioning != versioning: 795 assert False, 'Cannot modify versioning on existing bucket' 796 elif bucket_versioning is None: 797 assert False, 'Cannot use a bucket with versioning suspended' 798 if bucketExisted: 799 logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.") 800 else: 801 logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.") 802 803 return bucket 804 805 def _bindDomain(self, domain_name, create=False, block=True): 806 """ 807 Return the Boto Domain object representing the SDB domain of the given name. If the 808 domain does not exist and `create` is True, it will be created. 809 810 :param str domain_name: the name of the domain to bind to 811 812 :param bool create: True if domain should be created if it doesn't exist 813 814 :param bool block: If False, return None if the domain doesn't exist. If True, wait until 815 domain appears. This parameter is ignored if create is True. 816 817 :rtype: Domain|None 818 :raises SDBResponseError: If `block` is True and the domain still doesn't exist after the 819 retry timeout expires. 820 """ 821 logger.debug("Binding to job store domain '%s'.", domain_name) 822 retryargs = dict(predicate=lambda e: no_such_sdb_domain(e) or sdb_unavailable(e)) 823 if not block: 824 retryargs['timeout'] = 15 825 for attempt in retry_sdb(**retryargs): 826 with attempt: 827 try: 828 return self.db.get_domain(domain_name) 829 except SDBResponseError as e: 830 if no_such_sdb_domain(e): 831 if create: 832 return self.db.create_domain(domain_name) 833 elif block: 834 raise 835 else: 836 return None 837 else: 838 raise 839 840 def _newJobID(self): 841 return str(uuid.uuid4()) 842 843 # A dummy job ID under which all shared files are stored 844 sharedFileOwnerID = uuid.UUID('891f7db6-e4d9-4221-a58e-ab6cc4395f94') 845 846 # A dummy job ID under which all unread stats files are stored 847 statsFileOwnerID = uuid.UUID('bfcf5286-4bc7-41ef-a85d-9ab415b69d53') 848 849 # A dummy job ID under which all read stats files are stored 850 readStatsFileOwnerID = uuid.UUID('e77fc3aa-d232-4255-ae04-f64ee8eb0bfa') 851 852 def _sharedFileID(self, sharedFileName): 853 return str(uuid.uuid5(self.sharedFileOwnerID, sharedFileName)) 854 855 @InnerClass 856 class FileInfo(SDBHelper): 857 """ 858 Represents a file in this job store. 859 """ 860 outer = None 861 """ 862 :type: AWSJobStore 863 """ 864 865 def __init__(self, fileID, ownerID, encrypted, 866 version=None, content=None, numContentChunks=0, checksum=None): 867 """ 868 :type fileID: str 869 :param fileID: the file's ID 870 871 :type ownerID: str 872 :param ownerID: ID of the entity owning this file, typically a job ID aka jobStoreID 873 874 :type encrypted: bool 875 :param encrypted: whether the file is stored in encrypted form 876 877 :type version: str|None 878 :param version: a non-empty string containing the most recent version of the S3 879 object storing this file's content, None if the file is new, or empty string if the 880 file is inlined. 881 882 :type content: str|None 883 :param content: this file's inlined content 884 885 :type numContentChunks: int 886 :param numContentChunks: the number of SDB domain attributes occupied by this files 887 888 :type checksum: str|None 889 :param checksum: the checksum of the file, if available. Formatted 890 as <algorithm>$<lowercase hex hash>. 891 892 inlined content. Note that an inlined empty string still occupies one chunk. 893 """ 894 super(AWSJobStore.FileInfo, self).__init__() 895 self._fileID = fileID 896 self._ownerID = ownerID 897 self.encrypted = encrypted 898 self._version = version 899 self._previousVersion = version 900 self._content = content 901 self._checksum = checksum 902 self._numContentChunks = numContentChunks 903 904 @property 905 def fileID(self): 906 return self._fileID 907 908 @property 909 def ownerID(self): 910 return self._ownerID 911 912 @property 913 def version(self): 914 return self._version 915 916 @version.setter 917 def version(self, version): 918 # Version should only change once 919 assert self._previousVersion == self._version 920 self._version = version 921 if version: 922 self.content = None 923 924 @property 925 def previousVersion(self): 926 return self._previousVersion 927 928 @property 929 def content(self): 930 return self._content 931 932 @property 933 def checksum(self): 934 return self._checksum 935 936 @checksum.setter 937 def checksum(self, checksum): 938 self._checksum = checksum 939 940 @content.setter 941 def content(self, content): 942 assert content is None or isinstance(content, bytes) 943 self._content = content 944 if content is not None: 945 self.version = '' 946 947 @classmethod 948 def create(cls, ownerID): 949 return cls(str(uuid.uuid4()), ownerID, encrypted=cls.outer.sseKeyPath is not None) 950 951 @classmethod 952 def presenceIndicator(cls): 953 return 'encrypted' 954 955 @classmethod 956 def exists(cls, jobStoreFileID): 957 for attempt in retry_sdb(): 958 with attempt: 959 return bool(cls.outer.filesDomain.get_attributes( 960 item_name=compat_bytes(jobStoreFileID), 961 attribute_name=[cls.presenceIndicator()], 962 consistent_read=True)) 963 964 @classmethod 965 def load(cls, jobStoreFileID): 966 for attempt in retry_sdb(): 967 with attempt: 968 self = cls.fromItem( 969 cls.outer.filesDomain.get_attributes(item_name=compat_bytes(jobStoreFileID), 970 consistent_read=True)) 971 return self 972 973 @classmethod 974 def loadOrCreate(cls, jobStoreFileID, ownerID, encrypted): 975 self = cls.load(jobStoreFileID) 976 if encrypted is None: 977 encrypted = cls.outer.sseKeyPath is not None 978 if self is None: 979 self = cls(jobStoreFileID, ownerID, encrypted=encrypted) 980 else: 981 assert self.fileID == jobStoreFileID 982 assert self.ownerID == ownerID 983 self.encrypted = encrypted 984 return self 985 986 @classmethod 987 def loadOrFail(cls, jobStoreFileID, customName=None): 988 """ 989 :rtype: AWSJobStore.FileInfo 990 :return: an instance of this class representing the file with the given ID 991 :raises NoSuchFileException: if given file does not exist 992 """ 993 self = cls.load(jobStoreFileID) 994 if self is None: 995 raise NoSuchFileException(jobStoreFileID, customName=customName) 996 else: 997 return self 998 999 @classmethod 1000 def fromItem(cls, item): 1001 """ 1002 Convert an SDB item to an instance of this class. 1003 1004 :type item: Item 1005 """ 1006 assert item is not None 1007 1008 # Strings come back from SDB as unicode 1009 def strOrNone(s): 1010 return s if s is None else str(s) 1011 1012 # ownerID and encrypted are the only mandatory attributes 1013 ownerID = strOrNone(item.get('ownerID')) 1014 encrypted = item.get('encrypted') 1015 if ownerID is None: 1016 assert encrypted is None 1017 return None 1018 else: 1019 version = strOrNone(item['version']) 1020 checksum = strOrNone(item.get('checksum')) 1021 encrypted = strict_bool(encrypted) 1022 content, numContentChunks = cls.attributesToBinary(item) 1023 if encrypted: 1024 sseKeyPath = cls.outer.sseKeyPath 1025 if sseKeyPath is None: 1026 raise AssertionError('Content is encrypted but no key was provided.') 1027 if content is not None: 1028 content = encryption.decrypt(content, sseKeyPath) 1029 self = cls(fileID=item.name, ownerID=ownerID, encrypted=encrypted, version=version, 1030 content=content, numContentChunks=numContentChunks, checksum=checksum) 1031 return self 1032 1033 def toItem(self): 1034 """ 1035 Convert this instance to an attribute dictionary suitable for SDB put_attributes(). 1036 1037 :rtype: (dict,int) 1038 1039 :return: the attributes dict and an integer specifying the the number of chunk 1040 attributes in the dictionary that are used for storing inlined content. 1041 """ 1042 content = self.content 1043 assert content is None or isinstance(content, bytes) 1044 if self.encrypted and content is not None: 1045 sseKeyPath = self.outer.sseKeyPath 1046 if sseKeyPath is None: 1047 raise AssertionError('Encryption requested but no key was provided.') 1048 content = encryption.encrypt(content, sseKeyPath) 1049 assert content is None or isinstance(content, bytes) 1050 attributes = self.binaryToAttributes(content) 1051 numChunks = attributes['numChunks'] 1052 attributes.update(dict(ownerID=self.ownerID, 1053 encrypted=self.encrypted, 1054 version=self.version or '', 1055 checksum=self.checksum or '')) 1056 return attributes, numChunks 1057 1058 @classmethod 1059 def _reservedAttributes(cls): 1060 return 3 + super(AWSJobStore.FileInfo, cls)._reservedAttributes() 1061 1062 @staticmethod 1063 def maxInlinedSize(): 1064 return 256 1065 1066 def save(self): 1067 attributes, numNewContentChunks = self.toItem() 1068 # False stands for absence 1069 expected = ['version', False if self.previousVersion is None else self.previousVersion] 1070 try: 1071 for attempt in retry_sdb(): 1072 with attempt: 1073 assert self.outer.filesDomain.put_attributes(item_name=compat_bytes(self.fileID), 1074 attributes=attributes, 1075 expected_value=expected) 1076 # clean up the old version of the file if necessary and safe 1077 if self.previousVersion and (self.previousVersion != self.version): 1078 for attempt in retry_s3(): 1079 with attempt: 1080 self.outer.s3_client.delete_object(Bucket=self.outer.filesBucket.name, 1081 Key=compat_bytes(self.fileID), 1082 VersionId=self.previousVersion) 1083 self._previousVersion = self._version 1084 if numNewContentChunks < self._numContentChunks: 1085 residualChunks = range(numNewContentChunks, self._numContentChunks) 1086 attributes = [self._chunkName(i) for i in residualChunks] 1087 for attempt in retry_sdb(): 1088 with attempt: 1089 self.outer.filesDomain.delete_attributes(compat_bytes(self.fileID), 1090 attributes=attributes) 1091 self._numContentChunks = numNewContentChunks 1092 except SDBResponseError as e: 1093 if e.error_code == 'ConditionalCheckFailed': 1094 raise ConcurrentFileModificationException(self.fileID) 1095 else: 1096 raise 1097 1098 def upload(self, localFilePath, calculateChecksum=True): 1099 file_size, file_time = fileSizeAndTime(localFilePath) 1100 if file_size <= self.maxInlinedSize(): 1101 with open(localFilePath, 'rb') as f: 1102 self.content = f.read() 1103 # Clear out any old checksum in case of overwrite 1104 self.checksum = '' 1105 else: 1106 headerArgs = self._s3EncryptionArgs() 1107 # Create a new Resource in case it needs to be on its own thread 1108 resource = boto3_session.resource('s3', region_name=self.outer.region) 1109 1110 self.checksum = self._get_file_checksum(localFilePath) if calculateChecksum else None 1111 self.version = uploadFromPath(localFilePath, 1112 resource=resource, 1113 bucketName=self.outer.filesBucket.name, 1114 fileID=compat_bytes(self.fileID), 1115 headerArgs=headerArgs, 1116 partSize=self.outer.partSize) 1117 1118 def _start_checksum(self, to_match=None, algorithm='sha1'): 1119 """ 1120 Get a hasher that can be used with _update_checksum and 1121 _finish_checksum. 1122 1123 If to_match is set, it is a precomputed checksum which we expect 1124 the result to match. 1125 1126 The right way to compare checksums is to feed in the checksum to be 1127 matched, so we can see its algorithm, instead of getting a new one 1128 and comparing. If a checksum to match is fed in, _finish_checksum() 1129 will raise a ChecksumError if it isn't matched. 1130 """ 1131 1132 # If we have an expexted result it will go here. 1133 expected = None 1134 1135 if to_match is not None: 1136 parts = to_match.split('$') 1137 algorithm = parts[0] 1138 expected = parts[1] 1139 1140 wrapped = getattr(hashlib, algorithm)() 1141 logger.debug(f'Starting {algorithm} checksum to match {expected}') 1142 return algorithm, wrapped, expected 1143 1144 def _update_checksum(self, checksum_in_progress, data): 1145 """ 1146 Update a checksum in progress from _start_checksum with new data. 1147 """ 1148 checksum_in_progress[1].update(data) 1149 1150 def _finish_checksum(self, checksum_in_progress): 1151 """ 1152 Complete a checksum in progress from _start_checksum and return the 1153 checksum result string. 1154 """ 1155 1156 result_hash = checksum_in_progress[1].hexdigest() 1157 1158 logger.debug(f'Completed checksum with hash {result_hash} vs. expected {checksum_in_progress[2]}') 1159 if checksum_in_progress[2] is not None: 1160 # We expected a particular hash 1161 if result_hash != checksum_in_progress[2]: 1162 raise ChecksumError('Checksum mismatch. Expected: %s Actual: %s' % 1163 (checksum_in_progress[2], result_hash)) 1164 1165 return '$'.join([checksum_in_progress[0], result_hash]) 1166 1167 def _get_file_checksum(self, localFilePath, to_match=None): 1168 with open(localFilePath, 'rb') as f: 1169 hasher = self._start_checksum(to_match=to_match) 1170 contents = f.read(1024 * 1024) 1171 while contents != b'': 1172 self._update_checksum(hasher, contents) 1173 contents = f.read(1024 * 1024) 1174 return self._finish_checksum(hasher) 1175 1176 @contextmanager 1177 def uploadStream(self, multipart=True, allowInlining=True, encoding=None, errors=None): 1178 """ 1179 Context manager that gives out a binary or text mode upload stream to upload data. 1180 """ 1181 1182 # Note that we have to handle already having a content or a version 1183 # if we are overwriting something. 1184 1185 # But make sure we don't have both. 1186 assert not (bool(self.version) and self.content is not None) 1187 1188 info = self 1189 store = self.outer 1190 1191 class MultiPartPipe(WritablePipe): 1192 def readFrom(self, readable): 1193 # Get the first block of data we want to put 1194 buf = readable.read(store.partSize) 1195 assert isinstance(buf, bytes) 1196 1197 if allowInlining and len(buf) <= info.maxInlinedSize(): 1198 logger.debug('Inlining content of %d bytes', len(buf)) 1199 info.content = buf 1200 # There will be no checksum 1201 info.checksum = '' 1202 else: 1203 # We will compute a checksum 1204 hasher = info._start_checksum() 1205 logger.debug('Updating checksum with %d bytes', len(buf)) 1206 info._update_checksum(hasher, buf) 1207 1208 client = store.s3_client 1209 bucket_name = store.filesBucket.name 1210 headerArgs = info._s3EncryptionArgs() 1211 1212 for attempt in retry_s3(): 1213 with attempt: 1214 logger.debug('Starting multipart upload') 1215 # low-level clients are thread safe 1216 upload = client.create_multipart_upload(Bucket=bucket_name, 1217 Key=compat_bytes(info.fileID), 1218 **headerArgs) 1219 uploadId = upload['UploadId'] 1220 parts = [] 1221 logger.debug('Multipart upload started as %s', uploadId) 1222 1223 for i in range(CONSISTENCY_TICKS): 1224 # Sometimes we can create a multipart upload and not see it. Wait around for it. 1225 response = client.list_multipart_uploads(Bucket=bucket_name, 1226 MaxUploads=1, 1227 Prefix=compat_bytes(info.fileID)) 1228 if len(response['Uploads']) != 0 and response['Uploads'][0]['UploadId'] == uploadId: 1229 logger.debug('Multipart upload visible as %s', uploadId) 1230 break 1231 else: 1232 logger.debug('Multipart upload %s is not visible; we see %s', uploadId, response['Uploads']) 1233 time.sleep(CONSISTENCY_TIME * 2 ** i) 1234 1235 try: 1236 for part_num in itertools.count(): 1237 for attempt in retry_s3(): 1238 with attempt: 1239 logger.debug('Uploading part %d of %d bytes to %s', part_num + 1, len(buf), uploadId) 1240 # TODO: include the Content-MD5 header: 1241 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.complete_multipart_upload 1242 part = client.upload_part(Bucket=bucket_name, 1243 Key=compat_bytes(info.fileID), 1244 PartNumber=part_num + 1, 1245 UploadId=uploadId, 1246 Body=BytesIO(buf), 1247 **headerArgs) 1248 1249 parts.append({"PartNumber": part_num + 1, "ETag": part["ETag"]}) 1250 1251 # Get the next block of data we want to put 1252 buf = readable.read(info.outer.partSize) 1253 assert isinstance(buf, bytes) 1254 if len(buf) == 0: 1255 # Don't allow any part other than the very first to be empty. 1256 break 1257 info._update_checksum(hasher, buf) 1258 except: 1259 with panic(log=logger): 1260 for attempt in retry_s3(): 1261 with attempt: 1262 client.abort_multipart_upload(Bucket=bucket_name, 1263 Key=compat_bytes(info.fileID), 1264 UploadId=uploadId) 1265 1266 else: 1267 1268 while not store._getBucketVersioning(store.filesBucket.name): 1269 logger.warning('Versioning does not appear to be enabled yet. Deferring multipart ' 1270 'upload completion...') 1271 time.sleep(1) 1272 1273 # Save the checksum 1274 info.checksum = info._finish_checksum(hasher) 1275 1276 for attempt in retry_s3(): 1277 with attempt: 1278 logger.debug('Attempting to complete upload...') 1279 completed = client.complete_multipart_upload( 1280 Bucket=bucket_name, 1281 Key=compat_bytes(info.fileID), 1282 UploadId=uploadId, 1283 MultipartUpload={"Parts": parts}) 1284 1285 logger.debug('Completed upload object of type %s: %s', str(type(completed)), 1286 repr(completed)) 1287 info.version = completed['VersionId'] 1288 logger.debug('Completed upload with version %s', str(info.version)) 1289 1290 if info.version is None: 1291 # Somehow we don't know the version. Try and get it. 1292 for attempt in retry_s3(predicate=lambda e: retryable_s3_errors(e) or isinstance(e, AssertionError)): 1293 with attempt: 1294 version = client.head_object(Bucket=bucket_name, 1295 Key=compat_bytes(info.fileID), 1296 **headerArgs).get('VersionId', None) 1297 logger.warning('Loaded key for upload with no version and got version %s', 1298 str(version)) 1299 info.version = version 1300 assert info.version is not None 1301 1302 # Make sure we actually wrote something, even if an empty file 1303 assert (bool(info.version) or info.content is not None) 1304 1305 class SinglePartPipe(WritablePipe): 1306 def readFrom(self, readable): 1307 buf = readable.read() 1308 assert isinstance(buf, bytes) 1309 dataLength = len(buf) 1310 if allowInlining and dataLength <= info.maxInlinedSize(): 1311 logger.debug('Inlining content of %d bytes', len(buf)) 1312 info.content = buf 1313 # There will be no checksum 1314 info.checksum = '' 1315 else: 1316 # We will compute a checksum 1317 hasher = info._start_checksum() 1318 info._update_checksum(hasher, buf) 1319 info.checksum = info._finish_checksum(hasher) 1320 1321 bucket_name = store.filesBucket.name 1322 headerArgs = info._s3EncryptionArgs() 1323 client = store.s3_client 1324 1325 buf = BytesIO(buf) 1326 1327 while not store._getBucketVersioning(bucket_name): 1328 logger.warning('Versioning does not appear to be enabled yet. Deferring single part ' 1329 'upload...') 1330 time.sleep(1) 1331 1332 for attempt in retry_s3(): 1333 with attempt: 1334 logger.debug('Uploading single part of %d bytes', dataLength) 1335 client.upload_fileobj(Bucket=bucket_name, 1336 Key=compat_bytes(info.fileID), 1337 Fileobj=buf, 1338 ExtraArgs=headerArgs) 1339 1340 # use head_object with the SSE headers to access versionId and content_length attributes 1341 headObj = client.head_object(Bucket=bucket_name, 1342 Key=compat_bytes(info.fileID), 1343 **headerArgs) 1344 assert dataLength == headObj.get('ContentLength', None) 1345 info.version = headObj.get('VersionId', None) 1346 logger.debug('Upload received version %s', str(info.version)) 1347 1348 if info.version is None: 1349 # Somehow we don't know the version 1350 for attempt in retry_s3(predicate=lambda e: retryable_s3_errors(e) or isinstance(e, AssertionError)): 1351 with attempt: 1352 headObj = client.head_object(Bucket=bucket_name, 1353 Key=compat_bytes(info.fileID), 1354 **headerArgs) 1355 info.version = headObj.get('VersionId', None) 1356 logger.warning('Reloaded key with no version and got version %s', str(info.version)) 1357 assert info.version is not None 1358 1359 # Make sure we actually wrote something, even if an empty file 1360 assert (bool(info.version) or info.content is not None) 1361 1362 if multipart: 1363 pipe = MultiPartPipe(encoding=encoding, errors=errors) 1364 else: 1365 pipe = SinglePartPipe(encoding=encoding, errors=errors) 1366 1367 with pipe as writable: 1368 yield writable 1369 1370 if not pipe.reader_done: 1371 logger.debug('Version: {} Content: {}'.format(self.version, self.content)) 1372 raise RuntimeError('Escaped context manager without written data being read!') 1373 1374 # We check our work to make sure we have exactly one of embedded 1375 # content or a real object version. 1376 1377 if self.content is None: 1378 if not bool(self.version): 1379 logger.debug('Version: {} Content: {}'.format(self.version, self.content)) 1380 raise RuntimeError('No content added and no version created') 1381 else: 1382 if bool(self.version): 1383 logger.debug('Version: {} Content: {}'.format(self.version, self.content)) 1384 raise RuntimeError('Content added and version created') 1385 1386 def copyFrom(self, srcObj): 1387 """ 1388 Copies contents of source key into this file. 1389 1390 :param S3.Object srcObj: The key (object) that will be copied from 1391 """ 1392 assert srcObj.content_length is not None 1393 if srcObj.content_length <= self.maxInlinedSize(): 1394 self.content = srcObj.get().get('Body').read() 1395 else: 1396 # Create a new Resource in case it needs to be on its own thread 1397 resource = boto3_session.resource('s3', region_name=self.outer.region) 1398 self.version = copyKeyMultipart(resource, 1399 srcBucketName=compat_bytes(srcObj.bucket_name), 1400 srcKeyName=compat_bytes(srcObj.key), 1401 srcKeyVersion=compat_bytes(srcObj.version_id), 1402 dstBucketName=compat_bytes(self.outer.filesBucket.name), 1403 dstKeyName=compat_bytes(self._fileID), 1404 sseAlgorithm='AES256', 1405 sseKey=self._getSSEKey()) 1406 1407 def copyTo(self, dstObj): 1408 """ 1409 Copies contents of this file to the given key. 1410 1411 :param S3.Object dstObj: The key (object) to copy this file's content to 1412 """ 1413 if self.content is not None: 1414 for attempt in retry_s3(): 1415 with attempt: 1416 dstObj.put(Body=self.content) 1417 elif self.version: 1418 # Create a new Resource in case it needs to be on its own thread 1419 resource = boto3_session.resource('s3', region_name=self.outer.region) 1420 1421 for attempt in retry_s3(): 1422 # encrypted = True if self.outer.sseKeyPath else False 1423 with attempt: 1424 copyKeyMultipart(resource, 1425 srcBucketName=compat_bytes(self.outer.filesBucket.name), 1426 srcKeyName=compat_bytes(self.fileID), 1427 srcKeyVersion=compat_bytes(self.version), 1428 dstBucketName=compat_bytes(dstObj.bucket_name), 1429 dstKeyName=compat_bytes(dstObj.key), 1430 copySourceSseAlgorithm='AES256', 1431 copySourceSseKey=self._getSSEKey()) 1432 else: 1433 assert False 1434 1435 def download(self, localFilePath, verifyChecksum=True): 1436 if self.content is not None: 1437 with AtomicFileCreate(localFilePath) as tmpPath: 1438 with open(tmpPath, 'wb') as f: 1439 f.write(self.content) 1440 elif self.version: 1441 headerArgs = self._s3EncryptionArgs() 1442 obj = self.outer.filesBucket.Object(compat_bytes(self.fileID)) 1443 1444 for attempt in retry_s3(predicate=lambda e: retryable_s3_errors(e) or isinstance(e, ChecksumError)): 1445 with attempt: 1446 with AtomicFileCreate(localFilePath) as tmpPath: 1447 obj.download_file(Filename=tmpPath, ExtraArgs={'VersionId': self.version, **headerArgs}) 1448 1449 if verifyChecksum and self.checksum: 1450 try: 1451 # This automatically compares the result and matches the algorithm. 1452 self._get_file_checksum(localFilePath, self.checksum) 1453 except ChecksumError as e: 1454 # Annotate checksum mismatches with file name 1455 raise ChecksumError('Checksums do not match for file %s.' % localFilePath) from e 1456 # The error will get caught and result in a retry of the download until we run out of retries. 1457 # TODO: handle obviously truncated downloads by resuming instead. 1458 else: 1459 assert False 1460 1461 @contextmanager 1462 def downloadStream(self, verifyChecksum=True, encoding=None, errors=None): 1463 """ 1464 Context manager that gives out a download stream to download data. 1465 """ 1466 info = self 1467 1468 class DownloadPipe(ReadablePipe): 1469 def writeTo(self, writable): 1470 if info.content is not None: 1471 writable.write(info.content) 1472 elif info.version: 1473 headerArgs = info._s3EncryptionArgs() 1474 obj = info.outer.filesBucket.Object(compat_bytes(info.fileID)) 1475 for attempt in retry_s3(): 1476 with attempt: 1477 obj.download_fileobj(writable, ExtraArgs={'VersionId': info.version, **headerArgs}) 1478 else: 1479 assert False 1480 1481 class HashingPipe(ReadableTransformingPipe): 1482 """ 1483 Class which checksums all the data read through it. If it 1484 reaches EOF and the checksum isn't correct, raises 1485 ChecksumError. 1486 1487 Assumes info actually has a checksum. 1488 """ 1489 1490 def transform(self, readable, writable): 1491 hasher = info._start_checksum(to_match=info.checksum) 1492 contents = readable.read(1024 * 1024) 1493 while contents != b'': 1494 info._update_checksum(hasher, contents) 1495 try: 1496 writable.write(contents) 1497 except BrokenPipeError: 1498 # Read was stopped early by user code. 1499 # Can't check the checksum. 1500 return 1501 contents = readable.read(1024 * 1024) 1502 # We reached EOF in the input. 1503 # Finish checksumming and verify. 1504 info._finish_checksum(hasher) 1505 # Now stop so EOF happens in the output. 1506 1507 if verifyChecksum and self.checksum: 1508 with DownloadPipe() as readable: 1509 # Interpose a pipe to check the hash 1510 with HashingPipe(readable, encoding=encoding, errors=errors) as verified: 1511 yield verified 1512 else: 1513 # Readable end of pipe produces text mode output if encoding specified 1514 with DownloadPipe(encoding=encoding, errors=errors) as readable: 1515 # No true checksum available, so don't hash 1516 yield readable 1517 1518 def delete(self): 1519 store = self.outer 1520 if self.previousVersion is not None: 1521 for attempt in retry_sdb(): 1522 with attempt: 1523 store.filesDomain.delete_attributes( 1524 compat_bytes(self.fileID), 1525 expected_values=['version', self.previousVersion]) 1526 if self.previousVersion: 1527 for attempt in retry_s3(): 1528 with attempt: 1529 store.s3_client.delete_object(Bucket=store.filesBucket.name, 1530 Key=compat_bytes(self.fileID), 1531 VersionId=self.previousVersion) 1532 1533 def getSize(self): 1534 """ 1535 Return the size of the referenced item in bytes. 1536 """ 1537 if self.content is not None: 1538 return len(self.content) 1539 elif self.version: 1540 for attempt in retry_s3(): 1541 with attempt: 1542 obj = self.outer.filesBucket.Object(compat_bytes(self.fileID)) 1543 return obj.content_length 1544 else: 1545 return 0 1546 1547 def _getSSEKey(self) -> Optional[bytes]: 1548 sseKeyPath = self.outer.sseKeyPath 1549 if sseKeyPath: 1550 with open(sseKeyPath, 'rb') as f: 1551 sseKey = f.read() 1552 return sseKey 1553 1554 def _s3EncryptionArgs(self): 1555 # the keys of the returned dictionary are unpacked to the corresponding boto3 optional 1556 # parameters and will be used to set the http headers 1557 if self.encrypted: 1558 sseKey = self._getSSEKey() 1559 assert sseKey is not None, 'Content is encrypted but no key was provided.' 1560 assert len(sseKey) == 32 1561 # boto3 encodes the key and calculates the MD5 for us 1562 return {'SSECustomerAlgorithm': 'AES256', 'SSECustomerKey': sseKey} 1563 else: 1564 return {} 1565 1566 def __repr__(self): 1567 r = custom_repr 1568 d = (('fileID', r(self.fileID)), 1569 ('ownerID', r(self.ownerID)), 1570 ('encrypted', r(self.encrypted)), 1571 ('version', r(self.version)), 1572 ('previousVersion', r(self.previousVersion)), 1573 ('content', r(self.content)), 1574 ('checksum', r(self.checksum)), 1575 ('_numContentChunks', r(self._numContentChunks))) 1576 return "{}({})".format(type(self).__name__, 1577 ', '.join('%s=%s' % (k, v) for k, v in d)) 1578 1579 versionings = dict(Enabled=True, Disabled=False, Suspended=None) 1580 1581 def _getBucketVersioning(self, bucket_name): 1582 """ 1583 The status attribute of BucketVersioning can be 'Enabled', 'Suspended' or None (Disabled) 1584 which we map to True, None and False respectively. Note that we've never seen a versioning 1585 status of 'Disabled', only the None return value. Calling BucketVersioning.suspend() will 1586 cause BucketVersioning.status to then return 'Suspended' even on a new bucket that never 1587 had versioning enabled. 1588 1589 :param bucket_name: str 1590 """ 1591 for attempt in retry_s3(): 1592 with attempt: 1593 status = self.s3_resource.BucketVersioning(bucket_name).status 1594 return self.versionings.get(status) if status else False 1595 1596 @staticmethod 1597 def getBucketRegion(bucket_name: str): 1598 for attempt in retry_s3(): 1599 with attempt: 1600 loc = s3_boto3_client.get_bucket_location(Bucket=bucket_name) 1601 return bucket_location_to_region(loc.get('LocationConstraint', None)) 1602 1603 # TODO: Make this retry more specific? 1604 # example: https://github.com/DataBiosphere/toil/issues/3378 1605 @retry() 1606 def destroy(self): 1607 # FIXME: Destruction of encrypted stores only works after initialize() or .resume() 1608 # See https://github.com/BD2KGenomics/toil/issues/1041 1609 try: 1610 self._bind(create=False, block=False, check_versioning_consistency=False) 1611 except BucketLocationConflictException: 1612 # If the unique jobstore bucket name existed, _bind would have raised a 1613 # BucketLocationConflictException before calling destroy. Calling _bind here again 1614 # would reraise the same exception so we need to catch and ignore that exception. 1615 pass 1616 # TODO: Add other failure cases to be ignored here. 1617 self._registered = None 1618 if self.filesBucket is not None: 1619 self._delete_bucket(self.filesBucket) 1620 self.filesBucket = None 1621 for name in 'filesDomain', 'jobsDomain': 1622 domain = getattr(self, name) 1623 if domain is not None: 1624 self._delete_domain(domain) 1625 setattr(self, name, None) 1626 self._registered = False 1627 1628 def _delete_domain(self, domain): 1629 for attempt in retry_sdb(): 1630 with attempt: 1631 try: 1632 domain.delete() 1633 except SDBResponseError as e: 1634 if not no_such_sdb_domain(e): 1635 raise 1636 1637 def _delete_bucket(self, bucket): 1638 """ 1639 :param bucket: S3.Bucket 1640 """ 1641 for attempt in retry_s3(): 1642 with attempt: 1643 try: 1644 uploads = s3_boto3_client.list_multipart_uploads(Bucket=bucket.name).get('Uploads') 1645 if uploads: 1646 for u in uploads: 1647 s3_boto3_client.abort_multipart_upload(Bucket=bucket.name, 1648 Key=u["Key"], 1649 UploadId=u["UploadId"]) 1650 1651 bucket.objects.all().delete() 1652 bucket.object_versions.delete() 1653 bucket.delete() 1654 except s3_boto3_client.exceptions.NoSuchBucket: 1655 pass 1656 except ClientError as e: 1657 if e.response.get('ResponseMetadata', {}).get('HTTPStatusCode') != 404: 1658 raise 1659 1660 1661aRepr = reprlib.Repr() 1662aRepr.maxstring = 38 # so UUIDs don't get truncated (36 for UUID plus 2 for quotes) 1663custom_repr = aRepr.repr 1664 1665 1666class BucketLocationConflictException(Exception): 1667 def __init__(self, bucketRegion): 1668 super(BucketLocationConflictException, self).__init__( 1669 'A bucket with the same name as the jobstore was found in another region (%s). ' 1670 'Cannot proceed as the unique bucket name is already in use.' % bucketRegion) 1671