1"""Base provider module for all Lexicon providers"""
2from abc import ABC, abstractmethod
3from typing import Any, Dict, List, Optional, Union
4
5from lexicon.config import ConfigResolver, legacy_config_resolver
6
7
8class Provider(ABC):
9    """
10    This is the base class for all lexicon Providers.
11    It provides common functionality and ensures that all implemented
12    Providers follow a standard ducktype.
13    All standardized options will be provided here as defaults, but can be overwritten
14    by environmental variables and cli arguments.
15
16    Common options are:
17
18    action
19    domain
20    type
21    name
22    content
23    ttl
24    priority
25    identifier
26
27    The provider_env_cli_options will also contain any Provider specific options:
28
29    auth_username
30    auth_token
31    auth_password
32    ...
33
34    :param config: is a ConfigResolver object that contains all the options
35    for this provider, merged from CLI and Env variables.
36    """
37
38    def __init__(self, config: Union[ConfigResolver, Dict]):
39        if not isinstance(config, ConfigResolver):
40            # If config is a plain dict, we are in a legacy situation.
41            # To protect the Provider API, the legacy dict is handled in a
42            # correctly defined ConfigResolver.
43            # Also, there may be some situation where `provider` key is not set in the config.
44            # It should not happen when Lexicon is called from Client, as it will set itself
45            # this key. However there were no automated logic if the Provider is used directly.
46            # So we provide this logic here.
47            if not config.get("provider_name") and not config.get("provider"):
48                config[
49                    "provider_name"
50                ] = __name__  # Obviously we use the module name itself.
51            self.config = legacy_config_resolver(config)
52        else:
53            self.config = config
54
55        # Default ttl
56        self.config.with_dict({"ttl": 3600})
57
58        self.provider_name = self.config.resolve(
59            "lexicon:provider_name"
60        ) or self.config.resolve("lexicon:provider")
61        self.domain = str(self.config.resolve("lexicon:domain"))
62        self.domain_id = None
63
64    # Provider API
65    def authenticate(self) -> None:
66        """
67        Authenticate against provider,
68        Make any requests required to get the domain's id for this provider,
69        so it can be used in subsequent calls.
70        Should throw AuthenticationError or requests.HTTPError if authentication fails for any reason,
71        of if the domain does not exist.
72        """
73        self._authenticate()
74
75    def create_record(self, rtype: str, name: str, content: str) -> bool:
76        """
77        Create record. If record already exists with the same content, do nothing.
78        """
79        return self._create_record(rtype, name, content)
80
81    def list_records(
82        self,
83        rtype: Optional[str] = None,
84        name: Optional[str] = None,
85        content: Optional[str] = None,
86    ) -> List[Dict]:
87        """
88        List all records. Return an empty list if no records found
89        type, name and content are used to filter records.
90        If possible filter during the query, otherwise filter after response is received.
91        """
92        return self._list_records(rtype=rtype, name=name, content=content)
93
94    def update_record(
95        self,
96        identifier: Optional[str] = None,
97        rtype: Optional[str] = None,
98        name: Optional[str] = None,
99        content: Optional[str] = None,
100    ) -> bool:
101        """
102        Update a record. Identifier must be specified.
103        """
104        return self._update_record(identifier, rtype=rtype, name=name, content=content)
105
106    def delete_record(
107        self,
108        identifier: Optional[str] = None,
109        rtype: Optional[str] = None,
110        name: Optional[str] = None,
111        content: Optional[str] = None,
112    ) -> bool:
113        """
114        Delete an existing record.
115        If record does not exist, do nothing.
116        If an identifier is specified, use it, otherwise do a lookup using type, name and content.
117        """
118        return self._delete_record(
119            identifier=identifier, rtype=rtype, name=name, content=content
120        )
121
122    # Internal abstract implementations
123    @abstractmethod
124    def _authenticate(self) -> None:
125        ...
126
127    @abstractmethod
128    def _create_record(self, rtype: str, name: str, content: str) -> bool:
129        ...
130
131    @abstractmethod
132    def _list_records(
133        self,
134        rtype: Optional[str] = None,
135        name: Optional[str] = None,
136        content: Optional[str] = None,
137    ) -> List[Dict]:
138        ...
139
140    @abstractmethod
141    def _update_record(
142        self,
143        identifier: Optional[str] = None,
144        rtype: Optional[str] = None,
145        name: Optional[str] = None,
146        content: Optional[str] = None,
147    ) -> bool:
148        ...
149
150    @abstractmethod
151    def _delete_record(
152        self,
153        identifier: Optional[str] = None,
154        rtype: Optional[str] = None,
155        name: Optional[str] = None,
156        content: Optional[str] = None,
157    ) -> bool:
158        ...
159
160    # Helpers
161    @abstractmethod
162    def _request(
163        self,
164        action: str = "GET",
165        url: str = "/",
166        data: Optional[Dict] = None,
167        query_params: Optional[Dict] = None,
168    ) -> Any:
169        ...
170
171    # Helpers
172    def _get(self, url: str = "/", query_params: Optional[Dict] = None) -> Any:
173        return self._request("GET", url, query_params=query_params)
174
175    def _post(
176        self,
177        url: str = "/",
178        data: Optional[Dict] = None,
179        query_params: Optional[Dict] = None,
180    ) -> Any:
181        return self._request("POST", url, data=data, query_params=query_params)
182
183    def _put(
184        self,
185        url: str = "/",
186        data: Optional[Dict] = None,
187        query_params: Optional[Dict] = None,
188    ) -> Any:
189        return self._request("PUT", url, data=data, query_params=query_params)
190
191    def _patch(
192        self,
193        url: str = "/",
194        data: Optional[Dict] = None,
195        query_params: Optional[Dict] = None,
196    ) -> Any:
197        return self._request("PATCH", url, data=data, query_params=query_params)
198
199    def _delete(self, url: str = "/", query_params: Optional[Dict] = None) -> Any:
200        return self._request("DELETE", url, query_params=query_params)
201
202    def _fqdn_name(self, record_name: str) -> str:
203        # strip trailing period from fqdn if present
204        record_name = record_name.rstrip(".")
205        # check if the record_name is fully specified
206        if not record_name.endswith(self.domain):
207            record_name = f"{record_name}.{self.domain}"
208        return f"{record_name}."  # return the fqdn name
209
210    def _full_name(self, record_name: str) -> str:
211        # strip trailing period from fqdn if present
212        record_name = record_name.rstrip(".")
213        # check if the record_name is fully specified
214        if not record_name.endswith(self.domain):
215            record_name = f"{record_name}.{self.domain}"
216        return record_name
217
218    def _relative_name(self, record_name: str) -> str:
219        # strip trailing period from fqdn if present
220        record_name = record_name.rstrip(".")
221        # check if the record_name is fully specified
222        if record_name.endswith(self.domain):
223            record_name = record_name[: -len(self.domain)]
224            record_name = record_name.rstrip(".")
225        return record_name
226
227    def _clean_TXT_record(self, record: Dict) -> Dict:
228        if record["type"] == "TXT":
229            # Some providers have quotes around the TXT records,
230            # so we're going to remove those extra quotes
231            record["content"] = record["content"][1:-1]
232        return record
233
234    def _get_lexicon_option(self, option: str) -> Any:
235        return self.config.resolve(f"lexicon:{option}")
236
237    def _get_provider_option(self, option: str) -> Any:
238        return self.config.resolve(f"lexicon:{self.provider_name}:{option}")
239