1import json 2import os 3import base64 4import datetime 5import hashlib 6import copy 7import itertools 8import codecs 9import random 10import string 11import tempfile 12import threading 13import pytz 14import sys 15import time 16import uuid 17 18from bisect import insort 19from importlib import reload 20from moto.core import ( 21 ACCOUNT_ID, 22 BaseBackend, 23 BaseModel, 24 CloudFormationModel, 25 CloudWatchMetricProvider, 26) 27 28from moto.core.utils import ( 29 iso_8601_datetime_without_milliseconds_s3, 30 rfc_1123_datetime, 31 unix_time_millis, 32) 33from moto.cloudwatch.models import MetricDatum 34from moto.utilities.tagging_service import TaggingService 35from moto.utilities.utils import LowercaseDict 36from moto.s3.exceptions import ( 37 AccessDeniedByLock, 38 BucketAlreadyExists, 39 BucketNeedsToBeNew, 40 MissingBucket, 41 InvalidBucketName, 42 InvalidPart, 43 InvalidRequest, 44 EntityTooSmall, 45 MissingKey, 46 InvalidNotificationDestination, 47 MalformedXML, 48 InvalidStorageClass, 49 InvalidTargetBucketForLogging, 50 CrossLocationLoggingProhibitted, 51 NoSuchPublicAccessBlockConfiguration, 52 InvalidPublicAccessBlockConfiguration, 53 WrongPublicAccessBlockAccountIdError, 54 NoSuchUpload, 55 ObjectLockConfigurationNotFoundError, 56 InvalidTagError, 57) 58from .cloud_formation import cfn_to_api_encryption, is_replacement_update 59from .utils import clean_key_name, _VersionedKeyStore, undo_clean_key_name 60from ..settings import get_s3_default_key_buffer_size, S3_UPLOAD_PART_MIN_SIZE 61 62MAX_BUCKET_NAME_LENGTH = 63 63MIN_BUCKET_NAME_LENGTH = 3 64UPLOAD_ID_BYTES = 43 65STORAGE_CLASS = [ 66 "STANDARD", 67 "REDUCED_REDUNDANCY", 68 "STANDARD_IA", 69 "ONEZONE_IA", 70 "INTELLIGENT_TIERING", 71 "GLACIER", 72 "DEEP_ARCHIVE", 73] 74DEFAULT_TEXT_ENCODING = sys.getdefaultencoding() 75OWNER = "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a" 76 77 78def get_moto_s3_account_id(): 79 """This makes it easy for mocking AWS Account IDs when using AWS Config 80 -- Simply mock.patch the ACCOUNT_ID here, and Config gets it for free. 81 """ 82 return ACCOUNT_ID 83 84 85class FakeDeleteMarker(BaseModel): 86 def __init__(self, key): 87 self.key = key 88 self.name = key.name 89 self.last_modified = datetime.datetime.utcnow() 90 self._version_id = str(uuid.uuid4()) 91 92 @property 93 def last_modified_ISO8601(self): 94 return iso_8601_datetime_without_milliseconds_s3(self.last_modified) 95 96 @property 97 def version_id(self): 98 return self._version_id 99 100 101class FakeKey(BaseModel): 102 def __init__( 103 self, 104 name, 105 value, 106 storage="STANDARD", 107 etag=None, 108 is_versioned=False, 109 version_id=0, 110 max_buffer_size=None, 111 multipart=None, 112 bucket_name=None, 113 encryption=None, 114 kms_key_id=None, 115 bucket_key_enabled=None, 116 lock_mode=None, 117 lock_legal_status=None, 118 lock_until=None, 119 ): 120 self.name = name 121 self.last_modified = datetime.datetime.utcnow() 122 self.acl = get_canned_acl("private") 123 self.website_redirect_location = None 124 self._storage_class = storage if storage else "STANDARD" 125 self._metadata = LowercaseDict() 126 self._expiry = None 127 self._etag = etag 128 self._version_id = version_id 129 self._is_versioned = is_versioned 130 self.multipart = multipart 131 self.bucket_name = bucket_name 132 133 self._max_buffer_size = ( 134 max_buffer_size if max_buffer_size else get_s3_default_key_buffer_size() 135 ) 136 self._value_buffer = tempfile.SpooledTemporaryFile(self._max_buffer_size) 137 self.value = value 138 self.lock = threading.Lock() 139 140 self.encryption = encryption 141 self.kms_key_id = kms_key_id 142 self.bucket_key_enabled = bucket_key_enabled 143 144 self.lock_mode = lock_mode 145 self.lock_legal_status = lock_legal_status 146 self.lock_until = lock_until 147 148 # Default metadata values 149 self._metadata["Content-Type"] = "binary/octet-stream" 150 151 @property 152 def version_id(self): 153 return self._version_id 154 155 @property 156 def value(self): 157 self.lock.acquire() 158 self._value_buffer.seek(0) 159 r = self._value_buffer.read() 160 r = copy.copy(r) 161 self.lock.release() 162 return r 163 164 @property 165 def arn(self): 166 # S3 Objects don't have an ARN, but we do need something unique when creating tags against this resource 167 return "arn:aws:s3:::{}/{}/{}".format( 168 self.bucket_name, self.name, self.version_id 169 ) 170 171 @value.setter 172 def value(self, new_value): 173 self._value_buffer.seek(0) 174 self._value_buffer.truncate() 175 176 # Hack for working around moto's own unit tests; this probably won't 177 # actually get hit in normal use. 178 if isinstance(new_value, str): 179 new_value = new_value.encode(DEFAULT_TEXT_ENCODING) 180 self._value_buffer.write(new_value) 181 self.contentsize = len(new_value) 182 183 def set_metadata(self, metadata, replace=False): 184 if replace: 185 self._metadata = {} 186 self._metadata.update(metadata) 187 188 def set_storage_class(self, storage): 189 if storage is not None and storage not in STORAGE_CLASS: 190 raise InvalidStorageClass(storage=storage) 191 self._storage_class = storage 192 193 def set_expiry(self, expiry): 194 self._expiry = expiry 195 196 def set_acl(self, acl): 197 self.acl = acl 198 199 def append_to_value(self, value): 200 self.contentsize += len(value) 201 self._value_buffer.seek(0, os.SEEK_END) 202 self._value_buffer.write(value) 203 204 self.last_modified = datetime.datetime.utcnow() 205 self._etag = None # must recalculate etag 206 if self._is_versioned: 207 self._version_id = str(uuid.uuid4()) 208 else: 209 self._version_id = None 210 211 def restore(self, days): 212 self._expiry = datetime.datetime.utcnow() + datetime.timedelta(days) 213 214 @property 215 def etag(self): 216 if self._etag is None: 217 value_md5 = hashlib.md5() 218 self._value_buffer.seek(0) 219 while True: 220 block = self._value_buffer.read(16 * 1024 * 1024) # read in 16MB chunks 221 if not block: 222 break 223 value_md5.update(block) 224 225 self._etag = value_md5.hexdigest() 226 return '"{0}"'.format(self._etag) 227 228 @property 229 def last_modified_ISO8601(self): 230 return iso_8601_datetime_without_milliseconds_s3(self.last_modified) 231 232 @property 233 def last_modified_RFC1123(self): 234 # Different datetime formats depending on how the key is obtained 235 # https://github.com/boto/boto/issues/466 236 return rfc_1123_datetime(self.last_modified) 237 238 @property 239 def metadata(self): 240 return self._metadata 241 242 @property 243 def response_dict(self): 244 res = { 245 "ETag": self.etag, 246 "last-modified": self.last_modified_RFC1123, 247 "content-length": str(self.size), 248 } 249 if self.encryption is not None: 250 res["x-amz-server-side-encryption"] = self.encryption 251 if self.encryption == "aws:kms" and self.kms_key_id is not None: 252 res["x-amz-server-side-encryption-aws-kms-key-id"] = self.kms_key_id 253 if self.bucket_key_enabled is not None: 254 res[ 255 "x-amz-server-side-encryption-bucket-key-enabled" 256 ] = self.bucket_key_enabled 257 if self._storage_class != "STANDARD": 258 res["x-amz-storage-class"] = self._storage_class 259 if self._expiry is not None: 260 rhdr = 'ongoing-request="false", expiry-date="{0}"' 261 res["x-amz-restore"] = rhdr.format(self.expiry_date) 262 263 if self._is_versioned: 264 res["x-amz-version-id"] = str(self.version_id) 265 266 if self.website_redirect_location: 267 res["x-amz-website-redirect-location"] = self.website_redirect_location 268 if self.lock_legal_status: 269 res["x-amz-object-lock-legal-hold"] = self.lock_legal_status 270 if self.lock_until: 271 res["x-amz-object-lock-retain-until-date"] = self.lock_until 272 if self.lock_mode: 273 res["x-amz-object-lock-mode"] = self.lock_mode 274 275 if self.lock_legal_status: 276 res["x-amz-object-lock-legal-hold"] = self.lock_legal_status 277 if self.lock_until: 278 res["x-amz-object-lock-retain-until-date"] = self.lock_until 279 if self.lock_mode: 280 res["x-amz-object-lock-mode"] = self.lock_mode 281 282 return res 283 284 @property 285 def size(self): 286 return self.contentsize 287 288 @property 289 def storage_class(self): 290 return self._storage_class 291 292 @property 293 def expiry_date(self): 294 if self._expiry is not None: 295 return self._expiry.strftime("%a, %d %b %Y %H:%M:%S GMT") 296 297 # Keys need to be pickleable due to some implementation details of boto3. 298 # Since file objects aren't pickleable, we need to override the default 299 # behavior. The following is adapted from the Python docs: 300 # https://docs.python.org/3/library/pickle.html#handling-stateful-objects 301 def __getstate__(self): 302 state = self.__dict__.copy() 303 state["value"] = self.value 304 del state["_value_buffer"] 305 del state["lock"] 306 return state 307 308 def __setstate__(self, state): 309 self.__dict__.update({k: v for k, v in state.items() if k != "value"}) 310 311 self._value_buffer = tempfile.SpooledTemporaryFile( 312 max_size=self._max_buffer_size 313 ) 314 self.value = state["value"] 315 self.lock = threading.Lock() 316 317 @property 318 def is_locked(self): 319 if self.lock_legal_status == "ON": 320 return True 321 322 if self.lock_mode == "COMPLIANCE": 323 now = datetime.datetime.utcnow() 324 try: 325 until = datetime.datetime.strptime( 326 self.lock_until, "%Y-%m-%dT%H:%M:%SZ" 327 ) 328 except ValueError: 329 until = datetime.datetime.strptime( 330 self.lock_until, "%Y-%m-%dT%H:%M:%S.%fZ" 331 ) 332 333 if until > now: 334 return True 335 336 return False 337 338 339class FakeMultipart(BaseModel): 340 def __init__(self, key_name, metadata, storage=None, tags=None): 341 self.key_name = key_name 342 self.metadata = metadata 343 self.storage = storage 344 self.tags = tags 345 self.parts = {} 346 self.partlist = [] # ordered list of part ID's 347 rand_b64 = base64.b64encode(os.urandom(UPLOAD_ID_BYTES)) 348 self.id = ( 349 rand_b64.decode("utf-8").replace("=", "").replace("+", "").replace("/", "") 350 ) 351 352 def complete(self, body): 353 decode_hex = codecs.getdecoder("hex_codec") 354 total = bytearray() 355 md5s = bytearray() 356 357 last = None 358 count = 0 359 for pn, etag in body: 360 part = self.parts.get(pn) 361 part_etag = None 362 if part is not None: 363 part_etag = part.etag.replace('"', "") 364 etag = etag.replace('"', "") 365 if part is None or part_etag != etag: 366 raise InvalidPart() 367 if last is not None and last.contentsize < S3_UPLOAD_PART_MIN_SIZE: 368 raise EntityTooSmall() 369 md5s.extend(decode_hex(part_etag)[0]) 370 total.extend(part.value) 371 last = part 372 count += 1 373 374 etag = hashlib.md5() 375 etag.update(bytes(md5s)) 376 return total, "{0}-{1}".format(etag.hexdigest(), count) 377 378 def set_part(self, part_id, value): 379 if part_id < 1: 380 raise NoSuchUpload(upload_id=part_id) 381 382 key = FakeKey(part_id, value) 383 self.parts[part_id] = key 384 if part_id not in self.partlist: 385 insort(self.partlist, part_id) 386 return key 387 388 def list_parts(self, part_number_marker, max_parts): 389 for part_id in self.partlist: 390 part = self.parts[part_id] 391 if part_number_marker <= part.name < part_number_marker + max_parts: 392 yield part 393 394 395class FakeGrantee(BaseModel): 396 def __init__(self, id="", uri="", display_name=""): 397 self.id = id 398 self.uri = uri 399 self.display_name = display_name 400 401 def __eq__(self, other): 402 if not isinstance(other, FakeGrantee): 403 return False 404 return ( 405 self.id == other.id 406 and self.uri == other.uri 407 and self.display_name == other.display_name 408 ) 409 410 @property 411 def type(self): 412 return "Group" if self.uri else "CanonicalUser" 413 414 def __repr__(self): 415 return "FakeGrantee(display_name: '{}', id: '{}', uri: '{}')".format( 416 self.display_name, self.id, self.uri 417 ) 418 419 420ALL_USERS_GRANTEE = FakeGrantee(uri="http://acs.amazonaws.com/groups/global/AllUsers") 421AUTHENTICATED_USERS_GRANTEE = FakeGrantee( 422 uri="http://acs.amazonaws.com/groups/global/AuthenticatedUsers" 423) 424LOG_DELIVERY_GRANTEE = FakeGrantee(uri="http://acs.amazonaws.com/groups/s3/LogDelivery") 425 426PERMISSION_FULL_CONTROL = "FULL_CONTROL" 427PERMISSION_WRITE = "WRITE" 428PERMISSION_READ = "READ" 429PERMISSION_WRITE_ACP = "WRITE_ACP" 430PERMISSION_READ_ACP = "READ_ACP" 431 432CAMEL_CASED_PERMISSIONS = { 433 "FULL_CONTROL": "FullControl", 434 "WRITE": "Write", 435 "READ": "Read", 436 "WRITE_ACP": "WriteAcp", 437 "READ_ACP": "ReadAcp", 438} 439 440 441class FakeGrant(BaseModel): 442 def __init__(self, grantees, permissions): 443 self.grantees = grantees 444 self.permissions = permissions 445 446 def __repr__(self): 447 return "FakeGrant(grantees: {}, permissions: {})".format( 448 self.grantees, self.permissions 449 ) 450 451 452class FakeAcl(BaseModel): 453 def __init__(self, grants=None): 454 grants = grants or [] 455 self.grants = grants 456 457 @property 458 def public_read(self): 459 for grant in self.grants: 460 if ALL_USERS_GRANTEE in grant.grantees: 461 if PERMISSION_READ in grant.permissions: 462 return True 463 if PERMISSION_FULL_CONTROL in grant.permissions: 464 return True 465 return False 466 467 def __repr__(self): 468 return "FakeAcl(grants: {})".format(self.grants) 469 470 def to_config_dict(self): 471 """Returns the object into the format expected by AWS Config""" 472 data = { 473 "grantSet": None, # Always setting this to None. Feel free to change. 474 "owner": {"displayName": None, "id": OWNER}, 475 } 476 477 # Add details for each Grant: 478 grant_list = [] 479 for grant in self.grants: 480 permissions = ( 481 grant.permissions 482 if isinstance(grant.permissions, list) 483 else [grant.permissions] 484 ) 485 for permission in permissions: 486 for grantee in grant.grantees: 487 if grantee.uri: 488 grant_list.append( 489 { 490 "grantee": grantee.uri.split( 491 "http://acs.amazonaws.com/groups/s3/" 492 )[1], 493 "permission": CAMEL_CASED_PERMISSIONS[permission], 494 } 495 ) 496 else: 497 grant_list.append( 498 { 499 "grantee": { 500 "id": grantee.id, 501 "displayName": None 502 if not grantee.display_name 503 else grantee.display_name, 504 }, 505 "permission": CAMEL_CASED_PERMISSIONS[permission], 506 } 507 ) 508 509 if grant_list: 510 data["grantList"] = grant_list 511 512 return data 513 514 515def get_canned_acl(acl): 516 owner_grantee = FakeGrantee(id=OWNER) 517 grants = [FakeGrant([owner_grantee], [PERMISSION_FULL_CONTROL])] 518 if acl == "private": 519 pass # no other permissions 520 elif acl == "public-read": 521 grants.append(FakeGrant([ALL_USERS_GRANTEE], [PERMISSION_READ])) 522 elif acl == "public-read-write": 523 grants.append( 524 FakeGrant([ALL_USERS_GRANTEE], [PERMISSION_READ, PERMISSION_WRITE]) 525 ) 526 elif acl == "authenticated-read": 527 grants.append(FakeGrant([AUTHENTICATED_USERS_GRANTEE], [PERMISSION_READ])) 528 elif acl == "bucket-owner-read": 529 pass # TODO: bucket owner ACL 530 elif acl == "bucket-owner-full-control": 531 pass # TODO: bucket owner ACL 532 elif acl == "aws-exec-read": 533 pass # TODO: bucket owner, EC2 Read 534 elif acl == "log-delivery-write": 535 grants.append( 536 FakeGrant([LOG_DELIVERY_GRANTEE], [PERMISSION_READ_ACP, PERMISSION_WRITE]) 537 ) 538 else: 539 assert False, "Unknown canned acl: %s" % (acl,) 540 return FakeAcl(grants=grants) 541 542 543class LifecycleFilter(BaseModel): 544 def __init__(self, prefix=None, tag=None, and_filter=None): 545 self.prefix = prefix 546 (self.tag_key, self.tag_value) = tag if tag else (None, None) 547 self.and_filter = and_filter 548 549 def to_config_dict(self): 550 if self.prefix is not None: 551 return { 552 "predicate": {"type": "LifecyclePrefixPredicate", "prefix": self.prefix} 553 } 554 555 elif self.tag_key: 556 return { 557 "predicate": { 558 "type": "LifecycleTagPredicate", 559 "tag": {"key": self.tag_key, "value": self.tag_value}, 560 } 561 } 562 563 else: 564 return { 565 "predicate": { 566 "type": "LifecycleAndOperator", 567 "operands": self.and_filter.to_config_dict(), 568 } 569 } 570 571 572class LifecycleAndFilter(BaseModel): 573 def __init__(self, prefix=None, tags=None): 574 self.prefix = prefix 575 self.tags = tags 576 577 def to_config_dict(self): 578 data = [] 579 580 if self.prefix is not None: 581 data.append({"type": "LifecyclePrefixPredicate", "prefix": self.prefix}) 582 583 for key, value in self.tags.items(): 584 data.append( 585 {"type": "LifecycleTagPredicate", "tag": {"key": key, "value": value},} 586 ) 587 588 return data 589 590 591class LifecycleRule(BaseModel): 592 def __init__( 593 self, 594 id=None, 595 prefix=None, 596 lc_filter=None, 597 status=None, 598 expiration_days=None, 599 expiration_date=None, 600 transition_days=None, 601 transition_date=None, 602 storage_class=None, 603 expired_object_delete_marker=None, 604 nve_noncurrent_days=None, 605 nvt_noncurrent_days=None, 606 nvt_storage_class=None, 607 aimu_days=None, 608 ): 609 self.id = id 610 self.prefix = prefix 611 self.filter = lc_filter 612 self.status = status 613 self.expiration_days = expiration_days 614 self.expiration_date = expiration_date 615 self.transition_days = transition_days 616 self.transition_date = transition_date 617 self.storage_class = storage_class 618 self.expired_object_delete_marker = expired_object_delete_marker 619 self.nve_noncurrent_days = nve_noncurrent_days 620 self.nvt_noncurrent_days = nvt_noncurrent_days 621 self.nvt_storage_class = nvt_storage_class 622 self.aimu_days = aimu_days 623 624 def to_config_dict(self): 625 """Converts the object to the AWS Config data dict. 626 627 Note: The following are missing that should be added in the future: 628 - transitions (returns None for now) 629 - noncurrentVersionTransitions (returns None for now) 630 631 :param kwargs: 632 :return: 633 """ 634 635 lifecycle_dict = { 636 "id": self.id, 637 "prefix": self.prefix, 638 "status": self.status, 639 "expirationInDays": int(self.expiration_days) 640 if self.expiration_days 641 else None, 642 "expiredObjectDeleteMarker": self.expired_object_delete_marker, 643 "noncurrentVersionExpirationInDays": -1 or int(self.nve_noncurrent_days), 644 "expirationDate": self.expiration_date, 645 "transitions": None, # Replace me with logic to fill in 646 "noncurrentVersionTransitions": None, # Replace me with logic to fill in 647 } 648 649 if self.aimu_days: 650 lifecycle_dict["abortIncompleteMultipartUpload"] = { 651 "daysAfterInitiation": self.aimu_days 652 } 653 else: 654 lifecycle_dict["abortIncompleteMultipartUpload"] = None 655 656 # Format the filter: 657 if self.prefix is None and self.filter is None: 658 lifecycle_dict["filter"] = {"predicate": None} 659 660 elif self.prefix: 661 lifecycle_dict["filter"] = None 662 else: 663 lifecycle_dict["filter"] = self.filter.to_config_dict() 664 665 return lifecycle_dict 666 667 668class CorsRule(BaseModel): 669 def __init__( 670 self, 671 allowed_methods, 672 allowed_origins, 673 allowed_headers=None, 674 expose_headers=None, 675 max_age_seconds=None, 676 ): 677 self.allowed_methods = ( 678 [allowed_methods] if isinstance(allowed_methods, str) else allowed_methods 679 ) 680 self.allowed_origins = ( 681 [allowed_origins] if isinstance(allowed_origins, str) else allowed_origins 682 ) 683 self.allowed_headers = ( 684 [allowed_headers] if isinstance(allowed_headers, str) else allowed_headers 685 ) 686 self.exposed_headers = ( 687 [expose_headers] if isinstance(expose_headers, str) else expose_headers 688 ) 689 self.max_age_seconds = max_age_seconds 690 691 692class Notification(BaseModel): 693 def __init__(self, arn, events, filters=None, id=None): 694 self.id = ( 695 id 696 if id 697 else "".join( 698 random.choice(string.ascii_letters + string.digits) for _ in range(50) 699 ) 700 ) 701 self.arn = arn 702 self.events = events 703 self.filters = filters if filters else {} 704 705 def to_config_dict(self): 706 data = {} 707 708 # Type and ARN will be filled in by NotificationConfiguration's to_config_dict: 709 data["events"] = [event for event in self.events] 710 711 if self.filters: 712 data["filter"] = { 713 "s3KeyFilter": { 714 "filterRules": [ 715 {"name": fr["Name"], "value": fr["Value"]} 716 for fr in self.filters["S3Key"]["FilterRule"] 717 ] 718 } 719 } 720 else: 721 data["filter"] = None 722 723 # Not sure why this is a thing since AWS just seems to return this as filters ¯\_(ツ)_/¯ 724 data["objectPrefixes"] = [] 725 726 return data 727 728 729class NotificationConfiguration(BaseModel): 730 def __init__(self, topic=None, queue=None, cloud_function=None): 731 self.topic = ( 732 [ 733 Notification( 734 t["Topic"], t["Event"], filters=t.get("Filter"), id=t.get("Id") 735 ) 736 for t in topic 737 ] 738 if topic 739 else [] 740 ) 741 self.queue = ( 742 [ 743 Notification( 744 q["Queue"], q["Event"], filters=q.get("Filter"), id=q.get("Id") 745 ) 746 for q in queue 747 ] 748 if queue 749 else [] 750 ) 751 self.cloud_function = ( 752 [ 753 Notification( 754 c["CloudFunction"], 755 c["Event"], 756 filters=c.get("Filter"), 757 id=c.get("Id"), 758 ) 759 for c in cloud_function 760 ] 761 if cloud_function 762 else [] 763 ) 764 765 def to_config_dict(self): 766 data = {"configurations": {}} 767 768 for topic in self.topic: 769 topic_config = topic.to_config_dict() 770 topic_config["topicARN"] = topic.arn 771 topic_config["type"] = "TopicConfiguration" 772 data["configurations"][topic.id] = topic_config 773 774 for queue in self.queue: 775 queue_config = queue.to_config_dict() 776 queue_config["queueARN"] = queue.arn 777 queue_config["type"] = "QueueConfiguration" 778 data["configurations"][queue.id] = queue_config 779 780 for cloud_function in self.cloud_function: 781 cf_config = cloud_function.to_config_dict() 782 cf_config["queueARN"] = cloud_function.arn 783 cf_config["type"] = "LambdaConfiguration" 784 data["configurations"][cloud_function.id] = cf_config 785 786 return data 787 788 789def convert_str_to_bool(item): 790 """Converts a boolean string to a boolean value""" 791 if isinstance(item, str): 792 return item.lower() == "true" 793 794 return False 795 796 797class PublicAccessBlock(BaseModel): 798 def __init__( 799 self, 800 block_public_acls, 801 ignore_public_acls, 802 block_public_policy, 803 restrict_public_buckets, 804 ): 805 # The boto XML appears to expect these values to exist as lowercase strings... 806 self.block_public_acls = block_public_acls or "false" 807 self.ignore_public_acls = ignore_public_acls or "false" 808 self.block_public_policy = block_public_policy or "false" 809 self.restrict_public_buckets = restrict_public_buckets or "false" 810 811 def to_config_dict(self): 812 # Need to make the string values booleans for Config: 813 return { 814 "blockPublicAcls": convert_str_to_bool(self.block_public_acls), 815 "ignorePublicAcls": convert_str_to_bool(self.ignore_public_acls), 816 "blockPublicPolicy": convert_str_to_bool(self.block_public_policy), 817 "restrictPublicBuckets": convert_str_to_bool(self.restrict_public_buckets), 818 } 819 820 821class FakeBucket(CloudFormationModel): 822 def __init__(self, name, region_name): 823 self.name = name 824 self.region_name = region_name 825 self.keys = _VersionedKeyStore() 826 self.multiparts = {} 827 self.versioning_status = None 828 self.rules = [] 829 self.policy = None 830 self.website_configuration = None 831 self.acl = get_canned_acl("private") 832 self.cors = [] 833 self.logging = {} 834 self.notification_configuration = None 835 self.accelerate_configuration = None 836 self.payer = "BucketOwner" 837 self.creation_date = datetime.datetime.now(tz=pytz.utc) 838 self.public_access_block = None 839 self.encryption = None 840 self.object_lock_enabled = False 841 self.default_lock_mode = "" 842 self.default_lock_days = 0 843 self.default_lock_years = 0 844 845 @property 846 def location(self): 847 return self.region_name 848 849 @property 850 def creation_date_ISO8601(self): 851 return iso_8601_datetime_without_milliseconds_s3(self.creation_date) 852 853 @property 854 def is_versioned(self): 855 return self.versioning_status == "Enabled" 856 857 def set_lifecycle(self, rules): 858 self.rules = [] 859 for rule in rules: 860 # Extract and validate actions from Lifecycle rule 861 expiration = rule.get("Expiration") 862 transition = rule.get("Transition") 863 864 try: 865 top_level_prefix = ( 866 rule["Prefix"] or "" 867 ) # If it's `None` the set to the empty string 868 except KeyError: 869 top_level_prefix = None 870 871 nve_noncurrent_days = None 872 if rule.get("NoncurrentVersionExpiration") is not None: 873 if rule["NoncurrentVersionExpiration"].get("NoncurrentDays") is None: 874 raise MalformedXML() 875 nve_noncurrent_days = rule["NoncurrentVersionExpiration"][ 876 "NoncurrentDays" 877 ] 878 879 nvt_noncurrent_days = None 880 nvt_storage_class = None 881 if rule.get("NoncurrentVersionTransition") is not None: 882 if rule["NoncurrentVersionTransition"].get("NoncurrentDays") is None: 883 raise MalformedXML() 884 if rule["NoncurrentVersionTransition"].get("StorageClass") is None: 885 raise MalformedXML() 886 nvt_noncurrent_days = rule["NoncurrentVersionTransition"][ 887 "NoncurrentDays" 888 ] 889 nvt_storage_class = rule["NoncurrentVersionTransition"]["StorageClass"] 890 891 aimu_days = None 892 if rule.get("AbortIncompleteMultipartUpload") is not None: 893 if ( 894 rule["AbortIncompleteMultipartUpload"].get("DaysAfterInitiation") 895 is None 896 ): 897 raise MalformedXML() 898 aimu_days = rule["AbortIncompleteMultipartUpload"][ 899 "DaysAfterInitiation" 900 ] 901 902 eodm = None 903 if expiration and expiration.get("ExpiredObjectDeleteMarker") is not None: 904 # This cannot be set if Date or Days is set: 905 if expiration.get("Days") or expiration.get("Date"): 906 raise MalformedXML() 907 eodm = expiration["ExpiredObjectDeleteMarker"] 908 909 # Pull out the filter: 910 lc_filter = None 911 if rule.get("Filter"): 912 # Can't have both `Filter` and `Prefix` (need to check for the presence of the key): 913 try: 914 # 'Prefix' cannot be outside of a Filter: 915 if rule["Prefix"] or not rule["Prefix"]: 916 raise MalformedXML() 917 except KeyError: 918 pass 919 920 filters = 0 921 try: 922 prefix_filter = ( 923 rule["Filter"]["Prefix"] or "" 924 ) # If it's `None` the set to the empty string 925 filters += 1 926 except KeyError: 927 prefix_filter = None 928 929 and_filter = None 930 if rule["Filter"].get("And"): 931 filters += 1 932 and_tags = {} 933 if rule["Filter"]["And"].get("Tag"): 934 if not isinstance(rule["Filter"]["And"]["Tag"], list): 935 rule["Filter"]["And"]["Tag"] = [ 936 rule["Filter"]["And"]["Tag"] 937 ] 938 939 for t in rule["Filter"]["And"]["Tag"]: 940 and_tags[t["Key"]] = t.get("Value", "") 941 942 try: 943 and_prefix = ( 944 rule["Filter"]["And"]["Prefix"] or "" 945 ) # If it's `None` then set to the empty string 946 except KeyError: 947 and_prefix = None 948 949 and_filter = LifecycleAndFilter(prefix=and_prefix, tags=and_tags) 950 951 filter_tag = None 952 if rule["Filter"].get("Tag"): 953 filters += 1 954 filter_tag = ( 955 rule["Filter"]["Tag"]["Key"], 956 rule["Filter"]["Tag"].get("Value", ""), 957 ) 958 959 # Can't have more than 1 filter: 960 if filters > 1: 961 raise MalformedXML() 962 963 lc_filter = LifecycleFilter( 964 prefix=prefix_filter, tag=filter_tag, and_filter=and_filter 965 ) 966 967 # If no top level prefix and no filter is present, then this is invalid: 968 if top_level_prefix is None: 969 try: 970 rule["Filter"] 971 except KeyError: 972 raise MalformedXML() 973 974 self.rules.append( 975 LifecycleRule( 976 id=rule.get("ID"), 977 prefix=top_level_prefix, 978 lc_filter=lc_filter, 979 status=rule["Status"], 980 expiration_days=expiration.get("Days") if expiration else None, 981 expiration_date=expiration.get("Date") if expiration else None, 982 transition_days=transition.get("Days") if transition else None, 983 transition_date=transition.get("Date") if transition else None, 984 storage_class=transition.get("StorageClass") 985 if transition 986 else None, 987 expired_object_delete_marker=eodm, 988 nve_noncurrent_days=nve_noncurrent_days, 989 nvt_noncurrent_days=nvt_noncurrent_days, 990 nvt_storage_class=nvt_storage_class, 991 aimu_days=aimu_days, 992 ) 993 ) 994 995 def delete_lifecycle(self): 996 self.rules = [] 997 998 def set_cors(self, rules): 999 self.cors = [] 1000 1001 if len(rules) > 100: 1002 raise MalformedXML() 1003 1004 for rule in rules: 1005 assert isinstance(rule["AllowedMethod"], list) or isinstance( 1006 rule["AllowedMethod"], str 1007 ) 1008 assert isinstance(rule["AllowedOrigin"], list) or isinstance( 1009 rule["AllowedOrigin"], str 1010 ) 1011 assert isinstance(rule.get("AllowedHeader", []), list) or isinstance( 1012 rule.get("AllowedHeader", ""), str 1013 ) 1014 assert isinstance(rule.get("ExposeHeader", []), list) or isinstance( 1015 rule.get("ExposeHeader", ""), str 1016 ) 1017 assert isinstance(rule.get("MaxAgeSeconds", "0"), str) 1018 1019 if isinstance(rule["AllowedMethod"], str): 1020 methods = [rule["AllowedMethod"]] 1021 else: 1022 methods = rule["AllowedMethod"] 1023 1024 for method in methods: 1025 if method not in ["GET", "PUT", "HEAD", "POST", "DELETE"]: 1026 raise InvalidRequest(method) 1027 1028 self.cors.append( 1029 CorsRule( 1030 rule["AllowedMethod"], 1031 rule["AllowedOrigin"], 1032 rule.get("AllowedHeader"), 1033 rule.get("ExposeHeader"), 1034 rule.get("MaxAgeSeconds"), 1035 ) 1036 ) 1037 1038 def delete_cors(self): 1039 self.cors = [] 1040 1041 def set_logging(self, logging_config, bucket_backend): 1042 if not logging_config: 1043 self.logging = {} 1044 return 1045 1046 # Target bucket must exist in the same account (assuming all moto buckets are in the same account): 1047 if not bucket_backend.buckets.get(logging_config["TargetBucket"]): 1048 raise InvalidTargetBucketForLogging( 1049 "The target bucket for logging does not exist." 1050 ) 1051 1052 # Does the target bucket have the log-delivery WRITE and READ_ACP permissions? 1053 write = read_acp = False 1054 for grant in bucket_backend.buckets[logging_config["TargetBucket"]].acl.grants: 1055 # Must be granted to: http://acs.amazonaws.com/groups/s3/LogDelivery 1056 for grantee in grant.grantees: 1057 if grantee.uri == "http://acs.amazonaws.com/groups/s3/LogDelivery": 1058 if ( 1059 "WRITE" in grant.permissions 1060 or "FULL_CONTROL" in grant.permissions 1061 ): 1062 write = True 1063 1064 if ( 1065 "READ_ACP" in grant.permissions 1066 or "FULL_CONTROL" in grant.permissions 1067 ): 1068 read_acp = True 1069 1070 break 1071 1072 if not write or not read_acp: 1073 raise InvalidTargetBucketForLogging( 1074 "You must give the log-delivery group WRITE and READ_ACP" 1075 " permissions to the target bucket" 1076 ) 1077 1078 # Buckets must also exist within the same region: 1079 if ( 1080 bucket_backend.buckets[logging_config["TargetBucket"]].region_name 1081 != self.region_name 1082 ): 1083 raise CrossLocationLoggingProhibitted() 1084 1085 # Checks pass -- set the logging config: 1086 self.logging = logging_config 1087 1088 def set_notification_configuration(self, notification_config): 1089 if not notification_config: 1090 self.notification_configuration = None 1091 return 1092 1093 self.notification_configuration = NotificationConfiguration( 1094 topic=notification_config.get("TopicConfiguration"), 1095 queue=notification_config.get("QueueConfiguration"), 1096 cloud_function=notification_config.get("CloudFunctionConfiguration"), 1097 ) 1098 1099 # Validate that the region is correct: 1100 for thing in ["topic", "queue", "cloud_function"]: 1101 for t in getattr(self.notification_configuration, thing): 1102 region = t.arn.split(":")[3] 1103 if region != self.region_name: 1104 raise InvalidNotificationDestination() 1105 1106 def set_accelerate_configuration(self, accelerate_config): 1107 if self.accelerate_configuration is None and accelerate_config == "Suspended": 1108 # Cannot "suspend" a not active acceleration. Leaves it undefined 1109 return 1110 1111 self.accelerate_configuration = accelerate_config 1112 1113 @classmethod 1114 def has_cfn_attr(cls, attribute): 1115 return attribute in [ 1116 "Arn", 1117 "DomainName", 1118 "DualStackDomainName", 1119 "RegionalDomainName", 1120 "WebsiteURL", 1121 ] 1122 1123 def get_cfn_attribute(self, attribute_name): 1124 from moto.cloudformation.exceptions import UnformattedGetAttTemplateException 1125 1126 if attribute_name == "Arn": 1127 return self.arn 1128 elif attribute_name == "DomainName": 1129 return self.domain_name 1130 elif attribute_name == "DualStackDomainName": 1131 return self.dual_stack_domain_name 1132 elif attribute_name == "RegionalDomainName": 1133 return self.regional_domain_name 1134 elif attribute_name == "WebsiteURL": 1135 return self.website_url 1136 raise UnformattedGetAttTemplateException() 1137 1138 def set_acl(self, acl): 1139 self.acl = acl 1140 1141 @property 1142 def arn(self): 1143 return "arn:aws:s3:::{}".format(self.name) 1144 1145 @property 1146 def domain_name(self): 1147 return "{}.s3.amazonaws.com".format(self.name) 1148 1149 @property 1150 def dual_stack_domain_name(self): 1151 return "{}.s3.dualstack.{}.amazonaws.com".format(self.name, self.region_name) 1152 1153 @property 1154 def regional_domain_name(self): 1155 return "{}.s3.{}.amazonaws.com".format(self.name, self.region_name) 1156 1157 @property 1158 def website_url(self): 1159 return "http://{}.s3-website.{}.amazonaws.com".format( 1160 self.name, self.region_name 1161 ) 1162 1163 @property 1164 def physical_resource_id(self): 1165 return self.name 1166 1167 @staticmethod 1168 def cloudformation_name_type(): 1169 return "BucketName" 1170 1171 @staticmethod 1172 def cloudformation_type(): 1173 # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-s3-bucket.html 1174 return "AWS::S3::Bucket" 1175 1176 @classmethod 1177 def create_from_cloudformation_json( 1178 cls, resource_name, cloudformation_json, region_name, **kwargs 1179 ): 1180 bucket = s3_backend.create_bucket(resource_name, region_name) 1181 1182 properties = cloudformation_json.get("Properties", {}) 1183 1184 if "BucketEncryption" in properties: 1185 bucket_encryption = cfn_to_api_encryption(properties["BucketEncryption"]) 1186 s3_backend.put_bucket_encryption( 1187 bucket_name=resource_name, encryption=bucket_encryption 1188 ) 1189 1190 return bucket 1191 1192 @classmethod 1193 def update_from_cloudformation_json( 1194 cls, original_resource, new_resource_name, cloudformation_json, region_name, 1195 ): 1196 properties = cloudformation_json["Properties"] 1197 1198 if is_replacement_update(properties): 1199 resource_name_property = cls.cloudformation_name_type() 1200 if resource_name_property not in properties: 1201 properties[resource_name_property] = new_resource_name 1202 new_resource = cls.create_from_cloudformation_json( 1203 properties[resource_name_property], cloudformation_json, region_name 1204 ) 1205 properties[resource_name_property] = original_resource.name 1206 cls.delete_from_cloudformation_json( 1207 original_resource.name, cloudformation_json, region_name 1208 ) 1209 return new_resource 1210 1211 else: # No Interruption 1212 if "BucketEncryption" in properties: 1213 bucket_encryption = cfn_to_api_encryption( 1214 properties["BucketEncryption"] 1215 ) 1216 s3_backend.put_bucket_encryption( 1217 bucket_name=original_resource.name, encryption=bucket_encryption 1218 ) 1219 return original_resource 1220 1221 @classmethod 1222 def delete_from_cloudformation_json( 1223 cls, resource_name, cloudformation_json, region_name 1224 ): 1225 s3_backend.delete_bucket(resource_name) 1226 1227 def to_config_dict(self): 1228 """Return the AWS Config JSON format of this S3 bucket. 1229 1230 Note: The following features are not implemented and will need to be if you care about them: 1231 - Bucket Accelerate Configuration 1232 """ 1233 config_dict = { 1234 "version": "1.3", 1235 "configurationItemCaptureTime": str(self.creation_date), 1236 "configurationItemStatus": "ResourceDiscovered", 1237 "configurationStateId": str( 1238 int(time.mktime(self.creation_date.timetuple())) 1239 ), # PY2 and 3 compatible 1240 "configurationItemMD5Hash": "", 1241 "arn": self.arn, 1242 "resourceType": "AWS::S3::Bucket", 1243 "resourceId": self.name, 1244 "resourceName": self.name, 1245 "awsRegion": self.region_name, 1246 "availabilityZone": "Regional", 1247 "resourceCreationTime": str(self.creation_date), 1248 "relatedEvents": [], 1249 "relationships": [], 1250 "tags": s3_backend.tagger.get_tag_dict_for_resource(self.arn), 1251 "configuration": { 1252 "name": self.name, 1253 "owner": {"id": OWNER}, 1254 "creationDate": self.creation_date.isoformat(), 1255 }, 1256 } 1257 1258 # Make the supplementary configuration: 1259 # This is a dobule-wrapped JSON for some reason... 1260 s_config = { 1261 "AccessControlList": json.dumps(json.dumps(self.acl.to_config_dict())) 1262 } 1263 1264 if self.public_access_block: 1265 s_config["PublicAccessBlockConfiguration"] = json.dumps( 1266 self.public_access_block.to_config_dict() 1267 ) 1268 1269 # Tagging is special: 1270 if config_dict["tags"]: 1271 s_config["BucketTaggingConfiguration"] = json.dumps( 1272 {"tagSets": [{"tags": config_dict["tags"]}]} 1273 ) 1274 1275 # TODO implement Accelerate Configuration: 1276 s_config["BucketAccelerateConfiguration"] = {"status": None} 1277 1278 if self.rules: 1279 s_config["BucketLifecycleConfiguration"] = { 1280 "rules": [rule.to_config_dict() for rule in self.rules] 1281 } 1282 1283 s_config["BucketLoggingConfiguration"] = { 1284 "destinationBucketName": self.logging.get("TargetBucket", None), 1285 "logFilePrefix": self.logging.get("TargetPrefix", None), 1286 } 1287 1288 s_config["BucketPolicy"] = { 1289 "policyText": self.policy.decode("utf-8") if self.policy else None 1290 } 1291 1292 s_config["IsRequesterPaysEnabled"] = ( 1293 "false" if self.payer == "BucketOwner" else "true" 1294 ) 1295 1296 if self.notification_configuration: 1297 s_config[ 1298 "BucketNotificationConfiguration" 1299 ] = self.notification_configuration.to_config_dict() 1300 else: 1301 s_config["BucketNotificationConfiguration"] = {"configurations": {}} 1302 1303 config_dict["supplementaryConfiguration"] = s_config 1304 1305 return config_dict 1306 1307 @property 1308 def has_default_lock(self): 1309 if not self.object_lock_enabled: 1310 return False 1311 1312 if self.default_lock_mode: 1313 return True 1314 1315 return False 1316 1317 def default_retention(self): 1318 now = datetime.datetime.utcnow() 1319 now += datetime.timedelta(self.default_lock_days) 1320 now += datetime.timedelta(self.default_lock_years * 365) 1321 return now.strftime("%Y-%m-%dT%H:%M:%SZ") 1322 1323 1324class S3Backend(BaseBackend, CloudWatchMetricProvider): 1325 """ 1326 Moto implementation for S3. 1327 1328 Custom S3 endpoints are supported, if you are using a S3-compatible storage solution like Ceph. 1329 Example usage: 1330 1331 .. sourcecode:: python 1332 1333 os.environ["MOTO_S3_CUSTOM_ENDPOINTS"] = "http://custom.internal.endpoint,http://custom.other.endpoint" 1334 @mock_s3 1335 def test_my_custom_endpoint(): 1336 boto3.client("s3", endpoint_url="http://custom.internal.endpoint") 1337 ... 1338 1339 Note that this only works if the environment variable is set **before** the mock is initialized. 1340 """ 1341 1342 def __init__(self): 1343 self.buckets = {} 1344 self.account_public_access_block = None 1345 self.tagger = TaggingService() 1346 1347 @property 1348 def _url_module(self): 1349 # The urls-property can be different depending on env variables 1350 # Force a reload, to retrieve the correct set of URLs 1351 import moto.s3.urls as backend_urls_module 1352 1353 reload(backend_urls_module) 1354 return backend_urls_module 1355 1356 @staticmethod 1357 def default_vpc_endpoint_service(service_region, zones): 1358 """List of dicts representing default VPC endpoints for this service.""" 1359 accesspoint = { 1360 "AcceptanceRequired": False, 1361 "AvailabilityZones": zones, 1362 "BaseEndpointDnsNames": [ 1363 f"accesspoint.s3-global.{service_region}.vpce.amazonaws.com", 1364 ], 1365 "ManagesVpcEndpoints": False, 1366 "Owner": "amazon", 1367 "PrivateDnsName": "*.accesspoint.s3-global.amazonaws.com", 1368 "PrivateDnsNameVerificationState": "verified", 1369 "PrivateDnsNames": [ 1370 {"PrivateDnsName": "*.accesspoint.s3-global.amazonaws.com"} 1371 ], 1372 "ServiceId": f"vpce-svc-{BaseBackend.vpce_random_number()}", 1373 "ServiceName": "com.amazonaws.s3-global.accesspoint", 1374 "ServiceType": [{"ServiceType": "Interface"}], 1375 "Tags": [], 1376 "VpcEndpointPolicySupported": True, 1377 } 1378 return ( 1379 BaseBackend.default_vpc_endpoint_service_factory( 1380 service_region, zones, "s3", "Interface" 1381 ) 1382 + BaseBackend.default_vpc_endpoint_service_factory( 1383 service_region, zones, "s3", "Gateway" 1384 ) 1385 + [accesspoint] 1386 ) 1387 1388 # TODO: This is broken! DO NOT IMPORT MUTABLE DATA TYPES FROM OTHER AREAS -- THIS BREAKS UNMOCKING! 1389 # WRAP WITH A GETTER/SETTER FUNCTION 1390 # Register this class as a CloudWatch Metric Provider 1391 # Must provide a method 'get_cloudwatch_metrics' that will return a list of metrics, based on the data available 1392 # metric_providers["S3"] = self 1393 1394 @classmethod 1395 def get_cloudwatch_metrics(cls): 1396 metrics = [] 1397 for name, bucket in s3_backend.buckets.items(): 1398 metrics.append( 1399 MetricDatum( 1400 namespace="AWS/S3", 1401 name="BucketSizeBytes", 1402 value=bucket.keys.item_size(), 1403 dimensions=[ 1404 {"Name": "StorageType", "Value": "StandardStorage"}, 1405 {"Name": "BucketName", "Value": name}, 1406 ], 1407 timestamp=datetime.datetime.now(tz=pytz.utc).replace( 1408 hour=0, minute=0, second=0, microsecond=0 1409 ), 1410 unit="Bytes", 1411 ) 1412 ) 1413 metrics.append( 1414 MetricDatum( 1415 namespace="AWS/S3", 1416 name="NumberOfObjects", 1417 value=len(bucket.keys), 1418 dimensions=[ 1419 {"Name": "StorageType", "Value": "AllStorageTypes"}, 1420 {"Name": "BucketName", "Value": name}, 1421 ], 1422 timestamp=datetime.datetime.now(tz=pytz.utc).replace( 1423 hour=0, minute=0, second=0, microsecond=0 1424 ), 1425 unit="Count", 1426 ) 1427 ) 1428 return metrics 1429 1430 def create_bucket(self, bucket_name, region_name): 1431 if bucket_name in self.buckets: 1432 raise BucketAlreadyExists(bucket=bucket_name) 1433 if not MIN_BUCKET_NAME_LENGTH <= len(bucket_name) <= MAX_BUCKET_NAME_LENGTH: 1434 raise InvalidBucketName() 1435 new_bucket = FakeBucket(name=bucket_name, region_name=region_name) 1436 1437 self.buckets[bucket_name] = new_bucket 1438 return new_bucket 1439 1440 def list_buckets(self): 1441 return self.buckets.values() 1442 1443 def get_bucket(self, bucket_name): 1444 try: 1445 return self.buckets[bucket_name] 1446 except KeyError: 1447 raise MissingBucket(bucket=bucket_name) 1448 1449 def head_bucket(self, bucket_name): 1450 return self.get_bucket(bucket_name) 1451 1452 def delete_bucket(self, bucket_name): 1453 bucket = self.get_bucket(bucket_name) 1454 if bucket.keys: 1455 # Can't delete a bucket with keys 1456 return False 1457 else: 1458 return self.buckets.pop(bucket_name) 1459 1460 def set_bucket_versioning(self, bucket_name, status): 1461 self.get_bucket(bucket_name).versioning_status = status 1462 1463 def get_bucket_versioning(self, bucket_name): 1464 return self.get_bucket(bucket_name).versioning_status 1465 1466 def get_bucket_encryption(self, bucket_name): 1467 return self.get_bucket(bucket_name).encryption 1468 1469 def list_object_versions( 1470 self, 1471 bucket_name, 1472 delimiter=None, 1473 encoding_type=None, 1474 key_marker=None, 1475 max_keys=None, 1476 version_id_marker=None, 1477 prefix="", 1478 ): 1479 bucket = self.get_bucket(bucket_name) 1480 1481 common_prefixes = [] 1482 requested_versions = [] 1483 delete_markers = [] 1484 all_versions = itertools.chain( 1485 *(copy.deepcopy(l) for key, l in bucket.keys.iterlists()) 1486 ) 1487 all_versions = list(all_versions) 1488 # sort by name, revert last-modified-date 1489 all_versions.sort(key=lambda r: (r.name, -unix_time_millis(r.last_modified))) 1490 last_name = None 1491 for version in all_versions: 1492 name = version.name 1493 # guaranteed to be sorted - so the first key with this name will be the latest 1494 version.is_latest = name != last_name 1495 if version.is_latest: 1496 last_name = name 1497 # Differentiate between FakeKey and FakeDeleteMarkers 1498 if not isinstance(version, FakeKey): 1499 delete_markers.append(version) 1500 continue 1501 # skip all keys that alphabetically come before keymarker 1502 if key_marker and name < key_marker: 1503 continue 1504 # Filter for keys that start with prefix 1505 if not name.startswith(prefix): 1506 continue 1507 # separate out all keys that contain delimiter 1508 if delimiter and delimiter in name: 1509 index = name.index(delimiter) + len(delimiter) 1510 prefix_including_delimiter = name[0:index] 1511 common_prefixes.append(prefix_including_delimiter) 1512 continue 1513 1514 requested_versions.append(version) 1515 1516 common_prefixes = sorted(set(common_prefixes)) 1517 1518 return requested_versions, common_prefixes, delete_markers 1519 1520 def get_bucket_policy(self, bucket_name): 1521 return self.get_bucket(bucket_name).policy 1522 1523 def put_bucket_policy(self, bucket_name, policy): 1524 self.get_bucket(bucket_name).policy = policy 1525 1526 def delete_bucket_policy(self, bucket_name, body): 1527 bucket = self.get_bucket(bucket_name) 1528 bucket.policy = None 1529 1530 def put_bucket_encryption(self, bucket_name, encryption): 1531 self.get_bucket(bucket_name).encryption = encryption 1532 1533 def delete_bucket_encryption(self, bucket_name): 1534 self.get_bucket(bucket_name).encryption = None 1535 1536 def get_bucket_replication(self, bucket_name): 1537 bucket = self.get_bucket(bucket_name) 1538 return getattr(bucket, "replication", None) 1539 1540 def put_bucket_replication(self, bucket_name, replication): 1541 if isinstance(replication["Rule"], dict): 1542 replication["Rule"] = [replication["Rule"]] 1543 for rule in replication["Rule"]: 1544 if "Priority" not in rule: 1545 rule["Priority"] = 1 1546 if "ID" not in rule: 1547 rule["ID"] = "".join( 1548 random.choice(string.ascii_letters + string.digits) 1549 for _ in range(30) 1550 ) 1551 bucket = self.get_bucket(bucket_name) 1552 bucket.replication = replication 1553 1554 def delete_bucket_replication(self, bucket_name): 1555 bucket = self.get_bucket(bucket_name) 1556 bucket.replication = None 1557 1558 def put_bucket_lifecycle(self, bucket_name, rules): 1559 bucket = self.get_bucket(bucket_name) 1560 bucket.set_lifecycle(rules) 1561 1562 def delete_bucket_lifecycle(self, bucket_name): 1563 bucket = self.get_bucket(bucket_name) 1564 bucket.delete_lifecycle() 1565 1566 def set_bucket_website_configuration(self, bucket_name, website_configuration): 1567 bucket = self.get_bucket(bucket_name) 1568 bucket.website_configuration = website_configuration 1569 1570 def get_bucket_website_configuration(self, bucket_name): 1571 bucket = self.get_bucket(bucket_name) 1572 return bucket.website_configuration 1573 1574 def delete_bucket_website(self, bucket_name): 1575 bucket = self.get_bucket(bucket_name) 1576 bucket.website_configuration = None 1577 1578 def get_public_access_block(self, bucket_name): 1579 bucket = self.get_bucket(bucket_name) 1580 1581 if not bucket.public_access_block: 1582 raise NoSuchPublicAccessBlockConfiguration() 1583 1584 return bucket.public_access_block 1585 1586 def get_account_public_access_block(self, account_id): 1587 # The account ID should equal the account id that is set for Moto: 1588 if account_id != ACCOUNT_ID: 1589 raise WrongPublicAccessBlockAccountIdError() 1590 1591 if not self.account_public_access_block: 1592 raise NoSuchPublicAccessBlockConfiguration() 1593 1594 return self.account_public_access_block 1595 1596 def put_object( 1597 self, 1598 bucket_name, 1599 key_name, 1600 value, 1601 storage=None, 1602 etag=None, 1603 multipart=None, 1604 encryption=None, 1605 kms_key_id=None, 1606 bucket_key_enabled=None, 1607 lock_mode=None, 1608 lock_legal_status=None, 1609 lock_until=None, 1610 ): 1611 key_name = clean_key_name(key_name) 1612 if storage is not None and storage not in STORAGE_CLASS: 1613 raise InvalidStorageClass(storage=storage) 1614 1615 bucket = self.get_bucket(bucket_name) 1616 1617 # getting default config from bucket if not included in put request 1618 if bucket.encryption: 1619 bucket_key_enabled = bucket_key_enabled or bucket.encryption["Rule"].get( 1620 "BucketKeyEnabled", False 1621 ) 1622 kms_key_id = kms_key_id or bucket.encryption["Rule"][ 1623 "ApplyServerSideEncryptionByDefault" 1624 ].get("KMSMasterKeyID") 1625 encryption = ( 1626 encryption 1627 or bucket.encryption["Rule"]["ApplyServerSideEncryptionByDefault"][ 1628 "SSEAlgorithm" 1629 ] 1630 ) 1631 1632 new_key = FakeKey( 1633 name=key_name, 1634 value=value, 1635 storage=storage, 1636 etag=etag, 1637 is_versioned=bucket.is_versioned, 1638 version_id=str(uuid.uuid4()) if bucket.is_versioned else "null", 1639 multipart=multipart, 1640 encryption=encryption, 1641 kms_key_id=kms_key_id, 1642 bucket_key_enabled=bucket_key_enabled, 1643 lock_mode=lock_mode, 1644 lock_legal_status=lock_legal_status, 1645 lock_until=lock_until, 1646 ) 1647 1648 keys = [ 1649 key 1650 for key in bucket.keys.getlist(key_name, []) 1651 if key.version_id != new_key.version_id 1652 ] + [new_key] 1653 bucket.keys.setlist(key_name, keys) 1654 1655 return new_key 1656 1657 def put_object_acl(self, bucket_name, key_name, acl): 1658 key = self.get_object(bucket_name, key_name) 1659 # TODO: Support the XML-based ACL format 1660 if key is not None: 1661 key.set_acl(acl) 1662 else: 1663 raise MissingKey(key=key_name) 1664 1665 def put_object_legal_hold( 1666 self, bucket_name, key_name, version_id, legal_hold_status 1667 ): 1668 key = self.get_object(bucket_name, key_name, version_id=version_id) 1669 key.lock_legal_status = legal_hold_status 1670 1671 def put_object_retention(self, bucket_name, key_name, version_id, retention): 1672 key = self.get_object(bucket_name, key_name, version_id=version_id) 1673 key.lock_mode = retention[0] 1674 key.lock_until = retention[1] 1675 1676 def append_to_key(self, bucket_name, key_name, value): 1677 key = self.get_object(bucket_name, key_name) 1678 key.append_to_value(value) 1679 return key 1680 1681 def get_object(self, bucket_name, key_name, version_id=None, part_number=None): 1682 key_name = clean_key_name(key_name) 1683 bucket = self.get_bucket(bucket_name) 1684 key = None 1685 1686 if bucket: 1687 if version_id is None: 1688 if key_name in bucket.keys: 1689 key = bucket.keys[key_name] 1690 else: 1691 for key_version in bucket.keys.getlist(key_name, default=[]): 1692 if str(key_version.version_id) == str(version_id): 1693 key = key_version 1694 break 1695 1696 if part_number and key and key.multipart: 1697 key = key.multipart.parts[part_number] 1698 1699 if isinstance(key, FakeKey): 1700 return key 1701 else: 1702 return None 1703 1704 def head_object(self, bucket_name, key_name, version_id=None, part_number=None): 1705 return self.get_object(bucket_name, key_name, version_id, part_number) 1706 1707 def get_object_acl(self, key): 1708 return key.acl 1709 1710 def get_object_legal_hold(self, key): 1711 return key.lock_legal_status 1712 1713 def get_object_lock_configuration(self, bucket_name): 1714 bucket = self.get_bucket(bucket_name) 1715 if not bucket.object_lock_enabled: 1716 raise ObjectLockConfigurationNotFoundError 1717 return ( 1718 bucket.object_lock_enabled, 1719 bucket.default_lock_mode, 1720 bucket.default_lock_days, 1721 bucket.default_lock_years, 1722 ) 1723 1724 def get_object_tagging(self, key): 1725 return self.tagger.list_tags_for_resource(key.arn) 1726 1727 def set_key_tags(self, key, tags, key_name=None): 1728 if key is None: 1729 raise MissingKey(key=key_name) 1730 boto_tags_dict = self.tagger.convert_dict_to_tags_input(tags) 1731 errmsg = self.tagger.validate_tags(boto_tags_dict) 1732 if errmsg: 1733 raise InvalidTagError(errmsg) 1734 self.tagger.delete_all_tags_for_resource(key.arn) 1735 self.tagger.tag_resource( 1736 key.arn, boto_tags_dict, 1737 ) 1738 return key 1739 1740 def get_bucket_tagging(self, bucket_name): 1741 bucket = self.get_bucket(bucket_name) 1742 return self.tagger.list_tags_for_resource(bucket.arn) 1743 1744 def put_bucket_tagging(self, bucket_name, tags): 1745 bucket = self.get_bucket(bucket_name) 1746 self.tagger.delete_all_tags_for_resource(bucket.arn) 1747 self.tagger.tag_resource( 1748 bucket.arn, [{"Key": key, "Value": value} for key, value in tags.items()], 1749 ) 1750 1751 def put_object_lock_configuration( 1752 self, bucket_name, lock_enabled, mode=None, days=None, years=None 1753 ): 1754 bucket = self.get_bucket(bucket_name) 1755 1756 if bucket.keys.item_size() > 0: 1757 raise BucketNeedsToBeNew 1758 1759 if lock_enabled: 1760 bucket.object_lock_enabled = True 1761 bucket.versioning_status = "Enabled" 1762 1763 bucket.default_lock_mode = mode 1764 bucket.default_lock_days = days 1765 bucket.default_lock_years = years 1766 1767 def delete_bucket_tagging(self, bucket_name): 1768 bucket = self.get_bucket(bucket_name) 1769 self.tagger.delete_all_tags_for_resource(bucket.arn) 1770 1771 def put_bucket_cors(self, bucket_name, cors_rules): 1772 bucket = self.get_bucket(bucket_name) 1773 bucket.set_cors(cors_rules) 1774 1775 def put_bucket_logging(self, bucket_name, logging_config): 1776 bucket = self.get_bucket(bucket_name) 1777 bucket.set_logging(logging_config, self) 1778 1779 def delete_bucket_cors(self, bucket_name): 1780 bucket = self.get_bucket(bucket_name) 1781 bucket.delete_cors() 1782 1783 def delete_public_access_block(self, bucket_name): 1784 bucket = self.get_bucket(bucket_name) 1785 bucket.public_access_block = None 1786 1787 def delete_account_public_access_block(self, account_id): 1788 # The account ID should equal the account id that is set for Moto: 1789 if account_id != ACCOUNT_ID: 1790 raise WrongPublicAccessBlockAccountIdError() 1791 1792 self.account_public_access_block = None 1793 1794 def put_bucket_notification_configuration(self, bucket_name, notification_config): 1795 bucket = self.get_bucket(bucket_name) 1796 bucket.set_notification_configuration(notification_config) 1797 1798 def put_bucket_accelerate_configuration( 1799 self, bucket_name, accelerate_configuration 1800 ): 1801 if accelerate_configuration not in ["Enabled", "Suspended"]: 1802 raise MalformedXML() 1803 1804 bucket = self.get_bucket(bucket_name) 1805 if bucket.name.find(".") != -1: 1806 raise InvalidRequest("PutBucketAccelerateConfiguration") 1807 bucket.set_accelerate_configuration(accelerate_configuration) 1808 1809 def put_bucket_public_access_block(self, bucket_name, pub_block_config): 1810 bucket = self.get_bucket(bucket_name) 1811 1812 if not pub_block_config: 1813 raise InvalidPublicAccessBlockConfiguration() 1814 1815 bucket.public_access_block = PublicAccessBlock( 1816 pub_block_config.get("BlockPublicAcls"), 1817 pub_block_config.get("IgnorePublicAcls"), 1818 pub_block_config.get("BlockPublicPolicy"), 1819 pub_block_config.get("RestrictPublicBuckets"), 1820 ) 1821 1822 def put_account_public_access_block(self, account_id, pub_block_config): 1823 # The account ID should equal the account id that is set for Moto: 1824 if account_id != ACCOUNT_ID: 1825 raise WrongPublicAccessBlockAccountIdError() 1826 1827 if not pub_block_config: 1828 raise InvalidPublicAccessBlockConfiguration() 1829 1830 self.account_public_access_block = PublicAccessBlock( 1831 pub_block_config.get("BlockPublicAcls"), 1832 pub_block_config.get("IgnorePublicAcls"), 1833 pub_block_config.get("BlockPublicPolicy"), 1834 pub_block_config.get("RestrictPublicBuckets"), 1835 ) 1836 1837 def initiate_multipart(self, bucket_name, key_name, metadata): 1838 bucket = self.get_bucket(bucket_name) 1839 new_multipart = FakeMultipart(key_name, metadata) 1840 bucket.multiparts[new_multipart.id] = new_multipart 1841 1842 return new_multipart 1843 1844 def complete_multipart(self, bucket_name, multipart_id, body): 1845 bucket = self.get_bucket(bucket_name) 1846 multipart = bucket.multiparts[multipart_id] 1847 value, etag = multipart.complete(body) 1848 if value is None: 1849 return 1850 del bucket.multiparts[multipart_id] 1851 1852 key = self.put_object( 1853 bucket_name, multipart.key_name, value, etag=etag, multipart=multipart 1854 ) 1855 key.set_metadata(multipart.metadata) 1856 return key 1857 1858 def abort_multipart_upload(self, bucket_name, multipart_id): 1859 bucket = self.get_bucket(bucket_name) 1860 multipart_data = bucket.multiparts.get(multipart_id, None) 1861 if not multipart_data: 1862 raise NoSuchUpload(upload_id=multipart_id) 1863 del bucket.multiparts[multipart_id] 1864 1865 def list_parts( 1866 self, bucket_name, multipart_id, part_number_marker=0, max_parts=1000 1867 ): 1868 bucket = self.get_bucket(bucket_name) 1869 if multipart_id not in bucket.multiparts: 1870 raise NoSuchUpload(upload_id=multipart_id) 1871 return list( 1872 bucket.multiparts[multipart_id].list_parts(part_number_marker, max_parts) 1873 ) 1874 1875 def is_truncated(self, bucket_name, multipart_id, next_part_number_marker): 1876 bucket = self.get_bucket(bucket_name) 1877 return len(bucket.multiparts[multipart_id].parts) >= next_part_number_marker 1878 1879 def create_multipart_upload( 1880 self, bucket_name, key_name, metadata, storage_type, tags 1881 ): 1882 multipart = FakeMultipart(key_name, metadata, storage=storage_type, tags=tags) 1883 1884 bucket = self.get_bucket(bucket_name) 1885 bucket.multiparts[multipart.id] = multipart 1886 return multipart.id 1887 1888 def complete_multipart_upload(self, bucket_name, multipart_id, body): 1889 bucket = self.get_bucket(bucket_name) 1890 multipart = bucket.multiparts[multipart_id] 1891 value, etag = multipart.complete(body) 1892 if value is not None: 1893 del bucket.multiparts[multipart_id] 1894 return multipart, value, etag 1895 1896 def get_all_multiparts(self, bucket_name): 1897 bucket = self.get_bucket(bucket_name) 1898 return bucket.multiparts 1899 1900 def upload_part(self, bucket_name, multipart_id, part_id, value): 1901 bucket = self.get_bucket(bucket_name) 1902 multipart = bucket.multiparts[multipart_id] 1903 return multipart.set_part(part_id, value) 1904 1905 def copy_part( 1906 self, 1907 dest_bucket_name, 1908 multipart_id, 1909 part_id, 1910 src_bucket_name, 1911 src_key_name, 1912 src_version_id, 1913 start_byte, 1914 end_byte, 1915 ): 1916 dest_bucket = self.get_bucket(dest_bucket_name) 1917 multipart = dest_bucket.multiparts[multipart_id] 1918 1919 src_value = self.get_object( 1920 src_bucket_name, src_key_name, version_id=src_version_id 1921 ).value 1922 if start_byte is not None: 1923 src_value = src_value[start_byte : end_byte + 1] 1924 return multipart.set_part(part_id, src_value) 1925 1926 def list_objects(self, bucket, prefix, delimiter): 1927 key_results = set() 1928 folder_results = set() 1929 if prefix: 1930 for key_name, key in bucket.keys.items(): 1931 if key_name.startswith(prefix): 1932 key_without_prefix = key_name.replace(prefix, "", 1) 1933 if delimiter and delimiter in key_without_prefix: 1934 # If delimiter, we need to split out folder_results 1935 key_without_delimiter = key_without_prefix.split(delimiter)[0] 1936 folder_results.add( 1937 "{0}{1}{2}".format(prefix, key_without_delimiter, delimiter) 1938 ) 1939 else: 1940 key_results.add(key) 1941 else: 1942 for key_name, key in bucket.keys.items(): 1943 if delimiter and delimiter in key_name: 1944 # If delimiter, we need to split out folder_results 1945 folder_results.add(key_name.split(delimiter)[0] + delimiter) 1946 else: 1947 key_results.add(key) 1948 1949 key_results = filter( 1950 lambda key: not isinstance(key, FakeDeleteMarker), key_results 1951 ) 1952 key_results = sorted(key_results, key=lambda key: key.name) 1953 folder_results = [ 1954 folder_name for folder_name in sorted(folder_results, key=lambda key: key) 1955 ] 1956 1957 return key_results, folder_results 1958 1959 def list_objects_v2(self, bucket, prefix, delimiter): 1960 result_keys, result_folders = self.list_objects(bucket, prefix, delimiter) 1961 # sort the combination of folders and keys into lexicographical order 1962 all_keys = result_keys + result_folders 1963 all_keys.sort(key=self._get_name) 1964 return all_keys 1965 1966 @staticmethod 1967 def _get_name(key): 1968 if isinstance(key, FakeKey): 1969 return key.name 1970 else: 1971 return key 1972 1973 def _set_delete_marker(self, bucket_name, key_name): 1974 bucket = self.get_bucket(bucket_name) 1975 delete_marker = FakeDeleteMarker(key=bucket.keys[key_name]) 1976 bucket.keys[key_name] = delete_marker 1977 return delete_marker 1978 1979 def delete_object_tagging(self, bucket_name, key_name, version_id=None): 1980 key = self.get_object(bucket_name, key_name, version_id=version_id) 1981 self.tagger.delete_all_tags_for_resource(key.arn) 1982 1983 def delete_object(self, bucket_name, key_name, version_id=None, bypass=False): 1984 key_name = clean_key_name(key_name) 1985 bucket = self.get_bucket(bucket_name) 1986 1987 response_meta = {} 1988 1989 try: 1990 if not bucket.is_versioned: 1991 bucket.keys.pop(key_name) 1992 else: 1993 if version_id is None: 1994 delete_marker = self._set_delete_marker(bucket_name, key_name) 1995 response_meta["version-id"] = delete_marker.version_id 1996 else: 1997 if key_name not in bucket.keys: 1998 raise KeyError 1999 2000 response_meta["delete-marker"] = "false" 2001 for key in bucket.keys.getlist(key_name): 2002 if str(key.version_id) == str(version_id): 2003 2004 if ( 2005 hasattr(key, "is_locked") 2006 and key.is_locked 2007 and not bypass 2008 ): 2009 raise AccessDeniedByLock 2010 2011 if type(key) is FakeDeleteMarker: 2012 response_meta["delete-marker"] = "true" 2013 break 2014 2015 bucket.keys.setlist( 2016 key_name, 2017 [ 2018 key 2019 for key in bucket.keys.getlist(key_name) 2020 if str(key.version_id) != str(version_id) 2021 ], 2022 ) 2023 2024 if not bucket.keys.getlist(key_name): 2025 bucket.keys.pop(key_name) 2026 return True, response_meta 2027 except KeyError: 2028 return False, None 2029 2030 def delete_objects(self, bucket_name, objects): 2031 deleted_objects = [] 2032 for object_ in objects: 2033 key_name = object_["Key"] 2034 version_id = object_.get("VersionId", None) 2035 2036 self.delete_object( 2037 bucket_name, undo_clean_key_name(key_name), version_id=version_id 2038 ) 2039 deleted_objects.append((key_name, version_id)) 2040 return deleted_objects 2041 2042 def copy_object( 2043 self, 2044 src_bucket_name, 2045 src_key_name, 2046 dest_bucket_name, 2047 dest_key_name, 2048 storage=None, 2049 acl=None, 2050 src_version_id=None, 2051 encryption=None, 2052 kms_key_id=None, 2053 ): 2054 key = self.get_object(src_bucket_name, src_key_name, version_id=src_version_id) 2055 2056 new_key = self.put_object( 2057 bucket_name=dest_bucket_name, 2058 key_name=dest_key_name, 2059 value=key.value, 2060 storage=storage or key.storage_class, 2061 multipart=key.multipart, 2062 encryption=encryption or key.encryption, 2063 kms_key_id=kms_key_id or key.kms_key_id, 2064 bucket_key_enabled=key.bucket_key_enabled, 2065 lock_mode=key.lock_mode, 2066 lock_legal_status=key.lock_legal_status, 2067 lock_until=key.lock_until, 2068 ) 2069 self.tagger.copy_tags(key.arn, new_key.arn) 2070 2071 if acl is not None: 2072 new_key.set_acl(acl) 2073 if key.storage_class in "GLACIER": 2074 # Object copied from Glacier object should not have expiry 2075 new_key.set_expiry(None) 2076 2077 def put_bucket_acl(self, bucket_name, acl): 2078 bucket = self.get_bucket(bucket_name) 2079 bucket.set_acl(acl) 2080 2081 def get_bucket_acl(self, bucket_name): 2082 bucket = self.get_bucket(bucket_name) 2083 return bucket.acl 2084 2085 def get_bucket_cors(self, bucket_name): 2086 bucket = self.get_bucket(bucket_name) 2087 return bucket.cors 2088 2089 def get_bucket_lifecycle(self, bucket_name): 2090 bucket = self.get_bucket(bucket_name) 2091 return bucket.rules 2092 2093 def get_bucket_location(self, bucket_name): 2094 bucket = self.get_bucket(bucket_name) 2095 2096 return bucket.location 2097 2098 def get_bucket_logging(self, bucket_name): 2099 bucket = self.get_bucket(bucket_name) 2100 return bucket.logging 2101 2102 def get_bucket_notification_configuration(self, bucket_name): 2103 bucket = self.get_bucket(bucket_name) 2104 return bucket.notification_configuration 2105 2106 2107s3_backend = S3Backend() 2108