1""" 2Lexicon UltraDNS Provider 3 4Derived from PowerDNS provider 5 6API Docs: https://doc.ultradns.com/md/httpapi/api_spec/ 7 8Implementation notes: 9* The UltraDNS API does not assign a unique identifier to each record in the way 10that Lexicon expects. We work around this by creating an ID based on the record 11name, type and content, which when taken together are always unique 12* The UltraDNS API has no notion of 'create a single record' or 'delete a single 13record'. All operations are either 'replace the RRSet with this new set of records' 14or 'delete all records for this name and type. Similarly, there is no notion of 15'change the content of this record', because records are identified by their name, 16type and content. 17* The API is very picky about the format of values used when creating records: 18** CNAMEs must be fully qualified 19This is why the _clean_content and _unclean_content methods exist, to convert 20back and forth between the format UltraDNS expects, and the format Lexicon uses 21""" 22import hashlib 23import json 24import logging 25 26import requests 27 28from lexicon.providers.base import Provider as BaseProvider 29 30LOGGER = logging.getLogger(__name__) 31 32NAMESERVER_DOMAINS = ["ultradns.net"] 33 34 35def provider_parser(subparser): 36 """Configure provider parser for ultradns""" 37 subparser.add_argument( 38 "--auth-token", 39 help="specify token for authentication;" 40 + " if not set --auth-token, --auth-password are used", 41 ) 42 subparser.add_argument( 43 "--auth-username", help="specify username for authentication" 44 ) 45 subparser.add_argument( 46 "--auth-password", help="specify password for authentication" 47 ) 48 49 50class Provider(BaseProvider): 51 """Provider class for UltraDNS""" 52 53 def __init__(self, config): 54 super(Provider, self).__init__(config) 55 56 self.api_key = self._get_provider_option("auth_token") 57 58 assert (self.api_key is not None) or ( 59 self._get_provider_option("auth_username") is not None 60 and self._get_provider_option("auth_password") is not None 61 ) 62 63 def zone_data(self): 64 """Get zone data""" 65 data = self._get("/zones/" + self._ensure_dot(self.domain) + "/rrsets").json() 66 for i, rrset in enumerate(data["rrSets"]): 67 data["rrSets"][i]["rrtype"] = self._clean_rrtype(rrset["rrtype"]) 68 return data 69 70 def _authenticate(self): 71 if self.api_key is None: 72 username = self._get_provider_option("auth_username") 73 password = self._get_provider_option("auth_password") 74 url = "https://restapi.ultradns.com/v2/authorization/token" 75 data = { 76 "grant_type": "password", 77 "username": username, 78 "password": password, 79 } 80 81 result = requests.post(url, data=data) 82 result.raise_for_status() 83 self.api_key = result.json()["accessToken"] 84 85 assert self.api_key is not None 86 self.domain_id = self.domain 87 # Test auth works 88 self.zone_data() 89 90 def _make_identifier(self, rtype, name, content): 91 sha256 = hashlib.sha256() 92 sha256.update(("type=" + rtype + ",").encode("utf-8")) 93 sha256.update(("name=" + name + ",").encode("utf-8")) 94 sha256.update(("data=" + content + ",").encode("utf-8")) 95 return sha256.hexdigest()[0:7] 96 97 def _list_records(self, rtype=None, name=None, content=None): 98 records = [] 99 for rrset in self.zone_data()["rrSets"]: 100 if ( 101 name is None 102 or self._fqdn_name(rrset["ownerName"]) == self._fqdn_name(name) 103 ) and (rtype is None or rrset["rrtype"] == rtype): 104 for record in rrset["rdata"]: 105 if content is None or record == self._clean_content(rtype, content): 106 records.append( 107 { 108 "type": rrset["rrtype"], 109 "name": self._full_name(rrset["ownerName"]), 110 "ttl": rrset["ttl"], 111 "content": self._unclean_content( 112 rrset["rrtype"], record 113 ), 114 "id": self._make_identifier( 115 rrset["rrtype"], rrset["ownerName"], record 116 ), 117 } 118 ) 119 LOGGER.debug("list_records: %s", records) 120 return records 121 122 def _clean_rrtype(self, rtype): 123 """ UltraDNS returns records with types like 'MX (15)' """ 124 return rtype.split()[0] 125 126 def _clean_content(self, rtype, content): 127 if rtype == "CNAME" and not content.endswith("."): 128 # Regularise non-FQDN CNAMEs - do not affect FQDNs or we break out of zone 129 content = self._fqdn_name(content) 130 return content 131 132 def _unclean_content(self, rtype, content): 133 if rtype == "CNAME" and not content.endswith("."): 134 # Regularise non-FQDN CNAMEs - do not affect FQDNs or we break out of zone 135 content = self._full_name(content) 136 return content 137 138 def _create_record(self, rtype, name, content): 139 rname = self._fqdn_name(name) 140 newcontent = self._clean_content(rtype, content) 141 142 updated_data = { 143 "ownerName": rname, 144 "rrtype": rtype, 145 "rdata": [], 146 "ttl": self._get_lexicon_option("ttl") or 600, 147 } 148 149 updated_data["rdata"].append(newcontent) 150 151 found = False 152 for rrset in self.zone_data()["rrSets"]: 153 if rrset["ownerName"] == rname and rrset["rrtype"] == rtype: 154 updated_data["ttl"] = rrset["ttl"] 155 found = True 156 157 for record in rrset["rdata"]: 158 if record == newcontent: 159 return True # Exactly the same record exists, just return 160 updated_data["rdata"].append(record) 161 break 162 163 if found: 164 self._put( 165 f"/zones/{self._ensure_dot(self.domain)}/rrsets/{rtype}/{rname}", 166 data=updated_data, 167 ) 168 else: 169 self._post( 170 f"/zones/{self._ensure_dot(self.domain)}/rrsets/{rtype}/{rname}", 171 data=updated_data, 172 ) 173 174 return True 175 176 def _delete_record(self, identifier=None, rtype=None, name=None, content=None): 177 178 data = self.zone_data() 179 180 if identifier is not None: 181 found = False 182 for rrset in data["rrSets"]: 183 for record in rrset["rdata"]: 184 ident = self._make_identifier( 185 rrset["rrtype"], rrset["ownerName"], record 186 ) 187 if identifier == ident: 188 rtype = rrset["rrtype"] 189 name = self._full_name(rrset["ownerName"]) 190 content = self._unclean_content(rrset["rrtype"], record) 191 found = True 192 break 193 else: 194 continue 195 break # break out of the outer loop too 196 if not found: 197 return True # No match means nothing to do 198 199 LOGGER.debug("delete %s %s %s", rtype, name, content) 200 if rtype is None or name is None: 201 raise Exception("Must specify at least both rtype and name") 202 203 rname = self._fqdn_name(name) 204 205 found = False 206 for rrset in data["rrSets"]: 207 if ( 208 rrset["rrtype"] == rtype 209 and self._fqdn_name(rrset["ownerName"]) == rname 210 ): 211 update_data = rrset 212 found = True 213 214 if content is None: 215 update_data["rdata"] = [] 216 else: 217 new_record_list = [] 218 for record in update_data["rdata"]: 219 if self._clean_content(rrset["rrtype"], content) != record: 220 new_record_list.append(record) 221 222 update_data["rdata"] = new_record_list 223 break 224 225 if not found: 226 return True # Do nothing if the record did not exist 227 228 request = {"rrSets": [update_data]} 229 LOGGER.debug("request: %s", request) 230 231 if update_data["rdata"]: 232 self._put( 233 f"/zones/{self._ensure_dot(self.domain)}/rrsets/{rtype}/{rname}", 234 data=update_data, 235 ) 236 else: 237 self._delete( 238 f"/zones/{self._ensure_dot(self.domain)}/rrsets/{rtype}/{rname}" 239 ) 240 241 return True 242 243 def _update_record(self, identifier, rtype=None, name=None, content=None): 244 self._delete_record(identifier, rtype, name, content) 245 return self._create_record(rtype, name, content) 246 247 def _patch(self, url="/", data=None, query_params=None): 248 return self._request("PATCH", url, data=data, query_params=query_params) 249 250 def _request(self, action="GET", url="/", data=None, query_params=None): 251 if data is None: 252 data = {} 253 if query_params is None: 254 query_params = {} 255 response = requests.request( 256 action, 257 "https://restapi.ultradns.com/v2" + url, 258 params=query_params, 259 data=json.dumps(data), 260 headers={ 261 "Authorization": "Bearer " + self.api_key, 262 "Content-Type": "application/json", 263 "Accept": "application/json", 264 }, 265 ) 266 LOGGER.debug("response: %s", response.text) 267 response.raise_for_status() 268 return response 269 270 @classmethod 271 def _ensure_dot(cls, text): 272 """ 273 This function makes sure a string contains a dot at the end 274 """ 275 if text.endswith("."): 276 return text 277 return text + "." 278