1"""
2Lexicon UltraDNS Provider
3
4Derived from PowerDNS provider
5
6API Docs: https://doc.ultradns.com/md/httpapi/api_spec/
7
8Implementation notes:
9* The UltraDNS API does not assign a unique identifier to each record in the way
10that Lexicon expects. We work around this by creating an ID based on the record
11name, type and content, which when taken together are always unique
12* The UltraDNS API has no notion of 'create a single record' or 'delete a single
13record'. All operations are either 'replace the RRSet with this new set of records'
14or 'delete all records for this name and type. Similarly, there is no notion of
15'change the content of this record', because records are identified by their name,
16type and content.
17* The API is very picky about the format of values used when creating records:
18** CNAMEs must be fully qualified
19This is why the _clean_content and _unclean_content methods exist, to convert
20back and forth between the format UltraDNS expects, and the format Lexicon uses
21"""
22import hashlib
23import json
24import logging
25
26import requests
27
28from lexicon.providers.base import Provider as BaseProvider
29
30LOGGER = logging.getLogger(__name__)
31
32NAMESERVER_DOMAINS = ["ultradns.net"]
33
34
35def provider_parser(subparser):
36    """Configure provider parser for ultradns"""
37    subparser.add_argument(
38        "--auth-token",
39        help="specify token for authentication;"
40        + " if not set --auth-token, --auth-password are used",
41    )
42    subparser.add_argument(
43        "--auth-username", help="specify username for authentication"
44    )
45    subparser.add_argument(
46        "--auth-password", help="specify password for authentication"
47    )
48
49
50class Provider(BaseProvider):
51    """Provider class for UltraDNS"""
52
53    def __init__(self, config):
54        super(Provider, self).__init__(config)
55
56        self.api_key = self._get_provider_option("auth_token")
57
58        assert (self.api_key is not None) or (
59            self._get_provider_option("auth_username") is not None
60            and self._get_provider_option("auth_password") is not None
61        )
62
63    def zone_data(self):
64        """Get zone data"""
65        data = self._get("/zones/" + self._ensure_dot(self.domain) + "/rrsets").json()
66        for i, rrset in enumerate(data["rrSets"]):
67            data["rrSets"][i]["rrtype"] = self._clean_rrtype(rrset["rrtype"])
68        return data
69
70    def _authenticate(self):
71        if self.api_key is None:
72            username = self._get_provider_option("auth_username")
73            password = self._get_provider_option("auth_password")
74            url = "https://restapi.ultradns.com/v2/authorization/token"
75            data = {
76                "grant_type": "password",
77                "username": username,
78                "password": password,
79            }
80
81            result = requests.post(url, data=data)
82            result.raise_for_status()
83            self.api_key = result.json()["accessToken"]
84
85        assert self.api_key is not None
86        self.domain_id = self.domain
87        # Test auth works
88        self.zone_data()
89
90    def _make_identifier(self, rtype, name, content):
91        sha256 = hashlib.sha256()
92        sha256.update(("type=" + rtype + ",").encode("utf-8"))
93        sha256.update(("name=" + name + ",").encode("utf-8"))
94        sha256.update(("data=" + content + ",").encode("utf-8"))
95        return sha256.hexdigest()[0:7]
96
97    def _list_records(self, rtype=None, name=None, content=None):
98        records = []
99        for rrset in self.zone_data()["rrSets"]:
100            if (
101                name is None
102                or self._fqdn_name(rrset["ownerName"]) == self._fqdn_name(name)
103            ) and (rtype is None or rrset["rrtype"] == rtype):
104                for record in rrset["rdata"]:
105                    if content is None or record == self._clean_content(rtype, content):
106                        records.append(
107                            {
108                                "type": rrset["rrtype"],
109                                "name": self._full_name(rrset["ownerName"]),
110                                "ttl": rrset["ttl"],
111                                "content": self._unclean_content(
112                                    rrset["rrtype"], record
113                                ),
114                                "id": self._make_identifier(
115                                    rrset["rrtype"], rrset["ownerName"], record
116                                ),
117                            }
118                        )
119        LOGGER.debug("list_records: %s", records)
120        return records
121
122    def _clean_rrtype(self, rtype):
123        """ UltraDNS returns records with types like 'MX (15)' """
124        return rtype.split()[0]
125
126    def _clean_content(self, rtype, content):
127        if rtype == "CNAME" and not content.endswith("."):
128            # Regularise non-FQDN CNAMEs - do not affect FQDNs or we break out of zone
129            content = self._fqdn_name(content)
130        return content
131
132    def _unclean_content(self, rtype, content):
133        if rtype == "CNAME" and not content.endswith("."):
134            # Regularise non-FQDN CNAMEs - do not affect FQDNs or we break out of zone
135            content = self._full_name(content)
136        return content
137
138    def _create_record(self, rtype, name, content):
139        rname = self._fqdn_name(name)
140        newcontent = self._clean_content(rtype, content)
141
142        updated_data = {
143            "ownerName": rname,
144            "rrtype": rtype,
145            "rdata": [],
146            "ttl": self._get_lexicon_option("ttl") or 600,
147        }
148
149        updated_data["rdata"].append(newcontent)
150
151        found = False
152        for rrset in self.zone_data()["rrSets"]:
153            if rrset["ownerName"] == rname and rrset["rrtype"] == rtype:
154                updated_data["ttl"] = rrset["ttl"]
155                found = True
156
157                for record in rrset["rdata"]:
158                    if record == newcontent:
159                        return True  # Exactly the same record exists, just return
160                    updated_data["rdata"].append(record)
161                break
162
163        if found:
164            self._put(
165                f"/zones/{self._ensure_dot(self.domain)}/rrsets/{rtype}/{rname}",
166                data=updated_data,
167            )
168        else:
169            self._post(
170                f"/zones/{self._ensure_dot(self.domain)}/rrsets/{rtype}/{rname}",
171                data=updated_data,
172            )
173
174        return True
175
176    def _delete_record(self, identifier=None, rtype=None, name=None, content=None):
177
178        data = self.zone_data()
179
180        if identifier is not None:
181            found = False
182            for rrset in data["rrSets"]:
183                for record in rrset["rdata"]:
184                    ident = self._make_identifier(
185                        rrset["rrtype"], rrset["ownerName"], record
186                    )
187                    if identifier == ident:
188                        rtype = rrset["rrtype"]
189                        name = self._full_name(rrset["ownerName"])
190                        content = self._unclean_content(rrset["rrtype"], record)
191                        found = True
192                        break
193                else:
194                    continue
195                break  # break out of the outer loop too
196            if not found:
197                return True  # No match means nothing to do
198
199        LOGGER.debug("delete %s %s %s", rtype, name, content)
200        if rtype is None or name is None:
201            raise Exception("Must specify at least both rtype and name")
202
203        rname = self._fqdn_name(name)
204
205        found = False
206        for rrset in data["rrSets"]:
207            if (
208                rrset["rrtype"] == rtype
209                and self._fqdn_name(rrset["ownerName"]) == rname
210            ):
211                update_data = rrset
212                found = True
213
214                if content is None:
215                    update_data["rdata"] = []
216                else:
217                    new_record_list = []
218                    for record in update_data["rdata"]:
219                        if self._clean_content(rrset["rrtype"], content) != record:
220                            new_record_list.append(record)
221
222                    update_data["rdata"] = new_record_list
223                break
224
225        if not found:
226            return True  # Do nothing if the record did not exist
227
228        request = {"rrSets": [update_data]}
229        LOGGER.debug("request: %s", request)
230
231        if update_data["rdata"]:
232            self._put(
233                f"/zones/{self._ensure_dot(self.domain)}/rrsets/{rtype}/{rname}",
234                data=update_data,
235            )
236        else:
237            self._delete(
238                f"/zones/{self._ensure_dot(self.domain)}/rrsets/{rtype}/{rname}"
239            )
240
241        return True
242
243    def _update_record(self, identifier, rtype=None, name=None, content=None):
244        self._delete_record(identifier, rtype, name, content)
245        return self._create_record(rtype, name, content)
246
247    def _patch(self, url="/", data=None, query_params=None):
248        return self._request("PATCH", url, data=data, query_params=query_params)
249
250    def _request(self, action="GET", url="/", data=None, query_params=None):
251        if data is None:
252            data = {}
253        if query_params is None:
254            query_params = {}
255        response = requests.request(
256            action,
257            "https://restapi.ultradns.com/v2" + url,
258            params=query_params,
259            data=json.dumps(data),
260            headers={
261                "Authorization": "Bearer " + self.api_key,
262                "Content-Type": "application/json",
263                "Accept": "application/json",
264            },
265        )
266        LOGGER.debug("response: %s", response.text)
267        response.raise_for_status()
268        return response
269
270    @classmethod
271    def _ensure_dot(cls, text):
272        """
273        This function makes sure a string contains a dot at the end
274        """
275        if text.endswith("."):
276            return text
277        return text + "."
278