1"""Plugin common functions."""
2from abc import ABCMeta
3from abc import abstractmethod
4import argparse
5import logging
6import re
7import shutil
8import tempfile
9from typing import Any
10from typing import Callable
11from typing import Iterable
12from typing import List
13from typing import Optional
14from typing import Set
15from typing import Tuple
16
17import pkg_resources
18
19from certbot import achallenges
20from certbot import configuration
21from certbot import crypto_util
22from certbot import interfaces
23from certbot import errors
24from certbot import reverter
25from certbot._internal import constants
26from certbot.compat import filesystem
27from certbot.compat import os
28from certbot.interfaces import Installer as AbstractInstaller
29from certbot.interfaces import Plugin as AbstractPlugin
30from certbot.plugins.storage import PluginStorage
31
32logger = logging.getLogger(__name__)
33
34
35def option_namespace(name: str) -> str:
36    """ArgumentParser options namespace (prefix of all options)."""
37    return name + "-"
38
39
40def dest_namespace(name: str) -> str:
41    """ArgumentParser dest namespace (prefix of all destinations)."""
42    return name.replace("-", "_") + "_"
43
44
45private_ips_regex = re.compile(
46    r"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|"
47    r"(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)")
48hostname_regex = re.compile(
49    r"^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*[a-z]+$", re.IGNORECASE)
50
51
52class Plugin(AbstractPlugin, metaclass=ABCMeta):
53    """Generic plugin."""
54
55    def __init__(self, config: configuration.NamespaceConfig, name: str) -> None:
56        super().__init__(config, name)
57        self.config = config
58        self.name = name
59
60    @classmethod
61    @abstractmethod
62    def add_parser_arguments(cls, add: Callable[..., None]) -> None:
63        """Add plugin arguments to the CLI argument parser.
64
65        :param callable add: Function that proxies calls to
66            `argparse.ArgumentParser.add_argument` prepending options
67            with unique plugin name prefix.
68
69        """
70
71    @classmethod
72    def inject_parser_options(cls, parser: argparse.ArgumentParser, name: str) -> None:
73        """Inject parser options.
74
75        See `~.certbot.interfaces.Plugin.inject_parser_options` for docs.
76
77        """
78        # dummy function, doesn't check if dest.startswith(self.dest_namespace)
79        def add(arg_name_no_prefix: str, *args: Any, **kwargs: Any) -> None:
80            parser.add_argument(
81                "--{0}{1}".format(option_namespace(name), arg_name_no_prefix),
82                *args, **kwargs)
83        return cls.add_parser_arguments(add)
84
85    @property
86    def option_namespace(self) -> str:
87        """ArgumentParser options namespace (prefix of all options)."""
88        return option_namespace(self.name)
89
90    def option_name(self, name: str) -> str:
91        """Option name (include plugin namespace)."""
92        return self.option_namespace + name
93
94    @property
95    def dest_namespace(self) -> str:
96        """ArgumentParser dest namespace (prefix of all destinations)."""
97        return dest_namespace(self.name)
98
99    def dest(self, var: str) -> str:
100        """Find a destination for given variable ``var``."""
101        # this should do exactly the same what ArgumentParser(arg),
102        # does to "arg" to compute "dest"
103        return self.dest_namespace + var.replace("-", "_")
104
105    def conf(self, var: str) -> Any:
106        """Find a configuration value for variable ``var``."""
107        return getattr(self.config, self.dest(var))
108
109    def auth_hint(self, failed_achalls: List[achallenges.AnnotatedChallenge]) -> str:
110        """Human-readable string to help the user troubleshoot the authenticator.
111
112        Shown to the user if one or more of the attempted challenges were not a success.
113
114        Should describe, in simple language, what the authenticator tried to do, what went
115        wrong and what the user should try as their "next steps".
116
117        TODO: auth_hint belongs in Authenticator but can't be added until the next major
118        version of Certbot. For now, it lives in .Plugin and auth_handler will only call it
119        on authenticators that subclass .Plugin. For now, inherit from `.Plugin` to implement
120        and/or override the method.
121
122        :param list failed_achalls: List of one or more failed challenges
123                                    (:class:`achallenges.AnnotatedChallenge` subclasses).
124
125        :rtype str:
126        """
127        # This is a fallback hint. Authenticators should implement their own auth_hint that
128        # addresses the specific mechanics of that authenticator.
129        challs = " and ".join(sorted({achall.typ for achall in failed_achalls}))
130        return ("The Certificate Authority couldn't externally verify that the {name} plugin "
131                "completed the required {challs} challenges. Ensure the plugin is configured "
132                "correctly and that the changes it makes are accessible from the internet."
133                .format(name=self.name, challs=challs))
134
135
136class Installer(AbstractInstaller, Plugin, metaclass=ABCMeta):
137    """An installer base class with reverter and ssl_dhparam methods defined.
138
139    Installer plugins do not have to inherit from this class.
140
141    """
142    def __init__(self, *args: Any, **kwargs: Any) -> None:
143        super().__init__(*args, **kwargs)
144        self.storage = PluginStorage(self.config, self.name)
145        self.reverter = reverter.Reverter(self.config)
146
147    def add_to_checkpoint(self, save_files: Set[str], save_notes: str,
148                          temporary: bool = False) -> None:
149        """Add files to a checkpoint.
150
151        :param set save_files: set of filepaths to save
152        :param str save_notes: notes about changes during the save
153        :param bool temporary: True if the files should be added to a
154            temporary checkpoint rather than a permanent one. This is
155            usually used for changes that will soon be reverted.
156
157        :raises .errors.PluginError: when unable to add to checkpoint
158
159        """
160        if temporary:
161            checkpoint_func = self.reverter.add_to_temp_checkpoint
162        else:
163            checkpoint_func = self.reverter.add_to_checkpoint
164
165        try:
166            checkpoint_func(save_files, save_notes)
167        except errors.ReverterError as err:
168            raise errors.PluginError(str(err))
169
170    def finalize_checkpoint(self, title: str) -> None:
171        """Timestamp and save changes made through the reverter.
172
173        :param str title: Title describing checkpoint
174
175        :raises .errors.PluginError: when an error occurs
176
177        """
178        try:
179            self.reverter.finalize_checkpoint(title)
180        except errors.ReverterError as err:
181            raise errors.PluginError(str(err))
182
183    def recovery_routine(self) -> None:
184        """Revert all previously modified files.
185
186        Reverts all modified files that have not been saved as a checkpoint
187
188        :raises .errors.PluginError: If unable to recover the configuration
189
190        """
191        try:
192            self.reverter.recovery_routine()
193        except errors.ReverterError as err:
194            raise errors.PluginError(str(err))
195
196    def revert_temporary_config(self) -> None:
197        """Rollback temporary checkpoint.
198
199        :raises .errors.PluginError: when unable to revert config
200
201        """
202        try:
203            self.reverter.revert_temporary_config()
204        except errors.ReverterError as err:
205            raise errors.PluginError(str(err))
206
207    def rollback_checkpoints(self, rollback: int = 1) -> None:
208        """Rollback saved checkpoints.
209
210        :param int rollback: Number of checkpoints to revert
211
212        :raises .errors.PluginError: If there is a problem with the input or
213            the function is unable to correctly revert the configuration
214
215        """
216        try:
217            self.reverter.rollback_checkpoints(rollback)
218        except errors.ReverterError as err:
219            raise errors.PluginError(str(err))
220
221    @property
222    def ssl_dhparams(self) -> str:
223        """Full absolute path to ssl_dhparams file."""
224        return os.path.join(self.config.config_dir, constants.SSL_DHPARAMS_DEST)
225
226    @property
227    def updated_ssl_dhparams_digest(self) -> str:
228        """Full absolute path to digest of updated ssl_dhparams file."""
229        return os.path.join(self.config.config_dir, constants.UPDATED_SSL_DHPARAMS_DIGEST)
230
231    def install_ssl_dhparams(self) -> None:
232        """Copy Certbot's ssl_dhparams file into the system's config dir if required."""
233        install_version_controlled_file(
234            self.ssl_dhparams,
235            self.updated_ssl_dhparams_digest,
236            constants.SSL_DHPARAMS_SRC,
237            constants.ALL_SSL_DHPARAMS_HASHES)
238
239
240class Configurator(Installer, interfaces.Authenticator, metaclass=ABCMeta):
241    """
242    A plugin that extends certbot.plugins.common.Installer
243    and implements certbot.interfaces.Authenticator
244    """
245
246
247class Addr:
248    r"""Represents an virtual host address.
249
250    :param str addr: addr part of vhost address
251    :param str port: port number or \*, or ""
252
253    """
254    def __init__(self, tup: Tuple[str, str], ipv6: bool = False):
255        self.tup = tup
256        self.ipv6 = ipv6
257
258    @classmethod
259    def fromstring(cls, str_addr: str) -> 'Addr':
260        """Initialize Addr from string."""
261        if str_addr.startswith('['):
262            # ipv6 addresses starts with [
263            endIndex = str_addr.rfind(']')
264            host = str_addr[:endIndex + 1]
265            port = ''
266            if len(str_addr) > endIndex + 2 and str_addr[endIndex + 1] == ':':
267                port = str_addr[endIndex + 2:]
268            return cls((host, port), ipv6=True)
269        else:
270            tup = str_addr.partition(':')
271            return cls((tup[0], tup[2]))
272
273    def __str__(self) -> str:
274        if self.tup[1]:
275            return "%s:%s" % self.tup
276        return self.tup[0]
277
278    def normalized_tuple(self) -> Tuple[str, str]:
279        """Normalized representation of addr/port tuple
280        """
281        if self.ipv6:
282            return self.get_ipv6_exploded(), self.tup[1]
283        return self.tup
284
285    def __eq__(self, other: Any) -> bool:
286        if isinstance(other, self.__class__):
287            # compare normalized to take different
288            # styles of representation into account
289            return self.normalized_tuple() == other.normalized_tuple()
290
291        return False
292
293    def __hash__(self) -> int:
294        return hash(self.tup)
295
296    def get_addr(self) -> str:
297        """Return addr part of Addr object."""
298        return self.tup[0]
299
300    def get_port(self) -> str:
301        """Return port."""
302        return self.tup[1]
303
304    def get_addr_obj(self, port: str) -> 'Addr':
305        """Return new address object with same addr and new port."""
306        return self.__class__((self.tup[0], port), self.ipv6)
307
308    def _normalize_ipv6(self, addr: str) -> List[str]:
309        """Return IPv6 address in normalized form, helper function"""
310        addr = addr.lstrip("[")
311        addr = addr.rstrip("]")
312        return self._explode_ipv6(addr)
313
314    def get_ipv6_exploded(self) -> str:
315        """Return IPv6 in normalized form"""
316        if self.ipv6:
317            return ":".join(self._normalize_ipv6(self.tup[0]))
318        return ""
319
320    def _explode_ipv6(self, addr: str) -> List[str]:
321        """Explode IPv6 address for comparison"""
322        result = ['0', '0', '0', '0', '0', '0', '0', '0']
323        addr_list = addr.split(":")
324        if len(addr_list) > len(result):
325            # too long, truncate
326            addr_list = addr_list[0:len(result)]
327        append_to_end = False
328        for i, block in enumerate(addr_list):
329            if not block:
330                # encountered ::, so rest of the blocks should be
331                # appended to the end
332                append_to_end = True
333                continue
334            if len(block) > 1:
335                # remove leading zeros
336                block = block.lstrip("0")
337            if not append_to_end:
338                result[i] = str(block)
339            else:
340                # count the location from the end using negative indices
341                result[i-len(addr_list)] = str(block)
342        return result
343
344
345class ChallengePerformer:
346    """Abstract base for challenge performers.
347
348    :ivar configurator: Authenticator and installer plugin
349    :ivar achalls: Annotated challenges
350    :vartype achalls: `list` of `.KeyAuthorizationAnnotatedChallenge`
351    :ivar indices: Holds the indices of challenges from a larger array
352        so the user of the class doesn't have to.
353    :vartype indices: `list` of `int`
354
355    """
356
357    def __init__(self, configurator: Configurator):
358        self.configurator = configurator
359        self.achalls: List[achallenges.KeyAuthorizationAnnotatedChallenge] = []
360        self.indices: List[int] = []
361
362    def add_chall(self, achall: achallenges.KeyAuthorizationAnnotatedChallenge,
363                  idx: Optional[int] = None) -> None:
364        """Store challenge to be performed when perform() is called.
365
366        :param .KeyAuthorizationAnnotatedChallenge achall: Annotated
367            challenge.
368        :param int idx: index to challenge in a larger array
369
370        """
371        self.achalls.append(achall)
372        if idx is not None:
373            self.indices.append(idx)
374
375    def perform(self) -> List[achallenges.KeyAuthorizationAnnotatedChallenge]:
376        """Perform all added challenges.
377
378        :returns: challenge responses
379        :rtype: `list` of `acme.challenges.KeyAuthorizationChallengeResponse`
380
381
382        """
383        raise NotImplementedError()
384
385
386def install_version_controlled_file(dest_path: str, digest_path: str, src_path: str,
387                                    all_hashes: Iterable[str]) -> None:
388    """Copy a file into an active location (likely the system's config dir) if required.
389
390       :param str dest_path: destination path for version controlled file
391       :param str digest_path: path to save a digest of the file in
392       :param str src_path: path to version controlled file found in distribution
393       :param list all_hashes: hashes of every released version of the file
394    """
395    current_hash = crypto_util.sha256sum(src_path)
396
397    def _write_current_hash() -> None:
398        with open(digest_path, "w") as file_h:
399            file_h.write(current_hash)
400
401    def _install_current_file() -> None:
402        shutil.copyfile(src_path, dest_path)
403        _write_current_hash()
404
405    # Check to make sure options-ssl.conf is installed
406    if not os.path.isfile(dest_path):
407        _install_current_file()
408        return
409    # there's already a file there. if it's up to date, do nothing. if it's not but
410    # it matches a known file hash, we can update it.
411    # otherwise, print a warning once per new version.
412    active_file_digest = crypto_util.sha256sum(dest_path)
413    if active_file_digest == current_hash: # already up to date
414        return
415    if active_file_digest in all_hashes: # safe to update
416        _install_current_file()
417    else:  # has been manually modified, not safe to update
418        # did they modify the current version or an old version?
419        if os.path.isfile(digest_path):
420            with open(digest_path, "r") as f:
421                saved_digest = f.read()
422            # they modified it after we either installed or told them about this version, so return
423            if saved_digest == current_hash:
424                return
425        # there's a new version but we couldn't update the file, or they deleted the digest.
426        # save the current digest so we only print this once, and print a warning
427        _write_current_hash()
428        logger.warning("%s has been manually modified; updated file "
429            "saved to %s. We recommend updating %s for security purposes.",
430            dest_path, src_path, dest_path)
431
432
433# test utils used by certbot_apache/certbot_nginx (hence
434# "pragma: no cover") TODO: this might quickly lead to dead code (also
435# c.f. #383)
436
437def dir_setup(test_dir: str, pkg: str) -> Tuple[str, str, str]:  # pragma: no cover
438    """Setup the directories necessary for the configurator."""
439    def expanded_tempdir(prefix: str) -> str:
440        """Return the real path of a temp directory with the specified prefix
441
442        Some plugins rely on real paths of symlinks for working correctly. For
443        example, certbot-apache uses real paths of configuration files to tell
444        a virtual host from another. On systems where TMP itself is a symbolic
445        link, (ex: OS X) such plugins will be confused. This function prevents
446        such a case.
447        """
448        return filesystem.realpath(tempfile.mkdtemp(prefix))
449
450    temp_dir = expanded_tempdir("temp")
451    config_dir = expanded_tempdir("config")
452    work_dir = expanded_tempdir("work")
453
454    filesystem.chmod(temp_dir, constants.CONFIG_DIRS_MODE)
455    filesystem.chmod(config_dir, constants.CONFIG_DIRS_MODE)
456    filesystem.chmod(work_dir, constants.CONFIG_DIRS_MODE)
457
458    test_configs = pkg_resources.resource_filename(
459        pkg, os.path.join("testdata", test_dir))
460
461    shutil.copytree(
462        test_configs, os.path.join(temp_dir, test_dir), symlinks=True)
463
464    return temp_dir, config_dir, work_dir
465