1# -*- coding: utf-8 -*-
2# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com>
3#
4# This file is part of Ansible
5#
6# Ansible is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# Ansible is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
18
19# Make coding more python3-ish
20from __future__ import (absolute_import, division, print_function)
21__metaclass__ = type
22
23try:
24    import passlib
25    from passlib.handlers import pbkdf2
26except ImportError:
27    passlib = None
28    pbkdf2 = None
29
30import pytest
31
32from units.mock.loader import DictDataLoader
33
34from units.compat import unittest
35from units.compat.mock import mock_open, patch
36from ansible.errors import AnsibleError
37from ansible.module_utils.six import text_type
38from ansible.module_utils.six.moves import builtins
39from ansible.module_utils._text import to_bytes
40from ansible.plugins.loader import PluginLoader
41from ansible.plugins.lookup import password
42
43
44DEFAULT_CHARS = sorted([u'ascii_letters', u'digits', u".,:-_"])
45DEFAULT_CANDIDATE_CHARS = u'.,:-_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
46
47# Currently there isn't a new-style
48old_style_params_data = (
49    # Simple case
50    dict(
51        term=u'/path/to/file',
52        filename=u'/path/to/file',
53        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
54        candidate_chars=DEFAULT_CANDIDATE_CHARS,
55    ),
56
57    # Special characters in path
58    dict(
59        term=u'/path/with/embedded spaces and/file',
60        filename=u'/path/with/embedded spaces and/file',
61        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
62        candidate_chars=DEFAULT_CANDIDATE_CHARS,
63    ),
64    dict(
65        term=u'/path/with/equals/cn=com.ansible',
66        filename=u'/path/with/equals/cn=com.ansible',
67        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
68        candidate_chars=DEFAULT_CANDIDATE_CHARS,
69    ),
70    dict(
71        term=u'/path/with/unicode/くらとみ/file',
72        filename=u'/path/with/unicode/くらとみ/file',
73        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
74        candidate_chars=DEFAULT_CANDIDATE_CHARS,
75    ),
76    # Mix several special chars
77    dict(
78        term=u'/path/with/utf 8 and spaces/くらとみ/file',
79        filename=u'/path/with/utf 8 and spaces/くらとみ/file',
80        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
81        candidate_chars=DEFAULT_CANDIDATE_CHARS,
82    ),
83    dict(
84        term=u'/path/with/encoding=unicode/くらとみ/file',
85        filename=u'/path/with/encoding=unicode/くらとみ/file',
86        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
87        candidate_chars=DEFAULT_CANDIDATE_CHARS,
88    ),
89    dict(
90        term=u'/path/with/encoding=unicode/くらとみ/and spaces file',
91        filename=u'/path/with/encoding=unicode/くらとみ/and spaces file',
92        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
93        candidate_chars=DEFAULT_CANDIDATE_CHARS,
94    ),
95
96    # Simple parameters
97    dict(
98        term=u'/path/to/file length=42',
99        filename=u'/path/to/file',
100        params=dict(length=42, encrypt=None, chars=DEFAULT_CHARS),
101        candidate_chars=DEFAULT_CANDIDATE_CHARS,
102    ),
103    dict(
104        term=u'/path/to/file encrypt=pbkdf2_sha256',
105        filename=u'/path/to/file',
106        params=dict(length=password.DEFAULT_LENGTH, encrypt='pbkdf2_sha256', chars=DEFAULT_CHARS),
107        candidate_chars=DEFAULT_CANDIDATE_CHARS,
108    ),
109    dict(
110        term=u'/path/to/file chars=abcdefghijklmnop',
111        filename=u'/path/to/file',
112        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abcdefghijklmnop']),
113        candidate_chars=u'abcdefghijklmnop',
114    ),
115    dict(
116        term=u'/path/to/file chars=digits,abc,def',
117        filename=u'/path/to/file',
118        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'abc', u'def'])),
119        candidate_chars=u'abcdef0123456789',
120    ),
121
122    # Including comma in chars
123    dict(
124        term=u'/path/to/file chars=abcdefghijklmnop,,digits',
125        filename=u'/path/to/file',
126        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'abcdefghijklmnop', u',', u'digits'])),
127        candidate_chars=u',abcdefghijklmnop0123456789',
128    ),
129    dict(
130        term=u'/path/to/file chars=,,',
131        filename=u'/path/to/file',
132        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u',']),
133        candidate_chars=u',',
134    ),
135
136    # Including = in chars
137    dict(
138        term=u'/path/to/file chars=digits,=,,',
139        filename=u'/path/to/file',
140        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'=', u','])),
141        candidate_chars=u',=0123456789',
142    ),
143    dict(
144        term=u'/path/to/file chars=digits,abc=def',
145        filename=u'/path/to/file',
146        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'abc=def'])),
147        candidate_chars=u'abc=def0123456789',
148    ),
149
150    # Including unicode in chars
151    dict(
152        term=u'/path/to/file chars=digits,くらとみ,,',
153        filename=u'/path/to/file',
154        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'くらとみ', u','])),
155        candidate_chars=u',0123456789くらとみ',
156    ),
157    # Including only unicode in chars
158    dict(
159        term=u'/path/to/file chars=くらとみ',
160        filename=u'/path/to/file',
161        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'くらとみ'])),
162        candidate_chars=u'くらとみ',
163    ),
164
165    # Include ':' in path
166    dict(
167        term=u'/path/to/file_with:colon chars=ascii_letters,digits',
168        filename=u'/path/to/file_with:colon',
169        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'ascii_letters', u'digits'])),
170        candidate_chars=u'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',
171    ),
172
173    # Including special chars in both path and chars
174    # Special characters in path
175    dict(
176        term=u'/path/with/embedded spaces and/file chars=abc=def',
177        filename=u'/path/with/embedded spaces and/file',
178        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']),
179        candidate_chars=u'abc=def',
180    ),
181    dict(
182        term=u'/path/with/equals/cn=com.ansible chars=abc=def',
183        filename=u'/path/with/equals/cn=com.ansible',
184        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']),
185        candidate_chars=u'abc=def',
186    ),
187    dict(
188        term=u'/path/with/unicode/くらとみ/file chars=くらとみ',
189        filename=u'/path/with/unicode/くらとみ/file',
190        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']),
191        candidate_chars=u'くらとみ',
192    ),
193)
194
195
196class TestParseParameters(unittest.TestCase):
197    def test(self):
198        for testcase in old_style_params_data:
199            filename, params = password._parse_parameters(testcase['term'])
200            params['chars'].sort()
201            self.assertEqual(filename, testcase['filename'])
202            self.assertEqual(params, testcase['params'])
203
204    def test_unrecognized_value(self):
205        testcase = dict(term=u'/path/to/file chars=くらとみi  sdfsdf',
206                        filename=u'/path/to/file',
207                        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']),
208                        candidate_chars=u'くらとみ')
209        self.assertRaises(AnsibleError, password._parse_parameters, testcase['term'])
210
211    def test_invalid_params(self):
212        testcase = dict(term=u'/path/to/file chars=くらとみi  somethign_invalid=123',
213                        filename=u'/path/to/file',
214                        params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']),
215                        candidate_chars=u'くらとみ')
216        self.assertRaises(AnsibleError, password._parse_parameters, testcase['term'])
217
218
219class TestReadPasswordFile(unittest.TestCase):
220    def setUp(self):
221        self.os_path_exists = password.os.path.exists
222
223    def tearDown(self):
224        password.os.path.exists = self.os_path_exists
225
226    def test_no_password_file(self):
227        password.os.path.exists = lambda x: False
228        self.assertEqual(password._read_password_file(b'/nonexistent'), None)
229
230    def test_with_password_file(self):
231        password.os.path.exists = lambda x: True
232        with patch.object(builtins, 'open', mock_open(read_data=b'Testing\n')) as m:
233            self.assertEqual(password._read_password_file(b'/etc/motd'), u'Testing')
234
235
236class TestGenCandidateChars(unittest.TestCase):
237    def _assert_gen_candidate_chars(self, testcase):
238        expected_candidate_chars = testcase['candidate_chars']
239        params = testcase['params']
240        chars_spec = params['chars']
241        res = password._gen_candidate_chars(chars_spec)
242        self.assertEqual(res, expected_candidate_chars)
243
244    def test_gen_candidate_chars(self):
245        for testcase in old_style_params_data:
246            self._assert_gen_candidate_chars(testcase)
247
248
249class TestRandomPassword(unittest.TestCase):
250    def _assert_valid_chars(self, res, chars):
251        for res_char in res:
252            self.assertIn(res_char, chars)
253
254    def test_default(self):
255        res = password.random_password()
256        self.assertEqual(len(res), password.DEFAULT_LENGTH)
257        self.assertTrue(isinstance(res, text_type))
258        self._assert_valid_chars(res, DEFAULT_CANDIDATE_CHARS)
259
260    def test_zero_length(self):
261        res = password.random_password(length=0)
262        self.assertEqual(len(res), 0)
263        self.assertTrue(isinstance(res, text_type))
264        self._assert_valid_chars(res, u',')
265
266    def test_just_a_common(self):
267        res = password.random_password(length=1, chars=u',')
268        self.assertEqual(len(res), 1)
269        self.assertEqual(res, u',')
270
271    def test_free_will(self):
272        # A Rush and Spinal Tap reference twofer
273        res = password.random_password(length=11, chars=u'a')
274        self.assertEqual(len(res), 11)
275        self.assertEqual(res, 'aaaaaaaaaaa')
276        self._assert_valid_chars(res, u'a')
277
278    def test_unicode(self):
279        res = password.random_password(length=11, chars=u'くらとみ')
280        self._assert_valid_chars(res, u'くらとみ')
281        self.assertEqual(len(res), 11)
282
283    def test_gen_password(self):
284        for testcase in old_style_params_data:
285            params = testcase['params']
286            candidate_chars = testcase['candidate_chars']
287            params_chars_spec = password._gen_candidate_chars(params['chars'])
288            password_string = password.random_password(length=params['length'],
289                                                       chars=params_chars_spec)
290            self.assertEqual(len(password_string),
291                             params['length'],
292                             msg='generated password=%s has length (%s) instead of expected length (%s)' %
293                             (password_string, len(password_string), params['length']))
294
295            for char in password_string:
296                self.assertIn(char, candidate_chars,
297                              msg='%s not found in %s from chars spect %s' %
298                              (char, candidate_chars, params['chars']))
299
300
301class TestParseContent(unittest.TestCase):
302    def test_empty_password_file(self):
303        plaintext_password, salt = password._parse_content(u'')
304        self.assertEqual(plaintext_password, u'')
305        self.assertEqual(salt, None)
306
307    def test(self):
308        expected_content = u'12345678'
309        file_content = expected_content
310        plaintext_password, salt = password._parse_content(file_content)
311        self.assertEqual(plaintext_password, expected_content)
312        self.assertEqual(salt, None)
313
314    def test_with_salt(self):
315        expected_content = u'12345678 salt=87654321'
316        file_content = expected_content
317        plaintext_password, salt = password._parse_content(file_content)
318        self.assertEqual(plaintext_password, u'12345678')
319        self.assertEqual(salt, u'87654321')
320
321
322class TestFormatContent(unittest.TestCase):
323    def test_no_encrypt(self):
324        self.assertEqual(
325            password._format_content(password=u'hunter42',
326                                     salt=u'87654321',
327                                     encrypt=False),
328            u'hunter42 salt=87654321')
329
330    def test_no_encrypt_no_salt(self):
331        self.assertEqual(
332            password._format_content(password=u'hunter42',
333                                     salt=None,
334                                     encrypt=None),
335            u'hunter42')
336
337    def test_encrypt(self):
338        self.assertEqual(
339            password._format_content(password=u'hunter42',
340                                     salt=u'87654321',
341                                     encrypt='pbkdf2_sha256'),
342            u'hunter42 salt=87654321')
343
344    def test_encrypt_no_salt(self):
345        self.assertRaises(AssertionError, password._format_content, u'hunter42', None, 'pbkdf2_sha256')
346
347
348class TestWritePasswordFile(unittest.TestCase):
349    def setUp(self):
350        self.makedirs_safe = password.makedirs_safe
351        self.os_chmod = password.os.chmod
352        password.makedirs_safe = lambda path, mode: None
353        password.os.chmod = lambda path, mode: None
354
355    def tearDown(self):
356        password.makedirs_safe = self.makedirs_safe
357        password.os.chmod = self.os_chmod
358
359    def test_content_written(self):
360
361        with patch.object(builtins, 'open', mock_open()) as m:
362            password._write_password_file(b'/this/is/a/test/caf\xc3\xa9', u'Testing Café')
363
364            m.assert_called_once_with(b'/this/is/a/test/caf\xc3\xa9', 'wb')
365            m().write.assert_called_once_with(u'Testing Café\n'.encode('utf-8'))
366
367
368class BaseTestLookupModule(unittest.TestCase):
369    def setUp(self):
370        self.fake_loader = DictDataLoader({'/path/to/somewhere': 'sdfsdf'})
371        self.password_lookup = password.LookupModule(loader=self.fake_loader)
372        self.os_path_exists = password.os.path.exists
373        self.os_open = password.os.open
374        password.os.open = lambda path, flag: None
375        self.os_close = password.os.close
376        password.os.close = lambda fd: None
377        self.os_remove = password.os.remove
378        password.os.remove = lambda path: None
379        self.makedirs_safe = password.makedirs_safe
380        password.makedirs_safe = lambda path, mode: None
381
382    def tearDown(self):
383        password.os.path.exists = self.os_path_exists
384        password.os.open = self.os_open
385        password.os.close = self.os_close
386        password.os.remove = self.os_remove
387        password.makedirs_safe = self.makedirs_safe
388
389
390class TestLookupModuleWithoutPasslib(BaseTestLookupModule):
391    @patch.object(PluginLoader, '_get_paths')
392    @patch('ansible.plugins.lookup.password._write_password_file')
393    def test_no_encrypt(self, mock_get_paths, mock_write_file):
394        mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
395
396        results = self.password_lookup.run([u'/path/to/somewhere'], None)
397
398        # FIXME: assert something useful
399        for result in results:
400            assert len(result) == password.DEFAULT_LENGTH
401            assert isinstance(result, text_type)
402
403    @patch.object(PluginLoader, '_get_paths')
404    @patch('ansible.plugins.lookup.password._write_password_file')
405    def test_password_already_created_no_encrypt(self, mock_get_paths, mock_write_file):
406        mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
407        password.os.path.exists = lambda x: x == to_bytes('/path/to/somewhere')
408
409        with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m:
410            results = self.password_lookup.run([u'/path/to/somewhere chars=anything'], None)
411
412        for result in results:
413            self.assertEqual(result, u'hunter42')
414
415    @patch.object(PluginLoader, '_get_paths')
416    @patch('ansible.plugins.lookup.password._write_password_file')
417    def test_only_a(self, mock_get_paths, mock_write_file):
418        mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
419
420        results = self.password_lookup.run([u'/path/to/somewhere chars=a'], None)
421        for result in results:
422            self.assertEqual(result, u'a' * password.DEFAULT_LENGTH)
423
424    @patch('time.sleep')
425    def test_lock_been_held(self, mock_sleep):
426        # pretend the lock file is here
427        password.os.path.exists = lambda x: True
428        try:
429            with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m:
430                # should timeout here
431                results = self.password_lookup.run([u'/path/to/somewhere chars=anything'], None)
432                self.fail("Lookup didn't timeout when lock already been held")
433        except AnsibleError:
434            pass
435
436    def test_lock_not_been_held(self):
437        # pretend now there is password file but no lock
438        password.os.path.exists = lambda x: x == to_bytes('/path/to/somewhere')
439        try:
440            with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m:
441                # should not timeout here
442                results = self.password_lookup.run([u'/path/to/somewhere chars=anything'], None)
443        except AnsibleError:
444            self.fail('Lookup timeouts when lock is free')
445
446        for result in results:
447            self.assertEqual(result, u'hunter42')
448
449
450@pytest.mark.skipif(passlib is None, reason='passlib must be installed to run these tests')
451class TestLookupModuleWithPasslib(BaseTestLookupModule):
452    def setUp(self):
453        super(TestLookupModuleWithPasslib, self).setUp()
454
455        # Different releases of passlib default to a different number of rounds
456        self.sha256 = passlib.registry.get_crypt_handler('pbkdf2_sha256')
457        sha256_for_tests = pbkdf2.create_pbkdf2_hash("sha256", 32, 20000)
458        passlib.registry.register_crypt_handler(sha256_for_tests, force=True)
459
460    def tearDown(self):
461        super(TestLookupModuleWithPasslib, self).tearDown()
462
463        passlib.registry.register_crypt_handler(self.sha256, force=True)
464
465    @patch.object(PluginLoader, '_get_paths')
466    @patch('ansible.plugins.lookup.password._write_password_file')
467    def test_encrypt(self, mock_get_paths, mock_write_file):
468        mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
469
470        results = self.password_lookup.run([u'/path/to/somewhere encrypt=pbkdf2_sha256'], None)
471
472        # pbkdf2 format plus hash
473        expected_password_length = 76
474
475        for result in results:
476            self.assertEqual(len(result), expected_password_length)
477            # result should have 5 parts split by '$'
478            str_parts = result.split('$', 5)
479
480            # verify the result is parseable by the passlib
481            crypt_parts = passlib.hash.pbkdf2_sha256.parsehash(result)
482
483            # verify it used the right algo type
484            self.assertEqual(str_parts[1], 'pbkdf2-sha256')
485
486            self.assertEqual(len(str_parts), 5)
487
488            # verify the string and parsehash agree on the number of rounds
489            self.assertEqual(int(str_parts[2]), crypt_parts['rounds'])
490            self.assertIsInstance(result, text_type)
491
492    @patch.object(PluginLoader, '_get_paths')
493    @patch('ansible.plugins.lookup.password._write_password_file')
494    def test_password_already_created_encrypt(self, mock_get_paths, mock_write_file):
495        mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
496        password.os.path.exists = lambda x: x == to_bytes('/path/to/somewhere')
497
498        with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m:
499            results = self.password_lookup.run([u'/path/to/somewhere chars=anything encrypt=pbkdf2_sha256'], None)
500        for result in results:
501            self.assertEqual(result, u'$pbkdf2-sha256$20000$ODc2NTQzMjE$Uikde0cv0BKaRaAXMrUQB.zvG4GmnjClwjghwIRf2gU')
502