1"""Implement models for EFS resources.
2
3See AWS docs for details:
4https://docs.aws.amazon.com/efs/latest/ug/whatisefs.html
5"""
6
7import json
8import time
9from copy import deepcopy
10from hashlib import md5
11
12from boto3 import Session
13
14from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel
15from moto.core.utils import (
16    camelcase_to_underscores,
17    get_random_hex,
18    underscores_to_camelcase,
19)
20from moto.ec2 import ec2_backends
21from moto.ec2.exceptions import InvalidSubnetIdError
22from moto.efs.exceptions import (
23    BadRequest,
24    FileSystemAlreadyExists,
25    FileSystemInUse,
26    FileSystemNotFound,
27    MountTargetConflict,
28    MountTargetNotFound,
29    PolicyNotFound,
30    SubnetNotFound,
31    SecurityGroupNotFound,
32    SecurityGroupLimitExceeded,
33)
34
35
36def _lookup_az_id(az_name):
37    """Find the Availability zone ID given the AZ name."""
38    ec2 = ec2_backends[az_name[:-1]]
39    for zone in ec2.describe_availability_zones():
40        if zone.name == az_name:
41            return zone.zone_id
42
43
44class FileSystem(CloudFormationModel):
45    """A model for an EFS File System Volume."""
46
47    def __init__(
48        self,
49        region_name,
50        creation_token,
51        file_system_id,
52        performance_mode="generalPurpose",
53        encrypted=False,
54        kms_key_id=None,
55        throughput_mode="bursting",
56        provisioned_throughput_in_mibps=None,
57        availability_zone_name=None,
58        backup=False,
59        lifecycle_policies=None,
60        file_system_policy=None,
61        tags=None,
62    ):
63        if availability_zone_name:
64            backup = True
65        if kms_key_id and not encrypted:
66            raise BadRequest('If kms_key_id given, "encrypted" must be True.')
67
68        # Save given parameters
69        self.creation_token = creation_token
70        self.performance_mode = performance_mode
71        self.encrypted = encrypted
72        self.kms_key_id = kms_key_id
73        self.throughput_mode = throughput_mode
74        self.provisioned_throughput_in_mibps = provisioned_throughput_in_mibps
75        self.availability_zone_name = availability_zone_name
76        self.availability_zone_id = None
77        if self.availability_zone_name:
78            self.availability_zone_id = _lookup_az_id(self.availability_zone_name)
79        self._backup = backup
80        self.lifecycle_policies = lifecycle_policies
81        self.file_system_policy = file_system_policy
82
83        # Validate tag structure.
84        if tags is None:
85            self.tags = []
86        else:
87            if (
88                not isinstance(tags, list)
89                or not all(isinstance(tag, dict) for tag in tags)
90                or not all(set(tag.keys()) == {"Key", "Value"} for tag in tags)
91            ):
92                raise ValueError("Invalid tags: {}".format(tags))
93            else:
94                self.tags = tags
95
96        # Generate AWS-assigned parameters
97        self.file_system_id = file_system_id
98        self.file_system_arn = "arn:aws:elasticfilesystem:{region}:{user_id}:file-system/{file_system_id}".format(
99            region=region_name, user_id=ACCOUNT_ID, file_system_id=self.file_system_id
100        )
101        self.creation_time = time.time()
102        self.owner_id = ACCOUNT_ID
103
104        # Initialize some state parameters
105        self.life_cycle_state = "available"
106        self._mount_targets = {}
107        self._size_value = 0
108
109    @property
110    def size_in_bytes(self):
111        return {
112            "Value": self._size_value,
113            "ValueInIA": 0,
114            "ValueInStandard": self._size_value,
115            "Timestamp": time.time(),
116        }
117
118    @property
119    def physical_resource_id(self):
120        return self.file_system_id
121
122    @property
123    def number_of_mount_targets(self):
124        return len(self._mount_targets)
125
126    @property
127    def backup_policy(self):
128        if self._backup:
129            return {"Status": "ENABLED"}
130        else:
131            return
132
133    def info_json(self):
134        ret = {
135            underscores_to_camelcase(k.capitalize()): v
136            for k, v in self.__dict__.items()
137            if not k.startswith("_")
138        }
139        ret["SizeInBytes"] = self.size_in_bytes
140        ret["NumberOfMountTargets"] = self.number_of_mount_targets
141        return ret
142
143    def add_mount_target(self, subnet, mount_target):
144        # Check that the mount target doesn't violate constraints.
145        for other_mount_target in self._mount_targets.values():
146            if other_mount_target.subnet_vpc_id != subnet.vpc_id:
147                raise MountTargetConflict(
148                    "requested subnet for new mount target is not in the same VPC as existing mount targets"
149                )
150
151        if subnet.availability_zone in self._mount_targets:
152            raise MountTargetConflict("mount target already exists in this AZ")
153
154        self._mount_targets[subnet.availability_zone] = mount_target
155
156    def has_mount_target(self, subnet):
157        return subnet.availability_zone in self._mount_targets
158
159    def iter_mount_targets(self):
160        for mt in self._mount_targets.values():
161            yield mt
162
163    def remove_mount_target(self, subnet):
164        del self._mount_targets[subnet.availability_zone]
165
166    @staticmethod
167    def cloudformation_name_type():
168        return
169
170    @staticmethod
171    def cloudformation_type():
172        return "AWS::EFS::FileSystem"
173
174    @classmethod
175    def create_from_cloudformation_json(
176        cls, resource_name, cloudformation_json, region_name, **kwargs
177    ):
178        # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-efs-filesystem.html
179        props = deepcopy(cloudformation_json["Properties"])
180        props = {camelcase_to_underscores(k): v for k, v in props.items()}
181        if "file_system_tags" in props:
182            props["tags"] = props.pop("file_system_tags")
183        if "backup_policy" in props:
184            if "status" not in props["backup_policy"]:
185                raise ValueError("BackupPolicy must be of type BackupPolicy.")
186            status = props.pop("backup_policy")["status"]
187            if status not in ["ENABLED", "DISABLED"]:
188                raise ValueError('Invalid status: "{}".'.format(status))
189            props["backup"] = status == "ENABLED"
190        if "bypass_policy_lockout_safety_check" in props:
191            raise ValueError(
192                "BypassPolicyLockoutSafetyCheck not currently "
193                "supported by AWS Cloudformation."
194            )
195
196        return efs_backends[region_name].create_file_system(resource_name, **props)
197
198    @classmethod
199    def update_from_cloudformation_json(
200        cls, original_resource, new_resource_name, cloudformation_json, region_name
201    ):
202        raise NotImplementedError(
203            "Update of EFS File System via cloudformation is not yet implemented."
204        )
205
206    @classmethod
207    def delete_from_cloudformation_json(
208        cls, resource_name, cloudformation_json, region_name
209    ):
210        return efs_backends[region_name].delete_file_system(resource_name)
211
212
213class MountTarget(CloudFormationModel):
214    """A model for an EFS Mount Target."""
215
216    def __init__(self, file_system, subnet, ip_address, security_groups):
217        # Set the simple given parameters.
218        self.file_system_id = file_system.file_system_id
219        self._file_system = file_system
220        self._file_system.add_mount_target(subnet, self)
221        self.subnet_id = subnet.id
222        self._subnet = subnet
223        self.vpc_id = subnet.vpc_id
224        self.security_groups = security_groups
225
226        # Check the number of security groups.
227        if self.security_groups is not None and len(self.security_groups) > 5:
228            raise SecurityGroupLimitExceeded(
229                "The maximum number of security groups per interface has been reached."
230            )
231
232        # Get an IP address if needed, otherwise validate the one we're given.
233        if ip_address is None:
234            ip_address = subnet.get_available_subnet_ip(self)
235        else:
236            try:
237                subnet.request_ip(ip_address, self)
238            except Exception as e:
239                if "IP" in str(e) and "CIDR" in str(e):
240                    raise BadRequest(
241                        "Address does not fall within the subnet's address range"
242                    )
243                else:
244                    raise e
245        self.ip_address = ip_address
246
247        # Init non-user-assigned values.
248        self.owner_id = ACCOUNT_ID
249        self.mount_target_id = "fsmt-{}".format(get_random_hex())
250        self.life_cycle_state = "available"
251        self.network_interface_id = None
252        self.availability_zone_id = subnet.availability_zone_id
253        self.availability_zone_name = subnet.availability_zone
254
255    def clean_up(self):
256        self._file_system.remove_mount_target(self._subnet)
257        self._subnet.del_subnet_ip(self.ip_address)
258
259    def set_network_interface(self, network_interface):
260        self.network_interface_id = network_interface.id
261
262    def info_json(self):
263        ret = {
264            underscores_to_camelcase(k.capitalize()): v
265            for k, v in self.__dict__.items()
266            if not k.startswith("_")
267        }
268        return ret
269
270    @property
271    def physical_resource_id(self):
272        return self.mounted_target_id
273
274    @property
275    def subnet_vpc_id(self):
276        return self._subnet.vpc_id
277
278    @staticmethod
279    def cloudformation_name_type():
280        pass
281
282    @staticmethod
283    def cloudformation_type():
284        return "AWS::EFS::MountTarget"
285
286    @classmethod
287    def create_from_cloudformation_json(
288        cls, resource_name, cloudformation_json, region_name, **kwargs
289    ):
290        # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-efs-mounttarget.html
291        props = deepcopy(cloudformation_json["Properties"])
292        props = {camelcase_to_underscores(k): v for k, v in props.items()}
293        return efs_backends[region_name].create_mount_target(**props)
294
295    @classmethod
296    def update_from_cloudformation_json(
297        cls, original_resource, new_resource_name, cloudformation_json, region_name
298    ):
299        raise NotImplementedError(
300            "Updates of EFS Mount Target via cloudformation are not yet implemented."
301        )
302
303    @classmethod
304    def delete_from_cloudformation_json(
305        cls, resource_name, cloudformation_json, region_name
306    ):
307        return efs_backends[region_name].delete_mount_target(resource_name)
308
309
310class EFSBackend(BaseBackend):
311    """The backend manager of EFS resources.
312
313    This is the state-machine for each region, tracking the file systems, mount targets,
314    and eventually access points that are deployed. Creating, updating, and destroying
315    such resources should always go through this class.
316    """
317
318    def __init__(self, region_name=None):
319        super(EFSBackend, self).__init__()
320        self.region_name = region_name
321        self.creation_tokens = set()
322        self.file_systems_by_id = {}
323        self.mount_targets_by_id = {}
324        self.next_markers = {}
325
326    def reset(self):
327        # preserve region
328        region_name = self.region_name
329        self.__dict__ = {}
330        self.__init__(region_name)
331
332    def _mark_description(self, corpus, max_items):
333        if max_items < len(corpus):
334            new_corpus = corpus[max_items:]
335            new_hash = md5(json.dumps(new_corpus).encode("utf-8"))
336            next_marker = new_hash.hexdigest()
337            self.next_markers[next_marker] = new_corpus
338        else:
339            next_marker = None
340        return next_marker
341
342    @property
343    def ec2_backend(self):
344        return ec2_backends[self.region_name]
345
346    def create_file_system(self, creation_token, **params):
347        """Create a new EFS File System Volume.
348
349        https://docs.aws.amazon.com/efs/latest/ug/API_CreateFileSystem.html
350        """
351        if not creation_token:
352            raise ValueError("No creation token given.")
353        if creation_token in self.creation_tokens:
354            raise FileSystemAlreadyExists(creation_token)
355
356        # Create a new file system ID:
357        def make_id():
358            return "fs-{}".format(get_random_hex())
359
360        fsid = make_id()
361        while fsid in self.file_systems_by_id:
362            fsid = make_id()
363        self.file_systems_by_id[fsid] = FileSystem(
364            self.region_name,
365            creation_token,
366            fsid,
367            **{k: v for k, v in params.items() if v is not None}
368        )
369        self.creation_tokens.add(creation_token)
370        return self.file_systems_by_id[fsid]
371
372    def describe_file_systems(self, marker, max_items, creation_token, file_system_id):
373        """Describe all the EFS File Systems, or specific File Systems.
374
375        https://docs.aws.amazon.com/efs/latest/ug/API_DescribeFileSystems.html
376        """
377        # Restrict the possible corpus of resules based on inputs.
378        if creation_token and file_system_id:
379            raise BadRequest(
380                "Request cannot contain both a file system ID and a creation token."
381            )
382        elif creation_token:
383            # Handle the creation token case.
384            corpus = []
385            for fs in self.file_systems_by_id.values():
386                if fs.creation_token == creation_token:
387                    corpus.append(fs.info_json())
388        elif file_system_id:
389            # Handle the case that a file_system_id is given.
390            if file_system_id not in self.file_systems_by_id:
391                raise FileSystemNotFound(file_system_id)
392            corpus = [self.file_systems_by_id[file_system_id]]
393        elif marker is not None:
394            # Handle the case that a marker is given.
395            if marker not in self.next_markers:
396                raise BadRequest("Invalid Marker")
397            corpus = self.next_markers[marker]
398        else:
399            # Handle the vanilla case.
400            corpus = [fs.info_json() for fs in self.file_systems_by_id.values()]
401
402        # Handle the max_items parameter.
403        file_systems = corpus[:max_items]
404        next_marker = self._mark_description(corpus, max_items)
405        return next_marker, file_systems
406
407    def create_mount_target(
408        self, file_system_id, subnet_id, ip_address=None, security_groups=None
409    ):
410        """Create a new EFS Mount Target for a given File System to a given subnet.
411
412        Note that you can only create one mount target for each availability zone
413        (which is implied by the subnet ID).
414
415        https://docs.aws.amazon.com/efs/latest/ug/API_CreateMountTarget.html
416        """
417        # Get the relevant existing resources
418        try:
419            subnet = self.ec2_backend.get_subnet(subnet_id)
420        except InvalidSubnetIdError:
421            raise SubnetNotFound(subnet_id)
422        if file_system_id not in self.file_systems_by_id:
423            raise FileSystemNotFound(file_system_id)
424        file_system = self.file_systems_by_id[file_system_id]
425
426        # Validate the security groups.
427        if security_groups:
428            sg_lookup = {sg.id for sg in self.ec2_backend.describe_security_groups()}
429            for sg_id in security_groups:
430                if sg_id not in sg_lookup:
431                    raise SecurityGroupNotFound(sg_id)
432
433        # Create the new mount target
434        mount_target = MountTarget(file_system, subnet, ip_address, security_groups)
435
436        # Establish the network interface.
437        network_interface = self.ec2_backend.create_network_interface(
438            subnet, [mount_target.ip_address], group_ids=security_groups
439        )
440        mount_target.set_network_interface(network_interface)
441
442        # Record the new mount target
443        self.mount_targets_by_id[mount_target.mount_target_id] = mount_target
444        return mount_target
445
446    def describe_mount_targets(
447        self, max_items, file_system_id, mount_target_id, access_point_id, marker
448    ):
449        """Describe the mount targets given a mount target ID or a file system ID.
450
451        Note that as of this writing access points, and thus access point IDs are not
452        supported.
453
454        https://docs.aws.amazon.com/efs/latest/ug/API_DescribeMountTargets.html
455        """
456        # Restrict the possible corpus of results based on inputs.
457        if not (bool(file_system_id) ^ bool(mount_target_id) ^ bool(access_point_id)):
458            raise BadRequest("Must specify exactly one mutually exclusive parameter.")
459        elif file_system_id:
460            # Handle the case that a file_system_id is given.
461            if file_system_id not in self.file_systems_by_id:
462                raise FileSystemNotFound(file_system_id)
463            corpus = [
464                mt.info_json()
465                for mt in self.file_systems_by_id[file_system_id].iter_mount_targets()
466            ]
467        elif mount_target_id:
468            if mount_target_id not in self.mount_targets_by_id:
469                raise MountTargetNotFound(mount_target_id)
470            # Handle mount target specification case.
471            corpus = [self.mount_targets_by_id[mount_target_id].info_json()]
472        else:
473            # We don't handle access_point_id's yet.
474            assert False, "Moto does not yet support EFS access points."
475
476        # Handle the case that a marker is given. Note that the handling is quite
477        # different from that in describe_file_systems.
478        if marker is not None:
479            if marker not in self.next_markers:
480                raise BadRequest("Invalid Marker")
481            corpus_mtids = {m["MountTargetId"] for m in corpus}
482            marked_mtids = {m["MountTargetId"] for m in self.next_markers[marker]}
483            mt_ids = corpus_mtids & marked_mtids
484            corpus = [self.mount_targets_by_id[mt_id].info_json() for mt_id in mt_ids]
485
486        # Handle the max_items parameter.
487        mount_targets = corpus[:max_items]
488        next_marker = self._mark_description(corpus, max_items)
489        return next_marker, mount_targets
490
491    def delete_file_system(self, file_system_id):
492        """Delete the file system specified by the given file_system_id.
493
494        Note that mount targets must be deleted first.
495
496        https://docs.aws.amazon.com/efs/latest/ug/API_DeleteFileSystem.html
497        """
498        if file_system_id not in self.file_systems_by_id:
499            raise FileSystemNotFound(file_system_id)
500
501        file_system = self.file_systems_by_id[file_system_id]
502        if file_system.number_of_mount_targets > 0:
503            raise FileSystemInUse(
504                "Must delete all mount targets before deleting file system."
505            )
506
507        del self.file_systems_by_id[file_system_id]
508        self.creation_tokens.remove(file_system.creation_token)
509        return
510
511    def delete_mount_target(self, mount_target_id):
512        """Delete a mount target specified by the given mount_target_id.
513
514        Note that this will also delete a network interface.
515
516        https://docs.aws.amazon.com/efs/latest/ug/API_DeleteMountTarget.html
517        """
518        if mount_target_id not in self.mount_targets_by_id:
519            raise MountTargetNotFound(mount_target_id)
520
521        mount_target = self.mount_targets_by_id[mount_target_id]
522        self.ec2_backend.delete_network_interface(mount_target.network_interface_id)
523        del self.mount_targets_by_id[mount_target_id]
524        mount_target.clean_up()
525        return
526
527    def describe_backup_policy(self, file_system_id):
528        backup_policy = self.file_systems_by_id[file_system_id].backup_policy
529        if not backup_policy:
530            raise PolicyNotFound("None")
531        return backup_policy
532
533
534efs_backends = {}
535for region in Session().get_available_regions("efs"):
536    efs_backends[region] = EFSBackend(region)
537for region in Session().get_available_regions("efs", partition_name="aws-us-gov"):
538    efs_backends[region] = EFSBackend(region)
539for region in Session().get_available_regions("efs", partition_name="aws-cn"):
540    efs_backends[region] = EFSBackend(region)
541