1# -*- coding: utf-8 -*- 2# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com> 3# 4# This file is part of Ansible 5# 6# Ansible is free software: you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation, either version 3 of the License, or 9# (at your option) any later version. 10# 11# Ansible is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with Ansible. If not, see <http://www.gnu.org/licenses/>. 18 19# Make coding more python3-ish 20from __future__ import (absolute_import, division, print_function) 21__metaclass__ = type 22 23try: 24 import passlib 25 from passlib.handlers import pbkdf2 26except ImportError: 27 passlib = None 28 pbkdf2 = None 29 30import pytest 31 32from units.mock.loader import DictDataLoader 33 34from units.compat import unittest 35from units.compat.mock import mock_open, patch 36from ansible.errors import AnsibleError 37from ansible.module_utils.six import text_type 38from ansible.module_utils.six.moves import builtins 39from ansible.module_utils._text import to_bytes 40from ansible.plugins.loader import PluginLoader 41from ansible.plugins.lookup import password 42 43 44DEFAULT_CHARS = sorted([u'ascii_letters', u'digits', u".,:-_"]) 45DEFAULT_CANDIDATE_CHARS = u'.,:-_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' 46 47# Currently there isn't a new-style 48old_style_params_data = ( 49 # Simple case 50 dict( 51 term=u'/path/to/file', 52 filename=u'/path/to/file', 53 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), 54 candidate_chars=DEFAULT_CANDIDATE_CHARS, 55 ), 56 57 # Special characters in path 58 dict( 59 term=u'/path/with/embedded spaces and/file', 60 filename=u'/path/with/embedded spaces and/file', 61 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), 62 candidate_chars=DEFAULT_CANDIDATE_CHARS, 63 ), 64 dict( 65 term=u'/path/with/equals/cn=com.ansible', 66 filename=u'/path/with/equals/cn=com.ansible', 67 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), 68 candidate_chars=DEFAULT_CANDIDATE_CHARS, 69 ), 70 dict( 71 term=u'/path/with/unicode/くらとみ/file', 72 filename=u'/path/with/unicode/くらとみ/file', 73 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), 74 candidate_chars=DEFAULT_CANDIDATE_CHARS, 75 ), 76 # Mix several special chars 77 dict( 78 term=u'/path/with/utf 8 and spaces/くらとみ/file', 79 filename=u'/path/with/utf 8 and spaces/くらとみ/file', 80 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), 81 candidate_chars=DEFAULT_CANDIDATE_CHARS, 82 ), 83 dict( 84 term=u'/path/with/encoding=unicode/くらとみ/file', 85 filename=u'/path/with/encoding=unicode/くらとみ/file', 86 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), 87 candidate_chars=DEFAULT_CANDIDATE_CHARS, 88 ), 89 dict( 90 term=u'/path/with/encoding=unicode/くらとみ/and spaces file', 91 filename=u'/path/with/encoding=unicode/くらとみ/and spaces file', 92 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), 93 candidate_chars=DEFAULT_CANDIDATE_CHARS, 94 ), 95 96 # Simple parameters 97 dict( 98 term=u'/path/to/file length=42', 99 filename=u'/path/to/file', 100 params=dict(length=42, encrypt=None, chars=DEFAULT_CHARS), 101 candidate_chars=DEFAULT_CANDIDATE_CHARS, 102 ), 103 dict( 104 term=u'/path/to/file encrypt=pbkdf2_sha256', 105 filename=u'/path/to/file', 106 params=dict(length=password.DEFAULT_LENGTH, encrypt='pbkdf2_sha256', chars=DEFAULT_CHARS), 107 candidate_chars=DEFAULT_CANDIDATE_CHARS, 108 ), 109 dict( 110 term=u'/path/to/file chars=abcdefghijklmnop', 111 filename=u'/path/to/file', 112 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abcdefghijklmnop']), 113 candidate_chars=u'abcdefghijklmnop', 114 ), 115 dict( 116 term=u'/path/to/file chars=digits,abc,def', 117 filename=u'/path/to/file', 118 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'abc', u'def'])), 119 candidate_chars=u'abcdef0123456789', 120 ), 121 122 # Including comma in chars 123 dict( 124 term=u'/path/to/file chars=abcdefghijklmnop,,digits', 125 filename=u'/path/to/file', 126 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'abcdefghijklmnop', u',', u'digits'])), 127 candidate_chars=u',abcdefghijklmnop0123456789', 128 ), 129 dict( 130 term=u'/path/to/file chars=,,', 131 filename=u'/path/to/file', 132 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u',']), 133 candidate_chars=u',', 134 ), 135 136 # Including = in chars 137 dict( 138 term=u'/path/to/file chars=digits,=,,', 139 filename=u'/path/to/file', 140 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'=', u','])), 141 candidate_chars=u',=0123456789', 142 ), 143 dict( 144 term=u'/path/to/file chars=digits,abc=def', 145 filename=u'/path/to/file', 146 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'abc=def'])), 147 candidate_chars=u'abc=def0123456789', 148 ), 149 150 # Including unicode in chars 151 dict( 152 term=u'/path/to/file chars=digits,くらとみ,,', 153 filename=u'/path/to/file', 154 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'くらとみ', u','])), 155 candidate_chars=u',0123456789くらとみ', 156 ), 157 # Including only unicode in chars 158 dict( 159 term=u'/path/to/file chars=くらとみ', 160 filename=u'/path/to/file', 161 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'くらとみ'])), 162 candidate_chars=u'くらとみ', 163 ), 164 165 # Include ':' in path 166 dict( 167 term=u'/path/to/file_with:colon chars=ascii_letters,digits', 168 filename=u'/path/to/file_with:colon', 169 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'ascii_letters', u'digits'])), 170 candidate_chars=u'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', 171 ), 172 173 # Including special chars in both path and chars 174 # Special characters in path 175 dict( 176 term=u'/path/with/embedded spaces and/file chars=abc=def', 177 filename=u'/path/with/embedded spaces and/file', 178 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']), 179 candidate_chars=u'abc=def', 180 ), 181 dict( 182 term=u'/path/with/equals/cn=com.ansible chars=abc=def', 183 filename=u'/path/with/equals/cn=com.ansible', 184 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']), 185 candidate_chars=u'abc=def', 186 ), 187 dict( 188 term=u'/path/with/unicode/くらとみ/file chars=くらとみ', 189 filename=u'/path/with/unicode/くらとみ/file', 190 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']), 191 candidate_chars=u'くらとみ', 192 ), 193) 194 195 196class TestParseParameters(unittest.TestCase): 197 def test(self): 198 for testcase in old_style_params_data: 199 filename, params = password._parse_parameters(testcase['term']) 200 params['chars'].sort() 201 self.assertEqual(filename, testcase['filename']) 202 self.assertEqual(params, testcase['params']) 203 204 def test_unrecognized_value(self): 205 testcase = dict(term=u'/path/to/file chars=くらとみi sdfsdf', 206 filename=u'/path/to/file', 207 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']), 208 candidate_chars=u'くらとみ') 209 self.assertRaises(AnsibleError, password._parse_parameters, testcase['term']) 210 211 def test_invalid_params(self): 212 testcase = dict(term=u'/path/to/file chars=くらとみi somethign_invalid=123', 213 filename=u'/path/to/file', 214 params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']), 215 candidate_chars=u'くらとみ') 216 self.assertRaises(AnsibleError, password._parse_parameters, testcase['term']) 217 218 219class TestReadPasswordFile(unittest.TestCase): 220 def setUp(self): 221 self.os_path_exists = password.os.path.exists 222 223 def tearDown(self): 224 password.os.path.exists = self.os_path_exists 225 226 def test_no_password_file(self): 227 password.os.path.exists = lambda x: False 228 self.assertEqual(password._read_password_file(b'/nonexistent'), None) 229 230 def test_with_password_file(self): 231 password.os.path.exists = lambda x: True 232 with patch.object(builtins, 'open', mock_open(read_data=b'Testing\n')) as m: 233 self.assertEqual(password._read_password_file(b'/etc/motd'), u'Testing') 234 235 236class TestGenCandidateChars(unittest.TestCase): 237 def _assert_gen_candidate_chars(self, testcase): 238 expected_candidate_chars = testcase['candidate_chars'] 239 params = testcase['params'] 240 chars_spec = params['chars'] 241 res = password._gen_candidate_chars(chars_spec) 242 self.assertEqual(res, expected_candidate_chars) 243 244 def test_gen_candidate_chars(self): 245 for testcase in old_style_params_data: 246 self._assert_gen_candidate_chars(testcase) 247 248 249class TestRandomPassword(unittest.TestCase): 250 def _assert_valid_chars(self, res, chars): 251 for res_char in res: 252 self.assertIn(res_char, chars) 253 254 def test_default(self): 255 res = password.random_password() 256 self.assertEqual(len(res), password.DEFAULT_LENGTH) 257 self.assertTrue(isinstance(res, text_type)) 258 self._assert_valid_chars(res, DEFAULT_CANDIDATE_CHARS) 259 260 def test_zero_length(self): 261 res = password.random_password(length=0) 262 self.assertEqual(len(res), 0) 263 self.assertTrue(isinstance(res, text_type)) 264 self._assert_valid_chars(res, u',') 265 266 def test_just_a_common(self): 267 res = password.random_password(length=1, chars=u',') 268 self.assertEqual(len(res), 1) 269 self.assertEqual(res, u',') 270 271 def test_free_will(self): 272 # A Rush and Spinal Tap reference twofer 273 res = password.random_password(length=11, chars=u'a') 274 self.assertEqual(len(res), 11) 275 self.assertEqual(res, 'aaaaaaaaaaa') 276 self._assert_valid_chars(res, u'a') 277 278 def test_unicode(self): 279 res = password.random_password(length=11, chars=u'くらとみ') 280 self._assert_valid_chars(res, u'くらとみ') 281 self.assertEqual(len(res), 11) 282 283 def test_gen_password(self): 284 for testcase in old_style_params_data: 285 params = testcase['params'] 286 candidate_chars = testcase['candidate_chars'] 287 params_chars_spec = password._gen_candidate_chars(params['chars']) 288 password_string = password.random_password(length=params['length'], 289 chars=params_chars_spec) 290 self.assertEqual(len(password_string), 291 params['length'], 292 msg='generated password=%s has length (%s) instead of expected length (%s)' % 293 (password_string, len(password_string), params['length'])) 294 295 for char in password_string: 296 self.assertIn(char, candidate_chars, 297 msg='%s not found in %s from chars spect %s' % 298 (char, candidate_chars, params['chars'])) 299 300 301class TestParseContent(unittest.TestCase): 302 def test_empty_password_file(self): 303 plaintext_password, salt = password._parse_content(u'') 304 self.assertEqual(plaintext_password, u'') 305 self.assertEqual(salt, None) 306 307 def test(self): 308 expected_content = u'12345678' 309 file_content = expected_content 310 plaintext_password, salt = password._parse_content(file_content) 311 self.assertEqual(plaintext_password, expected_content) 312 self.assertEqual(salt, None) 313 314 def test_with_salt(self): 315 expected_content = u'12345678 salt=87654321' 316 file_content = expected_content 317 plaintext_password, salt = password._parse_content(file_content) 318 self.assertEqual(plaintext_password, u'12345678') 319 self.assertEqual(salt, u'87654321') 320 321 322class TestFormatContent(unittest.TestCase): 323 def test_no_encrypt(self): 324 self.assertEqual( 325 password._format_content(password=u'hunter42', 326 salt=u'87654321', 327 encrypt=False), 328 u'hunter42 salt=87654321') 329 330 def test_no_encrypt_no_salt(self): 331 self.assertEqual( 332 password._format_content(password=u'hunter42', 333 salt=None, 334 encrypt=None), 335 u'hunter42') 336 337 def test_encrypt(self): 338 self.assertEqual( 339 password._format_content(password=u'hunter42', 340 salt=u'87654321', 341 encrypt='pbkdf2_sha256'), 342 u'hunter42 salt=87654321') 343 344 def test_encrypt_no_salt(self): 345 self.assertRaises(AssertionError, password._format_content, u'hunter42', None, 'pbkdf2_sha256') 346 347 348class TestWritePasswordFile(unittest.TestCase): 349 def setUp(self): 350 self.makedirs_safe = password.makedirs_safe 351 self.os_chmod = password.os.chmod 352 password.makedirs_safe = lambda path, mode: None 353 password.os.chmod = lambda path, mode: None 354 355 def tearDown(self): 356 password.makedirs_safe = self.makedirs_safe 357 password.os.chmod = self.os_chmod 358 359 def test_content_written(self): 360 361 with patch.object(builtins, 'open', mock_open()) as m: 362 password._write_password_file(b'/this/is/a/test/caf\xc3\xa9', u'Testing Café') 363 364 m.assert_called_once_with(b'/this/is/a/test/caf\xc3\xa9', 'wb') 365 m().write.assert_called_once_with(u'Testing Café\n'.encode('utf-8')) 366 367 368class BaseTestLookupModule(unittest.TestCase): 369 def setUp(self): 370 self.fake_loader = DictDataLoader({'/path/to/somewhere': 'sdfsdf'}) 371 self.password_lookup = password.LookupModule(loader=self.fake_loader) 372 self.os_path_exists = password.os.path.exists 373 self.os_open = password.os.open 374 password.os.open = lambda path, flag: None 375 self.os_close = password.os.close 376 password.os.close = lambda fd: None 377 self.os_remove = password.os.remove 378 password.os.remove = lambda path: None 379 self.makedirs_safe = password.makedirs_safe 380 password.makedirs_safe = lambda path, mode: None 381 382 def tearDown(self): 383 password.os.path.exists = self.os_path_exists 384 password.os.open = self.os_open 385 password.os.close = self.os_close 386 password.os.remove = self.os_remove 387 password.makedirs_safe = self.makedirs_safe 388 389 390class TestLookupModuleWithoutPasslib(BaseTestLookupModule): 391 @patch.object(PluginLoader, '_get_paths') 392 @patch('ansible.plugins.lookup.password._write_password_file') 393 def test_no_encrypt(self, mock_get_paths, mock_write_file): 394 mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three'] 395 396 results = self.password_lookup.run([u'/path/to/somewhere'], None) 397 398 # FIXME: assert something useful 399 for result in results: 400 assert len(result) == password.DEFAULT_LENGTH 401 assert isinstance(result, text_type) 402 403 @patch.object(PluginLoader, '_get_paths') 404 @patch('ansible.plugins.lookup.password._write_password_file') 405 def test_password_already_created_no_encrypt(self, mock_get_paths, mock_write_file): 406 mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three'] 407 password.os.path.exists = lambda x: x == to_bytes('/path/to/somewhere') 408 409 with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m: 410 results = self.password_lookup.run([u'/path/to/somewhere chars=anything'], None) 411 412 for result in results: 413 self.assertEqual(result, u'hunter42') 414 415 @patch.object(PluginLoader, '_get_paths') 416 @patch('ansible.plugins.lookup.password._write_password_file') 417 def test_only_a(self, mock_get_paths, mock_write_file): 418 mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three'] 419 420 results = self.password_lookup.run([u'/path/to/somewhere chars=a'], None) 421 for result in results: 422 self.assertEqual(result, u'a' * password.DEFAULT_LENGTH) 423 424 @patch('time.sleep') 425 def test_lock_been_held(self, mock_sleep): 426 # pretend the lock file is here 427 password.os.path.exists = lambda x: True 428 try: 429 with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m: 430 # should timeout here 431 results = self.password_lookup.run([u'/path/to/somewhere chars=anything'], None) 432 self.fail("Lookup didn't timeout when lock already been held") 433 except AnsibleError: 434 pass 435 436 def test_lock_not_been_held(self): 437 # pretend now there is password file but no lock 438 password.os.path.exists = lambda x: x == to_bytes('/path/to/somewhere') 439 try: 440 with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m: 441 # should not timeout here 442 results = self.password_lookup.run([u'/path/to/somewhere chars=anything'], None) 443 except AnsibleError: 444 self.fail('Lookup timeouts when lock is free') 445 446 for result in results: 447 self.assertEqual(result, u'hunter42') 448 449 450@pytest.mark.skipif(passlib is None, reason='passlib must be installed to run these tests') 451class TestLookupModuleWithPasslib(BaseTestLookupModule): 452 def setUp(self): 453 super(TestLookupModuleWithPasslib, self).setUp() 454 455 # Different releases of passlib default to a different number of rounds 456 self.sha256 = passlib.registry.get_crypt_handler('pbkdf2_sha256') 457 sha256_for_tests = pbkdf2.create_pbkdf2_hash("sha256", 32, 20000) 458 passlib.registry.register_crypt_handler(sha256_for_tests, force=True) 459 460 def tearDown(self): 461 super(TestLookupModuleWithPasslib, self).tearDown() 462 463 passlib.registry.register_crypt_handler(self.sha256, force=True) 464 465 @patch.object(PluginLoader, '_get_paths') 466 @patch('ansible.plugins.lookup.password._write_password_file') 467 def test_encrypt(self, mock_get_paths, mock_write_file): 468 mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three'] 469 470 results = self.password_lookup.run([u'/path/to/somewhere encrypt=pbkdf2_sha256'], None) 471 472 # pbkdf2 format plus hash 473 expected_password_length = 76 474 475 for result in results: 476 self.assertEqual(len(result), expected_password_length) 477 # result should have 5 parts split by '$' 478 str_parts = result.split('$', 5) 479 480 # verify the result is parseable by the passlib 481 crypt_parts = passlib.hash.pbkdf2_sha256.parsehash(result) 482 483 # verify it used the right algo type 484 self.assertEqual(str_parts[1], 'pbkdf2-sha256') 485 486 self.assertEqual(len(str_parts), 5) 487 488 # verify the string and parsehash agree on the number of rounds 489 self.assertEqual(int(str_parts[2]), crypt_parts['rounds']) 490 self.assertIsInstance(result, text_type) 491 492 @patch.object(PluginLoader, '_get_paths') 493 @patch('ansible.plugins.lookup.password._write_password_file') 494 def test_password_already_created_encrypt(self, mock_get_paths, mock_write_file): 495 mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three'] 496 password.os.path.exists = lambda x: x == to_bytes('/path/to/somewhere') 497 498 with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m: 499 results = self.password_lookup.run([u'/path/to/somewhere chars=anything encrypt=pbkdf2_sha256'], None) 500 for result in results: 501 self.assertEqual(result, u'$pbkdf2-sha256$20000$ODc2NTQzMjE$Uikde0cv0BKaRaAXMrUQB.zvG4GmnjClwjghwIRf2gU') 502