1"""Utilities for all Certbot."""
2import argparse
3import atexit
4import collections
5import errno
6import logging
7import platform
8import re
9import socket
10import subprocess
11import sys
12from typing import Any
13from typing import Callable
14from typing import Dict
15from typing import IO
16from typing import List
17from typing import Optional
18from typing import Set
19from typing import Tuple
20from typing import TYPE_CHECKING
21from typing import Union
22import warnings
23
24import configargparse
25
26from certbot import errors
27from certbot._internal import constants
28from certbot._internal import lock
29from certbot.compat import filesystem
30from certbot.compat import os
31
32_USE_DISTRO = sys.platform.startswith('linux')
33if _USE_DISTRO:
34    import distro
35
36if TYPE_CHECKING:
37    import distutils.version
38
39logger = logging.getLogger(__name__)
40
41
42Key = collections.namedtuple("Key", "file pem")
43# Note: form is the type of data, "pem" or "der"
44CSR = collections.namedtuple("CSR", "file data form")
45
46
47# ANSI SGR escape codes
48# Formats text as bold or with increased intensity
49ANSI_SGR_BOLD = '\033[1m'
50# Colors text red
51ANSI_SGR_RED = "\033[31m"
52# Resets output format
53ANSI_SGR_RESET = "\033[0m"
54
55
56PERM_ERR_FMT = os.linesep.join((
57    "The following error was encountered:", "{0}",
58    "Either run as root, or set --config-dir, "
59    "--work-dir, and --logs-dir to writeable paths."))
60
61
62# Stores importing process ID to be used by atexit_register()
63_INITIAL_PID = os.getpid()
64# Maps paths to locked directories to their lock object. All locks in
65# the dict are attempted to be cleaned up at program exit. If the
66# program exits before the lock is cleaned up, it is automatically
67# released, but the file isn't deleted.
68_LOCKS: Dict[str, lock.LockFile] = {}
69_VERSION_COMPONENT_RE = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE)
70
71def env_no_snap_for_external_calls() -> Dict[str, str]:
72    """
73    When Certbot is run inside a Snap, certain environment variables
74    are modified. But Certbot sometimes calls out to external programs,
75    since it uses classic confinement. When we do that, we must modify
76    the env to remove our modifications so it will use the system's
77    libraries, since they may be incompatible with the versions of
78    libraries included in the Snap. For example, apachectl, Nginx, and
79    anything run from inside a hook should call this function and pass
80    the results into the ``env`` argument of ``subprocess.Popen``.
81
82    :returns: A modified copy of os.environ ready to pass to Popen
83    :rtype: dict
84
85    """
86    env = os.environ.copy()
87    # Avoid accidentally modifying env
88    if 'SNAP' not in env or 'CERTBOT_SNAPPED' not in env:
89        return env
90    for path_name in ('PATH', 'LD_LIBRARY_PATH'):
91        if path_name in env:
92            env[path_name] = ':'.join(x for x in env[path_name].split(':') if env['SNAP'] not in x)
93    return env
94
95
96def run_script(params: List[str], log: Callable[[str], None]=logger.error) -> Tuple[str, str]:
97    """Run the script with the given params.
98
99    :param list params: List of parameters to pass to subprocess.run
100    :param callable log: Logger method to use for errors
101
102    """
103    try:
104        proc = subprocess.run(params,
105                              check=False,
106                              stdout=subprocess.PIPE,
107                              stderr=subprocess.PIPE,
108                              universal_newlines=True,
109                              env=env_no_snap_for_external_calls())
110
111    except (OSError, ValueError):
112        msg = "Unable to run the command: %s" % " ".join(params)
113        log(msg)
114        raise errors.SubprocessError(msg)
115
116    if proc.returncode != 0:
117        msg = "Error while running %s.\n%s\n%s" % (
118            " ".join(params), proc.stdout, proc.stderr)
119        # Enter recovery routine...
120        log(msg)
121        raise errors.SubprocessError(msg)
122
123    return proc.stdout, proc.stderr
124
125
126def exe_exists(exe: str) -> bool:
127    """Determine whether path/name refers to an executable.
128
129    :param str exe: Executable path or name
130
131    :returns: If exe is a valid executable
132    :rtype: bool
133
134    """
135    path, _ = os.path.split(exe)
136    if path:
137        return filesystem.is_executable(exe)
138    for path in os.environ["PATH"].split(os.pathsep):
139        if filesystem.is_executable(os.path.join(path, exe)):
140            return True
141
142    return False
143
144
145def lock_dir_until_exit(dir_path: str) -> None:
146    """Lock the directory at dir_path until program exit.
147
148    :param str dir_path: path to directory
149
150    :raises errors.LockError: if the lock is held by another process
151
152    """
153    if not _LOCKS:  # this is the first lock to be released at exit
154        atexit_register(_release_locks)
155
156    if dir_path not in _LOCKS:
157        _LOCKS[dir_path] = lock.lock_dir(dir_path)
158
159
160def _release_locks() -> None:
161    for dir_lock in _LOCKS.values():
162        try:
163            dir_lock.release()
164        except:  # pylint: disable=bare-except
165            msg = 'Exception occurred releasing lock: {0!r}'.format(dir_lock)
166            logger.debug(msg, exc_info=True)
167    _LOCKS.clear()
168
169
170def set_up_core_dir(directory: str, mode: int, strict: bool) -> None:
171    """Ensure directory exists with proper permissions and is locked.
172
173    :param str directory: Path to a directory.
174    :param int mode: Directory mode.
175    :param bool strict: require directory to be owned by current user
176
177    :raises .errors.LockError: if the directory cannot be locked
178    :raises .errors.Error: if the directory cannot be made or verified
179
180    """
181    try:
182        make_or_verify_dir(directory, mode, strict)
183        lock_dir_until_exit(directory)
184    except OSError as error:
185        logger.debug("Exception was:", exc_info=True)
186        raise errors.Error(PERM_ERR_FMT.format(error))
187
188
189def make_or_verify_dir(directory: str, mode: int = 0o755, strict: bool = False) -> None:
190    """Make sure directory exists with proper permissions.
191
192    :param str directory: Path to a directory.
193    :param int mode: Directory mode.
194    :param bool strict: require directory to be owned by current user
195
196    :raises .errors.Error: if a directory already exists,
197        but has wrong permissions or owner
198
199    :raises OSError: if invalid or inaccessible file names and
200        paths, or other arguments that have the correct type,
201        but are not accepted by the operating system.
202
203    """
204    try:
205        filesystem.makedirs(directory, mode)
206    except OSError as exception:
207        if exception.errno == errno.EEXIST:
208            if strict and not filesystem.check_permissions(directory, mode):
209                raise errors.Error(
210                    "%s exists, but it should be owned by current user with"
211                    " permissions %s" % (directory, oct(mode)))
212        else:
213            raise
214
215
216def safe_open(path: str, mode: str = "w", chmod: Optional[int] = None) -> IO:
217    """Safely open a file.
218
219    :param str path: Path to a file.
220    :param str mode: Same os `mode` for `open`.
221    :param int chmod: Same as `mode` for `filesystem.open`, uses Python defaults
222        if ``None``.
223
224    """
225    open_args: Union[Tuple[()], Tuple[int]] = ()
226    if chmod is not None:
227        open_args = (chmod,)
228    fdopen_args: Union[Tuple[()], Tuple[int]] = ()
229    fd = filesystem.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args)
230    return os.fdopen(fd, mode, *fdopen_args)
231
232
233def _unique_file(path: str, filename_pat: Callable[[int], str], count: int,
234                 chmod: int, mode: str) -> Tuple[IO, str]:
235    while True:
236        current_path = os.path.join(path, filename_pat(count))
237        try:
238            return safe_open(current_path, chmod=chmod, mode=mode), os.path.abspath(current_path)
239        except OSError as err:
240            # "File exists," is okay, try a different name.
241            if err.errno != errno.EEXIST:
242                raise
243        count += 1
244
245
246def unique_file(path: str, chmod: int = 0o777, mode: str = "w") -> Tuple[IO, str]:
247    """Safely finds a unique file.
248
249    :param str path: path/filename.ext
250    :param int chmod: File mode
251    :param str mode: Open mode
252
253    :returns: tuple of file object and file name
254
255    """
256    path, tail = os.path.split(path)
257    return _unique_file(
258        path, filename_pat=(lambda count: "%04d_%s" % (count, tail)),
259        count=0, chmod=chmod, mode=mode)
260
261
262def unique_lineage_name(path: str, filename: str, chmod: int = 0o644,
263                        mode: str = "w") -> Tuple[IO, str]:
264    """Safely finds a unique file using lineage convention.
265
266    :param str path: directory path
267    :param str filename: proposed filename
268    :param int chmod: file mode
269    :param str mode: open mode
270
271    :returns: tuple of file object and file name (which may be modified
272        from the requested one by appending digits to ensure uniqueness)
273
274    :raises OSError: if writing files fails for an unanticipated reason,
275        such as a full disk or a lack of permission to write to
276        specified location.
277
278    """
279    preferred_path = os.path.join(path, "%s.conf" % (filename))
280    try:
281        return safe_open(preferred_path, chmod=chmod), preferred_path
282    except OSError as err:
283        if err.errno != errno.EEXIST:
284            raise
285    return _unique_file(
286        path, filename_pat=(lambda count: "%s-%04d.conf" % (filename, count)),
287        count=1, chmod=chmod, mode=mode)
288
289
290def safely_remove(path: str) -> None:
291    """Remove a file that may not exist."""
292    try:
293        os.remove(path)
294    except OSError as err:
295        if err.errno != errno.ENOENT:
296            raise
297
298
299def get_filtered_names(all_names: Set[str]) -> Set[str]:
300    """Removes names that aren't considered valid by Let's Encrypt.
301
302    :param set all_names: all names found in the configuration
303
304    :returns: all found names that are considered valid by LE
305    :rtype: set
306
307    """
308    filtered_names = set()
309    for name in all_names:
310        try:
311            filtered_names.add(enforce_le_validity(name))
312        except errors.ConfigurationError:
313            logger.debug('Not suggesting name "%s"', name, exc_info=True)
314    return filtered_names
315
316def get_os_info() -> Tuple[str, str]:
317    """
318    Get OS name and version
319
320    :returns: (os_name, os_version)
321    :rtype: `tuple` of `str`
322    """
323
324    return get_python_os_info(pretty=False)
325
326def get_os_info_ua() -> str:
327    """
328    Get OS name and version string for User Agent
329
330    :returns: os_ua
331    :rtype: `str`
332    """
333    if _USE_DISTRO:
334        os_info = distro.name(pretty=True)
335
336    if not _USE_DISTRO or not os_info:
337        return " ".join(get_python_os_info(pretty=True))
338    return os_info
339
340def get_systemd_os_like() -> List[str]:
341    """
342    Get a list of strings that indicate the distribution likeness to
343    other distributions.
344
345    :returns: List of distribution acronyms
346    :rtype: `list` of `str`
347    """
348
349    if _USE_DISTRO:
350        return distro.like().split(" ")
351    return []
352
353def get_var_from_file(varname: str, filepath: str = "/etc/os-release") -> str:
354    """
355    Get single value from a file formatted like systemd /etc/os-release
356
357    :param str varname: Name of variable to fetch
358    :param str filepath: File path of os-release file
359    :returns: requested value
360    :rtype: `str`
361    """
362
363    var_string = varname+"="
364    if not os.path.isfile(filepath):
365        return ""
366    with open(filepath, 'r') as fh:
367        contents = fh.readlines()
368
369    for line in contents:
370        if line.strip().startswith(var_string):
371            # Return the value of var, normalized
372            return _normalize_string(line.strip()[len(var_string):])
373    return ""
374
375def _normalize_string(orig: str) -> str:
376    """
377    Helper function for get_var_from_file() to remove quotes
378    and whitespaces
379    """
380    return orig.replace('"', '').replace("'", "").strip()
381
382def get_python_os_info(pretty: bool = False) -> Tuple[str, str]:
383    """
384    Get Operating System type/distribution and major version
385    using python platform module
386
387    :param bool pretty: If the returned OS name should be in longer (pretty) form
388
389    :returns: (os_name, os_version)
390    :rtype: `tuple` of `str`
391    """
392    info = platform.system_alias(
393        platform.system(),
394        platform.release(),
395        platform.version()
396    )
397    os_type, os_ver, _ = info
398    os_type = os_type.lower()
399    if os_type.startswith('linux') and _USE_DISTRO:
400        distro_name, distro_version = distro.name() if pretty else distro.id(), distro.version()
401        # On arch, these values are reportedly empty strings so handle it
402        # defensively
403        # so handle it defensively
404        if distro_name:
405            os_type = distro_name
406        if distro_version:
407            os_ver = distro_version
408    elif os_type.startswith('darwin'):
409        try:
410            proc = subprocess.run(
411                ["/usr/bin/sw_vers", "-productVersion"],
412                stdout=subprocess.PIPE, stderr=subprocess.PIPE,
413                check=False, universal_newlines=True,
414                env=env_no_snap_for_external_calls(),
415            )
416        except OSError:
417            proc = subprocess.run(
418                ["sw_vers", "-productVersion"],
419                stdout=subprocess.PIPE, stderr=subprocess.PIPE,
420                check=False, universal_newlines=True,
421                env=env_no_snap_for_external_calls(),
422            )
423        os_ver = proc.stdout.rstrip('\n')
424    elif os_type.startswith('freebsd'):
425        # eg "9.3-RC3-p1"
426        os_ver = os_ver.partition("-")[0]
427        os_ver = os_ver.partition(".")[0]
428    elif platform.win32_ver()[1]:
429        os_ver = platform.win32_ver()[1]
430    else:
431        # Cases known to fall here: Cygwin python
432        os_ver = ''
433    return os_type, os_ver
434
435# Just make sure we don't get pwned... Make sure that it also doesn't
436# start with a period or have two consecutive periods <- this needs to
437# be done in addition to the regex
438EMAIL_REGEX = re.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+$")
439
440
441def safe_email(email: str) -> bool:
442    """Scrub email address before using it."""
443    if EMAIL_REGEX.match(email) is not None:
444        return not email.startswith(".") and ".." not in email
445    logger.error("Invalid email address: %s.", email)
446    return False
447
448
449class DeprecatedArgumentAction(argparse.Action):
450    """Action to log a warning when an argument is used."""
451    def __call__(self, unused1: Any, unused2: Any, unused3: Any,
452                 option_string: Optional[str] = None) -> None:
453        warnings.warn("Use of %s is deprecated." % option_string, DeprecationWarning)
454
455
456def add_deprecated_argument(add_argument: Callable[..., None], argument_name: str,
457                            nargs: Union[str, int]) -> None:
458    """Adds a deprecated argument with the name argument_name.
459
460    Deprecated arguments are not shown in the help. If they are used on
461    the command line, a warning is shown stating that the argument is
462    deprecated and no other action is taken.
463
464    :param callable add_argument: Function that adds arguments to an
465        argument parser/group.
466    :param str argument_name: Name of deprecated argument.
467    :param nargs: Value for nargs when adding the argument to argparse.
468
469    """
470    if DeprecatedArgumentAction not in configargparse.ACTION_TYPES_THAT_DONT_NEED_A_VALUE:
471        # In version 0.12.0 ACTION_TYPES_THAT_DONT_NEED_A_VALUE was
472        # changed from a set to a tuple.
473        if isinstance(configargparse.ACTION_TYPES_THAT_DONT_NEED_A_VALUE, set):
474            configargparse.ACTION_TYPES_THAT_DONT_NEED_A_VALUE.add(
475                DeprecatedArgumentAction)
476        else:
477            configargparse.ACTION_TYPES_THAT_DONT_NEED_A_VALUE += (
478                DeprecatedArgumentAction,)
479    add_argument(argument_name, action=DeprecatedArgumentAction,
480                 help=argparse.SUPPRESS, nargs=nargs)
481
482
483def enforce_le_validity(domain: str) -> str:
484    """Checks that Let's Encrypt will consider domain to be valid.
485
486    :param str domain: FQDN to check
487    :type domain: `str`
488    :returns: The domain cast to `str`, with ASCII-only contents
489    :rtype: str
490    :raises ConfigurationError: for invalid domains and cases where Let's
491                                Encrypt currently will not issue certificates
492
493    """
494
495    domain = enforce_domain_sanity(domain)
496    if not re.match("^[A-Za-z0-9.-]*$", domain):
497        raise errors.ConfigurationError(
498            "{0} contains an invalid character. "
499            "Valid characters are A-Z, a-z, 0-9, ., and -.".format(domain))
500
501    labels = domain.split(".")
502    if len(labels) < 2:
503        raise errors.ConfigurationError(
504            "{0} needs at least two labels".format(domain))
505    for label in labels:
506        if label.startswith("-"):
507            raise errors.ConfigurationError(
508                'label "{0}" in domain "{1}" cannot start with "-"'.format(
509                    label, domain))
510        if label.endswith("-"):
511            raise errors.ConfigurationError(
512                'label "{0}" in domain "{1}" cannot end with "-"'.format(
513                    label, domain))
514    return domain
515
516
517def enforce_domain_sanity(domain: Union[str, bytes]) -> str:
518    """Method which validates domain value and errors out if
519    the requirements are not met.
520
521    :param domain: Domain to check
522    :type domain: `str` or `bytes`
523    :raises ConfigurationError: for invalid domains and cases where Let's
524                                Encrypt currently will not issue certificates
525
526    :returns: The domain cast to `str`, with ASCII-only contents
527    :rtype: str
528    """
529    # Unicode
530    try:
531        if isinstance(domain, bytes):
532            domain = domain.decode('utf-8')
533        domain.encode('ascii')
534    except UnicodeError:
535        raise errors.ConfigurationError("Non-ASCII domain names not supported. "
536            "To issue for an Internationalized Domain Name, use Punycode.")
537
538    domain = domain.lower()
539
540    # Remove trailing dot
541    domain = domain[:-1] if domain.endswith('.') else domain
542
543    # Separately check for odd "domains" like "http://example.com" to fail
544    # fast and provide a clear error message
545    for scheme in ["http", "https"]:  # Other schemes seem unlikely
546        if domain.startswith("{0}://".format(scheme)):
547            raise errors.ConfigurationError(
548                "Requested name {0} appears to be a URL, not a FQDN. "
549                "Try again without the leading \"{1}://\".".format(
550                    domain, scheme
551                )
552            )
553
554    if is_ipaddress(domain):
555        raise errors.ConfigurationError(
556            "Requested name {0} is an IP address. The Let's Encrypt "
557            "certificate authority will not issue certificates for a "
558            "bare IP address.".format(domain))
559
560    # FQDN checks according to RFC 2181: domain name should be less than 255
561    # octets (inclusive). And each label is 1 - 63 octets (inclusive).
562    # https://tools.ietf.org/html/rfc2181#section-11
563    msg = "Requested domain {0} is not a FQDN because".format(domain)
564    if len(domain) > 255:
565        raise errors.ConfigurationError("{0} it is too long.".format(msg))
566    labels = domain.split('.')
567    for l in labels:
568        if not l:
569            raise errors.ConfigurationError("{0} it contains an empty label.".format(msg))
570        if len(l) > 63:
571            raise errors.ConfigurationError("{0} label {1} is too long.".format(msg, l))
572
573    return domain
574
575
576def is_ipaddress(address: str) -> bool:
577    """Is given address string form of IP(v4 or v6) address?
578
579    :param address: address to check
580    :type address: `str`
581
582    :returns: True if address is valid IP address, otherwise return False.
583    :rtype: bool
584
585    """
586    try:
587        socket.inet_pton(socket.AF_INET, address)
588        # If this line runs it was ip address (ipv4)
589        return True
590    except socket.error:
591        # It wasn't an IPv4 address, so try ipv6
592        try:
593            socket.inet_pton(socket.AF_INET6, address)
594            return True
595        except socket.error:
596            return False
597
598
599def is_wildcard_domain(domain: Union[str, bytes]) -> bool:
600    """"Is domain a wildcard domain?
601
602    :param domain: domain to check
603    :type domain: `bytes` or `str`
604
605    :returns: True if domain is a wildcard, otherwise, False
606    :rtype: bool
607
608    """
609    if isinstance(domain, str):
610        return domain.startswith("*.")
611    return domain.startswith(b"*.")
612
613
614def get_strict_version(normalized: str) -> "distutils.version.StrictVersion":
615    """Converts a normalized version to a strict version.
616
617    :param str normalized: normalized version string
618
619    :returns: An equivalent strict version
620    :rtype: distutils.version.StrictVersion
621
622    """
623    warnings.warn("certbot.util.get_strict_version is deprecated and will be "
624                  "removed in a future release.", DeprecationWarning)
625    with warnings.catch_warnings():
626        warnings.simplefilter("ignore", DeprecationWarning)
627        import distutils.version
628        # strict version ending with "a" and a number designates a pre-release
629        return distutils.version.StrictVersion(normalized.replace(".dev", "a"))
630
631
632def is_staging(srv: str) -> bool:
633    """
634    Determine whether a given ACME server is a known test / staging server.
635
636    :param str srv: the URI for the ACME server
637    :returns: True iff srv is a known test / staging server
638    :rtype bool:
639    """
640    return srv == constants.STAGING_URI or "staging" in srv
641
642
643def atexit_register(func: Callable, *args: Any, **kwargs: Any) -> None:
644    """Sets func to be called before the program exits.
645
646    Special care is taken to ensure func is only called when the process
647    that first imports this module exits rather than any child processes.
648
649    :param function func: function to be called in case of an error
650
651    """
652    atexit.register(_atexit_call, func, *args, **kwargs)
653
654
655def parse_loose_version(version_string: str) -> List[Union[int, str]]:
656    """Parses a version string into its components.
657
658    This code and the returned tuple is based on the now deprecated
659    distutils.version.LooseVersion class from the Python standard library.
660    Two LooseVersion classes and two lists as returned by this function should
661    compare in the same way. See
662    https://github.com/python/cpython/blob/v3.10.0/Lib/distutils/version.py#L205-L347.
663
664    :param str version_string: version string
665
666    :returns: list of parsed version string components
667    :rtype: list
668
669    """
670    components: List[Union[int, str]]
671    components = [x for x in _VERSION_COMPONENT_RE.split(version_string)
672                          if x and x != '.']
673    for i, obj in enumerate(components):
674        try:
675            components[i] = int(obj)
676        except ValueError:
677            pass
678    return components
679
680
681def _atexit_call(func: Callable, *args: Any, **kwargs: Any) -> None:
682    if _INITIAL_PID == os.getpid():
683        func(*args, **kwargs)
684