1"""Utilities for all Certbot.""" 2import argparse 3import atexit 4import collections 5import errno 6import logging 7import platform 8import re 9import socket 10import subprocess 11import sys 12from typing import Any 13from typing import Callable 14from typing import Dict 15from typing import IO 16from typing import List 17from typing import Optional 18from typing import Set 19from typing import Tuple 20from typing import TYPE_CHECKING 21from typing import Union 22import warnings 23 24import configargparse 25 26from certbot import errors 27from certbot._internal import constants 28from certbot._internal import lock 29from certbot.compat import filesystem 30from certbot.compat import os 31 32_USE_DISTRO = sys.platform.startswith('linux') 33if _USE_DISTRO: 34 import distro 35 36if TYPE_CHECKING: 37 import distutils.version 38 39logger = logging.getLogger(__name__) 40 41 42Key = collections.namedtuple("Key", "file pem") 43# Note: form is the type of data, "pem" or "der" 44CSR = collections.namedtuple("CSR", "file data form") 45 46 47# ANSI SGR escape codes 48# Formats text as bold or with increased intensity 49ANSI_SGR_BOLD = '\033[1m' 50# Colors text red 51ANSI_SGR_RED = "\033[31m" 52# Resets output format 53ANSI_SGR_RESET = "\033[0m" 54 55 56PERM_ERR_FMT = os.linesep.join(( 57 "The following error was encountered:", "{0}", 58 "Either run as root, or set --config-dir, " 59 "--work-dir, and --logs-dir to writeable paths.")) 60 61 62# Stores importing process ID to be used by atexit_register() 63_INITIAL_PID = os.getpid() 64# Maps paths to locked directories to their lock object. All locks in 65# the dict are attempted to be cleaned up at program exit. If the 66# program exits before the lock is cleaned up, it is automatically 67# released, but the file isn't deleted. 68_LOCKS: Dict[str, lock.LockFile] = {} 69_VERSION_COMPONENT_RE = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE) 70 71def env_no_snap_for_external_calls() -> Dict[str, str]: 72 """ 73 When Certbot is run inside a Snap, certain environment variables 74 are modified. But Certbot sometimes calls out to external programs, 75 since it uses classic confinement. When we do that, we must modify 76 the env to remove our modifications so it will use the system's 77 libraries, since they may be incompatible with the versions of 78 libraries included in the Snap. For example, apachectl, Nginx, and 79 anything run from inside a hook should call this function and pass 80 the results into the ``env`` argument of ``subprocess.Popen``. 81 82 :returns: A modified copy of os.environ ready to pass to Popen 83 :rtype: dict 84 85 """ 86 env = os.environ.copy() 87 # Avoid accidentally modifying env 88 if 'SNAP' not in env or 'CERTBOT_SNAPPED' not in env: 89 return env 90 for path_name in ('PATH', 'LD_LIBRARY_PATH'): 91 if path_name in env: 92 env[path_name] = ':'.join(x for x in env[path_name].split(':') if env['SNAP'] not in x) 93 return env 94 95 96def run_script(params: List[str], log: Callable[[str], None]=logger.error) -> Tuple[str, str]: 97 """Run the script with the given params. 98 99 :param list params: List of parameters to pass to subprocess.run 100 :param callable log: Logger method to use for errors 101 102 """ 103 try: 104 proc = subprocess.run(params, 105 check=False, 106 stdout=subprocess.PIPE, 107 stderr=subprocess.PIPE, 108 universal_newlines=True, 109 env=env_no_snap_for_external_calls()) 110 111 except (OSError, ValueError): 112 msg = "Unable to run the command: %s" % " ".join(params) 113 log(msg) 114 raise errors.SubprocessError(msg) 115 116 if proc.returncode != 0: 117 msg = "Error while running %s.\n%s\n%s" % ( 118 " ".join(params), proc.stdout, proc.stderr) 119 # Enter recovery routine... 120 log(msg) 121 raise errors.SubprocessError(msg) 122 123 return proc.stdout, proc.stderr 124 125 126def exe_exists(exe: str) -> bool: 127 """Determine whether path/name refers to an executable. 128 129 :param str exe: Executable path or name 130 131 :returns: If exe is a valid executable 132 :rtype: bool 133 134 """ 135 path, _ = os.path.split(exe) 136 if path: 137 return filesystem.is_executable(exe) 138 for path in os.environ["PATH"].split(os.pathsep): 139 if filesystem.is_executable(os.path.join(path, exe)): 140 return True 141 142 return False 143 144 145def lock_dir_until_exit(dir_path: str) -> None: 146 """Lock the directory at dir_path until program exit. 147 148 :param str dir_path: path to directory 149 150 :raises errors.LockError: if the lock is held by another process 151 152 """ 153 if not _LOCKS: # this is the first lock to be released at exit 154 atexit_register(_release_locks) 155 156 if dir_path not in _LOCKS: 157 _LOCKS[dir_path] = lock.lock_dir(dir_path) 158 159 160def _release_locks() -> None: 161 for dir_lock in _LOCKS.values(): 162 try: 163 dir_lock.release() 164 except: # pylint: disable=bare-except 165 msg = 'Exception occurred releasing lock: {0!r}'.format(dir_lock) 166 logger.debug(msg, exc_info=True) 167 _LOCKS.clear() 168 169 170def set_up_core_dir(directory: str, mode: int, strict: bool) -> None: 171 """Ensure directory exists with proper permissions and is locked. 172 173 :param str directory: Path to a directory. 174 :param int mode: Directory mode. 175 :param bool strict: require directory to be owned by current user 176 177 :raises .errors.LockError: if the directory cannot be locked 178 :raises .errors.Error: if the directory cannot be made or verified 179 180 """ 181 try: 182 make_or_verify_dir(directory, mode, strict) 183 lock_dir_until_exit(directory) 184 except OSError as error: 185 logger.debug("Exception was:", exc_info=True) 186 raise errors.Error(PERM_ERR_FMT.format(error)) 187 188 189def make_or_verify_dir(directory: str, mode: int = 0o755, strict: bool = False) -> None: 190 """Make sure directory exists with proper permissions. 191 192 :param str directory: Path to a directory. 193 :param int mode: Directory mode. 194 :param bool strict: require directory to be owned by current user 195 196 :raises .errors.Error: if a directory already exists, 197 but has wrong permissions or owner 198 199 :raises OSError: if invalid or inaccessible file names and 200 paths, or other arguments that have the correct type, 201 but are not accepted by the operating system. 202 203 """ 204 try: 205 filesystem.makedirs(directory, mode) 206 except OSError as exception: 207 if exception.errno == errno.EEXIST: 208 if strict and not filesystem.check_permissions(directory, mode): 209 raise errors.Error( 210 "%s exists, but it should be owned by current user with" 211 " permissions %s" % (directory, oct(mode))) 212 else: 213 raise 214 215 216def safe_open(path: str, mode: str = "w", chmod: Optional[int] = None) -> IO: 217 """Safely open a file. 218 219 :param str path: Path to a file. 220 :param str mode: Same os `mode` for `open`. 221 :param int chmod: Same as `mode` for `filesystem.open`, uses Python defaults 222 if ``None``. 223 224 """ 225 open_args: Union[Tuple[()], Tuple[int]] = () 226 if chmod is not None: 227 open_args = (chmod,) 228 fdopen_args: Union[Tuple[()], Tuple[int]] = () 229 fd = filesystem.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args) 230 return os.fdopen(fd, mode, *fdopen_args) 231 232 233def _unique_file(path: str, filename_pat: Callable[[int], str], count: int, 234 chmod: int, mode: str) -> Tuple[IO, str]: 235 while True: 236 current_path = os.path.join(path, filename_pat(count)) 237 try: 238 return safe_open(current_path, chmod=chmod, mode=mode), os.path.abspath(current_path) 239 except OSError as err: 240 # "File exists," is okay, try a different name. 241 if err.errno != errno.EEXIST: 242 raise 243 count += 1 244 245 246def unique_file(path: str, chmod: int = 0o777, mode: str = "w") -> Tuple[IO, str]: 247 """Safely finds a unique file. 248 249 :param str path: path/filename.ext 250 :param int chmod: File mode 251 :param str mode: Open mode 252 253 :returns: tuple of file object and file name 254 255 """ 256 path, tail = os.path.split(path) 257 return _unique_file( 258 path, filename_pat=(lambda count: "%04d_%s" % (count, tail)), 259 count=0, chmod=chmod, mode=mode) 260 261 262def unique_lineage_name(path: str, filename: str, chmod: int = 0o644, 263 mode: str = "w") -> Tuple[IO, str]: 264 """Safely finds a unique file using lineage convention. 265 266 :param str path: directory path 267 :param str filename: proposed filename 268 :param int chmod: file mode 269 :param str mode: open mode 270 271 :returns: tuple of file object and file name (which may be modified 272 from the requested one by appending digits to ensure uniqueness) 273 274 :raises OSError: if writing files fails for an unanticipated reason, 275 such as a full disk or a lack of permission to write to 276 specified location. 277 278 """ 279 preferred_path = os.path.join(path, "%s.conf" % (filename)) 280 try: 281 return safe_open(preferred_path, chmod=chmod), preferred_path 282 except OSError as err: 283 if err.errno != errno.EEXIST: 284 raise 285 return _unique_file( 286 path, filename_pat=(lambda count: "%s-%04d.conf" % (filename, count)), 287 count=1, chmod=chmod, mode=mode) 288 289 290def safely_remove(path: str) -> None: 291 """Remove a file that may not exist.""" 292 try: 293 os.remove(path) 294 except OSError as err: 295 if err.errno != errno.ENOENT: 296 raise 297 298 299def get_filtered_names(all_names: Set[str]) -> Set[str]: 300 """Removes names that aren't considered valid by Let's Encrypt. 301 302 :param set all_names: all names found in the configuration 303 304 :returns: all found names that are considered valid by LE 305 :rtype: set 306 307 """ 308 filtered_names = set() 309 for name in all_names: 310 try: 311 filtered_names.add(enforce_le_validity(name)) 312 except errors.ConfigurationError: 313 logger.debug('Not suggesting name "%s"', name, exc_info=True) 314 return filtered_names 315 316def get_os_info() -> Tuple[str, str]: 317 """ 318 Get OS name and version 319 320 :returns: (os_name, os_version) 321 :rtype: `tuple` of `str` 322 """ 323 324 return get_python_os_info(pretty=False) 325 326def get_os_info_ua() -> str: 327 """ 328 Get OS name and version string for User Agent 329 330 :returns: os_ua 331 :rtype: `str` 332 """ 333 if _USE_DISTRO: 334 os_info = distro.name(pretty=True) 335 336 if not _USE_DISTRO or not os_info: 337 return " ".join(get_python_os_info(pretty=True)) 338 return os_info 339 340def get_systemd_os_like() -> List[str]: 341 """ 342 Get a list of strings that indicate the distribution likeness to 343 other distributions. 344 345 :returns: List of distribution acronyms 346 :rtype: `list` of `str` 347 """ 348 349 if _USE_DISTRO: 350 return distro.like().split(" ") 351 return [] 352 353def get_var_from_file(varname: str, filepath: str = "/etc/os-release") -> str: 354 """ 355 Get single value from a file formatted like systemd /etc/os-release 356 357 :param str varname: Name of variable to fetch 358 :param str filepath: File path of os-release file 359 :returns: requested value 360 :rtype: `str` 361 """ 362 363 var_string = varname+"=" 364 if not os.path.isfile(filepath): 365 return "" 366 with open(filepath, 'r') as fh: 367 contents = fh.readlines() 368 369 for line in contents: 370 if line.strip().startswith(var_string): 371 # Return the value of var, normalized 372 return _normalize_string(line.strip()[len(var_string):]) 373 return "" 374 375def _normalize_string(orig: str) -> str: 376 """ 377 Helper function for get_var_from_file() to remove quotes 378 and whitespaces 379 """ 380 return orig.replace('"', '').replace("'", "").strip() 381 382def get_python_os_info(pretty: bool = False) -> Tuple[str, str]: 383 """ 384 Get Operating System type/distribution and major version 385 using python platform module 386 387 :param bool pretty: If the returned OS name should be in longer (pretty) form 388 389 :returns: (os_name, os_version) 390 :rtype: `tuple` of `str` 391 """ 392 info = platform.system_alias( 393 platform.system(), 394 platform.release(), 395 platform.version() 396 ) 397 os_type, os_ver, _ = info 398 os_type = os_type.lower() 399 if os_type.startswith('linux') and _USE_DISTRO: 400 distro_name, distro_version = distro.name() if pretty else distro.id(), distro.version() 401 # On arch, these values are reportedly empty strings so handle it 402 # defensively 403 # so handle it defensively 404 if distro_name: 405 os_type = distro_name 406 if distro_version: 407 os_ver = distro_version 408 elif os_type.startswith('darwin'): 409 try: 410 proc = subprocess.run( 411 ["/usr/bin/sw_vers", "-productVersion"], 412 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 413 check=False, universal_newlines=True, 414 env=env_no_snap_for_external_calls(), 415 ) 416 except OSError: 417 proc = subprocess.run( 418 ["sw_vers", "-productVersion"], 419 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 420 check=False, universal_newlines=True, 421 env=env_no_snap_for_external_calls(), 422 ) 423 os_ver = proc.stdout.rstrip('\n') 424 elif os_type.startswith('freebsd'): 425 # eg "9.3-RC3-p1" 426 os_ver = os_ver.partition("-")[0] 427 os_ver = os_ver.partition(".")[0] 428 elif platform.win32_ver()[1]: 429 os_ver = platform.win32_ver()[1] 430 else: 431 # Cases known to fall here: Cygwin python 432 os_ver = '' 433 return os_type, os_ver 434 435# Just make sure we don't get pwned... Make sure that it also doesn't 436# start with a period or have two consecutive periods <- this needs to 437# be done in addition to the regex 438EMAIL_REGEX = re.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+$") 439 440 441def safe_email(email: str) -> bool: 442 """Scrub email address before using it.""" 443 if EMAIL_REGEX.match(email) is not None: 444 return not email.startswith(".") and ".." not in email 445 logger.error("Invalid email address: %s.", email) 446 return False 447 448 449class DeprecatedArgumentAction(argparse.Action): 450 """Action to log a warning when an argument is used.""" 451 def __call__(self, unused1: Any, unused2: Any, unused3: Any, 452 option_string: Optional[str] = None) -> None: 453 warnings.warn("Use of %s is deprecated." % option_string, DeprecationWarning) 454 455 456def add_deprecated_argument(add_argument: Callable[..., None], argument_name: str, 457 nargs: Union[str, int]) -> None: 458 """Adds a deprecated argument with the name argument_name. 459 460 Deprecated arguments are not shown in the help. If they are used on 461 the command line, a warning is shown stating that the argument is 462 deprecated and no other action is taken. 463 464 :param callable add_argument: Function that adds arguments to an 465 argument parser/group. 466 :param str argument_name: Name of deprecated argument. 467 :param nargs: Value for nargs when adding the argument to argparse. 468 469 """ 470 if DeprecatedArgumentAction not in configargparse.ACTION_TYPES_THAT_DONT_NEED_A_VALUE: 471 # In version 0.12.0 ACTION_TYPES_THAT_DONT_NEED_A_VALUE was 472 # changed from a set to a tuple. 473 if isinstance(configargparse.ACTION_TYPES_THAT_DONT_NEED_A_VALUE, set): 474 configargparse.ACTION_TYPES_THAT_DONT_NEED_A_VALUE.add( 475 DeprecatedArgumentAction) 476 else: 477 configargparse.ACTION_TYPES_THAT_DONT_NEED_A_VALUE += ( 478 DeprecatedArgumentAction,) 479 add_argument(argument_name, action=DeprecatedArgumentAction, 480 help=argparse.SUPPRESS, nargs=nargs) 481 482 483def enforce_le_validity(domain: str) -> str: 484 """Checks that Let's Encrypt will consider domain to be valid. 485 486 :param str domain: FQDN to check 487 :type domain: `str` 488 :returns: The domain cast to `str`, with ASCII-only contents 489 :rtype: str 490 :raises ConfigurationError: for invalid domains and cases where Let's 491 Encrypt currently will not issue certificates 492 493 """ 494 495 domain = enforce_domain_sanity(domain) 496 if not re.match("^[A-Za-z0-9.-]*$", domain): 497 raise errors.ConfigurationError( 498 "{0} contains an invalid character. " 499 "Valid characters are A-Z, a-z, 0-9, ., and -.".format(domain)) 500 501 labels = domain.split(".") 502 if len(labels) < 2: 503 raise errors.ConfigurationError( 504 "{0} needs at least two labels".format(domain)) 505 for label in labels: 506 if label.startswith("-"): 507 raise errors.ConfigurationError( 508 'label "{0}" in domain "{1}" cannot start with "-"'.format( 509 label, domain)) 510 if label.endswith("-"): 511 raise errors.ConfigurationError( 512 'label "{0}" in domain "{1}" cannot end with "-"'.format( 513 label, domain)) 514 return domain 515 516 517def enforce_domain_sanity(domain: Union[str, bytes]) -> str: 518 """Method which validates domain value and errors out if 519 the requirements are not met. 520 521 :param domain: Domain to check 522 :type domain: `str` or `bytes` 523 :raises ConfigurationError: for invalid domains and cases where Let's 524 Encrypt currently will not issue certificates 525 526 :returns: The domain cast to `str`, with ASCII-only contents 527 :rtype: str 528 """ 529 # Unicode 530 try: 531 if isinstance(domain, bytes): 532 domain = domain.decode('utf-8') 533 domain.encode('ascii') 534 except UnicodeError: 535 raise errors.ConfigurationError("Non-ASCII domain names not supported. " 536 "To issue for an Internationalized Domain Name, use Punycode.") 537 538 domain = domain.lower() 539 540 # Remove trailing dot 541 domain = domain[:-1] if domain.endswith('.') else domain 542 543 # Separately check for odd "domains" like "http://example.com" to fail 544 # fast and provide a clear error message 545 for scheme in ["http", "https"]: # Other schemes seem unlikely 546 if domain.startswith("{0}://".format(scheme)): 547 raise errors.ConfigurationError( 548 "Requested name {0} appears to be a URL, not a FQDN. " 549 "Try again without the leading \"{1}://\".".format( 550 domain, scheme 551 ) 552 ) 553 554 if is_ipaddress(domain): 555 raise errors.ConfigurationError( 556 "Requested name {0} is an IP address. The Let's Encrypt " 557 "certificate authority will not issue certificates for a " 558 "bare IP address.".format(domain)) 559 560 # FQDN checks according to RFC 2181: domain name should be less than 255 561 # octets (inclusive). And each label is 1 - 63 octets (inclusive). 562 # https://tools.ietf.org/html/rfc2181#section-11 563 msg = "Requested domain {0} is not a FQDN because".format(domain) 564 if len(domain) > 255: 565 raise errors.ConfigurationError("{0} it is too long.".format(msg)) 566 labels = domain.split('.') 567 for l in labels: 568 if not l: 569 raise errors.ConfigurationError("{0} it contains an empty label.".format(msg)) 570 if len(l) > 63: 571 raise errors.ConfigurationError("{0} label {1} is too long.".format(msg, l)) 572 573 return domain 574 575 576def is_ipaddress(address: str) -> bool: 577 """Is given address string form of IP(v4 or v6) address? 578 579 :param address: address to check 580 :type address: `str` 581 582 :returns: True if address is valid IP address, otherwise return False. 583 :rtype: bool 584 585 """ 586 try: 587 socket.inet_pton(socket.AF_INET, address) 588 # If this line runs it was ip address (ipv4) 589 return True 590 except socket.error: 591 # It wasn't an IPv4 address, so try ipv6 592 try: 593 socket.inet_pton(socket.AF_INET6, address) 594 return True 595 except socket.error: 596 return False 597 598 599def is_wildcard_domain(domain: Union[str, bytes]) -> bool: 600 """"Is domain a wildcard domain? 601 602 :param domain: domain to check 603 :type domain: `bytes` or `str` 604 605 :returns: True if domain is a wildcard, otherwise, False 606 :rtype: bool 607 608 """ 609 if isinstance(domain, str): 610 return domain.startswith("*.") 611 return domain.startswith(b"*.") 612 613 614def get_strict_version(normalized: str) -> "distutils.version.StrictVersion": 615 """Converts a normalized version to a strict version. 616 617 :param str normalized: normalized version string 618 619 :returns: An equivalent strict version 620 :rtype: distutils.version.StrictVersion 621 622 """ 623 warnings.warn("certbot.util.get_strict_version is deprecated and will be " 624 "removed in a future release.", DeprecationWarning) 625 with warnings.catch_warnings(): 626 warnings.simplefilter("ignore", DeprecationWarning) 627 import distutils.version 628 # strict version ending with "a" and a number designates a pre-release 629 return distutils.version.StrictVersion(normalized.replace(".dev", "a")) 630 631 632def is_staging(srv: str) -> bool: 633 """ 634 Determine whether a given ACME server is a known test / staging server. 635 636 :param str srv: the URI for the ACME server 637 :returns: True iff srv is a known test / staging server 638 :rtype bool: 639 """ 640 return srv == constants.STAGING_URI or "staging" in srv 641 642 643def atexit_register(func: Callable, *args: Any, **kwargs: Any) -> None: 644 """Sets func to be called before the program exits. 645 646 Special care is taken to ensure func is only called when the process 647 that first imports this module exits rather than any child processes. 648 649 :param function func: function to be called in case of an error 650 651 """ 652 atexit.register(_atexit_call, func, *args, **kwargs) 653 654 655def parse_loose_version(version_string: str) -> List[Union[int, str]]: 656 """Parses a version string into its components. 657 658 This code and the returned tuple is based on the now deprecated 659 distutils.version.LooseVersion class from the Python standard library. 660 Two LooseVersion classes and two lists as returned by this function should 661 compare in the same way. See 662 https://github.com/python/cpython/blob/v3.10.0/Lib/distutils/version.py#L205-L347. 663 664 :param str version_string: version string 665 666 :returns: list of parsed version string components 667 :rtype: list 668 669 """ 670 components: List[Union[int, str]] 671 components = [x for x in _VERSION_COMPONENT_RE.split(version_string) 672 if x and x != '.'] 673 for i, obj in enumerate(components): 674 try: 675 components[i] = int(obj) 676 except ValueError: 677 pass 678 return components 679 680 681def _atexit_call(func: Callable, *args: Any, **kwargs: Any) -> None: 682 if _INITIAL_PID == os.getpid(): 683 func(*args, **kwargs) 684