1"""A class that performs HTTP-01 challenges for Nginx""" 2 3import io 4import logging 5from typing import List 6from typing import Optional 7from typing import TYPE_CHECKING 8 9from acme import challenges 10from certbot import achallenges 11from certbot import errors 12from certbot.compat import os 13from certbot.plugins import common 14from certbot_nginx._internal import nginxparser 15from certbot_nginx._internal import obj 16 17if TYPE_CHECKING: 18 from certbot_nginx._internal.configurator import NginxConfigurator 19 20logger = logging.getLogger(__name__) 21 22 23class NginxHttp01(common.ChallengePerformer): 24 """HTTP-01 authenticator for Nginx 25 26 :ivar configurator: NginxConfigurator object 27 :type configurator: :class:`~nginx.configurator.NginxConfigurator` 28 29 :ivar list achalls: Annotated 30 class:`~certbot.achallenges.KeyAuthorizationAnnotatedChallenge` 31 challenges 32 33 :param list indices: Meant to hold indices of challenges in a 34 larger array. NginxHttp01 is capable of solving many challenges 35 at once which causes an indexing issue within NginxConfigurator 36 who must return all responses in order. Imagine 37 NginxConfigurator maintaining state about where all of the 38 challenges, possibly of different types, belong in the response 39 array. This is an optional utility. 40 41 """ 42 43 def __init__(self, configurator: "NginxConfigurator") -> None: 44 super().__init__(configurator) 45 self.configurator: "NginxConfigurator" 46 self.challenge_conf = os.path.join( 47 configurator.config.config_dir, "le_http_01_cert_challenge.conf") 48 49 def perform(self): 50 """Perform a challenge on Nginx. 51 52 :returns: list of :class:`certbot.acme.challenges.HTTP01Response` 53 :rtype: list 54 55 """ 56 if not self.achalls: 57 return [] 58 59 responses = [x.response(x.account_key) for x in self.achalls] 60 61 # Set up the configuration 62 self._mod_config() 63 64 # Save reversible changes 65 self.configurator.save("HTTP Challenge", True) 66 67 return responses 68 69 def _mod_config(self): 70 """Modifies Nginx config to include server_names_hash_bucket_size directive 71 and server challenge blocks. 72 73 :raises .MisconfigurationError: 74 Unable to find a suitable HTTP block in which to include 75 authenticator hosts. 76 """ 77 included = False 78 include_directive = ['\n', 'include', ' ', self.challenge_conf] 79 root = self.configurator.parser.config_root 80 81 bucket_directive = ['\n', 'server_names_hash_bucket_size', ' ', '128'] 82 83 main = self.configurator.parser.parsed[root] 84 for line in main: 85 if line[0] == ['http']: 86 body = line[1] 87 found_bucket = False 88 posn = 0 89 for inner_line in body: 90 if inner_line[0] == bucket_directive[1]: 91 if int(inner_line[1]) < int(bucket_directive[3]): 92 body[posn] = bucket_directive 93 found_bucket = True 94 posn += 1 95 if not found_bucket: 96 body.insert(0, bucket_directive) 97 if include_directive not in body: 98 body.insert(0, include_directive) 99 included = True 100 break 101 if not included: 102 raise errors.MisconfigurationError( 103 'Certbot could not find a block to include ' 104 'challenges in %s.' % root) 105 config = [self._make_or_mod_server_block(achall) for achall in self.achalls] 106 config = [x for x in config if x is not None] 107 config = nginxparser.UnspacedList(config) 108 logger.debug("Generated server block:\n%s", str(config)) 109 110 self.configurator.reverter.register_file_creation( 111 True, self.challenge_conf) 112 113 with io.open(self.challenge_conf, "w", encoding="utf-8") as new_conf: 114 nginxparser.dump(config, new_conf) 115 116 def _default_listen_addresses(self): 117 """Finds addresses for a challenge block to listen on. 118 :returns: list of :class:`certbot_nginx._internal.obj.Addr` to apply 119 :rtype: list 120 """ 121 addresses: List[obj.Addr] = [] 122 default_addr = "%s" % self.configurator.config.http01_port 123 ipv6_addr = "[::]:{0}".format( 124 self.configurator.config.http01_port) 125 port = self.configurator.config.http01_port 126 127 ipv6, ipv6only = self.configurator.ipv6_info(port) 128 129 if ipv6: 130 # If IPv6 is active in Nginx configuration 131 if not ipv6only: 132 # If ipv6only=on is not already present in the config 133 ipv6_addr = ipv6_addr + " ipv6only=on" 134 addresses = [obj.Addr.fromstring(default_addr), 135 obj.Addr.fromstring(ipv6_addr)] 136 logger.debug(("Using default addresses %s and %s for authentication."), 137 default_addr, 138 ipv6_addr) 139 else: 140 addresses = [obj.Addr.fromstring(default_addr)] 141 logger.debug("Using default address %s for authentication.", 142 default_addr) 143 return addresses 144 145 def _get_validation_path(self, achall): 146 return os.sep + os.path.join(challenges.HTTP01.URI_ROOT_PATH, achall.chall.encode("token")) 147 148 def _make_server_block(self, achall: achallenges.KeyAuthorizationAnnotatedChallenge) -> List: 149 """Creates a server block for a challenge. 150 151 :param achall: Annotated HTTP-01 challenge 152 :type achall: :class:`certbot.achallenges.KeyAuthorizationAnnotatedChallenge` 153 154 :returns: server block for the challenge host 155 :rtype: list 156 """ 157 addrs = self._default_listen_addresses() 158 block = [['listen', ' ', addr.to_string(include_default=False)] for addr in addrs] 159 160 # Ensure we 404 on any other request by setting a root 161 document_root = os.path.join( 162 self.configurator.config.work_dir, "http_01_nonexistent") 163 164 block.extend([['server_name', ' ', achall.domain], 165 ['root', ' ', document_root], 166 self._location_directive_for_achall(achall) 167 ]) 168 # TODO: do we want to return something else if they otherwise access this block? 169 return [['server'], block] 170 171 def _location_directive_for_achall(self, achall): 172 validation = achall.validation(achall.account_key) 173 validation_path = self._get_validation_path(achall) 174 175 location_directive = [['location', ' ', '=', ' ', validation_path], 176 [['default_type', ' ', 'text/plain'], 177 ['return', ' ', '200', ' ', validation]]] 178 return location_directive 179 180 181 def _make_or_mod_server_block(self, achall: achallenges.KeyAuthorizationAnnotatedChallenge 182 ) -> Optional[List]: 183 """Modifies server blocks to respond to a challenge. Returns a new HTTP server block 184 to add to the configuration if an existing one can't be found. 185 186 :param achall: Annotated HTTP-01 challenge 187 :type achall: :class:`certbot.achallenges.KeyAuthorizationAnnotatedChallenge` 188 189 :returns: new server block to be added, if any 190 :rtype: list 191 192 """ 193 http_vhosts, https_vhosts = self.configurator.choose_auth_vhosts(achall.domain) 194 195 new_vhost: Optional[list] = None 196 if not http_vhosts: 197 # Couldn't find either a matching name+port server block 198 # or a port+default_server block, so create a dummy block 199 new_vhost = self._make_server_block(achall) 200 201 # Modify any existing server blocks 202 for vhost in set(http_vhosts + https_vhosts): 203 location_directive = [self._location_directive_for_achall(achall)] 204 205 self.configurator.parser.add_server_directives(vhost, location_directive) 206 207 rewrite_directive = [['rewrite', ' ', '^(/.well-known/acme-challenge/.*)', 208 ' ', '$1', ' ', 'break']] 209 self.configurator.parser.add_server_directives(vhost, 210 rewrite_directive, insert_at_top=True) 211 212 return new_vhost 213