1"""Handles Directory Service requests, invokes methods, returns responses.""" 2import json 3 4from moto.core.exceptions import InvalidToken 5from moto.core.responses import BaseResponse 6from moto.ds.exceptions import InvalidNextTokenException 7from moto.ds.models import ds_backends 8 9 10class DirectoryServiceResponse(BaseResponse): 11 """Handler for DirectoryService requests and responses.""" 12 13 @property 14 def ds_backend(self): 15 """Return backend instance specific for this region.""" 16 return ds_backends[self.region] 17 18 def connect_directory(self): 19 """Create an AD Connector to connect to a self-managed directory.""" 20 name = self._get_param("Name") 21 short_name = self._get_param("ShortName") 22 password = self._get_param("Password") 23 description = self._get_param("Description") 24 size = self._get_param("Size") 25 connect_settings = self._get_param("ConnectSettings") 26 tags = self._get_param("Tags", []) 27 directory_id = self.ds_backend.connect_directory( 28 region=self.region, 29 name=name, 30 short_name=short_name, 31 password=password, 32 description=description, 33 size=size, 34 connect_settings=connect_settings, 35 tags=tags, 36 ) 37 return json.dumps({"DirectoryId": directory_id}) 38 39 def create_directory(self): 40 """Create a Simple AD directory.""" 41 name = self._get_param("Name") 42 short_name = self._get_param("ShortName") 43 password = self._get_param("Password") 44 description = self._get_param("Description") 45 size = self._get_param("Size") 46 vpc_settings = self._get_param("VpcSettings") 47 tags = self._get_param("Tags", []) 48 directory_id = self.ds_backend.create_directory( 49 region=self.region, 50 name=name, 51 short_name=short_name, 52 password=password, 53 description=description, 54 size=size, 55 vpc_settings=vpc_settings, 56 tags=tags, 57 ) 58 return json.dumps({"DirectoryId": directory_id}) 59 60 def create_alias(self): 61 """Create an alias and assign the alias to the directory.""" 62 directory_id = self._get_param("DirectoryId") 63 alias = self._get_param("Alias") 64 response = self.ds_backend.create_alias(directory_id, alias) 65 return json.dumps(response) 66 67 def create_microsoft_ad(self): 68 """Create a Microsoft AD directory.""" 69 name = self._get_param("Name") 70 short_name = self._get_param("ShortName") 71 password = self._get_param("Password") 72 description = self._get_param("Description") 73 vpc_settings = self._get_param("VpcSettings") 74 edition = self._get_param("Edition") 75 tags = self._get_param("Tags", []) 76 directory_id = self.ds_backend.create_microsoft_ad( 77 region=self.region, 78 name=name, 79 short_name=short_name, 80 password=password, 81 description=description, 82 vpc_settings=vpc_settings, 83 edition=edition, 84 tags=tags, 85 ) 86 return json.dumps({"DirectoryId": directory_id}) 87 88 def delete_directory(self): 89 """Delete a Directory Service directory.""" 90 directory_id_arg = self._get_param("DirectoryId") 91 directory_id = self.ds_backend.delete_directory(directory_id_arg) 92 return json.dumps({"DirectoryId": directory_id}) 93 94 def describe_directories(self): 95 """Return directory info for the given IDs or all IDs.""" 96 directory_ids = self._get_param("DirectoryIds") 97 next_token = self._get_param("NextToken") 98 limit = self._get_int_param("Limit") 99 try: 100 (descriptions, next_token) = self.ds_backend.describe_directories( 101 directory_ids, next_token=next_token, limit=limit 102 ) 103 except InvalidToken as exc: 104 raise InvalidNextTokenException() from exc 105 106 response = {"DirectoryDescriptions": [x.to_json() for x in descriptions]} 107 if next_token: 108 response["NextToken"] = next_token 109 return json.dumps(response) 110 111 def disable_sso(self): 112 """Disable single-sign on for a directory.""" 113 directory_id = self._get_param("DirectoryId") 114 username = self._get_param("UserName") 115 password = self._get_param("Password") 116 self.ds_backend.disable_sso(directory_id, username, password) 117 return "" 118 119 def enable_sso(self): 120 """Enable single-sign on for a directory.""" 121 directory_id = self._get_param("DirectoryId") 122 username = self._get_param("UserName") 123 password = self._get_param("Password") 124 self.ds_backend.enable_sso(directory_id, username, password) 125 return "" 126 127 def get_directory_limits(self): 128 """Return directory limit information for the current region.""" 129 limits = self.ds_backend.get_directory_limits() 130 return json.dumps({"DirectoryLimits": limits}) 131 132 def add_tags_to_resource(self): 133 """Add or overwrite on or more tags for specified directory.""" 134 resource_id = self._get_param("ResourceId") 135 tags = self._get_param("Tags") 136 self.ds_backend.add_tags_to_resource(resource_id=resource_id, tags=tags) 137 return "" 138 139 def remove_tags_from_resource(self): 140 """Removes tags from a directory.""" 141 resource_id = self._get_param("ResourceId") 142 tag_keys = self._get_param("TagKeys") 143 self.ds_backend.remove_tags_from_resource( 144 resource_id=resource_id, tag_keys=tag_keys 145 ) 146 return "" 147 148 def list_tags_for_resource(self): 149 """Lists all tags on a directory.""" 150 resource_id = self._get_param("ResourceId") 151 next_token = self._get_param("NextToken") 152 limit = self._get_param("Limit") 153 try: 154 (tags, next_token) = self.ds_backend.list_tags_for_resource( 155 resource_id=resource_id, next_token=next_token, limit=limit 156 ) 157 except InvalidToken as exc: 158 raise InvalidNextTokenException() from exc 159 160 response = {"Tags": tags} 161 if next_token: 162 response["NextToken"] = next_token 163 return json.dumps(response) 164