1"""Tests for certbot_nginx._internal.http_01"""
2import unittest
3
4import josepy as jose
5try:
6    import mock
7except ImportError: # pragma: no cover
8    from unittest import mock # type: ignore
9
10from acme import challenges
11from certbot import achallenges
12from certbot.tests import acme_util
13from certbot.tests import util as test_util
14from certbot_nginx._internal.obj import Addr
15import test_util as util
16
17AUTH_KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
18
19
20class HttpPerformTest(util.NginxTest):
21    """Test the NginxHttp01 challenge."""
22
23    account_key = AUTH_KEY
24    achalls = [
25        achallenges.KeyAuthorizationAnnotatedChallenge(
26            challb=acme_util.chall_to_challb(
27                challenges.HTTP01(token=b"kNdwjwOeX0I_A8DXt9Msmg"), "pending"),
28            domain="www.example.com", account_key=account_key),
29        achallenges.KeyAuthorizationAnnotatedChallenge(
30            challb=acme_util.chall_to_challb(
31                challenges.HTTP01(
32                    token=b"\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y"
33                          b"\x80\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945"
34                ), "pending"),
35            domain="ipv6.com", account_key=account_key),
36        achallenges.KeyAuthorizationAnnotatedChallenge(
37            challb=acme_util.chall_to_challb(
38                challenges.HTTP01(
39                    token=b"\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd"
40                          b"\xeb9\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4"
41                ), "pending"),
42            domain="www.example.org", account_key=account_key),
43        achallenges.KeyAuthorizationAnnotatedChallenge(
44            challb=acme_util.chall_to_challb(
45                challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), "pending"),
46            domain="migration.com", account_key=account_key),
47        achallenges.KeyAuthorizationAnnotatedChallenge(
48            challb=acme_util.chall_to_challb(
49                challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), "pending"),
50            domain="ipv6ssl.com", account_key=account_key),
51    ]
52
53    def setUp(self):
54        super().setUp()
55
56        config = self.get_nginx_configurator(
57            self.config_path, self.config_dir, self.work_dir, self.logs_dir)
58
59        from certbot_nginx._internal import http_01
60        self.http01 = http_01.NginxHttp01(config)
61
62    def test_perform0(self):
63        responses = self.http01.perform()
64        self.assertEqual([], responses)
65
66    @mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.save")
67    def test_perform1(self, mock_save):
68        self.http01.add_chall(self.achalls[0])
69        response = self.achalls[0].response(self.account_key)
70
71        responses = self.http01.perform()
72
73        self.assertEqual([response], responses)
74        self.assertEqual(mock_save.call_count, 1)
75
76    def test_perform2(self):
77        acme_responses = []
78        for achall in self.achalls:
79            self.http01.add_chall(achall)
80            acme_responses.append(achall.response(self.account_key))
81
82        http_responses = self.http01.perform()
83
84        self.assertEqual(len(http_responses), 5)
85        for i in range(5):
86            self.assertEqual(http_responses[i], acme_responses[i])
87
88    def test_mod_config(self):
89        self.http01.add_chall(self.achalls[0])
90        self.http01.add_chall(self.achalls[2])
91
92        self.http01._mod_config()  # pylint: disable=protected-access
93
94        self.http01.configurator.save()
95
96        self.http01.configurator.parser.load()
97
98        # vhosts = self.http01.configurator.parser.get_vhosts()
99
100        # for vhost in vhosts:
101        #     pass
102            # if the name matches
103            # check that the location block is in there and is correct
104
105            # if vhost.addrs == set(v_addr1):
106            #     response = self.achalls[0].response(self.account_key)
107            # else:
108            #     response = self.achalls[2].response(self.account_key)
109            #     self.assertEqual(vhost.addrs, set(v_addr2_print))
110            # self.assertEqual(vhost.names, set([response.z_domain.decode('ascii')]))
111
112    @mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
113    def test_mod_config_http_and_https(self, mock_add_server_directives):
114        """A server_name with both HTTP and HTTPS vhosts should get modded in both vhosts"""
115        self.configuration.https_port = 443
116        self.http01.add_chall(self.achalls[3]) # migration.com
117        self.http01._mod_config()  # pylint: disable=protected-access
118
119        # Domain has an HTTP and HTTPS vhost
120        # 2 * 'rewrite' + 2 * 'return 200 keyauthz' = 4
121        self.assertEqual(mock_add_server_directives.call_count, 4)
122
123    @mock.patch('certbot_nginx._internal.parser.nginxparser.dump')
124    @mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
125    def test_mod_config_only_https(self, mock_add_server_directives, mock_dump):
126        """A server_name with only an HTTPS vhost should get modded"""
127        self.http01.add_chall(self.achalls[4]) # ipv6ssl.com
128        self.http01._mod_config() # pylint: disable=protected-access
129
130        # It should modify the existing HTTPS vhost
131        self.assertEqual(mock_add_server_directives.call_count, 2)
132        # since there was no suitable HTTP vhost or default HTTP vhost, a non-empty one
133        # should have been created and written to the challenge conf file
134        self.assertNotEqual(mock_dump.call_args[0][0], [])
135
136    @mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
137    def test_mod_config_deduplicate(self, mock_add_server_directives):
138        """A vhost that appears in both HTTP and HTTPS vhosts only gets modded once"""
139        achall = achallenges.KeyAuthorizationAnnotatedChallenge(
140            challb=acme_util.chall_to_challb(
141                challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), "pending"),
142            domain="ssl.both.com", account_key=AUTH_KEY)
143        self.http01.add_chall(achall)
144        self.http01._mod_config() # pylint: disable=protected-access
145
146        # Should only get called 5 times, rather than 6, because two vhosts are the same
147        self.assertEqual(mock_add_server_directives.call_count, 5*2)
148
149    @mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
150    def test_default_listen_addresses_no_memoization(self, ipv6_info):
151        # pylint: disable=protected-access
152        ipv6_info.return_value = (True, True)
153        self.http01._default_listen_addresses()
154        self.assertEqual(ipv6_info.call_count, 1)
155        ipv6_info.return_value = (False, False)
156        self.http01._default_listen_addresses()
157        self.assertEqual(ipv6_info.call_count, 2)
158
159    @mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
160    def test_default_listen_addresses_t_t(self, ipv6_info):
161        # pylint: disable=protected-access
162        ipv6_info.return_value = (True, True)
163        addrs = self.http01._default_listen_addresses()
164        http_addr = Addr.fromstring("80")
165        http_ipv6_addr = Addr.fromstring("[::]:80")
166        self.assertEqual(addrs, [http_addr, http_ipv6_addr])
167
168    @mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
169    def test_default_listen_addresses_t_f(self, ipv6_info):
170        # pylint: disable=protected-access
171        ipv6_info.return_value = (True, False)
172        addrs = self.http01._default_listen_addresses()
173        http_addr = Addr.fromstring("80")
174        http_ipv6_addr = Addr.fromstring("[::]:80 ipv6only=on")
175        self.assertEqual(addrs, [http_addr, http_ipv6_addr])
176
177    @mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
178    def test_default_listen_addresses_f_f(self, ipv6_info):
179        # pylint: disable=protected-access
180        ipv6_info.return_value = (False, False)
181        addrs = self.http01._default_listen_addresses()
182        http_addr = Addr.fromstring("80")
183        self.assertEqual(addrs, [http_addr])
184
185if __name__ == "__main__":
186    unittest.main()  # pragma: no cover
187