1"""Plugin common functions.""" 2from abc import ABCMeta 3from abc import abstractmethod 4import argparse 5import logging 6import re 7import shutil 8import tempfile 9from typing import Any 10from typing import Callable 11from typing import Iterable 12from typing import List 13from typing import Optional 14from typing import Set 15from typing import Tuple 16 17import pkg_resources 18 19from certbot import achallenges 20from certbot import configuration 21from certbot import crypto_util 22from certbot import interfaces 23from certbot import errors 24from certbot import reverter 25from certbot._internal import constants 26from certbot.compat import filesystem 27from certbot.compat import os 28from certbot.interfaces import Installer as AbstractInstaller 29from certbot.interfaces import Plugin as AbstractPlugin 30from certbot.plugins.storage import PluginStorage 31 32logger = logging.getLogger(__name__) 33 34 35def option_namespace(name: str) -> str: 36 """ArgumentParser options namespace (prefix of all options).""" 37 return name + "-" 38 39 40def dest_namespace(name: str) -> str: 41 """ArgumentParser dest namespace (prefix of all destinations).""" 42 return name.replace("-", "_") + "_" 43 44 45private_ips_regex = re.compile( 46 r"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|" 47 r"(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)") 48hostname_regex = re.compile( 49 r"^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*[a-z]+$", re.IGNORECASE) 50 51 52class Plugin(AbstractPlugin, metaclass=ABCMeta): 53 """Generic plugin.""" 54 55 def __init__(self, config: configuration.NamespaceConfig, name: str) -> None: 56 super().__init__(config, name) 57 self.config = config 58 self.name = name 59 60 @classmethod 61 @abstractmethod 62 def add_parser_arguments(cls, add: Callable[..., None]) -> None: 63 """Add plugin arguments to the CLI argument parser. 64 65 :param callable add: Function that proxies calls to 66 `argparse.ArgumentParser.add_argument` prepending options 67 with unique plugin name prefix. 68 69 """ 70 71 @classmethod 72 def inject_parser_options(cls, parser: argparse.ArgumentParser, name: str) -> None: 73 """Inject parser options. 74 75 See `~.certbot.interfaces.Plugin.inject_parser_options` for docs. 76 77 """ 78 # dummy function, doesn't check if dest.startswith(self.dest_namespace) 79 def add(arg_name_no_prefix: str, *args: Any, **kwargs: Any) -> None: 80 parser.add_argument( 81 "--{0}{1}".format(option_namespace(name), arg_name_no_prefix), 82 *args, **kwargs) 83 return cls.add_parser_arguments(add) 84 85 @property 86 def option_namespace(self) -> str: 87 """ArgumentParser options namespace (prefix of all options).""" 88 return option_namespace(self.name) 89 90 def option_name(self, name: str) -> str: 91 """Option name (include plugin namespace).""" 92 return self.option_namespace + name 93 94 @property 95 def dest_namespace(self) -> str: 96 """ArgumentParser dest namespace (prefix of all destinations).""" 97 return dest_namespace(self.name) 98 99 def dest(self, var: str) -> str: 100 """Find a destination for given variable ``var``.""" 101 # this should do exactly the same what ArgumentParser(arg), 102 # does to "arg" to compute "dest" 103 return self.dest_namespace + var.replace("-", "_") 104 105 def conf(self, var: str) -> Any: 106 """Find a configuration value for variable ``var``.""" 107 return getattr(self.config, self.dest(var)) 108 109 def auth_hint(self, failed_achalls: List[achallenges.AnnotatedChallenge]) -> str: 110 """Human-readable string to help the user troubleshoot the authenticator. 111 112 Shown to the user if one or more of the attempted challenges were not a success. 113 114 Should describe, in simple language, what the authenticator tried to do, what went 115 wrong and what the user should try as their "next steps". 116 117 TODO: auth_hint belongs in Authenticator but can't be added until the next major 118 version of Certbot. For now, it lives in .Plugin and auth_handler will only call it 119 on authenticators that subclass .Plugin. For now, inherit from `.Plugin` to implement 120 and/or override the method. 121 122 :param list failed_achalls: List of one or more failed challenges 123 (:class:`achallenges.AnnotatedChallenge` subclasses). 124 125 :rtype str: 126 """ 127 # This is a fallback hint. Authenticators should implement their own auth_hint that 128 # addresses the specific mechanics of that authenticator. 129 challs = " and ".join(sorted({achall.typ for achall in failed_achalls})) 130 return ("The Certificate Authority couldn't externally verify that the {name} plugin " 131 "completed the required {challs} challenges. Ensure the plugin is configured " 132 "correctly and that the changes it makes are accessible from the internet." 133 .format(name=self.name, challs=challs)) 134 135 136class Installer(AbstractInstaller, Plugin, metaclass=ABCMeta): 137 """An installer base class with reverter and ssl_dhparam methods defined. 138 139 Installer plugins do not have to inherit from this class. 140 141 """ 142 def __init__(self, *args: Any, **kwargs: Any) -> None: 143 super().__init__(*args, **kwargs) 144 self.storage = PluginStorage(self.config, self.name) 145 self.reverter = reverter.Reverter(self.config) 146 147 def add_to_checkpoint(self, save_files: Set[str], save_notes: str, 148 temporary: bool = False) -> None: 149 """Add files to a checkpoint. 150 151 :param set save_files: set of filepaths to save 152 :param str save_notes: notes about changes during the save 153 :param bool temporary: True if the files should be added to a 154 temporary checkpoint rather than a permanent one. This is 155 usually used for changes that will soon be reverted. 156 157 :raises .errors.PluginError: when unable to add to checkpoint 158 159 """ 160 if temporary: 161 checkpoint_func = self.reverter.add_to_temp_checkpoint 162 else: 163 checkpoint_func = self.reverter.add_to_checkpoint 164 165 try: 166 checkpoint_func(save_files, save_notes) 167 except errors.ReverterError as err: 168 raise errors.PluginError(str(err)) 169 170 def finalize_checkpoint(self, title: str) -> None: 171 """Timestamp and save changes made through the reverter. 172 173 :param str title: Title describing checkpoint 174 175 :raises .errors.PluginError: when an error occurs 176 177 """ 178 try: 179 self.reverter.finalize_checkpoint(title) 180 except errors.ReverterError as err: 181 raise errors.PluginError(str(err)) 182 183 def recovery_routine(self) -> None: 184 """Revert all previously modified files. 185 186 Reverts all modified files that have not been saved as a checkpoint 187 188 :raises .errors.PluginError: If unable to recover the configuration 189 190 """ 191 try: 192 self.reverter.recovery_routine() 193 except errors.ReverterError as err: 194 raise errors.PluginError(str(err)) 195 196 def revert_temporary_config(self) -> None: 197 """Rollback temporary checkpoint. 198 199 :raises .errors.PluginError: when unable to revert config 200 201 """ 202 try: 203 self.reverter.revert_temporary_config() 204 except errors.ReverterError as err: 205 raise errors.PluginError(str(err)) 206 207 def rollback_checkpoints(self, rollback: int = 1) -> None: 208 """Rollback saved checkpoints. 209 210 :param int rollback: Number of checkpoints to revert 211 212 :raises .errors.PluginError: If there is a problem with the input or 213 the function is unable to correctly revert the configuration 214 215 """ 216 try: 217 self.reverter.rollback_checkpoints(rollback) 218 except errors.ReverterError as err: 219 raise errors.PluginError(str(err)) 220 221 @property 222 def ssl_dhparams(self) -> str: 223 """Full absolute path to ssl_dhparams file.""" 224 return os.path.join(self.config.config_dir, constants.SSL_DHPARAMS_DEST) 225 226 @property 227 def updated_ssl_dhparams_digest(self) -> str: 228 """Full absolute path to digest of updated ssl_dhparams file.""" 229 return os.path.join(self.config.config_dir, constants.UPDATED_SSL_DHPARAMS_DIGEST) 230 231 def install_ssl_dhparams(self) -> None: 232 """Copy Certbot's ssl_dhparams file into the system's config dir if required.""" 233 install_version_controlled_file( 234 self.ssl_dhparams, 235 self.updated_ssl_dhparams_digest, 236 constants.SSL_DHPARAMS_SRC, 237 constants.ALL_SSL_DHPARAMS_HASHES) 238 239 240class Configurator(Installer, interfaces.Authenticator, metaclass=ABCMeta): 241 """ 242 A plugin that extends certbot.plugins.common.Installer 243 and implements certbot.interfaces.Authenticator 244 """ 245 246 247class Addr: 248 r"""Represents an virtual host address. 249 250 :param str addr: addr part of vhost address 251 :param str port: port number or \*, or "" 252 253 """ 254 def __init__(self, tup: Tuple[str, str], ipv6: bool = False): 255 self.tup = tup 256 self.ipv6 = ipv6 257 258 @classmethod 259 def fromstring(cls, str_addr: str) -> 'Addr': 260 """Initialize Addr from string.""" 261 if str_addr.startswith('['): 262 # ipv6 addresses starts with [ 263 endIndex = str_addr.rfind(']') 264 host = str_addr[:endIndex + 1] 265 port = '' 266 if len(str_addr) > endIndex + 2 and str_addr[endIndex + 1] == ':': 267 port = str_addr[endIndex + 2:] 268 return cls((host, port), ipv6=True) 269 else: 270 tup = str_addr.partition(':') 271 return cls((tup[0], tup[2])) 272 273 def __str__(self) -> str: 274 if self.tup[1]: 275 return "%s:%s" % self.tup 276 return self.tup[0] 277 278 def normalized_tuple(self) -> Tuple[str, str]: 279 """Normalized representation of addr/port tuple 280 """ 281 if self.ipv6: 282 return self.get_ipv6_exploded(), self.tup[1] 283 return self.tup 284 285 def __eq__(self, other: Any) -> bool: 286 if isinstance(other, self.__class__): 287 # compare normalized to take different 288 # styles of representation into account 289 return self.normalized_tuple() == other.normalized_tuple() 290 291 return False 292 293 def __hash__(self) -> int: 294 return hash(self.tup) 295 296 def get_addr(self) -> str: 297 """Return addr part of Addr object.""" 298 return self.tup[0] 299 300 def get_port(self) -> str: 301 """Return port.""" 302 return self.tup[1] 303 304 def get_addr_obj(self, port: str) -> 'Addr': 305 """Return new address object with same addr and new port.""" 306 return self.__class__((self.tup[0], port), self.ipv6) 307 308 def _normalize_ipv6(self, addr: str) -> List[str]: 309 """Return IPv6 address in normalized form, helper function""" 310 addr = addr.lstrip("[") 311 addr = addr.rstrip("]") 312 return self._explode_ipv6(addr) 313 314 def get_ipv6_exploded(self) -> str: 315 """Return IPv6 in normalized form""" 316 if self.ipv6: 317 return ":".join(self._normalize_ipv6(self.tup[0])) 318 return "" 319 320 def _explode_ipv6(self, addr: str) -> List[str]: 321 """Explode IPv6 address for comparison""" 322 result = ['0', '0', '0', '0', '0', '0', '0', '0'] 323 addr_list = addr.split(":") 324 if len(addr_list) > len(result): 325 # too long, truncate 326 addr_list = addr_list[0:len(result)] 327 append_to_end = False 328 for i, block in enumerate(addr_list): 329 if not block: 330 # encountered ::, so rest of the blocks should be 331 # appended to the end 332 append_to_end = True 333 continue 334 if len(block) > 1: 335 # remove leading zeros 336 block = block.lstrip("0") 337 if not append_to_end: 338 result[i] = str(block) 339 else: 340 # count the location from the end using negative indices 341 result[i-len(addr_list)] = str(block) 342 return result 343 344 345class ChallengePerformer: 346 """Abstract base for challenge performers. 347 348 :ivar configurator: Authenticator and installer plugin 349 :ivar achalls: Annotated challenges 350 :vartype achalls: `list` of `.KeyAuthorizationAnnotatedChallenge` 351 :ivar indices: Holds the indices of challenges from a larger array 352 so the user of the class doesn't have to. 353 :vartype indices: `list` of `int` 354 355 """ 356 357 def __init__(self, configurator: Configurator): 358 self.configurator = configurator 359 self.achalls: List[achallenges.KeyAuthorizationAnnotatedChallenge] = [] 360 self.indices: List[int] = [] 361 362 def add_chall(self, achall: achallenges.KeyAuthorizationAnnotatedChallenge, 363 idx: Optional[int] = None) -> None: 364 """Store challenge to be performed when perform() is called. 365 366 :param .KeyAuthorizationAnnotatedChallenge achall: Annotated 367 challenge. 368 :param int idx: index to challenge in a larger array 369 370 """ 371 self.achalls.append(achall) 372 if idx is not None: 373 self.indices.append(idx) 374 375 def perform(self) -> List[achallenges.KeyAuthorizationAnnotatedChallenge]: 376 """Perform all added challenges. 377 378 :returns: challenge responses 379 :rtype: `list` of `acme.challenges.KeyAuthorizationChallengeResponse` 380 381 382 """ 383 raise NotImplementedError() 384 385 386def install_version_controlled_file(dest_path: str, digest_path: str, src_path: str, 387 all_hashes: Iterable[str]) -> None: 388 """Copy a file into an active location (likely the system's config dir) if required. 389 390 :param str dest_path: destination path for version controlled file 391 :param str digest_path: path to save a digest of the file in 392 :param str src_path: path to version controlled file found in distribution 393 :param list all_hashes: hashes of every released version of the file 394 """ 395 current_hash = crypto_util.sha256sum(src_path) 396 397 def _write_current_hash() -> None: 398 with open(digest_path, "w") as file_h: 399 file_h.write(current_hash) 400 401 def _install_current_file() -> None: 402 shutil.copyfile(src_path, dest_path) 403 _write_current_hash() 404 405 # Check to make sure options-ssl.conf is installed 406 if not os.path.isfile(dest_path): 407 _install_current_file() 408 return 409 # there's already a file there. if it's up to date, do nothing. if it's not but 410 # it matches a known file hash, we can update it. 411 # otherwise, print a warning once per new version. 412 active_file_digest = crypto_util.sha256sum(dest_path) 413 if active_file_digest == current_hash: # already up to date 414 return 415 if active_file_digest in all_hashes: # safe to update 416 _install_current_file() 417 else: # has been manually modified, not safe to update 418 # did they modify the current version or an old version? 419 if os.path.isfile(digest_path): 420 with open(digest_path, "r") as f: 421 saved_digest = f.read() 422 # they modified it after we either installed or told them about this version, so return 423 if saved_digest == current_hash: 424 return 425 # there's a new version but we couldn't update the file, or they deleted the digest. 426 # save the current digest so we only print this once, and print a warning 427 _write_current_hash() 428 logger.warning("%s has been manually modified; updated file " 429 "saved to %s. We recommend updating %s for security purposes.", 430 dest_path, src_path, dest_path) 431 432 433# test utils used by certbot_apache/certbot_nginx (hence 434# "pragma: no cover") TODO: this might quickly lead to dead code (also 435# c.f. #383) 436 437def dir_setup(test_dir: str, pkg: str) -> Tuple[str, str, str]: # pragma: no cover 438 """Setup the directories necessary for the configurator.""" 439 def expanded_tempdir(prefix: str) -> str: 440 """Return the real path of a temp directory with the specified prefix 441 442 Some plugins rely on real paths of symlinks for working correctly. For 443 example, certbot-apache uses real paths of configuration files to tell 444 a virtual host from another. On systems where TMP itself is a symbolic 445 link, (ex: OS X) such plugins will be confused. This function prevents 446 such a case. 447 """ 448 return filesystem.realpath(tempfile.mkdtemp(prefix)) 449 450 temp_dir = expanded_tempdir("temp") 451 config_dir = expanded_tempdir("config") 452 work_dir = expanded_tempdir("work") 453 454 filesystem.chmod(temp_dir, constants.CONFIG_DIRS_MODE) 455 filesystem.chmod(config_dir, constants.CONFIG_DIRS_MODE) 456 filesystem.chmod(work_dir, constants.CONFIG_DIRS_MODE) 457 458 test_configs = pkg_resources.resource_filename( 459 pkg, os.path.join("testdata", test_dir)) 460 461 shutil.copytree( 462 test_configs, os.path.join(temp_dir, test_dir), symlinks=True) 463 464 return temp_dir, config_dir, work_dir 465