1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import os
16import re
17
18import mock
19from OpenSSL import crypto
20import pytest
21
22from google.auth import exceptions
23from google.auth.transport import _mtls_helper
24
25CONTEXT_AWARE_METADATA = {"cert_provider_command": ["some command"]}
26
27CONTEXT_AWARE_METADATA_NO_CERT_PROVIDER_COMMAND = {}
28
29ENCRYPTED_EC_PRIVATE_KEY = b"""-----BEGIN ENCRYPTED PRIVATE KEY-----
30MIHkME8GCSqGSIb3DQEFDTBCMCkGCSqGSIb3DQEFDDAcBAgl2/yVgs1h3QICCAAw
31DAYIKoZIhvcNAgkFADAVBgkrBgEEAZdVAQIECJk2GRrvxOaJBIGQXIBnMU4wmciT
32uA6yD8q0FxuIzjG7E2S6tc5VRgSbhRB00eBO3jWmO2pBybeQW+zVioDcn50zp2ts
33wYErWC+LCm1Zg3r+EGnT1E1GgNoODbVQ3AEHlKh1CGCYhEovxtn3G+Fjh7xOBrNB
34saVVeDb4tHD4tMkiVVUBrUcTZPndP73CtgyGHYEphasYPzEz3+AU
35-----END ENCRYPTED PRIVATE KEY-----"""
36
37EC_PUBLIC_KEY = b"""-----BEGIN PUBLIC KEY-----
38MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEvCNi1NoDY1oMqPHIgXI8RBbTYGi/
39brEjbre1nSiQW11xRTJbVeETdsuP0EAu2tG3PcRhhwDfeJ8zXREgTBurNw==
40-----END PUBLIC KEY-----"""
41
42PASSPHRASE = b"""-----BEGIN PASSPHRASE-----
43password
44-----END PASSPHRASE-----"""
45PASSPHRASE_VALUE = b"password"
46
47
48def check_cert_and_key(content, expected_cert, expected_key):
49    success = True
50
51    cert_match = re.findall(_mtls_helper._CERT_REGEX, content)
52    success = success and len(cert_match) == 1 and cert_match[0] == expected_cert
53
54    key_match = re.findall(_mtls_helper._KEY_REGEX, content)
55    success = success and len(key_match) == 1 and key_match[0] == expected_key
56
57    return success
58
59
60class TestCertAndKeyRegex(object):
61    def test_cert_and_key(self):
62        # Test single cert and single key
63        check_cert_and_key(
64            pytest.public_cert_bytes + pytest.private_key_bytes,
65            pytest.public_cert_bytes,
66            pytest.private_key_bytes,
67        )
68        check_cert_and_key(
69            pytest.private_key_bytes + pytest.public_cert_bytes,
70            pytest.public_cert_bytes,
71            pytest.private_key_bytes,
72        )
73
74        # Test cert chain and single key
75        check_cert_and_key(
76            pytest.public_cert_bytes
77            + pytest.public_cert_bytes
78            + pytest.private_key_bytes,
79            pytest.public_cert_bytes + pytest.public_cert_bytes,
80            pytest.private_key_bytes,
81        )
82        check_cert_and_key(
83            pytest.private_key_bytes
84            + pytest.public_cert_bytes
85            + pytest.public_cert_bytes,
86            pytest.public_cert_bytes + pytest.public_cert_bytes,
87            pytest.private_key_bytes,
88        )
89
90    def test_key(self):
91        # Create some fake keys for regex check.
92        KEY = b"""-----BEGIN PRIVATE KEY-----
93        MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
94        /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
95        -----END PRIVATE KEY-----"""
96        RSA_KEY = b"""-----BEGIN RSA PRIVATE KEY-----
97        MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
98        /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
99        -----END RSA PRIVATE KEY-----"""
100        EC_KEY = b"""-----BEGIN EC PRIVATE KEY-----
101        MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
102        /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
103        -----END EC PRIVATE KEY-----"""
104
105        check_cert_and_key(
106            pytest.public_cert_bytes + KEY, pytest.public_cert_bytes, KEY
107        )
108        check_cert_and_key(
109            pytest.public_cert_bytes + RSA_KEY, pytest.public_cert_bytes, RSA_KEY
110        )
111        check_cert_and_key(
112            pytest.public_cert_bytes + EC_KEY, pytest.public_cert_bytes, EC_KEY
113        )
114
115
116class TestCheckaMetadataPath(object):
117    def test_success(self):
118        metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json")
119        returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
120        assert returned_path is not None
121
122    def test_failure(self):
123        metadata_path = os.path.join(pytest.data_dir, "not_exists.json")
124        returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
125        assert returned_path is None
126
127
128class TestReadMetadataFile(object):
129    def test_success(self):
130        metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json")
131        metadata = _mtls_helper._read_dca_metadata_file(metadata_path)
132
133        assert "cert_provider_command" in metadata
134
135    def test_file_not_json(self):
136        # read a file which is not json format.
137        metadata_path = os.path.join(pytest.data_dir, "privatekey.pem")
138        with pytest.raises(exceptions.ClientCertError):
139            _mtls_helper._read_dca_metadata_file(metadata_path)
140
141
142class TestRunCertProviderCommand(object):
143    def create_mock_process(self, output, error):
144        # There are two steps to execute a script with subprocess.Popen.
145        # (1) process = subprocess.Popen([comannds])
146        # (2) stdout, stderr = process.communicate()
147        # This function creates a mock process which can be returned by a mock
148        # subprocess.Popen. The mock process returns the given output and error
149        # when mock_process.communicate() is called.
150        mock_process = mock.Mock()
151        attrs = {"communicate.return_value": (output, error), "returncode": 0}
152        mock_process.configure_mock(**attrs)
153        return mock_process
154
155    @mock.patch("subprocess.Popen", autospec=True)
156    def test_success(self, mock_popen):
157        mock_popen.return_value = self.create_mock_process(
158            pytest.public_cert_bytes + pytest.private_key_bytes, b""
159        )
160        cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"])
161        assert cert == pytest.public_cert_bytes
162        assert key == pytest.private_key_bytes
163        assert passphrase is None
164
165        mock_popen.return_value = self.create_mock_process(
166            pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
167        )
168        cert, key, passphrase = _mtls_helper._run_cert_provider_command(
169            ["command"], expect_encrypted_key=True
170        )
171        assert cert == pytest.public_cert_bytes
172        assert key == ENCRYPTED_EC_PRIVATE_KEY
173        assert passphrase == PASSPHRASE_VALUE
174
175    @mock.patch("subprocess.Popen", autospec=True)
176    def test_success_with_cert_chain(self, mock_popen):
177        PUBLIC_CERT_CHAIN_BYTES = pytest.public_cert_bytes + pytest.public_cert_bytes
178        mock_popen.return_value = self.create_mock_process(
179            PUBLIC_CERT_CHAIN_BYTES + pytest.private_key_bytes, b""
180        )
181        cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"])
182        assert cert == PUBLIC_CERT_CHAIN_BYTES
183        assert key == pytest.private_key_bytes
184        assert passphrase is None
185
186        mock_popen.return_value = self.create_mock_process(
187            PUBLIC_CERT_CHAIN_BYTES + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
188        )
189        cert, key, passphrase = _mtls_helper._run_cert_provider_command(
190            ["command"], expect_encrypted_key=True
191        )
192        assert cert == PUBLIC_CERT_CHAIN_BYTES
193        assert key == ENCRYPTED_EC_PRIVATE_KEY
194        assert passphrase == PASSPHRASE_VALUE
195
196    @mock.patch("subprocess.Popen", autospec=True)
197    def test_missing_cert(self, mock_popen):
198        mock_popen.return_value = self.create_mock_process(
199            pytest.private_key_bytes, b""
200        )
201        with pytest.raises(exceptions.ClientCertError):
202            _mtls_helper._run_cert_provider_command(["command"])
203
204        mock_popen.return_value = self.create_mock_process(
205            ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
206        )
207        with pytest.raises(exceptions.ClientCertError):
208            _mtls_helper._run_cert_provider_command(
209                ["command"], expect_encrypted_key=True
210            )
211
212    @mock.patch("subprocess.Popen", autospec=True)
213    def test_missing_key(self, mock_popen):
214        mock_popen.return_value = self.create_mock_process(
215            pytest.public_cert_bytes, b""
216        )
217        with pytest.raises(exceptions.ClientCertError):
218            _mtls_helper._run_cert_provider_command(["command"])
219
220        mock_popen.return_value = self.create_mock_process(
221            pytest.public_cert_bytes + PASSPHRASE, b""
222        )
223        with pytest.raises(exceptions.ClientCertError):
224            _mtls_helper._run_cert_provider_command(
225                ["command"], expect_encrypted_key=True
226            )
227
228    @mock.patch("subprocess.Popen", autospec=True)
229    def test_missing_passphrase(self, mock_popen):
230        mock_popen.return_value = self.create_mock_process(
231            pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b""
232        )
233        with pytest.raises(exceptions.ClientCertError):
234            _mtls_helper._run_cert_provider_command(
235                ["command"], expect_encrypted_key=True
236            )
237
238    @mock.patch("subprocess.Popen", autospec=True)
239    def test_passphrase_not_expected(self, mock_popen):
240        mock_popen.return_value = self.create_mock_process(
241            pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b""
242        )
243        with pytest.raises(exceptions.ClientCertError):
244            _mtls_helper._run_cert_provider_command(["command"])
245
246    @mock.patch("subprocess.Popen", autospec=True)
247    def test_encrypted_key_expected(self, mock_popen):
248        mock_popen.return_value = self.create_mock_process(
249            pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b""
250        )
251        with pytest.raises(exceptions.ClientCertError):
252            _mtls_helper._run_cert_provider_command(
253                ["command"], expect_encrypted_key=True
254            )
255
256    @mock.patch("subprocess.Popen", autospec=True)
257    def test_unencrypted_key_expected(self, mock_popen):
258        mock_popen.return_value = self.create_mock_process(
259            pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b""
260        )
261        with pytest.raises(exceptions.ClientCertError):
262            _mtls_helper._run_cert_provider_command(["command"])
263
264    @mock.patch("subprocess.Popen", autospec=True)
265    def test_cert_provider_returns_error(self, mock_popen):
266        mock_popen.return_value = self.create_mock_process(b"", b"some error")
267        mock_popen.return_value.returncode = 1
268        with pytest.raises(exceptions.ClientCertError):
269            _mtls_helper._run_cert_provider_command(["command"])
270
271    @mock.patch("subprocess.Popen", autospec=True)
272    def test_popen_raise_exception(self, mock_popen):
273        mock_popen.side_effect = OSError()
274        with pytest.raises(exceptions.ClientCertError):
275            _mtls_helper._run_cert_provider_command(["command"])
276
277
278class TestGetClientSslCredentials(object):
279    @mock.patch(
280        "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
281    )
282    @mock.patch(
283        "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
284    )
285    @mock.patch(
286        "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
287    )
288    def test_success(
289        self,
290        mock_check_dca_metadata_path,
291        mock_read_dca_metadata_file,
292        mock_run_cert_provider_command,
293    ):
294        mock_check_dca_metadata_path.return_value = True
295        mock_read_dca_metadata_file.return_value = {
296            "cert_provider_command": ["command"]
297        }
298        mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
299        has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
300        assert has_cert
301        assert cert == b"cert"
302        assert key == b"key"
303        assert passphrase is None
304
305    @mock.patch(
306        "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
307    )
308    def test_success_without_metadata(self, mock_check_dca_metadata_path):
309        mock_check_dca_metadata_path.return_value = False
310        has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
311        assert not has_cert
312        assert cert is None
313        assert key is None
314        assert passphrase is None
315
316    @mock.patch(
317        "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
318    )
319    @mock.patch(
320        "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
321    )
322    @mock.patch(
323        "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
324    )
325    def test_success_with_encrypted_key(
326        self,
327        mock_check_dca_metadata_path,
328        mock_read_dca_metadata_file,
329        mock_run_cert_provider_command,
330    ):
331        mock_check_dca_metadata_path.return_value = True
332        mock_read_dca_metadata_file.return_value = {
333            "cert_provider_command": ["command"]
334        }
335        mock_run_cert_provider_command.return_value = (b"cert", b"key", b"passphrase")
336        has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
337            generate_encrypted_key=True
338        )
339        assert has_cert
340        assert cert == b"cert"
341        assert key == b"key"
342        assert passphrase == b"passphrase"
343        mock_run_cert_provider_command.assert_called_once_with(
344            ["command", "--with_passphrase"], expect_encrypted_key=True
345        )
346
347    @mock.patch(
348        "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
349    )
350    @mock.patch(
351        "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
352    )
353    def test_missing_cert_command(
354        self, mock_check_dca_metadata_path, mock_read_dca_metadata_file
355    ):
356        mock_check_dca_metadata_path.return_value = True
357        mock_read_dca_metadata_file.return_value = {}
358        with pytest.raises(exceptions.ClientCertError):
359            _mtls_helper.get_client_ssl_credentials()
360
361    @mock.patch(
362        "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
363    )
364    @mock.patch(
365        "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
366    )
367    @mock.patch(
368        "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
369    )
370    def test_customize_context_aware_metadata_path(
371        self,
372        mock_check_dca_metadata_path,
373        mock_read_dca_metadata_file,
374        mock_run_cert_provider_command,
375    ):
376        context_aware_metadata_path = "/path/to/metata/data"
377        mock_check_dca_metadata_path.return_value = context_aware_metadata_path
378        mock_read_dca_metadata_file.return_value = {
379            "cert_provider_command": ["command"]
380        }
381        mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
382
383        has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
384            context_aware_metadata_path=context_aware_metadata_path
385        )
386
387        assert has_cert
388        assert cert == b"cert"
389        assert key == b"key"
390        assert passphrase is None
391        mock_check_dca_metadata_path.assert_called_with(context_aware_metadata_path)
392        mock_read_dca_metadata_file.assert_called_with(context_aware_metadata_path)
393
394
395class TestGetClientCertAndKey(object):
396    def test_callback_success(self):
397        callback = mock.Mock()
398        callback.return_value = (pytest.public_cert_bytes, pytest.private_key_bytes)
399
400        found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key(callback)
401        assert found_cert_key
402        assert cert == pytest.public_cert_bytes
403        assert key == pytest.private_key_bytes
404
405    @mock.patch(
406        "google.auth.transport._mtls_helper.get_client_ssl_credentials", autospec=True
407    )
408    def test_use_metadata(self, mock_get_client_ssl_credentials):
409        mock_get_client_ssl_credentials.return_value = (
410            True,
411            pytest.public_cert_bytes,
412            pytest.private_key_bytes,
413            None,
414        )
415
416        found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key()
417        assert found_cert_key
418        assert cert == pytest.public_cert_bytes
419        assert key == pytest.private_key_bytes
420
421
422class TestDecryptPrivateKey(object):
423    def test_success(self):
424        decrypted_key = _mtls_helper.decrypt_private_key(
425            ENCRYPTED_EC_PRIVATE_KEY, PASSPHRASE_VALUE
426        )
427        private_key = crypto.load_privatekey(crypto.FILETYPE_PEM, decrypted_key)
428        public_key = crypto.load_publickey(crypto.FILETYPE_PEM, EC_PUBLIC_KEY)
429        x509 = crypto.X509()
430        x509.set_pubkey(public_key)
431
432        # Test the decrypted key works by signing and verification.
433        signature = crypto.sign(private_key, b"data", "sha256")
434        crypto.verify(x509, signature, b"data", "sha256")
435
436    def test_crypto_error(self):
437        with pytest.raises(crypto.Error):
438            _mtls_helper.decrypt_private_key(
439                ENCRYPTED_EC_PRIVATE_KEY, b"wrong_password"
440            )
441