1"""Facilities for implementing hooks that call shell commands."""
2
3import logging
4from typing import List
5from typing import Optional
6from typing import Set
7
8from certbot import configuration
9from certbot import errors
10from certbot import util
11from certbot.compat import filesystem
12from certbot.compat import misc
13from certbot.compat import os
14from certbot.display import ops as display_ops
15from certbot.plugins import util as plug_util
16
17logger = logging.getLogger(__name__)
18
19
20def validate_hooks(config: configuration.NamespaceConfig) -> None:
21    """Check hook commands are executable."""
22    validate_hook(config.pre_hook, "pre")
23    validate_hook(config.post_hook, "post")
24    validate_hook(config.deploy_hook, "deploy")
25    validate_hook(config.renew_hook, "renew")
26
27
28def _prog(shell_cmd: str) -> Optional[str]:
29    """Extract the program run by a shell command.
30
31    :param str shell_cmd: command to be executed
32
33    :returns: basename of command or None if the command isn't found
34    :rtype: str or None
35
36    """
37    if not util.exe_exists(shell_cmd):
38        plug_util.path_surgery(shell_cmd)
39        if not util.exe_exists(shell_cmd):
40            return None
41
42    return os.path.basename(shell_cmd)
43
44
45def validate_hook(shell_cmd: str, hook_name: str) -> None:
46    """Check that a command provided as a hook is plausibly executable.
47
48    :raises .errors.HookCommandNotFound: if the command is not found
49    """
50    if shell_cmd:
51        cmd = shell_cmd.split(None, 1)[0]
52        if not _prog(cmd):
53            path = os.environ["PATH"]
54            if os.path.exists(cmd):
55                msg = "{1}-hook command {0} exists, but is not executable.".format(cmd, hook_name)
56            else:
57                msg = "Unable to find {2}-hook command {0} in the PATH.\n(PATH is {1})".format(
58                    cmd, path, hook_name)
59
60            raise errors.HookCommandNotFound(msg)
61
62
63def pre_hook(config: configuration.NamespaceConfig) -> None:
64    """Run pre-hooks if they exist and haven't already been run.
65
66    When Certbot is running with the renew subcommand, this function
67    runs any hooks found in the config.renewal_pre_hooks_dir (if they
68    have not already been run) followed by any pre-hook in the config.
69    If hooks in config.renewal_pre_hooks_dir are run and the pre-hook in
70    the config is a path to one of these scripts, it is not run twice.
71
72    :param configuration.NamespaceConfig config: Certbot settings
73
74    """
75    if config.verb == "renew" and config.directory_hooks:
76        for hook in list_hooks(config.renewal_pre_hooks_dir):
77            _run_pre_hook_if_necessary(hook)
78
79    cmd = config.pre_hook
80    if cmd:
81        _run_pre_hook_if_necessary(cmd)
82
83
84executed_pre_hooks: Set[str] = set()
85
86
87def _run_pre_hook_if_necessary(command: str) -> None:
88    """Run the specified pre-hook if we haven't already.
89
90    If we've already run this exact command before, a message is logged
91    saying the pre-hook was skipped.
92
93    :param str command: pre-hook to be run
94
95    """
96    if command in executed_pre_hooks:
97        logger.info("Pre-hook command already run, skipping: %s", command)
98    else:
99        _run_hook("pre-hook", command)
100        executed_pre_hooks.add(command)
101
102
103def post_hook(config: configuration.NamespaceConfig) -> None:
104    """Run post-hooks if defined.
105
106    This function also registers any executables found in
107    config.renewal_post_hooks_dir to be run when Certbot is used with
108    the renew subcommand.
109
110    If the verb is renew, we delay executing any post-hooks until
111    :func:`run_saved_post_hooks` is called. In this case, this function
112    registers all hooks found in config.renewal_post_hooks_dir to be
113    called followed by any post-hook in the config. If the post-hook in
114    the config is a path to an executable in the post-hook directory, it
115    is not scheduled to be run twice.
116
117    :param configuration.NamespaceConfig config: Certbot settings
118
119    """
120
121    cmd = config.post_hook
122    # In the "renew" case, we save these up to run at the end
123    if config.verb == "renew":
124        if config.directory_hooks:
125            for hook in list_hooks(config.renewal_post_hooks_dir):
126                _run_eventually(hook)
127        if cmd:
128            _run_eventually(cmd)
129    # certonly / run
130    elif cmd:
131        _run_hook("post-hook", cmd)
132
133
134post_hooks: List[str] = []
135
136
137def _run_eventually(command: str) -> None:
138    """Registers a post-hook to be run eventually.
139
140    All commands given to this function will be run exactly once in the
141    order they were given when :func:`run_saved_post_hooks` is called.
142
143    :param str command: post-hook to register to be run
144
145    """
146    if command not in post_hooks:
147        post_hooks.append(command)
148
149
150def run_saved_post_hooks() -> None:
151    """Run any post hooks that were saved up in the course of the 'renew' verb"""
152    for cmd in post_hooks:
153        _run_hook("post-hook", cmd)
154
155
156def deploy_hook(config: configuration.NamespaceConfig, domains: List[str],
157                lineage_path: str) -> None:
158    """Run post-issuance hook if defined.
159
160    :param configuration.NamespaceConfig config: Certbot settings
161    :param domains: domains in the obtained certificate
162    :type domains: `list` of `str`
163    :param str lineage_path: live directory path for the new cert
164
165    """
166    if config.deploy_hook:
167        _run_deploy_hook(config.deploy_hook, domains,
168                         lineage_path, config.dry_run)
169
170
171def renew_hook(config: configuration.NamespaceConfig, domains: List[str],
172               lineage_path: str) -> None:
173    """Run post-renewal hooks.
174
175    This function runs any hooks found in
176    config.renewal_deploy_hooks_dir followed by any renew-hook in the
177    config. If the renew-hook in the config is a path to a script in
178    config.renewal_deploy_hooks_dir, it is not run twice.
179
180    If Certbot is doing a dry run, no hooks are run and messages are
181    logged saying that they were skipped.
182
183    :param configuration.NamespaceConfig config: Certbot settings
184    :param domains: domains in the obtained certificate
185    :type domains: `list` of `str`
186    :param str lineage_path: live directory path for the new cert
187
188    """
189    executed_dir_hooks = set()
190    if config.directory_hooks:
191        for hook in list_hooks(config.renewal_deploy_hooks_dir):
192            _run_deploy_hook(hook, domains, lineage_path, config.dry_run)
193            executed_dir_hooks.add(hook)
194
195    if config.renew_hook:
196        if config.renew_hook in executed_dir_hooks:
197            logger.info("Skipping deploy-hook '%s' as it was already run.",
198                        config.renew_hook)
199        else:
200            _run_deploy_hook(config.renew_hook, domains,
201                             lineage_path, config.dry_run)
202
203
204def _run_deploy_hook(command: str, domains: List[str], lineage_path: str, dry_run: bool) -> None:
205    """Run the specified deploy-hook (if not doing a dry run).
206
207    If dry_run is True, command is not run and a message is logged
208    saying that it was skipped. If dry_run is False, the hook is run
209    after setting the appropriate environment variables.
210
211    :param str command: command to run as a deploy-hook
212    :param domains: domains in the obtained certificate
213    :type domains: `list` of `str`
214    :param str lineage_path: live directory path for the new cert
215    :param bool dry_run: True iff Certbot is doing a dry run
216
217    """
218    if dry_run:
219        logger.info("Dry run: skipping deploy hook command: %s",
220                       command)
221        return
222
223    os.environ["RENEWED_DOMAINS"] = " ".join(domains)
224    os.environ["RENEWED_LINEAGE"] = lineage_path
225    _run_hook("deploy-hook", command)
226
227
228def _run_hook(cmd_name: str, shell_cmd: str) -> str:
229    """Run a hook command.
230
231    :param str cmd_name: the user facing name of the hook being run
232    :param shell_cmd: shell command to execute
233    :type shell_cmd: `list` of `str` or `str`
234
235    :returns: stderr if there was any"""
236    returncode, err, out = misc.execute_command_status(
237        cmd_name, shell_cmd, env=util.env_no_snap_for_external_calls())
238    display_ops.report_executed_command(f"Hook '{cmd_name}'", returncode, out, err)
239    return err
240
241
242def list_hooks(dir_path: str) -> List[str]:
243    """List paths to all hooks found in dir_path in sorted order.
244
245    :param str dir_path: directory to search
246
247    :returns: `list` of `str`
248    :rtype: sorted list of absolute paths to executables in dir_path
249
250    """
251    allpaths = (os.path.join(dir_path, f) for f in os.listdir(dir_path))
252    hooks = [path for path in allpaths if filesystem.is_executable(path) and not path.endswith('~')]
253    return sorted(hooks)
254