1#!/usr/bin/env python 2 3# test_pynslcd_cache.py - tests for the pynslcd caching functionality 4# 5# Copyright (C) 2013-2019 Arthur de Jong 6# 7# This library is free software; you can redistribute it and/or 8# modify it under the terms of the GNU Lesser General Public 9# License as published by the Free Software Foundation; either 10# version 2.1 of the License, or (at your option) any later version. 11# 12# This library is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15# Lesser General Public License for more details. 16# 17# You should have received a copy of the GNU Lesser General Public 18# License along with this library; if not, write to the Free Software 19# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 20# 02110-1301 USA 21 22import os 23import os.path 24import sys 25import unittest 26 27# fix the Python path 28sys.path.insert(1, os.path.abspath(os.path.join(sys.path[0], '..', 'pynslcd'))) 29sys.path.insert(2, os.path.abspath(os.path.join('..', 'pynslcd'))) 30 31 32# TODO: think about case-sensitivity of cache searches (have tests for that) 33 34 35class TestAlias(unittest.TestCase): 36 37 def setUp(self): 38 import alias 39 cache = alias.Cache() 40 cache.store('alias1', ['member1', 'member2']) 41 cache.store('alias2', ['member1', 'member3']) 42 cache.store('alias3', []) 43 self.cache = cache 44 if not hasattr(self, 'assertItemsEqual'): 45 self.assertItemsEqual = self.assertCountEqual 46 47 def test_by_name(self): 48 self.assertItemsEqual( 49 self.cache.retrieve(dict(cn='alias1')), 50 [ 51 ['alias1', ['member1', 'member2']], 52 ]) 53 54 def test_by_member(self): 55 self.assertItemsEqual( 56 self.cache.retrieve(dict(rfc822MailMember='member1')), 57 [ 58 ['alias1', ['member1', 'member2']], 59 ['alias2', ['member1', 'member3']], 60 ]) 61 62 def test_all(self): 63 self.assertItemsEqual( 64 self.cache.retrieve({}), 65 [ 66 ['alias1', ['member1', 'member2']], 67 ['alias2', ['member1', 'member3']], 68 ['alias3', []], 69 ]) 70 71 72class TestEther(unittest.TestCase): 73 74 def setUp(self): 75 import ether 76 cache = ether.Cache() 77 cache.store('name1', '0:18:8a:54:1a:11') 78 cache.store('name2', '0:18:8a:54:1a:22') 79 self.cache = cache 80 if not hasattr(self, 'assertItemsEqual'): 81 self.assertItemsEqual = self.assertCountEqual 82 83 def test_by_name(self): 84 self.assertItemsEqual( 85 self.cache.retrieve(dict(cn='name1')), 86 [ 87 ['name1', '0:18:8a:54:1a:11'], 88 ]) 89 self.assertItemsEqual( 90 self.cache.retrieve(dict(cn='name2')), 91 [ 92 ['name2', '0:18:8a:54:1a:22'], 93 ]) 94 95 def test_by_ether(self): 96 # ideally we should also support alternate representations 97 self.assertItemsEqual( 98 self.cache.retrieve(dict(macAddress='0:18:8a:54:1a:22')), 99 [ 100 ['name2', '0:18:8a:54:1a:22'], 101 ]) 102 103 def test_all(self): 104 self.assertItemsEqual( 105 self.cache.retrieve({}), 106 [ 107 ['name1', '0:18:8a:54:1a:11'], 108 ['name2', '0:18:8a:54:1a:22'], 109 ]) 110 111 112class TestGroup(unittest.TestCase): 113 114 def setUp(self): 115 import group 116 cache = group.Cache() 117 cache.store('group1', 'pass1', 10, ['user1', 'user2']) 118 cache.store('group2', 'pass2', 20, ['user1', 'user2', 'user3']) 119 cache.store('group3', 'pass3', 30, []) 120 cache.store('group4', 'pass4', 40, ['user2']) 121 self.cache = cache 122 if not hasattr(self, 'assertItemsEqual'): 123 self.assertItemsEqual = self.assertCountEqual 124 125 def test_by_name(self): 126 self.assertItemsEqual( 127 self.cache.retrieve(dict(cn='group1')), 128 [ 129 ['group1', 'pass1', 10, ['user1', 'user2']], 130 ]) 131 self.assertItemsEqual( 132 self.cache.retrieve(dict(cn='group3')), 133 [ 134 ['group3', 'pass3', 30, []], 135 ]) 136 137 def test_by_gid(self): 138 self.assertItemsEqual( 139 self.cache.retrieve(dict(gidNumber=10)), 140 [ 141 ['group1', 'pass1', 10, ['user1', 'user2']], 142 ]) 143 self.assertItemsEqual( 144 self.cache.retrieve(dict(gidNumber=40)), 145 [ 146 ['group4', 'pass4', 40, ['user2']], 147 ]) 148 149 def test_all(self): 150 self.assertItemsEqual( 151 self.cache.retrieve({}), 152 [ 153 ['group1', 'pass1', 10, ['user1', 'user2']], 154 ['group2', 'pass2', 20, ['user1', 'user2', 'user3']], 155 ['group3', 'pass3', 30, []], 156 ['group4', 'pass4', 40, ['user2']], 157 ]) 158 159 def test_bymember(self): 160 self.assertItemsEqual( 161 self.cache.retrieve(dict(memberUid='user1')), 162 [ 163 ['group1', 'pass1', 10, ['user1', 'user2']], 164 ['group2', 'pass2', 20, ['user1', 'user2', 'user3']], 165 ]) 166 self.assertItemsEqual( 167 self.cache.retrieve(dict(memberUid='user2')), 168 [ 169 ['group1', 'pass1', 10, ['user1', 'user2']], 170 ['group2', 'pass2', 20, ['user1', 'user2', 'user3']], 171 ['group4', 'pass4', 40, ['user2']], 172 ]) 173 self.assertItemsEqual( 174 self.cache.retrieve(dict(memberUid='user3')), 175 [ 176 ['group2', 'pass2', 20, ['user1', 'user2', 'user3']], 177 ]) 178 179 180class TestHost(unittest.TestCase): 181 182 def setUp(self): 183 import host 184 cache = host.Cache() 185 cache.store('hostname1', [], ['127.0.0.1']) 186 cache.store('hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']) 187 self.cache = cache 188 if not hasattr(self, 'assertItemsEqual'): 189 self.assertItemsEqual = self.assertCountEqual 190 191 def test_by_name(self): 192 self.assertItemsEqual( 193 self.cache.retrieve(dict(cn='hostname1')), 194 [ 195 ['hostname1', [], ['127.0.0.1']], 196 ]) 197 self.assertItemsEqual( 198 self.cache.retrieve(dict(cn='hostname2')), 199 [ 200 ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']], 201 ]) 202 203 def test_by_alias(self): 204 self.assertItemsEqual( 205 self.cache.retrieve(dict(cn='alias1')), 206 [ 207 ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']], 208 ]) 209 self.assertItemsEqual( 210 self.cache.retrieve(dict(cn='alias2')), 211 [ 212 ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']], 213 ]) 214 215 def test_by_address(self): 216 self.assertItemsEqual( 217 self.cache.retrieve(dict(ipHostNumber='127.0.0.3')), 218 [ 219 ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']], 220 ]) 221 222 223class TestNetgroup(unittest.TestCase): 224 225 def setUp(self): 226 import netgroup 227 cache = netgroup.Cache() 228 cache.store( 229 'netgroup1', 230 ['(host1, user1,)', '(host1, user2,)', '(host2, user1,)'], 231 ['netgroup2']) 232 cache.store( 233 'netgroup2', ['(host3, user1,)', '(host3, user3,)'], []) 234 self.cache = cache 235 if not hasattr(self, 'assertItemsEqual'): 236 self.assertItemsEqual = self.assertCountEqual 237 238 def test_by_name(self): 239 self.assertItemsEqual( 240 self.cache.retrieve(dict(cn='netgroup1')), 241 [ 242 [ 243 'netgroup1', 244 ['(host1, user1,)', '(host1, user2,)', '(host2, user1,)'], 245 ['netgroup2'], 246 ], 247 ]) 248 self.assertItemsEqual( 249 self.cache.retrieve(dict(cn='netgroup2')), 250 [ 251 ['netgroup2', ['(host3, user1,)', '(host3, user3,)'], []], 252 ]) 253 254 255class TestNetwork(unittest.TestCase): 256 257 def setUp(self): 258 import network 259 cache = network.Cache() 260 cache.store('networkname1', [], ['127.0.0.1']) 261 cache.store('networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']) 262 self.cache = cache 263 if not hasattr(self, 'assertItemsEqual'): 264 self.assertItemsEqual = self.assertCountEqual 265 266 def test_by_name(self): 267 self.assertItemsEqual( 268 self.cache.retrieve(dict(cn='networkname1')), 269 [ 270 ['networkname1', [], ['127.0.0.1']], 271 ]) 272 self.assertItemsEqual( 273 self.cache.retrieve(dict(cn='networkname2')), 274 [ 275 ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']], 276 ]) 277 278 def test_by_alias(self): 279 self.assertItemsEqual( 280 self.cache.retrieve(dict(cn='alias1')), 281 [ 282 ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']], 283 ]) 284 self.assertItemsEqual( 285 self.cache.retrieve(dict(cn='alias2')), 286 [ 287 ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']], 288 ]) 289 290 def test_by_address(self): 291 self.assertItemsEqual( 292 self.cache.retrieve(dict(ipNetworkNumber='127.0.0.3')), 293 [ 294 ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']], 295 ]) 296 297 298class TestPasswd(unittest.TestCase): 299 300 def setUp(self): 301 import passwd 302 cache = passwd.Cache() 303 cache.store('name', 'passwd', 100, 200, 'gecos', '/home/user', '/bin/bash') 304 cache.store('name2', 'passwd2', 101, 202, 'gecos2', '/home/user2', '/bin/bash') 305 self.cache = cache 306 if not hasattr(self, 'assertItemsEqual'): 307 self.assertItemsEqual = self.assertCountEqual 308 309 def test_by_name(self): 310 self.assertItemsEqual( 311 self.cache.retrieve(dict(uid='name')), 312 [ 313 [u'name', u'passwd', 100, 200, u'gecos', u'/home/user', u'/bin/bash'], 314 ]) 315 316 def test_by_unknown_name(self): 317 self.assertItemsEqual( 318 self.cache.retrieve(dict(uid='notfound')), 319 []) 320 321 def test_by_number(self): 322 self.assertItemsEqual( 323 self.cache.retrieve(dict(uidNumber=100)), 324 [ 325 [u'name', u'passwd', 100, 200, u'gecos', u'/home/user', u'/bin/bash'], 326 ]) 327 self.assertItemsEqual( 328 self.cache.retrieve(dict(uidNumber=101)), 329 [ 330 ['name2', 'passwd2', 101, 202, 'gecos2', '/home/user2', '/bin/bash'], 331 ]) 332 333 def test_all(self): 334 self.assertItemsEqual( 335 self.cache.retrieve({}), 336 [ 337 [u'name', u'passwd', 100, 200, u'gecos', u'/home/user', u'/bin/bash'], 338 [u'name2', u'passwd2', 101, 202, u'gecos2', u'/home/user2', u'/bin/bash'], 339 ]) 340 341 342class TestProtocol(unittest.TestCase): 343 344 def setUp(self): 345 import protocol 346 cache = protocol.Cache() 347 cache.store('protocol1', ['alias1', 'alias2'], 100) 348 cache.store('protocol2', ['alias3'], 200) 349 cache.store('protocol3', [], 300) 350 self.cache = cache 351 if not hasattr(self, 'assertItemsEqual'): 352 self.assertItemsEqual = self.assertCountEqual 353 354 def test_by_name(self): 355 self.assertItemsEqual( 356 self.cache.retrieve(dict(cn='protocol1')), 357 [ 358 ['protocol1', ['alias1', 'alias2'], 100], 359 ]) 360 self.assertItemsEqual( 361 self.cache.retrieve(dict(cn='protocol2')), 362 [ 363 ['protocol2', ['alias3'], 200], 364 ]) 365 self.assertItemsEqual( 366 self.cache.retrieve(dict(cn='protocol3')), 367 [ 368 ['protocol3', [], 300], 369 ]) 370 371 def test_by_unknown_name(self): 372 self.assertItemsEqual( 373 self.cache.retrieve(dict(cn='notfound')), 374 []) 375 376 def test_by_number(self): 377 self.assertItemsEqual( 378 self.cache.retrieve(dict(ipProtocolNumber=100)), 379 [ 380 ['protocol1', ['alias1', 'alias2'], 100], 381 ]) 382 self.assertItemsEqual( 383 self.cache.retrieve(dict(ipProtocolNumber=200)), 384 [ 385 ['protocol2', ['alias3'], 200], 386 ]) 387 388 def test_by_alias(self): 389 self.assertItemsEqual( 390 self.cache.retrieve(dict(cn='alias1')), 391 [ 392 ['protocol1', ['alias1', 'alias2'], 100], 393 ]) 394 self.assertItemsEqual( 395 self.cache.retrieve(dict(cn='alias3')), 396 [ 397 ['protocol2', ['alias3'], 200], 398 ]) 399 400 def test_all(self): 401 self.assertItemsEqual( 402 self.cache.retrieve({}), 403 [ 404 ['protocol1', ['alias1', 'alias2'], 100], 405 ['protocol2', ['alias3'], 200], 406 ['protocol3', [], 300], 407 ]) 408 409 410class TestRpc(unittest.TestCase): 411 412 def setUp(self): 413 import rpc 414 cache = rpc.Cache() 415 cache.store('rpc1', ['alias1', 'alias2'], 100) 416 cache.store('rpc2', ['alias3'], 200) 417 cache.store('rpc3', [], 300) 418 self.cache = cache 419 if not hasattr(self, 'assertItemsEqual'): 420 self.assertItemsEqual = self.assertCountEqual 421 422 def test_by_name(self): 423 self.assertItemsEqual( 424 self.cache.retrieve(dict(cn='rpc1')), 425 [ 426 ['rpc1', ['alias1', 'alias2'], 100], 427 ]) 428 self.assertItemsEqual( 429 self.cache.retrieve(dict(cn='rpc2')), 430 [ 431 ['rpc2', ['alias3'], 200], 432 ]) 433 self.assertItemsEqual( 434 self.cache.retrieve(dict(cn='rpc3')), 435 [ 436 ['rpc3', [], 300], 437 ]) 438 439 def test_by_unknown_name(self): 440 self.assertItemsEqual( 441 self.cache.retrieve(dict(cn='notfound')), 442 []) 443 444 def test_by_number(self): 445 self.assertItemsEqual( 446 self.cache.retrieve(dict(oncRpcNumber=100)), 447 [ 448 ['rpc1', ['alias1', 'alias2'], 100], 449 ]) 450 self.assertItemsEqual( 451 self.cache.retrieve(dict(oncRpcNumber=200)), 452 [ 453 ['rpc2', ['alias3'], 200], 454 ]) 455 456 def test_by_alias(self): 457 self.assertItemsEqual( 458 self.cache.retrieve(dict(cn='alias1')), 459 [ 460 ['rpc1', ['alias1', 'alias2'], 100], 461 ]) 462 self.assertItemsEqual( 463 self.cache.retrieve(dict(cn='alias3')), 464 [ 465 ['rpc2', ['alias3'], 200], 466 ]) 467 468 def test_all(self): 469 self.assertItemsEqual( 470 self.cache.retrieve({}), 471 [ 472 ['rpc1', ['alias1', 'alias2'], 100], 473 ['rpc2', ['alias3'], 200], 474 ['rpc3', [], 300], 475 ]) 476 477 478class TestService(unittest.TestCase): 479 480 def setUp(self): 481 import service 482 cache = service.Cache() 483 cache.store('service1', ['alias1', 'alias2'], 100, 'tcp') 484 cache.store('service1', ['alias1', 'alias2'], 100, 'udp') 485 cache.store('service2', ['alias3'], 200, 'udp') 486 cache.store('service3', [], 300, 'udp') 487 self.cache = cache 488 if not hasattr(self, 'assertItemsEqual'): 489 self.assertItemsEqual = self.assertCountEqual 490 491 def test_by_name(self): 492 self.assertItemsEqual( 493 self.cache.retrieve(dict(cn='service1')), 494 [ 495 ['service1', ['alias1', 'alias2'], 100, 'tcp'], 496 ['service1', ['alias1', 'alias2'], 100, 'udp'], 497 ]) 498 self.assertItemsEqual( 499 self.cache.retrieve(dict(cn='service2')), 500 [ 501 ['service2', ['alias3'], 200, 'udp'], 502 ]) 503 self.assertItemsEqual( 504 self.cache.retrieve(dict(cn='service3')), 505 [ 506 ['service3', [], 300, 'udp'], 507 ]) 508 509 def test_by_name_and_protocol(self): 510 self.assertItemsEqual( 511 self.cache.retrieve(dict(cn='service1', ipServiceProtocol='udp')), 512 [ 513 ['service1', ['alias1', 'alias2'], 100, 'udp'], 514 ]) 515 self.assertItemsEqual( 516 self.cache.retrieve(dict(cn='service1', ipServiceProtocol='tcp')), 517 [ 518 ['service1', ['alias1', 'alias2'], 100, 'tcp'], 519 ]) 520 self.assertItemsEqual( 521 self.cache.retrieve(dict(cn='service2', ipServiceProtocol='udp')), 522 [ 523 ['service2', ['alias3'], 200, 'udp'], 524 ]) 525 self.assertItemsEqual( 526 self.cache.retrieve(dict(cn='service2', ipServiceProtocol='tcp')), 527 []) 528 529 def test_by_unknown_name(self): 530 self.assertItemsEqual(self.cache.retrieve(dict(cn='notfound')), []) 531 532 def test_by_number(self): 533 self.assertItemsEqual( 534 self.cache.retrieve(dict(ipServicePort=100)), 535 [ 536 ['service1', ['alias1', 'alias2'], 100, 'tcp'], 537 ['service1', ['alias1', 'alias2'], 100, 'udp'], 538 ]) 539 self.assertItemsEqual( 540 self.cache.retrieve(dict(ipServicePort=200)), 541 [ 542 ['service2', ['alias3'], 200, 'udp'], 543 ]) 544 545 def test_by_number_and_protocol(self): 546 self.assertItemsEqual( 547 self.cache.retrieve(dict(ipServicePort=100, ipServiceProtocol='udp')), 548 [ 549 ['service1', ['alias1', 'alias2'], 100, 'udp'], 550 ]) 551 self.assertItemsEqual( 552 self.cache.retrieve(dict(ipServicePort=100, ipServiceProtocol='tcp')), 553 [ 554 ['service1', ['alias1', 'alias2'], 100, 'tcp'], 555 ]) 556 self.assertItemsEqual( 557 self.cache.retrieve(dict(ipServicePort=200, ipServiceProtocol='udp')), 558 [ 559 ['service2', ['alias3'], 200, 'udp'], 560 ]) 561 self.assertItemsEqual( 562 self.cache.retrieve(dict(ipServicePort=200, ipServiceProtocol='tcp')), 563 []) 564 565 def test_by_alias(self): 566 self.assertItemsEqual(self.cache.retrieve(dict(cn='alias1')), [ 567 ['service1', ['alias1', 'alias2'], 100, 'udp'], 568 ['service1', ['alias1', 'alias2'], 100, 'tcp'], 569 ]) 570 self.assertItemsEqual(self.cache.retrieve(dict(cn='alias3')), [ 571 ['service2', ['alias3'], 200, 'udp'], 572 ]) 573 574 def test_all(self): 575 self.assertItemsEqual(self.cache.retrieve({}), [ 576 ['service1', ['alias1', 'alias2'], 100, 'tcp'], 577 ['service1', ['alias1', 'alias2'], 100, 'udp'], 578 ['service2', ['alias3'], 200, 'udp'], 579 ['service3', [], 300, 'udp'], 580 ]) 581 582 583class Testshadow(unittest.TestCase): 584 585 def setUp(self): 586 import shadow 587 cache = shadow.Cache() 588 cache.store('name', 'passwd', 15639, 0, 7, -1, -1, -1, 0) 589 cache.store('name2', 'passwd2', 15639, 0, 7, -1, -1, -1, 0) 590 self.cache = cache 591 if not hasattr(self, 'assertItemsEqual'): 592 self.assertItemsEqual = self.assertCountEqual 593 594 def test_by_name(self): 595 self.assertItemsEqual( 596 self.cache.retrieve(dict(uid='name')), 597 [ 598 [u'name', u'passwd', 15639, 0, 7, -1, -1, -1, 0], 599 ]) 600 self.assertItemsEqual( 601 self.cache.retrieve(dict(uid='name2')), 602 [ 603 [u'name2', u'passwd2', 15639, 0, 7, -1, -1, -1, 0], 604 ]) 605 606 def test_by_unknown_name(self): 607 self.assertItemsEqual( 608 self.cache.retrieve(dict(uid='notfound')), 609 []) 610 611 def test_all(self): 612 self.assertItemsEqual( 613 self.cache.retrieve({}), [ 614 [u'name', u'passwd', 15639, 0, 7, -1, -1, -1, 0], 615 [u'name2', u'passwd2', 15639, 0, 7, -1, -1, -1, 0], 616 ]) 617 618 619if __name__ == '__main__': 620 unittest.main() 621