1"""Module provider for Hover""" 2import json 3import logging 4 5import requests 6 7from lexicon.exceptions import AuthenticationError 8from lexicon.providers.base import Provider as BaseProvider 9 10LOGGER = logging.getLogger(__name__) 11 12NAMESERVER_DOMAINS = ["hover.com"] 13 14 15def provider_parser(subparser): 16 """Return the parser for this provider""" 17 subparser.add_argument( 18 "--auth-username", help="specify username for authentication" 19 ) 20 subparser.add_argument( 21 "--auth-password", help="specify password for authentication" 22 ) 23 24 25class Provider(BaseProvider): 26 """Provider class for Hover""" 27 28 def __init__(self, config): 29 super(Provider, self).__init__(config) 30 self.domain_id = None 31 self.api_endpoint = "https://www.hover.com/api" 32 self.cookies = {} 33 34 def _authenticate(self): 35 # Getting required cookies "hover_session" and "hoverauth" 36 response = requests.get("https://www.hover.com/signin") 37 self.cookies["hover_session"] = response.cookies["hover_session"] 38 39 payload = { 40 "username": self._get_provider_option("auth_username"), 41 "password": self._get_provider_option("auth_password"), 42 } 43 response = requests.post( 44 "https://www.hover.com/api/login/", json=payload, cookies=self.cookies 45 ) 46 response.raise_for_status() 47 48 if "hoverauth" not in response.cookies: 49 raise Exception("Unexpected auth response") 50 self.cookies["hoverauth"] = response.cookies["hoverauth"] 51 52 # Make sure domain exists 53 # domain is stored in self.domain from BaseProvider 54 55 domains = self._list_domains() 56 for domain in domains: 57 if domain["name"] == self.domain: 58 self.domain_id = domain["id"] 59 break 60 else: 61 raise AuthenticationError(f"Domain {self.domain} not found") 62 63 def _list_domains(self): 64 response = self._get("/domains") 65 66 domains = [] 67 for domain in response["domains"]: 68 processed_domain = { 69 "name": domain["domain_name"], 70 "id": domain["id"], 71 "active": (domain["status"] == "active"), 72 } 73 domains.append(processed_domain) 74 75 LOGGER.debug("list_domains: %s", domains) 76 return domains 77 78 # List all records. Return an empty list if no records found 79 # type, name and content are used to filter records. 80 # If possible filter during the query, otherwise filter after response is received. 81 def _list_records(self, rtype=None, name=None, content=None): 82 payload = self._get(f"/domains/{self.domain_id}/dns") 83 84 # payload['domains'] should be a list of len 1 85 try: 86 raw_records = payload["domains"][0]["entries"] 87 except (KeyError, IndexError): 88 raise Exception("Unexpected response") 89 90 processed_records = [] 91 for record in raw_records: 92 processed_record = { 93 "type": record["type"], 94 "name": self._full_name(record["name"]), 95 "ttl": record["ttl"], 96 "content": record["content"], 97 "id": record["id"], 98 } 99 processed_records.append(processed_record) 100 101 if rtype: 102 processed_records = [ 103 record for record in processed_records if record["type"] == rtype 104 ] 105 if name: 106 name = self._relative_name(name) 107 processed_records = [ 108 record for record in processed_records if name in record["name"] 109 ] 110 if content: 111 processed_records = [ 112 record 113 for record in processed_records 114 if record["content"].lower() == content.lower() 115 ] 116 117 LOGGER.debug("list_records: %s", processed_records) 118 return processed_records 119 120 def _create_record(self, rtype, name, content): 121 name = self._relative_name(name) 122 records = self._list_records(rtype, name, content) 123 if records: 124 LOGGER.debug("not creating duplicate record: %s", records[0]) 125 return True 126 127 record = {"name": name, "type": rtype, "content": content} 128 if self._get_lexicon_option("ttl"): 129 record["ttl"] = self._get_lexicon_option("ttl") 130 131 LOGGER.debug("create_record: %s", record) 132 payload = self._post(f"/domains/{self.domain_id}/dns", record) 133 return payload["succeeded"] 134 135 # Update a record. Hover cannot update name so we delete and recreate. 136 def _update_record(self, identifier, rtype=None, name=None, content=None): 137 if identifier: 138 records = self._list_records() 139 records = [r for r in records if r["id"] == identifier] 140 else: 141 records = self._list_records(rtype, name, None) 142 143 if not records: 144 raise Exception("Record not found") 145 if len(records) > 1: 146 raise Exception("Record not unique") 147 orig_record = records[0] 148 orig_id = orig_record["id"] 149 150 new_rtype = rtype if rtype else orig_record["type"] 151 new_name = name if name else orig_record["name"] 152 new_content = content if content else orig_record["content"] 153 154 self._delete_record(orig_id) 155 return self._create_record(new_rtype, new_name, new_content) 156 157 # Delete an existing record. 158 # If record does not exist, do nothing. 159 def _delete_record(self, identifier=None, rtype=None, name=None, content=None): 160 delete_record_ids = [] 161 if not identifier: 162 records = self._list_records(rtype, name, content) 163 delete_record_ids = [record["id"] for record in records] 164 else: 165 delete_record_ids.append(identifier) 166 167 LOGGER.debug("delete_records: %s", delete_record_ids) 168 169 for record_id in delete_record_ids: 170 self._delete(f"/dns/{record_id}") 171 LOGGER.debug("delete_record: %s", record_id) 172 return True 173 174 # Helpers 175 def _request(self, action="GET", url="/", data=None, query_params=None): 176 if data is None: 177 data = {} 178 if query_params is None: 179 query_params = {} 180 response = requests.request( 181 action, 182 self.api_endpoint + url, 183 params=query_params, 184 data=json.dumps(data), 185 cookies=self.cookies, 186 headers={"Content-Type": "application/json"}, 187 ) 188 189 # if the request fails for any reason, throw an error. 190 response.raise_for_status() 191 try: 192 return response.json() 193 except ValueError: # response is not json 194 raise Exception("Did not get JSON response.") 195