1"""A class that performs HTTP-01 challenges for Nginx"""
2
3import io
4import logging
5from typing import List
6from typing import Optional
7from typing import TYPE_CHECKING
8
9from acme import challenges
10from certbot import achallenges
11from certbot import errors
12from certbot.compat import os
13from certbot.plugins import common
14from certbot_nginx._internal import nginxparser
15from certbot_nginx._internal import obj
16
17if TYPE_CHECKING:
18    from certbot_nginx._internal.configurator import NginxConfigurator
19
20logger = logging.getLogger(__name__)
21
22
23class NginxHttp01(common.ChallengePerformer):
24    """HTTP-01 authenticator for Nginx
25
26    :ivar configurator: NginxConfigurator object
27    :type configurator: :class:`~nginx.configurator.NginxConfigurator`
28
29    :ivar list achalls: Annotated
30        class:`~certbot.achallenges.KeyAuthorizationAnnotatedChallenge`
31        challenges
32
33    :param list indices: Meant to hold indices of challenges in a
34        larger array. NginxHttp01 is capable of solving many challenges
35        at once which causes an indexing issue within NginxConfigurator
36        who must return all responses in order. Imagine
37        NginxConfigurator maintaining state about where all of the
38        challenges, possibly of different types, belong in the response
39        array. This is an optional utility.
40
41    """
42
43    def __init__(self, configurator: "NginxConfigurator") -> None:
44        super().__init__(configurator)
45        self.configurator: "NginxConfigurator"
46        self.challenge_conf = os.path.join(
47            configurator.config.config_dir, "le_http_01_cert_challenge.conf")
48
49    def perform(self):
50        """Perform a challenge on Nginx.
51
52        :returns: list of :class:`certbot.acme.challenges.HTTP01Response`
53        :rtype: list
54
55        """
56        if not self.achalls:
57            return []
58
59        responses = [x.response(x.account_key) for x in self.achalls]
60
61        # Set up the configuration
62        self._mod_config()
63
64        # Save reversible changes
65        self.configurator.save("HTTP Challenge", True)
66
67        return responses
68
69    def _mod_config(self):
70        """Modifies Nginx config to include server_names_hash_bucket_size directive
71           and server challenge blocks.
72
73        :raises .MisconfigurationError:
74            Unable to find a suitable HTTP block in which to include
75            authenticator hosts.
76        """
77        included = False
78        include_directive = ['\n', 'include', ' ', self.challenge_conf]
79        root = self.configurator.parser.config_root
80
81        bucket_directive = ['\n', 'server_names_hash_bucket_size', ' ', '128']
82
83        main = self.configurator.parser.parsed[root]
84        for line in main:
85            if line[0] == ['http']:
86                body = line[1]
87                found_bucket = False
88                posn = 0
89                for inner_line in body:
90                    if inner_line[0] == bucket_directive[1]:
91                        if int(inner_line[1]) < int(bucket_directive[3]):
92                            body[posn] = bucket_directive
93                        found_bucket = True
94                    posn += 1
95                if not found_bucket:
96                    body.insert(0, bucket_directive)
97                if include_directive not in body:
98                    body.insert(0, include_directive)
99                included = True
100                break
101        if not included:
102            raise errors.MisconfigurationError(
103                'Certbot could not find a block to include '
104                'challenges in %s.' % root)
105        config = [self._make_or_mod_server_block(achall) for achall in self.achalls]
106        config = [x for x in config if x is not None]
107        config = nginxparser.UnspacedList(config)
108        logger.debug("Generated server block:\n%s", str(config))
109
110        self.configurator.reverter.register_file_creation(
111            True, self.challenge_conf)
112
113        with io.open(self.challenge_conf, "w", encoding="utf-8") as new_conf:
114            nginxparser.dump(config, new_conf)
115
116    def _default_listen_addresses(self):
117        """Finds addresses for a challenge block to listen on.
118        :returns: list of :class:`certbot_nginx._internal.obj.Addr` to apply
119        :rtype: list
120        """
121        addresses: List[obj.Addr] = []
122        default_addr = "%s" % self.configurator.config.http01_port
123        ipv6_addr = "[::]:{0}".format(
124            self.configurator.config.http01_port)
125        port = self.configurator.config.http01_port
126
127        ipv6, ipv6only = self.configurator.ipv6_info(port)
128
129        if ipv6:
130            # If IPv6 is active in Nginx configuration
131            if not ipv6only:
132                # If ipv6only=on is not already present in the config
133                ipv6_addr = ipv6_addr + " ipv6only=on"
134            addresses = [obj.Addr.fromstring(default_addr),
135                         obj.Addr.fromstring(ipv6_addr)]
136            logger.debug(("Using default addresses %s and %s for authentication."),
137                        default_addr,
138                        ipv6_addr)
139        else:
140            addresses = [obj.Addr.fromstring(default_addr)]
141            logger.debug("Using default address %s for authentication.",
142                        default_addr)
143        return addresses
144
145    def _get_validation_path(self, achall):
146        return os.sep + os.path.join(challenges.HTTP01.URI_ROOT_PATH, achall.chall.encode("token"))
147
148    def _make_server_block(self, achall: achallenges.KeyAuthorizationAnnotatedChallenge) -> List:
149        """Creates a server block for a challenge.
150
151        :param achall: Annotated HTTP-01 challenge
152        :type achall: :class:`certbot.achallenges.KeyAuthorizationAnnotatedChallenge`
153
154        :returns: server block for the challenge host
155        :rtype: list
156        """
157        addrs = self._default_listen_addresses()
158        block = [['listen', ' ', addr.to_string(include_default=False)] for addr in addrs]
159
160        # Ensure we 404 on any other request by setting a root
161        document_root = os.path.join(
162            self.configurator.config.work_dir, "http_01_nonexistent")
163
164        block.extend([['server_name', ' ', achall.domain],
165                      ['root', ' ', document_root],
166                      self._location_directive_for_achall(achall)
167                      ])
168        # TODO: do we want to return something else if they otherwise access this block?
169        return [['server'], block]
170
171    def _location_directive_for_achall(self, achall):
172        validation = achall.validation(achall.account_key)
173        validation_path = self._get_validation_path(achall)
174
175        location_directive = [['location', ' ', '=', ' ', validation_path],
176                              [['default_type', ' ', 'text/plain'],
177                               ['return', ' ', '200', ' ', validation]]]
178        return location_directive
179
180
181    def _make_or_mod_server_block(self, achall: achallenges.KeyAuthorizationAnnotatedChallenge
182                                  ) -> Optional[List]:
183        """Modifies server blocks to respond to a challenge. Returns a new HTTP server block
184           to add to the configuration if an existing one can't be found.
185
186        :param achall: Annotated HTTP-01 challenge
187        :type achall: :class:`certbot.achallenges.KeyAuthorizationAnnotatedChallenge`
188
189        :returns: new server block to be added, if any
190        :rtype: list
191
192        """
193        http_vhosts, https_vhosts = self.configurator.choose_auth_vhosts(achall.domain)
194
195        new_vhost: Optional[list] = None
196        if not http_vhosts:
197            # Couldn't find either a matching name+port server block
198            # or a port+default_server block, so create a dummy block
199            new_vhost = self._make_server_block(achall)
200
201        # Modify any existing server blocks
202        for vhost in set(http_vhosts + https_vhosts):
203            location_directive = [self._location_directive_for_achall(achall)]
204
205            self.configurator.parser.add_server_directives(vhost, location_directive)
206
207            rewrite_directive = [['rewrite', ' ', '^(/.well-known/acme-challenge/.*)',
208                                    ' ', '$1', ' ', 'break']]
209            self.configurator.parser.add_server_directives(vhost,
210                rewrite_directive, insert_at_top=True)
211
212        return new_vhost
213