1import json
2import os
3import base64
4import datetime
5import hashlib
6import copy
7import itertools
8import codecs
9import random
10import string
11import tempfile
12import threading
13import pytz
14import sys
15import time
16import uuid
17
18from bisect import insort
19from importlib import reload
20from moto.core import (
21    ACCOUNT_ID,
22    BaseBackend,
23    BaseModel,
24    CloudFormationModel,
25    CloudWatchMetricProvider,
26)
27
28from moto.core.utils import (
29    iso_8601_datetime_without_milliseconds_s3,
30    rfc_1123_datetime,
31    unix_time_millis,
32)
33from moto.cloudwatch.models import MetricDatum
34from moto.utilities.tagging_service import TaggingService
35from moto.utilities.utils import LowercaseDict
36from moto.s3.exceptions import (
37    AccessDeniedByLock,
38    BucketAlreadyExists,
39    BucketNeedsToBeNew,
40    MissingBucket,
41    InvalidBucketName,
42    InvalidPart,
43    InvalidRequest,
44    EntityTooSmall,
45    MissingKey,
46    InvalidNotificationDestination,
47    MalformedXML,
48    InvalidStorageClass,
49    InvalidTargetBucketForLogging,
50    CrossLocationLoggingProhibitted,
51    NoSuchPublicAccessBlockConfiguration,
52    InvalidPublicAccessBlockConfiguration,
53    WrongPublicAccessBlockAccountIdError,
54    NoSuchUpload,
55    ObjectLockConfigurationNotFoundError,
56    InvalidTagError,
57)
58from .cloud_formation import cfn_to_api_encryption, is_replacement_update
59from .utils import clean_key_name, _VersionedKeyStore, undo_clean_key_name
60from ..settings import get_s3_default_key_buffer_size, S3_UPLOAD_PART_MIN_SIZE
61
62MAX_BUCKET_NAME_LENGTH = 63
63MIN_BUCKET_NAME_LENGTH = 3
64UPLOAD_ID_BYTES = 43
65STORAGE_CLASS = [
66    "STANDARD",
67    "REDUCED_REDUNDANCY",
68    "STANDARD_IA",
69    "ONEZONE_IA",
70    "INTELLIGENT_TIERING",
71    "GLACIER",
72    "DEEP_ARCHIVE",
73]
74DEFAULT_TEXT_ENCODING = sys.getdefaultencoding()
75OWNER = "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a"
76
77
78def get_moto_s3_account_id():
79    """This makes it easy for mocking AWS Account IDs when using AWS Config
80    -- Simply mock.patch the ACCOUNT_ID here, and Config gets it for free.
81    """
82    return ACCOUNT_ID
83
84
85class FakeDeleteMarker(BaseModel):
86    def __init__(self, key):
87        self.key = key
88        self.name = key.name
89        self.last_modified = datetime.datetime.utcnow()
90        self._version_id = str(uuid.uuid4())
91
92    @property
93    def last_modified_ISO8601(self):
94        return iso_8601_datetime_without_milliseconds_s3(self.last_modified)
95
96    @property
97    def version_id(self):
98        return self._version_id
99
100
101class FakeKey(BaseModel):
102    def __init__(
103        self,
104        name,
105        value,
106        storage="STANDARD",
107        etag=None,
108        is_versioned=False,
109        version_id=0,
110        max_buffer_size=None,
111        multipart=None,
112        bucket_name=None,
113        encryption=None,
114        kms_key_id=None,
115        bucket_key_enabled=None,
116        lock_mode=None,
117        lock_legal_status=None,
118        lock_until=None,
119    ):
120        self.name = name
121        self.last_modified = datetime.datetime.utcnow()
122        self.acl = get_canned_acl("private")
123        self.website_redirect_location = None
124        self._storage_class = storage if storage else "STANDARD"
125        self._metadata = LowercaseDict()
126        self._expiry = None
127        self._etag = etag
128        self._version_id = version_id
129        self._is_versioned = is_versioned
130        self.multipart = multipart
131        self.bucket_name = bucket_name
132
133        self._max_buffer_size = (
134            max_buffer_size if max_buffer_size else get_s3_default_key_buffer_size()
135        )
136        self._value_buffer = tempfile.SpooledTemporaryFile(self._max_buffer_size)
137        self.value = value
138        self.lock = threading.Lock()
139
140        self.encryption = encryption
141        self.kms_key_id = kms_key_id
142        self.bucket_key_enabled = bucket_key_enabled
143
144        self.lock_mode = lock_mode
145        self.lock_legal_status = lock_legal_status
146        self.lock_until = lock_until
147
148        # Default metadata values
149        self._metadata["Content-Type"] = "binary/octet-stream"
150
151    @property
152    def version_id(self):
153        return self._version_id
154
155    @property
156    def value(self):
157        self.lock.acquire()
158        self._value_buffer.seek(0)
159        r = self._value_buffer.read()
160        r = copy.copy(r)
161        self.lock.release()
162        return r
163
164    @property
165    def arn(self):
166        # S3 Objects don't have an ARN, but we do need something unique when creating tags against this resource
167        return "arn:aws:s3:::{}/{}/{}".format(
168            self.bucket_name, self.name, self.version_id
169        )
170
171    @value.setter
172    def value(self, new_value):
173        self._value_buffer.seek(0)
174        self._value_buffer.truncate()
175
176        # Hack for working around moto's own unit tests; this probably won't
177        # actually get hit in normal use.
178        if isinstance(new_value, str):
179            new_value = new_value.encode(DEFAULT_TEXT_ENCODING)
180        self._value_buffer.write(new_value)
181        self.contentsize = len(new_value)
182
183    def set_metadata(self, metadata, replace=False):
184        if replace:
185            self._metadata = {}
186        self._metadata.update(metadata)
187
188    def set_storage_class(self, storage):
189        if storage is not None and storage not in STORAGE_CLASS:
190            raise InvalidStorageClass(storage=storage)
191        self._storage_class = storage
192
193    def set_expiry(self, expiry):
194        self._expiry = expiry
195
196    def set_acl(self, acl):
197        self.acl = acl
198
199    def append_to_value(self, value):
200        self.contentsize += len(value)
201        self._value_buffer.seek(0, os.SEEK_END)
202        self._value_buffer.write(value)
203
204        self.last_modified = datetime.datetime.utcnow()
205        self._etag = None  # must recalculate etag
206        if self._is_versioned:
207            self._version_id = str(uuid.uuid4())
208        else:
209            self._version_id = None
210
211    def restore(self, days):
212        self._expiry = datetime.datetime.utcnow() + datetime.timedelta(days)
213
214    @property
215    def etag(self):
216        if self._etag is None:
217            value_md5 = hashlib.md5()
218            self._value_buffer.seek(0)
219            while True:
220                block = self._value_buffer.read(16 * 1024 * 1024)  # read in 16MB chunks
221                if not block:
222                    break
223                value_md5.update(block)
224
225            self._etag = value_md5.hexdigest()
226        return '"{0}"'.format(self._etag)
227
228    @property
229    def last_modified_ISO8601(self):
230        return iso_8601_datetime_without_milliseconds_s3(self.last_modified)
231
232    @property
233    def last_modified_RFC1123(self):
234        # Different datetime formats depending on how the key is obtained
235        # https://github.com/boto/boto/issues/466
236        return rfc_1123_datetime(self.last_modified)
237
238    @property
239    def metadata(self):
240        return self._metadata
241
242    @property
243    def response_dict(self):
244        res = {
245            "ETag": self.etag,
246            "last-modified": self.last_modified_RFC1123,
247            "content-length": str(self.size),
248        }
249        if self.encryption is not None:
250            res["x-amz-server-side-encryption"] = self.encryption
251            if self.encryption == "aws:kms" and self.kms_key_id is not None:
252                res["x-amz-server-side-encryption-aws-kms-key-id"] = self.kms_key_id
253        if self.bucket_key_enabled is not None:
254            res[
255                "x-amz-server-side-encryption-bucket-key-enabled"
256            ] = self.bucket_key_enabled
257        if self._storage_class != "STANDARD":
258            res["x-amz-storage-class"] = self._storage_class
259        if self._expiry is not None:
260            rhdr = 'ongoing-request="false", expiry-date="{0}"'
261            res["x-amz-restore"] = rhdr.format(self.expiry_date)
262
263        if self._is_versioned:
264            res["x-amz-version-id"] = str(self.version_id)
265
266        if self.website_redirect_location:
267            res["x-amz-website-redirect-location"] = self.website_redirect_location
268        if self.lock_legal_status:
269            res["x-amz-object-lock-legal-hold"] = self.lock_legal_status
270        if self.lock_until:
271            res["x-amz-object-lock-retain-until-date"] = self.lock_until
272        if self.lock_mode:
273            res["x-amz-object-lock-mode"] = self.lock_mode
274
275        if self.lock_legal_status:
276            res["x-amz-object-lock-legal-hold"] = self.lock_legal_status
277        if self.lock_until:
278            res["x-amz-object-lock-retain-until-date"] = self.lock_until
279        if self.lock_mode:
280            res["x-amz-object-lock-mode"] = self.lock_mode
281
282        return res
283
284    @property
285    def size(self):
286        return self.contentsize
287
288    @property
289    def storage_class(self):
290        return self._storage_class
291
292    @property
293    def expiry_date(self):
294        if self._expiry is not None:
295            return self._expiry.strftime("%a, %d %b %Y %H:%M:%S GMT")
296
297    # Keys need to be pickleable due to some implementation details of boto3.
298    # Since file objects aren't pickleable, we need to override the default
299    # behavior. The following is adapted from the Python docs:
300    # https://docs.python.org/3/library/pickle.html#handling-stateful-objects
301    def __getstate__(self):
302        state = self.__dict__.copy()
303        state["value"] = self.value
304        del state["_value_buffer"]
305        del state["lock"]
306        return state
307
308    def __setstate__(self, state):
309        self.__dict__.update({k: v for k, v in state.items() if k != "value"})
310
311        self._value_buffer = tempfile.SpooledTemporaryFile(
312            max_size=self._max_buffer_size
313        )
314        self.value = state["value"]
315        self.lock = threading.Lock()
316
317    @property
318    def is_locked(self):
319        if self.lock_legal_status == "ON":
320            return True
321
322        if self.lock_mode == "COMPLIANCE":
323            now = datetime.datetime.utcnow()
324            try:
325                until = datetime.datetime.strptime(
326                    self.lock_until, "%Y-%m-%dT%H:%M:%SZ"
327                )
328            except ValueError:
329                until = datetime.datetime.strptime(
330                    self.lock_until, "%Y-%m-%dT%H:%M:%S.%fZ"
331                )
332
333            if until > now:
334                return True
335
336        return False
337
338
339class FakeMultipart(BaseModel):
340    def __init__(self, key_name, metadata, storage=None, tags=None):
341        self.key_name = key_name
342        self.metadata = metadata
343        self.storage = storage
344        self.tags = tags
345        self.parts = {}
346        self.partlist = []  # ordered list of part ID's
347        rand_b64 = base64.b64encode(os.urandom(UPLOAD_ID_BYTES))
348        self.id = (
349            rand_b64.decode("utf-8").replace("=", "").replace("+", "").replace("/", "")
350        )
351
352    def complete(self, body):
353        decode_hex = codecs.getdecoder("hex_codec")
354        total = bytearray()
355        md5s = bytearray()
356
357        last = None
358        count = 0
359        for pn, etag in body:
360            part = self.parts.get(pn)
361            part_etag = None
362            if part is not None:
363                part_etag = part.etag.replace('"', "")
364                etag = etag.replace('"', "")
365            if part is None or part_etag != etag:
366                raise InvalidPart()
367            if last is not None and last.contentsize < S3_UPLOAD_PART_MIN_SIZE:
368                raise EntityTooSmall()
369            md5s.extend(decode_hex(part_etag)[0])
370            total.extend(part.value)
371            last = part
372            count += 1
373
374        etag = hashlib.md5()
375        etag.update(bytes(md5s))
376        return total, "{0}-{1}".format(etag.hexdigest(), count)
377
378    def set_part(self, part_id, value):
379        if part_id < 1:
380            raise NoSuchUpload(upload_id=part_id)
381
382        key = FakeKey(part_id, value)
383        self.parts[part_id] = key
384        if part_id not in self.partlist:
385            insort(self.partlist, part_id)
386        return key
387
388    def list_parts(self, part_number_marker, max_parts):
389        for part_id in self.partlist:
390            part = self.parts[part_id]
391            if part_number_marker <= part.name < part_number_marker + max_parts:
392                yield part
393
394
395class FakeGrantee(BaseModel):
396    def __init__(self, id="", uri="", display_name=""):
397        self.id = id
398        self.uri = uri
399        self.display_name = display_name
400
401    def __eq__(self, other):
402        if not isinstance(other, FakeGrantee):
403            return False
404        return (
405            self.id == other.id
406            and self.uri == other.uri
407            and self.display_name == other.display_name
408        )
409
410    @property
411    def type(self):
412        return "Group" if self.uri else "CanonicalUser"
413
414    def __repr__(self):
415        return "FakeGrantee(display_name: '{}', id: '{}', uri: '{}')".format(
416            self.display_name, self.id, self.uri
417        )
418
419
420ALL_USERS_GRANTEE = FakeGrantee(uri="http://acs.amazonaws.com/groups/global/AllUsers")
421AUTHENTICATED_USERS_GRANTEE = FakeGrantee(
422    uri="http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
423)
424LOG_DELIVERY_GRANTEE = FakeGrantee(uri="http://acs.amazonaws.com/groups/s3/LogDelivery")
425
426PERMISSION_FULL_CONTROL = "FULL_CONTROL"
427PERMISSION_WRITE = "WRITE"
428PERMISSION_READ = "READ"
429PERMISSION_WRITE_ACP = "WRITE_ACP"
430PERMISSION_READ_ACP = "READ_ACP"
431
432CAMEL_CASED_PERMISSIONS = {
433    "FULL_CONTROL": "FullControl",
434    "WRITE": "Write",
435    "READ": "Read",
436    "WRITE_ACP": "WriteAcp",
437    "READ_ACP": "ReadAcp",
438}
439
440
441class FakeGrant(BaseModel):
442    def __init__(self, grantees, permissions):
443        self.grantees = grantees
444        self.permissions = permissions
445
446    def __repr__(self):
447        return "FakeGrant(grantees: {}, permissions: {})".format(
448            self.grantees, self.permissions
449        )
450
451
452class FakeAcl(BaseModel):
453    def __init__(self, grants=None):
454        grants = grants or []
455        self.grants = grants
456
457    @property
458    def public_read(self):
459        for grant in self.grants:
460            if ALL_USERS_GRANTEE in grant.grantees:
461                if PERMISSION_READ in grant.permissions:
462                    return True
463                if PERMISSION_FULL_CONTROL in grant.permissions:
464                    return True
465        return False
466
467    def __repr__(self):
468        return "FakeAcl(grants: {})".format(self.grants)
469
470    def to_config_dict(self):
471        """Returns the object into the format expected by AWS Config"""
472        data = {
473            "grantSet": None,  # Always setting this to None. Feel free to change.
474            "owner": {"displayName": None, "id": OWNER},
475        }
476
477        # Add details for each Grant:
478        grant_list = []
479        for grant in self.grants:
480            permissions = (
481                grant.permissions
482                if isinstance(grant.permissions, list)
483                else [grant.permissions]
484            )
485            for permission in permissions:
486                for grantee in grant.grantees:
487                    if grantee.uri:
488                        grant_list.append(
489                            {
490                                "grantee": grantee.uri.split(
491                                    "http://acs.amazonaws.com/groups/s3/"
492                                )[1],
493                                "permission": CAMEL_CASED_PERMISSIONS[permission],
494                            }
495                        )
496                    else:
497                        grant_list.append(
498                            {
499                                "grantee": {
500                                    "id": grantee.id,
501                                    "displayName": None
502                                    if not grantee.display_name
503                                    else grantee.display_name,
504                                },
505                                "permission": CAMEL_CASED_PERMISSIONS[permission],
506                            }
507                        )
508
509        if grant_list:
510            data["grantList"] = grant_list
511
512        return data
513
514
515def get_canned_acl(acl):
516    owner_grantee = FakeGrantee(id=OWNER)
517    grants = [FakeGrant([owner_grantee], [PERMISSION_FULL_CONTROL])]
518    if acl == "private":
519        pass  # no other permissions
520    elif acl == "public-read":
521        grants.append(FakeGrant([ALL_USERS_GRANTEE], [PERMISSION_READ]))
522    elif acl == "public-read-write":
523        grants.append(
524            FakeGrant([ALL_USERS_GRANTEE], [PERMISSION_READ, PERMISSION_WRITE])
525        )
526    elif acl == "authenticated-read":
527        grants.append(FakeGrant([AUTHENTICATED_USERS_GRANTEE], [PERMISSION_READ]))
528    elif acl == "bucket-owner-read":
529        pass  # TODO: bucket owner ACL
530    elif acl == "bucket-owner-full-control":
531        pass  # TODO: bucket owner ACL
532    elif acl == "aws-exec-read":
533        pass  # TODO: bucket owner, EC2 Read
534    elif acl == "log-delivery-write":
535        grants.append(
536            FakeGrant([LOG_DELIVERY_GRANTEE], [PERMISSION_READ_ACP, PERMISSION_WRITE])
537        )
538    else:
539        assert False, "Unknown canned acl: %s" % (acl,)
540    return FakeAcl(grants=grants)
541
542
543class LifecycleFilter(BaseModel):
544    def __init__(self, prefix=None, tag=None, and_filter=None):
545        self.prefix = prefix
546        (self.tag_key, self.tag_value) = tag if tag else (None, None)
547        self.and_filter = and_filter
548
549    def to_config_dict(self):
550        if self.prefix is not None:
551            return {
552                "predicate": {"type": "LifecyclePrefixPredicate", "prefix": self.prefix}
553            }
554
555        elif self.tag_key:
556            return {
557                "predicate": {
558                    "type": "LifecycleTagPredicate",
559                    "tag": {"key": self.tag_key, "value": self.tag_value},
560                }
561            }
562
563        else:
564            return {
565                "predicate": {
566                    "type": "LifecycleAndOperator",
567                    "operands": self.and_filter.to_config_dict(),
568                }
569            }
570
571
572class LifecycleAndFilter(BaseModel):
573    def __init__(self, prefix=None, tags=None):
574        self.prefix = prefix
575        self.tags = tags
576
577    def to_config_dict(self):
578        data = []
579
580        if self.prefix is not None:
581            data.append({"type": "LifecyclePrefixPredicate", "prefix": self.prefix})
582
583        for key, value in self.tags.items():
584            data.append(
585                {"type": "LifecycleTagPredicate", "tag": {"key": key, "value": value},}
586            )
587
588        return data
589
590
591class LifecycleRule(BaseModel):
592    def __init__(
593        self,
594        id=None,
595        prefix=None,
596        lc_filter=None,
597        status=None,
598        expiration_days=None,
599        expiration_date=None,
600        transition_days=None,
601        transition_date=None,
602        storage_class=None,
603        expired_object_delete_marker=None,
604        nve_noncurrent_days=None,
605        nvt_noncurrent_days=None,
606        nvt_storage_class=None,
607        aimu_days=None,
608    ):
609        self.id = id
610        self.prefix = prefix
611        self.filter = lc_filter
612        self.status = status
613        self.expiration_days = expiration_days
614        self.expiration_date = expiration_date
615        self.transition_days = transition_days
616        self.transition_date = transition_date
617        self.storage_class = storage_class
618        self.expired_object_delete_marker = expired_object_delete_marker
619        self.nve_noncurrent_days = nve_noncurrent_days
620        self.nvt_noncurrent_days = nvt_noncurrent_days
621        self.nvt_storage_class = nvt_storage_class
622        self.aimu_days = aimu_days
623
624    def to_config_dict(self):
625        """Converts the object to the AWS Config data dict.
626
627        Note: The following are missing that should be added in the future:
628            - transitions (returns None for now)
629            - noncurrentVersionTransitions (returns None for now)
630
631        :param kwargs:
632        :return:
633        """
634
635        lifecycle_dict = {
636            "id": self.id,
637            "prefix": self.prefix,
638            "status": self.status,
639            "expirationInDays": int(self.expiration_days)
640            if self.expiration_days
641            else None,
642            "expiredObjectDeleteMarker": self.expired_object_delete_marker,
643            "noncurrentVersionExpirationInDays": -1 or int(self.nve_noncurrent_days),
644            "expirationDate": self.expiration_date,
645            "transitions": None,  # Replace me with logic to fill in
646            "noncurrentVersionTransitions": None,  # Replace me with logic to fill in
647        }
648
649        if self.aimu_days:
650            lifecycle_dict["abortIncompleteMultipartUpload"] = {
651                "daysAfterInitiation": self.aimu_days
652            }
653        else:
654            lifecycle_dict["abortIncompleteMultipartUpload"] = None
655
656        # Format the filter:
657        if self.prefix is None and self.filter is None:
658            lifecycle_dict["filter"] = {"predicate": None}
659
660        elif self.prefix:
661            lifecycle_dict["filter"] = None
662        else:
663            lifecycle_dict["filter"] = self.filter.to_config_dict()
664
665        return lifecycle_dict
666
667
668class CorsRule(BaseModel):
669    def __init__(
670        self,
671        allowed_methods,
672        allowed_origins,
673        allowed_headers=None,
674        expose_headers=None,
675        max_age_seconds=None,
676    ):
677        self.allowed_methods = (
678            [allowed_methods] if isinstance(allowed_methods, str) else allowed_methods
679        )
680        self.allowed_origins = (
681            [allowed_origins] if isinstance(allowed_origins, str) else allowed_origins
682        )
683        self.allowed_headers = (
684            [allowed_headers] if isinstance(allowed_headers, str) else allowed_headers
685        )
686        self.exposed_headers = (
687            [expose_headers] if isinstance(expose_headers, str) else expose_headers
688        )
689        self.max_age_seconds = max_age_seconds
690
691
692class Notification(BaseModel):
693    def __init__(self, arn, events, filters=None, id=None):
694        self.id = (
695            id
696            if id
697            else "".join(
698                random.choice(string.ascii_letters + string.digits) for _ in range(50)
699            )
700        )
701        self.arn = arn
702        self.events = events
703        self.filters = filters if filters else {}
704
705    def to_config_dict(self):
706        data = {}
707
708        # Type and ARN will be filled in by NotificationConfiguration's to_config_dict:
709        data["events"] = [event for event in self.events]
710
711        if self.filters:
712            data["filter"] = {
713                "s3KeyFilter": {
714                    "filterRules": [
715                        {"name": fr["Name"], "value": fr["Value"]}
716                        for fr in self.filters["S3Key"]["FilterRule"]
717                    ]
718                }
719            }
720        else:
721            data["filter"] = None
722
723        # Not sure why this is a thing since AWS just seems to return this as filters ¯\_(ツ)_/¯
724        data["objectPrefixes"] = []
725
726        return data
727
728
729class NotificationConfiguration(BaseModel):
730    def __init__(self, topic=None, queue=None, cloud_function=None):
731        self.topic = (
732            [
733                Notification(
734                    t["Topic"], t["Event"], filters=t.get("Filter"), id=t.get("Id")
735                )
736                for t in topic
737            ]
738            if topic
739            else []
740        )
741        self.queue = (
742            [
743                Notification(
744                    q["Queue"], q["Event"], filters=q.get("Filter"), id=q.get("Id")
745                )
746                for q in queue
747            ]
748            if queue
749            else []
750        )
751        self.cloud_function = (
752            [
753                Notification(
754                    c["CloudFunction"],
755                    c["Event"],
756                    filters=c.get("Filter"),
757                    id=c.get("Id"),
758                )
759                for c in cloud_function
760            ]
761            if cloud_function
762            else []
763        )
764
765    def to_config_dict(self):
766        data = {"configurations": {}}
767
768        for topic in self.topic:
769            topic_config = topic.to_config_dict()
770            topic_config["topicARN"] = topic.arn
771            topic_config["type"] = "TopicConfiguration"
772            data["configurations"][topic.id] = topic_config
773
774        for queue in self.queue:
775            queue_config = queue.to_config_dict()
776            queue_config["queueARN"] = queue.arn
777            queue_config["type"] = "QueueConfiguration"
778            data["configurations"][queue.id] = queue_config
779
780        for cloud_function in self.cloud_function:
781            cf_config = cloud_function.to_config_dict()
782            cf_config["queueARN"] = cloud_function.arn
783            cf_config["type"] = "LambdaConfiguration"
784            data["configurations"][cloud_function.id] = cf_config
785
786        return data
787
788
789def convert_str_to_bool(item):
790    """Converts a boolean string to a boolean value"""
791    if isinstance(item, str):
792        return item.lower() == "true"
793
794    return False
795
796
797class PublicAccessBlock(BaseModel):
798    def __init__(
799        self,
800        block_public_acls,
801        ignore_public_acls,
802        block_public_policy,
803        restrict_public_buckets,
804    ):
805        # The boto XML appears to expect these values to exist as lowercase strings...
806        self.block_public_acls = block_public_acls or "false"
807        self.ignore_public_acls = ignore_public_acls or "false"
808        self.block_public_policy = block_public_policy or "false"
809        self.restrict_public_buckets = restrict_public_buckets or "false"
810
811    def to_config_dict(self):
812        # Need to make the string values booleans for Config:
813        return {
814            "blockPublicAcls": convert_str_to_bool(self.block_public_acls),
815            "ignorePublicAcls": convert_str_to_bool(self.ignore_public_acls),
816            "blockPublicPolicy": convert_str_to_bool(self.block_public_policy),
817            "restrictPublicBuckets": convert_str_to_bool(self.restrict_public_buckets),
818        }
819
820
821class FakeBucket(CloudFormationModel):
822    def __init__(self, name, region_name):
823        self.name = name
824        self.region_name = region_name
825        self.keys = _VersionedKeyStore()
826        self.multiparts = {}
827        self.versioning_status = None
828        self.rules = []
829        self.policy = None
830        self.website_configuration = None
831        self.acl = get_canned_acl("private")
832        self.cors = []
833        self.logging = {}
834        self.notification_configuration = None
835        self.accelerate_configuration = None
836        self.payer = "BucketOwner"
837        self.creation_date = datetime.datetime.now(tz=pytz.utc)
838        self.public_access_block = None
839        self.encryption = None
840        self.object_lock_enabled = False
841        self.default_lock_mode = ""
842        self.default_lock_days = 0
843        self.default_lock_years = 0
844
845    @property
846    def location(self):
847        return self.region_name
848
849    @property
850    def creation_date_ISO8601(self):
851        return iso_8601_datetime_without_milliseconds_s3(self.creation_date)
852
853    @property
854    def is_versioned(self):
855        return self.versioning_status == "Enabled"
856
857    def set_lifecycle(self, rules):
858        self.rules = []
859        for rule in rules:
860            # Extract and validate actions from Lifecycle rule
861            expiration = rule.get("Expiration")
862            transition = rule.get("Transition")
863
864            try:
865                top_level_prefix = (
866                    rule["Prefix"] or ""
867                )  # If it's `None` the set to the empty string
868            except KeyError:
869                top_level_prefix = None
870
871            nve_noncurrent_days = None
872            if rule.get("NoncurrentVersionExpiration") is not None:
873                if rule["NoncurrentVersionExpiration"].get("NoncurrentDays") is None:
874                    raise MalformedXML()
875                nve_noncurrent_days = rule["NoncurrentVersionExpiration"][
876                    "NoncurrentDays"
877                ]
878
879            nvt_noncurrent_days = None
880            nvt_storage_class = None
881            if rule.get("NoncurrentVersionTransition") is not None:
882                if rule["NoncurrentVersionTransition"].get("NoncurrentDays") is None:
883                    raise MalformedXML()
884                if rule["NoncurrentVersionTransition"].get("StorageClass") is None:
885                    raise MalformedXML()
886                nvt_noncurrent_days = rule["NoncurrentVersionTransition"][
887                    "NoncurrentDays"
888                ]
889                nvt_storage_class = rule["NoncurrentVersionTransition"]["StorageClass"]
890
891            aimu_days = None
892            if rule.get("AbortIncompleteMultipartUpload") is not None:
893                if (
894                    rule["AbortIncompleteMultipartUpload"].get("DaysAfterInitiation")
895                    is None
896                ):
897                    raise MalformedXML()
898                aimu_days = rule["AbortIncompleteMultipartUpload"][
899                    "DaysAfterInitiation"
900                ]
901
902            eodm = None
903            if expiration and expiration.get("ExpiredObjectDeleteMarker") is not None:
904                # This cannot be set if Date or Days is set:
905                if expiration.get("Days") or expiration.get("Date"):
906                    raise MalformedXML()
907                eodm = expiration["ExpiredObjectDeleteMarker"]
908
909            # Pull out the filter:
910            lc_filter = None
911            if rule.get("Filter"):
912                # Can't have both `Filter` and `Prefix` (need to check for the presence of the key):
913                try:
914                    # 'Prefix' cannot be outside of a Filter:
915                    if rule["Prefix"] or not rule["Prefix"]:
916                        raise MalformedXML()
917                except KeyError:
918                    pass
919
920                filters = 0
921                try:
922                    prefix_filter = (
923                        rule["Filter"]["Prefix"] or ""
924                    )  # If it's `None` the set to the empty string
925                    filters += 1
926                except KeyError:
927                    prefix_filter = None
928
929                and_filter = None
930                if rule["Filter"].get("And"):
931                    filters += 1
932                    and_tags = {}
933                    if rule["Filter"]["And"].get("Tag"):
934                        if not isinstance(rule["Filter"]["And"]["Tag"], list):
935                            rule["Filter"]["And"]["Tag"] = [
936                                rule["Filter"]["And"]["Tag"]
937                            ]
938
939                        for t in rule["Filter"]["And"]["Tag"]:
940                            and_tags[t["Key"]] = t.get("Value", "")
941
942                    try:
943                        and_prefix = (
944                            rule["Filter"]["And"]["Prefix"] or ""
945                        )  # If it's `None` then set to the empty string
946                    except KeyError:
947                        and_prefix = None
948
949                    and_filter = LifecycleAndFilter(prefix=and_prefix, tags=and_tags)
950
951                filter_tag = None
952                if rule["Filter"].get("Tag"):
953                    filters += 1
954                    filter_tag = (
955                        rule["Filter"]["Tag"]["Key"],
956                        rule["Filter"]["Tag"].get("Value", ""),
957                    )
958
959                # Can't have more than 1 filter:
960                if filters > 1:
961                    raise MalformedXML()
962
963                lc_filter = LifecycleFilter(
964                    prefix=prefix_filter, tag=filter_tag, and_filter=and_filter
965                )
966
967            # If no top level prefix and no filter is present, then this is invalid:
968            if top_level_prefix is None:
969                try:
970                    rule["Filter"]
971                except KeyError:
972                    raise MalformedXML()
973
974            self.rules.append(
975                LifecycleRule(
976                    id=rule.get("ID"),
977                    prefix=top_level_prefix,
978                    lc_filter=lc_filter,
979                    status=rule["Status"],
980                    expiration_days=expiration.get("Days") if expiration else None,
981                    expiration_date=expiration.get("Date") if expiration else None,
982                    transition_days=transition.get("Days") if transition else None,
983                    transition_date=transition.get("Date") if transition else None,
984                    storage_class=transition.get("StorageClass")
985                    if transition
986                    else None,
987                    expired_object_delete_marker=eodm,
988                    nve_noncurrent_days=nve_noncurrent_days,
989                    nvt_noncurrent_days=nvt_noncurrent_days,
990                    nvt_storage_class=nvt_storage_class,
991                    aimu_days=aimu_days,
992                )
993            )
994
995    def delete_lifecycle(self):
996        self.rules = []
997
998    def set_cors(self, rules):
999        self.cors = []
1000
1001        if len(rules) > 100:
1002            raise MalformedXML()
1003
1004        for rule in rules:
1005            assert isinstance(rule["AllowedMethod"], list) or isinstance(
1006                rule["AllowedMethod"], str
1007            )
1008            assert isinstance(rule["AllowedOrigin"], list) or isinstance(
1009                rule["AllowedOrigin"], str
1010            )
1011            assert isinstance(rule.get("AllowedHeader", []), list) or isinstance(
1012                rule.get("AllowedHeader", ""), str
1013            )
1014            assert isinstance(rule.get("ExposeHeader", []), list) or isinstance(
1015                rule.get("ExposeHeader", ""), str
1016            )
1017            assert isinstance(rule.get("MaxAgeSeconds", "0"), str)
1018
1019            if isinstance(rule["AllowedMethod"], str):
1020                methods = [rule["AllowedMethod"]]
1021            else:
1022                methods = rule["AllowedMethod"]
1023
1024            for method in methods:
1025                if method not in ["GET", "PUT", "HEAD", "POST", "DELETE"]:
1026                    raise InvalidRequest(method)
1027
1028            self.cors.append(
1029                CorsRule(
1030                    rule["AllowedMethod"],
1031                    rule["AllowedOrigin"],
1032                    rule.get("AllowedHeader"),
1033                    rule.get("ExposeHeader"),
1034                    rule.get("MaxAgeSeconds"),
1035                )
1036            )
1037
1038    def delete_cors(self):
1039        self.cors = []
1040
1041    def set_logging(self, logging_config, bucket_backend):
1042        if not logging_config:
1043            self.logging = {}
1044            return
1045
1046        # Target bucket must exist in the same account (assuming all moto buckets are in the same account):
1047        if not bucket_backend.buckets.get(logging_config["TargetBucket"]):
1048            raise InvalidTargetBucketForLogging(
1049                "The target bucket for logging does not exist."
1050            )
1051
1052        # Does the target bucket have the log-delivery WRITE and READ_ACP permissions?
1053        write = read_acp = False
1054        for grant in bucket_backend.buckets[logging_config["TargetBucket"]].acl.grants:
1055            # Must be granted to: http://acs.amazonaws.com/groups/s3/LogDelivery
1056            for grantee in grant.grantees:
1057                if grantee.uri == "http://acs.amazonaws.com/groups/s3/LogDelivery":
1058                    if (
1059                        "WRITE" in grant.permissions
1060                        or "FULL_CONTROL" in grant.permissions
1061                    ):
1062                        write = True
1063
1064                    if (
1065                        "READ_ACP" in grant.permissions
1066                        or "FULL_CONTROL" in grant.permissions
1067                    ):
1068                        read_acp = True
1069
1070                    break
1071
1072        if not write or not read_acp:
1073            raise InvalidTargetBucketForLogging(
1074                "You must give the log-delivery group WRITE and READ_ACP"
1075                " permissions to the target bucket"
1076            )
1077
1078        # Buckets must also exist within the same region:
1079        if (
1080            bucket_backend.buckets[logging_config["TargetBucket"]].region_name
1081            != self.region_name
1082        ):
1083            raise CrossLocationLoggingProhibitted()
1084
1085        # Checks pass -- set the logging config:
1086        self.logging = logging_config
1087
1088    def set_notification_configuration(self, notification_config):
1089        if not notification_config:
1090            self.notification_configuration = None
1091            return
1092
1093        self.notification_configuration = NotificationConfiguration(
1094            topic=notification_config.get("TopicConfiguration"),
1095            queue=notification_config.get("QueueConfiguration"),
1096            cloud_function=notification_config.get("CloudFunctionConfiguration"),
1097        )
1098
1099        # Validate that the region is correct:
1100        for thing in ["topic", "queue", "cloud_function"]:
1101            for t in getattr(self.notification_configuration, thing):
1102                region = t.arn.split(":")[3]
1103                if region != self.region_name:
1104                    raise InvalidNotificationDestination()
1105
1106    def set_accelerate_configuration(self, accelerate_config):
1107        if self.accelerate_configuration is None and accelerate_config == "Suspended":
1108            # Cannot "suspend" a not active acceleration. Leaves it undefined
1109            return
1110
1111        self.accelerate_configuration = accelerate_config
1112
1113    @classmethod
1114    def has_cfn_attr(cls, attribute):
1115        return attribute in [
1116            "Arn",
1117            "DomainName",
1118            "DualStackDomainName",
1119            "RegionalDomainName",
1120            "WebsiteURL",
1121        ]
1122
1123    def get_cfn_attribute(self, attribute_name):
1124        from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
1125
1126        if attribute_name == "Arn":
1127            return self.arn
1128        elif attribute_name == "DomainName":
1129            return self.domain_name
1130        elif attribute_name == "DualStackDomainName":
1131            return self.dual_stack_domain_name
1132        elif attribute_name == "RegionalDomainName":
1133            return self.regional_domain_name
1134        elif attribute_name == "WebsiteURL":
1135            return self.website_url
1136        raise UnformattedGetAttTemplateException()
1137
1138    def set_acl(self, acl):
1139        self.acl = acl
1140
1141    @property
1142    def arn(self):
1143        return "arn:aws:s3:::{}".format(self.name)
1144
1145    @property
1146    def domain_name(self):
1147        return "{}.s3.amazonaws.com".format(self.name)
1148
1149    @property
1150    def dual_stack_domain_name(self):
1151        return "{}.s3.dualstack.{}.amazonaws.com".format(self.name, self.region_name)
1152
1153    @property
1154    def regional_domain_name(self):
1155        return "{}.s3.{}.amazonaws.com".format(self.name, self.region_name)
1156
1157    @property
1158    def website_url(self):
1159        return "http://{}.s3-website.{}.amazonaws.com".format(
1160            self.name, self.region_name
1161        )
1162
1163    @property
1164    def physical_resource_id(self):
1165        return self.name
1166
1167    @staticmethod
1168    def cloudformation_name_type():
1169        return "BucketName"
1170
1171    @staticmethod
1172    def cloudformation_type():
1173        # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-s3-bucket.html
1174        return "AWS::S3::Bucket"
1175
1176    @classmethod
1177    def create_from_cloudformation_json(
1178        cls, resource_name, cloudformation_json, region_name, **kwargs
1179    ):
1180        bucket = s3_backend.create_bucket(resource_name, region_name)
1181
1182        properties = cloudformation_json.get("Properties", {})
1183
1184        if "BucketEncryption" in properties:
1185            bucket_encryption = cfn_to_api_encryption(properties["BucketEncryption"])
1186            s3_backend.put_bucket_encryption(
1187                bucket_name=resource_name, encryption=bucket_encryption
1188            )
1189
1190        return bucket
1191
1192    @classmethod
1193    def update_from_cloudformation_json(
1194        cls, original_resource, new_resource_name, cloudformation_json, region_name,
1195    ):
1196        properties = cloudformation_json["Properties"]
1197
1198        if is_replacement_update(properties):
1199            resource_name_property = cls.cloudformation_name_type()
1200            if resource_name_property not in properties:
1201                properties[resource_name_property] = new_resource_name
1202            new_resource = cls.create_from_cloudformation_json(
1203                properties[resource_name_property], cloudformation_json, region_name
1204            )
1205            properties[resource_name_property] = original_resource.name
1206            cls.delete_from_cloudformation_json(
1207                original_resource.name, cloudformation_json, region_name
1208            )
1209            return new_resource
1210
1211        else:  # No Interruption
1212            if "BucketEncryption" in properties:
1213                bucket_encryption = cfn_to_api_encryption(
1214                    properties["BucketEncryption"]
1215                )
1216                s3_backend.put_bucket_encryption(
1217                    bucket_name=original_resource.name, encryption=bucket_encryption
1218                )
1219            return original_resource
1220
1221    @classmethod
1222    def delete_from_cloudformation_json(
1223        cls, resource_name, cloudformation_json, region_name
1224    ):
1225        s3_backend.delete_bucket(resource_name)
1226
1227    def to_config_dict(self):
1228        """Return the AWS Config JSON format of this S3 bucket.
1229
1230        Note: The following features are not implemented and will need to be if you care about them:
1231        - Bucket Accelerate Configuration
1232        """
1233        config_dict = {
1234            "version": "1.3",
1235            "configurationItemCaptureTime": str(self.creation_date),
1236            "configurationItemStatus": "ResourceDiscovered",
1237            "configurationStateId": str(
1238                int(time.mktime(self.creation_date.timetuple()))
1239            ),  # PY2 and 3 compatible
1240            "configurationItemMD5Hash": "",
1241            "arn": self.arn,
1242            "resourceType": "AWS::S3::Bucket",
1243            "resourceId": self.name,
1244            "resourceName": self.name,
1245            "awsRegion": self.region_name,
1246            "availabilityZone": "Regional",
1247            "resourceCreationTime": str(self.creation_date),
1248            "relatedEvents": [],
1249            "relationships": [],
1250            "tags": s3_backend.tagger.get_tag_dict_for_resource(self.arn),
1251            "configuration": {
1252                "name": self.name,
1253                "owner": {"id": OWNER},
1254                "creationDate": self.creation_date.isoformat(),
1255            },
1256        }
1257
1258        # Make the supplementary configuration:
1259        # This is a dobule-wrapped JSON for some reason...
1260        s_config = {
1261            "AccessControlList": json.dumps(json.dumps(self.acl.to_config_dict()))
1262        }
1263
1264        if self.public_access_block:
1265            s_config["PublicAccessBlockConfiguration"] = json.dumps(
1266                self.public_access_block.to_config_dict()
1267            )
1268
1269        # Tagging is special:
1270        if config_dict["tags"]:
1271            s_config["BucketTaggingConfiguration"] = json.dumps(
1272                {"tagSets": [{"tags": config_dict["tags"]}]}
1273            )
1274
1275        # TODO implement Accelerate Configuration:
1276        s_config["BucketAccelerateConfiguration"] = {"status": None}
1277
1278        if self.rules:
1279            s_config["BucketLifecycleConfiguration"] = {
1280                "rules": [rule.to_config_dict() for rule in self.rules]
1281            }
1282
1283        s_config["BucketLoggingConfiguration"] = {
1284            "destinationBucketName": self.logging.get("TargetBucket", None),
1285            "logFilePrefix": self.logging.get("TargetPrefix", None),
1286        }
1287
1288        s_config["BucketPolicy"] = {
1289            "policyText": self.policy.decode("utf-8") if self.policy else None
1290        }
1291
1292        s_config["IsRequesterPaysEnabled"] = (
1293            "false" if self.payer == "BucketOwner" else "true"
1294        )
1295
1296        if self.notification_configuration:
1297            s_config[
1298                "BucketNotificationConfiguration"
1299            ] = self.notification_configuration.to_config_dict()
1300        else:
1301            s_config["BucketNotificationConfiguration"] = {"configurations": {}}
1302
1303        config_dict["supplementaryConfiguration"] = s_config
1304
1305        return config_dict
1306
1307    @property
1308    def has_default_lock(self):
1309        if not self.object_lock_enabled:
1310            return False
1311
1312        if self.default_lock_mode:
1313            return True
1314
1315        return False
1316
1317    def default_retention(self):
1318        now = datetime.datetime.utcnow()
1319        now += datetime.timedelta(self.default_lock_days)
1320        now += datetime.timedelta(self.default_lock_years * 365)
1321        return now.strftime("%Y-%m-%dT%H:%M:%SZ")
1322
1323
1324class S3Backend(BaseBackend, CloudWatchMetricProvider):
1325    """
1326    Moto implementation for S3.
1327
1328    Custom S3 endpoints are supported, if you are using a S3-compatible storage solution like Ceph.
1329    Example usage:
1330
1331    .. sourcecode:: python
1332
1333        os.environ["MOTO_S3_CUSTOM_ENDPOINTS"] = "http://custom.internal.endpoint,http://custom.other.endpoint"
1334        @mock_s3
1335        def test_my_custom_endpoint():
1336            boto3.client("s3", endpoint_url="http://custom.internal.endpoint")
1337            ...
1338
1339    Note that this only works if the environment variable is set **before** the mock is initialized.
1340    """
1341
1342    def __init__(self):
1343        self.buckets = {}
1344        self.account_public_access_block = None
1345        self.tagger = TaggingService()
1346
1347    @property
1348    def _url_module(self):
1349        # The urls-property can be different depending on env variables
1350        # Force a reload, to retrieve the correct set of URLs
1351        import moto.s3.urls as backend_urls_module
1352
1353        reload(backend_urls_module)
1354        return backend_urls_module
1355
1356    @staticmethod
1357    def default_vpc_endpoint_service(service_region, zones):
1358        """List of dicts representing default VPC endpoints for this service."""
1359        accesspoint = {
1360            "AcceptanceRequired": False,
1361            "AvailabilityZones": zones,
1362            "BaseEndpointDnsNames": [
1363                f"accesspoint.s3-global.{service_region}.vpce.amazonaws.com",
1364            ],
1365            "ManagesVpcEndpoints": False,
1366            "Owner": "amazon",
1367            "PrivateDnsName": "*.accesspoint.s3-global.amazonaws.com",
1368            "PrivateDnsNameVerificationState": "verified",
1369            "PrivateDnsNames": [
1370                {"PrivateDnsName": "*.accesspoint.s3-global.amazonaws.com"}
1371            ],
1372            "ServiceId": f"vpce-svc-{BaseBackend.vpce_random_number()}",
1373            "ServiceName": "com.amazonaws.s3-global.accesspoint",
1374            "ServiceType": [{"ServiceType": "Interface"}],
1375            "Tags": [],
1376            "VpcEndpointPolicySupported": True,
1377        }
1378        return (
1379            BaseBackend.default_vpc_endpoint_service_factory(
1380                service_region, zones, "s3", "Interface"
1381            )
1382            + BaseBackend.default_vpc_endpoint_service_factory(
1383                service_region, zones, "s3", "Gateway"
1384            )
1385            + [accesspoint]
1386        )
1387
1388        # TODO: This is broken! DO NOT IMPORT MUTABLE DATA TYPES FROM OTHER AREAS -- THIS BREAKS UNMOCKING!
1389        # WRAP WITH A GETTER/SETTER FUNCTION
1390        # Register this class as a CloudWatch Metric Provider
1391        # Must provide a method 'get_cloudwatch_metrics' that will return a list of metrics, based on the data available
1392        # metric_providers["S3"] = self
1393
1394    @classmethod
1395    def get_cloudwatch_metrics(cls):
1396        metrics = []
1397        for name, bucket in s3_backend.buckets.items():
1398            metrics.append(
1399                MetricDatum(
1400                    namespace="AWS/S3",
1401                    name="BucketSizeBytes",
1402                    value=bucket.keys.item_size(),
1403                    dimensions=[
1404                        {"Name": "StorageType", "Value": "StandardStorage"},
1405                        {"Name": "BucketName", "Value": name},
1406                    ],
1407                    timestamp=datetime.datetime.now(tz=pytz.utc).replace(
1408                        hour=0, minute=0, second=0, microsecond=0
1409                    ),
1410                    unit="Bytes",
1411                )
1412            )
1413            metrics.append(
1414                MetricDatum(
1415                    namespace="AWS/S3",
1416                    name="NumberOfObjects",
1417                    value=len(bucket.keys),
1418                    dimensions=[
1419                        {"Name": "StorageType", "Value": "AllStorageTypes"},
1420                        {"Name": "BucketName", "Value": name},
1421                    ],
1422                    timestamp=datetime.datetime.now(tz=pytz.utc).replace(
1423                        hour=0, minute=0, second=0, microsecond=0
1424                    ),
1425                    unit="Count",
1426                )
1427            )
1428        return metrics
1429
1430    def create_bucket(self, bucket_name, region_name):
1431        if bucket_name in self.buckets:
1432            raise BucketAlreadyExists(bucket=bucket_name)
1433        if not MIN_BUCKET_NAME_LENGTH <= len(bucket_name) <= MAX_BUCKET_NAME_LENGTH:
1434            raise InvalidBucketName()
1435        new_bucket = FakeBucket(name=bucket_name, region_name=region_name)
1436
1437        self.buckets[bucket_name] = new_bucket
1438        return new_bucket
1439
1440    def list_buckets(self):
1441        return self.buckets.values()
1442
1443    def get_bucket(self, bucket_name):
1444        try:
1445            return self.buckets[bucket_name]
1446        except KeyError:
1447            raise MissingBucket(bucket=bucket_name)
1448
1449    def head_bucket(self, bucket_name):
1450        return self.get_bucket(bucket_name)
1451
1452    def delete_bucket(self, bucket_name):
1453        bucket = self.get_bucket(bucket_name)
1454        if bucket.keys:
1455            # Can't delete a bucket with keys
1456            return False
1457        else:
1458            return self.buckets.pop(bucket_name)
1459
1460    def set_bucket_versioning(self, bucket_name, status):
1461        self.get_bucket(bucket_name).versioning_status = status
1462
1463    def get_bucket_versioning(self, bucket_name):
1464        return self.get_bucket(bucket_name).versioning_status
1465
1466    def get_bucket_encryption(self, bucket_name):
1467        return self.get_bucket(bucket_name).encryption
1468
1469    def list_object_versions(
1470        self,
1471        bucket_name,
1472        delimiter=None,
1473        encoding_type=None,
1474        key_marker=None,
1475        max_keys=None,
1476        version_id_marker=None,
1477        prefix="",
1478    ):
1479        bucket = self.get_bucket(bucket_name)
1480
1481        common_prefixes = []
1482        requested_versions = []
1483        delete_markers = []
1484        all_versions = itertools.chain(
1485            *(copy.deepcopy(l) for key, l in bucket.keys.iterlists())
1486        )
1487        all_versions = list(all_versions)
1488        # sort by name, revert last-modified-date
1489        all_versions.sort(key=lambda r: (r.name, -unix_time_millis(r.last_modified)))
1490        last_name = None
1491        for version in all_versions:
1492            name = version.name
1493            # guaranteed to be sorted - so the first key with this name will be the latest
1494            version.is_latest = name != last_name
1495            if version.is_latest:
1496                last_name = name
1497            # Differentiate between FakeKey and FakeDeleteMarkers
1498            if not isinstance(version, FakeKey):
1499                delete_markers.append(version)
1500                continue
1501            # skip all keys that alphabetically come before keymarker
1502            if key_marker and name < key_marker:
1503                continue
1504            # Filter for keys that start with prefix
1505            if not name.startswith(prefix):
1506                continue
1507            # separate out all keys that contain delimiter
1508            if delimiter and delimiter in name:
1509                index = name.index(delimiter) + len(delimiter)
1510                prefix_including_delimiter = name[0:index]
1511                common_prefixes.append(prefix_including_delimiter)
1512                continue
1513
1514            requested_versions.append(version)
1515
1516        common_prefixes = sorted(set(common_prefixes))
1517
1518        return requested_versions, common_prefixes, delete_markers
1519
1520    def get_bucket_policy(self, bucket_name):
1521        return self.get_bucket(bucket_name).policy
1522
1523    def put_bucket_policy(self, bucket_name, policy):
1524        self.get_bucket(bucket_name).policy = policy
1525
1526    def delete_bucket_policy(self, bucket_name, body):
1527        bucket = self.get_bucket(bucket_name)
1528        bucket.policy = None
1529
1530    def put_bucket_encryption(self, bucket_name, encryption):
1531        self.get_bucket(bucket_name).encryption = encryption
1532
1533    def delete_bucket_encryption(self, bucket_name):
1534        self.get_bucket(bucket_name).encryption = None
1535
1536    def get_bucket_replication(self, bucket_name):
1537        bucket = self.get_bucket(bucket_name)
1538        return getattr(bucket, "replication", None)
1539
1540    def put_bucket_replication(self, bucket_name, replication):
1541        if isinstance(replication["Rule"], dict):
1542            replication["Rule"] = [replication["Rule"]]
1543        for rule in replication["Rule"]:
1544            if "Priority" not in rule:
1545                rule["Priority"] = 1
1546            if "ID" not in rule:
1547                rule["ID"] = "".join(
1548                    random.choice(string.ascii_letters + string.digits)
1549                    for _ in range(30)
1550                )
1551        bucket = self.get_bucket(bucket_name)
1552        bucket.replication = replication
1553
1554    def delete_bucket_replication(self, bucket_name):
1555        bucket = self.get_bucket(bucket_name)
1556        bucket.replication = None
1557
1558    def put_bucket_lifecycle(self, bucket_name, rules):
1559        bucket = self.get_bucket(bucket_name)
1560        bucket.set_lifecycle(rules)
1561
1562    def delete_bucket_lifecycle(self, bucket_name):
1563        bucket = self.get_bucket(bucket_name)
1564        bucket.delete_lifecycle()
1565
1566    def set_bucket_website_configuration(self, bucket_name, website_configuration):
1567        bucket = self.get_bucket(bucket_name)
1568        bucket.website_configuration = website_configuration
1569
1570    def get_bucket_website_configuration(self, bucket_name):
1571        bucket = self.get_bucket(bucket_name)
1572        return bucket.website_configuration
1573
1574    def delete_bucket_website(self, bucket_name):
1575        bucket = self.get_bucket(bucket_name)
1576        bucket.website_configuration = None
1577
1578    def get_public_access_block(self, bucket_name):
1579        bucket = self.get_bucket(bucket_name)
1580
1581        if not bucket.public_access_block:
1582            raise NoSuchPublicAccessBlockConfiguration()
1583
1584        return bucket.public_access_block
1585
1586    def get_account_public_access_block(self, account_id):
1587        # The account ID should equal the account id that is set for Moto:
1588        if account_id != ACCOUNT_ID:
1589            raise WrongPublicAccessBlockAccountIdError()
1590
1591        if not self.account_public_access_block:
1592            raise NoSuchPublicAccessBlockConfiguration()
1593
1594        return self.account_public_access_block
1595
1596    def put_object(
1597        self,
1598        bucket_name,
1599        key_name,
1600        value,
1601        storage=None,
1602        etag=None,
1603        multipart=None,
1604        encryption=None,
1605        kms_key_id=None,
1606        bucket_key_enabled=None,
1607        lock_mode=None,
1608        lock_legal_status=None,
1609        lock_until=None,
1610    ):
1611        key_name = clean_key_name(key_name)
1612        if storage is not None and storage not in STORAGE_CLASS:
1613            raise InvalidStorageClass(storage=storage)
1614
1615        bucket = self.get_bucket(bucket_name)
1616
1617        # getting default config from bucket if not included in put request
1618        if bucket.encryption:
1619            bucket_key_enabled = bucket_key_enabled or bucket.encryption["Rule"].get(
1620                "BucketKeyEnabled", False
1621            )
1622            kms_key_id = kms_key_id or bucket.encryption["Rule"][
1623                "ApplyServerSideEncryptionByDefault"
1624            ].get("KMSMasterKeyID")
1625            encryption = (
1626                encryption
1627                or bucket.encryption["Rule"]["ApplyServerSideEncryptionByDefault"][
1628                    "SSEAlgorithm"
1629                ]
1630            )
1631
1632        new_key = FakeKey(
1633            name=key_name,
1634            value=value,
1635            storage=storage,
1636            etag=etag,
1637            is_versioned=bucket.is_versioned,
1638            version_id=str(uuid.uuid4()) if bucket.is_versioned else "null",
1639            multipart=multipart,
1640            encryption=encryption,
1641            kms_key_id=kms_key_id,
1642            bucket_key_enabled=bucket_key_enabled,
1643            lock_mode=lock_mode,
1644            lock_legal_status=lock_legal_status,
1645            lock_until=lock_until,
1646        )
1647
1648        keys = [
1649            key
1650            for key in bucket.keys.getlist(key_name, [])
1651            if key.version_id != new_key.version_id
1652        ] + [new_key]
1653        bucket.keys.setlist(key_name, keys)
1654
1655        return new_key
1656
1657    def put_object_acl(self, bucket_name, key_name, acl):
1658        key = self.get_object(bucket_name, key_name)
1659        # TODO: Support the XML-based ACL format
1660        if key is not None:
1661            key.set_acl(acl)
1662        else:
1663            raise MissingKey(key=key_name)
1664
1665    def put_object_legal_hold(
1666        self, bucket_name, key_name, version_id, legal_hold_status
1667    ):
1668        key = self.get_object(bucket_name, key_name, version_id=version_id)
1669        key.lock_legal_status = legal_hold_status
1670
1671    def put_object_retention(self, bucket_name, key_name, version_id, retention):
1672        key = self.get_object(bucket_name, key_name, version_id=version_id)
1673        key.lock_mode = retention[0]
1674        key.lock_until = retention[1]
1675
1676    def append_to_key(self, bucket_name, key_name, value):
1677        key = self.get_object(bucket_name, key_name)
1678        key.append_to_value(value)
1679        return key
1680
1681    def get_object(self, bucket_name, key_name, version_id=None, part_number=None):
1682        key_name = clean_key_name(key_name)
1683        bucket = self.get_bucket(bucket_name)
1684        key = None
1685
1686        if bucket:
1687            if version_id is None:
1688                if key_name in bucket.keys:
1689                    key = bucket.keys[key_name]
1690            else:
1691                for key_version in bucket.keys.getlist(key_name, default=[]):
1692                    if str(key_version.version_id) == str(version_id):
1693                        key = key_version
1694                        break
1695
1696            if part_number and key and key.multipart:
1697                key = key.multipart.parts[part_number]
1698
1699        if isinstance(key, FakeKey):
1700            return key
1701        else:
1702            return None
1703
1704    def head_object(self, bucket_name, key_name, version_id=None, part_number=None):
1705        return self.get_object(bucket_name, key_name, version_id, part_number)
1706
1707    def get_object_acl(self, key):
1708        return key.acl
1709
1710    def get_object_legal_hold(self, key):
1711        return key.lock_legal_status
1712
1713    def get_object_lock_configuration(self, bucket_name):
1714        bucket = self.get_bucket(bucket_name)
1715        if not bucket.object_lock_enabled:
1716            raise ObjectLockConfigurationNotFoundError
1717        return (
1718            bucket.object_lock_enabled,
1719            bucket.default_lock_mode,
1720            bucket.default_lock_days,
1721            bucket.default_lock_years,
1722        )
1723
1724    def get_object_tagging(self, key):
1725        return self.tagger.list_tags_for_resource(key.arn)
1726
1727    def set_key_tags(self, key, tags, key_name=None):
1728        if key is None:
1729            raise MissingKey(key=key_name)
1730        boto_tags_dict = self.tagger.convert_dict_to_tags_input(tags)
1731        errmsg = self.tagger.validate_tags(boto_tags_dict)
1732        if errmsg:
1733            raise InvalidTagError(errmsg)
1734        self.tagger.delete_all_tags_for_resource(key.arn)
1735        self.tagger.tag_resource(
1736            key.arn, boto_tags_dict,
1737        )
1738        return key
1739
1740    def get_bucket_tagging(self, bucket_name):
1741        bucket = self.get_bucket(bucket_name)
1742        return self.tagger.list_tags_for_resource(bucket.arn)
1743
1744    def put_bucket_tagging(self, bucket_name, tags):
1745        bucket = self.get_bucket(bucket_name)
1746        self.tagger.delete_all_tags_for_resource(bucket.arn)
1747        self.tagger.tag_resource(
1748            bucket.arn, [{"Key": key, "Value": value} for key, value in tags.items()],
1749        )
1750
1751    def put_object_lock_configuration(
1752        self, bucket_name, lock_enabled, mode=None, days=None, years=None
1753    ):
1754        bucket = self.get_bucket(bucket_name)
1755
1756        if bucket.keys.item_size() > 0:
1757            raise BucketNeedsToBeNew
1758
1759        if lock_enabled:
1760            bucket.object_lock_enabled = True
1761            bucket.versioning_status = "Enabled"
1762
1763        bucket.default_lock_mode = mode
1764        bucket.default_lock_days = days
1765        bucket.default_lock_years = years
1766
1767    def delete_bucket_tagging(self, bucket_name):
1768        bucket = self.get_bucket(bucket_name)
1769        self.tagger.delete_all_tags_for_resource(bucket.arn)
1770
1771    def put_bucket_cors(self, bucket_name, cors_rules):
1772        bucket = self.get_bucket(bucket_name)
1773        bucket.set_cors(cors_rules)
1774
1775    def put_bucket_logging(self, bucket_name, logging_config):
1776        bucket = self.get_bucket(bucket_name)
1777        bucket.set_logging(logging_config, self)
1778
1779    def delete_bucket_cors(self, bucket_name):
1780        bucket = self.get_bucket(bucket_name)
1781        bucket.delete_cors()
1782
1783    def delete_public_access_block(self, bucket_name):
1784        bucket = self.get_bucket(bucket_name)
1785        bucket.public_access_block = None
1786
1787    def delete_account_public_access_block(self, account_id):
1788        # The account ID should equal the account id that is set for Moto:
1789        if account_id != ACCOUNT_ID:
1790            raise WrongPublicAccessBlockAccountIdError()
1791
1792        self.account_public_access_block = None
1793
1794    def put_bucket_notification_configuration(self, bucket_name, notification_config):
1795        bucket = self.get_bucket(bucket_name)
1796        bucket.set_notification_configuration(notification_config)
1797
1798    def put_bucket_accelerate_configuration(
1799        self, bucket_name, accelerate_configuration
1800    ):
1801        if accelerate_configuration not in ["Enabled", "Suspended"]:
1802            raise MalformedXML()
1803
1804        bucket = self.get_bucket(bucket_name)
1805        if bucket.name.find(".") != -1:
1806            raise InvalidRequest("PutBucketAccelerateConfiguration")
1807        bucket.set_accelerate_configuration(accelerate_configuration)
1808
1809    def put_bucket_public_access_block(self, bucket_name, pub_block_config):
1810        bucket = self.get_bucket(bucket_name)
1811
1812        if not pub_block_config:
1813            raise InvalidPublicAccessBlockConfiguration()
1814
1815        bucket.public_access_block = PublicAccessBlock(
1816            pub_block_config.get("BlockPublicAcls"),
1817            pub_block_config.get("IgnorePublicAcls"),
1818            pub_block_config.get("BlockPublicPolicy"),
1819            pub_block_config.get("RestrictPublicBuckets"),
1820        )
1821
1822    def put_account_public_access_block(self, account_id, pub_block_config):
1823        # The account ID should equal the account id that is set for Moto:
1824        if account_id != ACCOUNT_ID:
1825            raise WrongPublicAccessBlockAccountIdError()
1826
1827        if not pub_block_config:
1828            raise InvalidPublicAccessBlockConfiguration()
1829
1830        self.account_public_access_block = PublicAccessBlock(
1831            pub_block_config.get("BlockPublicAcls"),
1832            pub_block_config.get("IgnorePublicAcls"),
1833            pub_block_config.get("BlockPublicPolicy"),
1834            pub_block_config.get("RestrictPublicBuckets"),
1835        )
1836
1837    def initiate_multipart(self, bucket_name, key_name, metadata):
1838        bucket = self.get_bucket(bucket_name)
1839        new_multipart = FakeMultipart(key_name, metadata)
1840        bucket.multiparts[new_multipart.id] = new_multipart
1841
1842        return new_multipart
1843
1844    def complete_multipart(self, bucket_name, multipart_id, body):
1845        bucket = self.get_bucket(bucket_name)
1846        multipart = bucket.multiparts[multipart_id]
1847        value, etag = multipart.complete(body)
1848        if value is None:
1849            return
1850        del bucket.multiparts[multipart_id]
1851
1852        key = self.put_object(
1853            bucket_name, multipart.key_name, value, etag=etag, multipart=multipart
1854        )
1855        key.set_metadata(multipart.metadata)
1856        return key
1857
1858    def abort_multipart_upload(self, bucket_name, multipart_id):
1859        bucket = self.get_bucket(bucket_name)
1860        multipart_data = bucket.multiparts.get(multipart_id, None)
1861        if not multipart_data:
1862            raise NoSuchUpload(upload_id=multipart_id)
1863        del bucket.multiparts[multipart_id]
1864
1865    def list_parts(
1866        self, bucket_name, multipart_id, part_number_marker=0, max_parts=1000
1867    ):
1868        bucket = self.get_bucket(bucket_name)
1869        if multipart_id not in bucket.multiparts:
1870            raise NoSuchUpload(upload_id=multipart_id)
1871        return list(
1872            bucket.multiparts[multipart_id].list_parts(part_number_marker, max_parts)
1873        )
1874
1875    def is_truncated(self, bucket_name, multipart_id, next_part_number_marker):
1876        bucket = self.get_bucket(bucket_name)
1877        return len(bucket.multiparts[multipart_id].parts) >= next_part_number_marker
1878
1879    def create_multipart_upload(
1880        self, bucket_name, key_name, metadata, storage_type, tags
1881    ):
1882        multipart = FakeMultipart(key_name, metadata, storage=storage_type, tags=tags)
1883
1884        bucket = self.get_bucket(bucket_name)
1885        bucket.multiparts[multipart.id] = multipart
1886        return multipart.id
1887
1888    def complete_multipart_upload(self, bucket_name, multipart_id, body):
1889        bucket = self.get_bucket(bucket_name)
1890        multipart = bucket.multiparts[multipart_id]
1891        value, etag = multipart.complete(body)
1892        if value is not None:
1893            del bucket.multiparts[multipart_id]
1894        return multipart, value, etag
1895
1896    def get_all_multiparts(self, bucket_name):
1897        bucket = self.get_bucket(bucket_name)
1898        return bucket.multiparts
1899
1900    def upload_part(self, bucket_name, multipart_id, part_id, value):
1901        bucket = self.get_bucket(bucket_name)
1902        multipart = bucket.multiparts[multipart_id]
1903        return multipart.set_part(part_id, value)
1904
1905    def copy_part(
1906        self,
1907        dest_bucket_name,
1908        multipart_id,
1909        part_id,
1910        src_bucket_name,
1911        src_key_name,
1912        src_version_id,
1913        start_byte,
1914        end_byte,
1915    ):
1916        dest_bucket = self.get_bucket(dest_bucket_name)
1917        multipart = dest_bucket.multiparts[multipart_id]
1918
1919        src_value = self.get_object(
1920            src_bucket_name, src_key_name, version_id=src_version_id
1921        ).value
1922        if start_byte is not None:
1923            src_value = src_value[start_byte : end_byte + 1]
1924        return multipart.set_part(part_id, src_value)
1925
1926    def list_objects(self, bucket, prefix, delimiter):
1927        key_results = set()
1928        folder_results = set()
1929        if prefix:
1930            for key_name, key in bucket.keys.items():
1931                if key_name.startswith(prefix):
1932                    key_without_prefix = key_name.replace(prefix, "", 1)
1933                    if delimiter and delimiter in key_without_prefix:
1934                        # If delimiter, we need to split out folder_results
1935                        key_without_delimiter = key_without_prefix.split(delimiter)[0]
1936                        folder_results.add(
1937                            "{0}{1}{2}".format(prefix, key_without_delimiter, delimiter)
1938                        )
1939                    else:
1940                        key_results.add(key)
1941        else:
1942            for key_name, key in bucket.keys.items():
1943                if delimiter and delimiter in key_name:
1944                    # If delimiter, we need to split out folder_results
1945                    folder_results.add(key_name.split(delimiter)[0] + delimiter)
1946                else:
1947                    key_results.add(key)
1948
1949        key_results = filter(
1950            lambda key: not isinstance(key, FakeDeleteMarker), key_results
1951        )
1952        key_results = sorted(key_results, key=lambda key: key.name)
1953        folder_results = [
1954            folder_name for folder_name in sorted(folder_results, key=lambda key: key)
1955        ]
1956
1957        return key_results, folder_results
1958
1959    def list_objects_v2(self, bucket, prefix, delimiter):
1960        result_keys, result_folders = self.list_objects(bucket, prefix, delimiter)
1961        # sort the combination of folders and keys into lexicographical order
1962        all_keys = result_keys + result_folders
1963        all_keys.sort(key=self._get_name)
1964        return all_keys
1965
1966    @staticmethod
1967    def _get_name(key):
1968        if isinstance(key, FakeKey):
1969            return key.name
1970        else:
1971            return key
1972
1973    def _set_delete_marker(self, bucket_name, key_name):
1974        bucket = self.get_bucket(bucket_name)
1975        delete_marker = FakeDeleteMarker(key=bucket.keys[key_name])
1976        bucket.keys[key_name] = delete_marker
1977        return delete_marker
1978
1979    def delete_object_tagging(self, bucket_name, key_name, version_id=None):
1980        key = self.get_object(bucket_name, key_name, version_id=version_id)
1981        self.tagger.delete_all_tags_for_resource(key.arn)
1982
1983    def delete_object(self, bucket_name, key_name, version_id=None, bypass=False):
1984        key_name = clean_key_name(key_name)
1985        bucket = self.get_bucket(bucket_name)
1986
1987        response_meta = {}
1988
1989        try:
1990            if not bucket.is_versioned:
1991                bucket.keys.pop(key_name)
1992            else:
1993                if version_id is None:
1994                    delete_marker = self._set_delete_marker(bucket_name, key_name)
1995                    response_meta["version-id"] = delete_marker.version_id
1996                else:
1997                    if key_name not in bucket.keys:
1998                        raise KeyError
1999
2000                    response_meta["delete-marker"] = "false"
2001                    for key in bucket.keys.getlist(key_name):
2002                        if str(key.version_id) == str(version_id):
2003
2004                            if (
2005                                hasattr(key, "is_locked")
2006                                and key.is_locked
2007                                and not bypass
2008                            ):
2009                                raise AccessDeniedByLock
2010
2011                            if type(key) is FakeDeleteMarker:
2012                                response_meta["delete-marker"] = "true"
2013                            break
2014
2015                    bucket.keys.setlist(
2016                        key_name,
2017                        [
2018                            key
2019                            for key in bucket.keys.getlist(key_name)
2020                            if str(key.version_id) != str(version_id)
2021                        ],
2022                    )
2023
2024                    if not bucket.keys.getlist(key_name):
2025                        bucket.keys.pop(key_name)
2026            return True, response_meta
2027        except KeyError:
2028            return False, None
2029
2030    def delete_objects(self, bucket_name, objects):
2031        deleted_objects = []
2032        for object_ in objects:
2033            key_name = object_["Key"]
2034            version_id = object_.get("VersionId", None)
2035
2036            self.delete_object(
2037                bucket_name, undo_clean_key_name(key_name), version_id=version_id
2038            )
2039            deleted_objects.append((key_name, version_id))
2040        return deleted_objects
2041
2042    def copy_object(
2043        self,
2044        src_bucket_name,
2045        src_key_name,
2046        dest_bucket_name,
2047        dest_key_name,
2048        storage=None,
2049        acl=None,
2050        src_version_id=None,
2051        encryption=None,
2052        kms_key_id=None,
2053    ):
2054        key = self.get_object(src_bucket_name, src_key_name, version_id=src_version_id)
2055
2056        new_key = self.put_object(
2057            bucket_name=dest_bucket_name,
2058            key_name=dest_key_name,
2059            value=key.value,
2060            storage=storage or key.storage_class,
2061            multipart=key.multipart,
2062            encryption=encryption or key.encryption,
2063            kms_key_id=kms_key_id or key.kms_key_id,
2064            bucket_key_enabled=key.bucket_key_enabled,
2065            lock_mode=key.lock_mode,
2066            lock_legal_status=key.lock_legal_status,
2067            lock_until=key.lock_until,
2068        )
2069        self.tagger.copy_tags(key.arn, new_key.arn)
2070
2071        if acl is not None:
2072            new_key.set_acl(acl)
2073        if key.storage_class in "GLACIER":
2074            # Object copied from Glacier object should not have expiry
2075            new_key.set_expiry(None)
2076
2077    def put_bucket_acl(self, bucket_name, acl):
2078        bucket = self.get_bucket(bucket_name)
2079        bucket.set_acl(acl)
2080
2081    def get_bucket_acl(self, bucket_name):
2082        bucket = self.get_bucket(bucket_name)
2083        return bucket.acl
2084
2085    def get_bucket_cors(self, bucket_name):
2086        bucket = self.get_bucket(bucket_name)
2087        return bucket.cors
2088
2089    def get_bucket_lifecycle(self, bucket_name):
2090        bucket = self.get_bucket(bucket_name)
2091        return bucket.rules
2092
2093    def get_bucket_location(self, bucket_name):
2094        bucket = self.get_bucket(bucket_name)
2095
2096        return bucket.location
2097
2098    def get_bucket_logging(self, bucket_name):
2099        bucket = self.get_bucket(bucket_name)
2100        return bucket.logging
2101
2102    def get_bucket_notification_configuration(self, bucket_name):
2103        bucket = self.get_bucket(bucket_name)
2104        return bucket.notification_configuration
2105
2106
2107s3_backend = S3Backend()
2108