1"""Base provider module for all Lexicon providers""" 2from abc import ABC, abstractmethod 3from typing import Any, Dict, List, Optional, Union 4 5from lexicon.config import ConfigResolver, legacy_config_resolver 6 7 8class Provider(ABC): 9 """ 10 This is the base class for all lexicon Providers. 11 It provides common functionality and ensures that all implemented 12 Providers follow a standard ducktype. 13 All standardized options will be provided here as defaults, but can be overwritten 14 by environmental variables and cli arguments. 15 16 Common options are: 17 18 action 19 domain 20 type 21 name 22 content 23 ttl 24 priority 25 identifier 26 27 The provider_env_cli_options will also contain any Provider specific options: 28 29 auth_username 30 auth_token 31 auth_password 32 ... 33 34 :param config: is a ConfigResolver object that contains all the options 35 for this provider, merged from CLI and Env variables. 36 """ 37 38 def __init__(self, config: Union[ConfigResolver, Dict]): 39 if not isinstance(config, ConfigResolver): 40 # If config is a plain dict, we are in a legacy situation. 41 # To protect the Provider API, the legacy dict is handled in a 42 # correctly defined ConfigResolver. 43 # Also, there may be some situation where `provider` key is not set in the config. 44 # It should not happen when Lexicon is called from Client, as it will set itself 45 # this key. However there were no automated logic if the Provider is used directly. 46 # So we provide this logic here. 47 if not config.get("provider_name") and not config.get("provider"): 48 config[ 49 "provider_name" 50 ] = __name__ # Obviously we use the module name itself. 51 self.config = legacy_config_resolver(config) 52 else: 53 self.config = config 54 55 # Default ttl 56 self.config.with_dict({"ttl": 3600}) 57 58 self.provider_name = self.config.resolve( 59 "lexicon:provider_name" 60 ) or self.config.resolve("lexicon:provider") 61 self.domain = str(self.config.resolve("lexicon:domain")) 62 self.domain_id = None 63 64 # Provider API 65 def authenticate(self) -> None: 66 """ 67 Authenticate against provider, 68 Make any requests required to get the domain's id for this provider, 69 so it can be used in subsequent calls. 70 Should throw AuthenticationError or requests.HTTPError if authentication fails for any reason, 71 of if the domain does not exist. 72 """ 73 self._authenticate() 74 75 def create_record(self, rtype: str, name: str, content: str) -> bool: 76 """ 77 Create record. If record already exists with the same content, do nothing. 78 """ 79 return self._create_record(rtype, name, content) 80 81 def list_records( 82 self, 83 rtype: Optional[str] = None, 84 name: Optional[str] = None, 85 content: Optional[str] = None, 86 ) -> List[Dict]: 87 """ 88 List all records. Return an empty list if no records found 89 type, name and content are used to filter records. 90 If possible filter during the query, otherwise filter after response is received. 91 """ 92 return self._list_records(rtype=rtype, name=name, content=content) 93 94 def update_record( 95 self, 96 identifier: Optional[str] = None, 97 rtype: Optional[str] = None, 98 name: Optional[str] = None, 99 content: Optional[str] = None, 100 ) -> bool: 101 """ 102 Update a record. Identifier must be specified. 103 """ 104 return self._update_record(identifier, rtype=rtype, name=name, content=content) 105 106 def delete_record( 107 self, 108 identifier: Optional[str] = None, 109 rtype: Optional[str] = None, 110 name: Optional[str] = None, 111 content: Optional[str] = None, 112 ) -> bool: 113 """ 114 Delete an existing record. 115 If record does not exist, do nothing. 116 If an identifier is specified, use it, otherwise do a lookup using type, name and content. 117 """ 118 return self._delete_record( 119 identifier=identifier, rtype=rtype, name=name, content=content 120 ) 121 122 # Internal abstract implementations 123 @abstractmethod 124 def _authenticate(self) -> None: 125 ... 126 127 @abstractmethod 128 def _create_record(self, rtype: str, name: str, content: str) -> bool: 129 ... 130 131 @abstractmethod 132 def _list_records( 133 self, 134 rtype: Optional[str] = None, 135 name: Optional[str] = None, 136 content: Optional[str] = None, 137 ) -> List[Dict]: 138 ... 139 140 @abstractmethod 141 def _update_record( 142 self, 143 identifier: Optional[str] = None, 144 rtype: Optional[str] = None, 145 name: Optional[str] = None, 146 content: Optional[str] = None, 147 ) -> bool: 148 ... 149 150 @abstractmethod 151 def _delete_record( 152 self, 153 identifier: Optional[str] = None, 154 rtype: Optional[str] = None, 155 name: Optional[str] = None, 156 content: Optional[str] = None, 157 ) -> bool: 158 ... 159 160 # Helpers 161 @abstractmethod 162 def _request( 163 self, 164 action: str = "GET", 165 url: str = "/", 166 data: Optional[Dict] = None, 167 query_params: Optional[Dict] = None, 168 ) -> Any: 169 ... 170 171 # Helpers 172 def _get(self, url: str = "/", query_params: Optional[Dict] = None) -> Any: 173 return self._request("GET", url, query_params=query_params) 174 175 def _post( 176 self, 177 url: str = "/", 178 data: Optional[Dict] = None, 179 query_params: Optional[Dict] = None, 180 ) -> Any: 181 return self._request("POST", url, data=data, query_params=query_params) 182 183 def _put( 184 self, 185 url: str = "/", 186 data: Optional[Dict] = None, 187 query_params: Optional[Dict] = None, 188 ) -> Any: 189 return self._request("PUT", url, data=data, query_params=query_params) 190 191 def _patch( 192 self, 193 url: str = "/", 194 data: Optional[Dict] = None, 195 query_params: Optional[Dict] = None, 196 ) -> Any: 197 return self._request("PATCH", url, data=data, query_params=query_params) 198 199 def _delete(self, url: str = "/", query_params: Optional[Dict] = None) -> Any: 200 return self._request("DELETE", url, query_params=query_params) 201 202 def _fqdn_name(self, record_name: str) -> str: 203 # strip trailing period from fqdn if present 204 record_name = record_name.rstrip(".") 205 # check if the record_name is fully specified 206 if not record_name.endswith(self.domain): 207 record_name = f"{record_name}.{self.domain}" 208 return f"{record_name}." # return the fqdn name 209 210 def _full_name(self, record_name: str) -> str: 211 # strip trailing period from fqdn if present 212 record_name = record_name.rstrip(".") 213 # check if the record_name is fully specified 214 if not record_name.endswith(self.domain): 215 record_name = f"{record_name}.{self.domain}" 216 return record_name 217 218 def _relative_name(self, record_name: str) -> str: 219 # strip trailing period from fqdn if present 220 record_name = record_name.rstrip(".") 221 # check if the record_name is fully specified 222 if record_name.endswith(self.domain): 223 record_name = record_name[: -len(self.domain)] 224 record_name = record_name.rstrip(".") 225 return record_name 226 227 def _clean_TXT_record(self, record: Dict) -> Dict: 228 if record["type"] == "TXT": 229 # Some providers have quotes around the TXT records, 230 # so we're going to remove those extra quotes 231 record["content"] = record["content"][1:-1] 232 return record 233 234 def _get_lexicon_option(self, option: str) -> Any: 235 return self.config.resolve(f"lexicon:{option}") 236 237 def _get_provider_option(self, option: str) -> Any: 238 return self.config.resolve(f"lexicon:{self.provider_name}:{option}") 239