1#!/usr/bin/env python
2
3# test_pynslcd_cache.py - tests for the pynslcd caching functionality
4#
5# Copyright (C) 2013-2019 Arthur de Jong
6#
7# This library is free software; you can redistribute it and/or
8# modify it under the terms of the GNU Lesser General Public
9# License as published by the Free Software Foundation; either
10# version 2.1 of the License, or (at your option) any later version.
11#
12# This library is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15# Lesser General Public License for more details.
16#
17# You should have received a copy of the GNU Lesser General Public
18# License along with this library; if not, write to the Free Software
19# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
20# 02110-1301 USA
21
22import os
23import os.path
24import sys
25import unittest
26
27# fix the Python path
28sys.path.insert(1, os.path.abspath(os.path.join(sys.path[0], '..', 'pynslcd')))
29sys.path.insert(2, os.path.abspath(os.path.join('..', 'pynslcd')))
30
31
32# TODO: think about case-sensitivity of cache searches (have tests for that)
33
34
35class TestAlias(unittest.TestCase):
36
37    def setUp(self):
38        import alias
39        cache = alias.Cache()
40        cache.store('alias1', ['member1', 'member2'])
41        cache.store('alias2', ['member1', 'member3'])
42        cache.store('alias3', [])
43        self.cache = cache
44        if not hasattr(self, 'assertItemsEqual'):
45            self.assertItemsEqual = self.assertCountEqual
46
47    def test_by_name(self):
48        self.assertItemsEqual(
49            self.cache.retrieve(dict(cn='alias1')),
50            [
51                ['alias1', ['member1', 'member2']],
52            ])
53
54    def test_by_member(self):
55        self.assertItemsEqual(
56            self.cache.retrieve(dict(rfc822MailMember='member1')),
57            [
58                ['alias1', ['member1', 'member2']],
59                ['alias2', ['member1', 'member3']],
60            ])
61
62    def test_all(self):
63        self.assertItemsEqual(
64            self.cache.retrieve({}),
65            [
66                ['alias1', ['member1', 'member2']],
67                ['alias2', ['member1', 'member3']],
68                ['alias3', []],
69            ])
70
71
72class TestEther(unittest.TestCase):
73
74    def setUp(self):
75        import ether
76        cache = ether.Cache()
77        cache.store('name1', '0:18:8a:54:1a:11')
78        cache.store('name2', '0:18:8a:54:1a:22')
79        self.cache = cache
80        if not hasattr(self, 'assertItemsEqual'):
81            self.assertItemsEqual = self.assertCountEqual
82
83    def test_by_name(self):
84        self.assertItemsEqual(
85            self.cache.retrieve(dict(cn='name1')),
86            [
87                ['name1', '0:18:8a:54:1a:11'],
88            ])
89        self.assertItemsEqual(
90            self.cache.retrieve(dict(cn='name2')),
91            [
92                ['name2', '0:18:8a:54:1a:22'],
93            ])
94
95    def test_by_ether(self):
96        # ideally we should also support alternate representations
97        self.assertItemsEqual(
98            self.cache.retrieve(dict(macAddress='0:18:8a:54:1a:22')),
99            [
100                ['name2', '0:18:8a:54:1a:22'],
101            ])
102
103    def test_all(self):
104        self.assertItemsEqual(
105            self.cache.retrieve({}),
106            [
107                ['name1', '0:18:8a:54:1a:11'],
108                ['name2', '0:18:8a:54:1a:22'],
109            ])
110
111
112class TestGroup(unittest.TestCase):
113
114    def setUp(self):
115        import group
116        cache = group.Cache()
117        cache.store('group1', 'pass1', 10, ['user1', 'user2'])
118        cache.store('group2', 'pass2', 20, ['user1', 'user2', 'user3'])
119        cache.store('group3', 'pass3', 30, [])
120        cache.store('group4', 'pass4', 40, ['user2'])
121        self.cache = cache
122        if not hasattr(self, 'assertItemsEqual'):
123            self.assertItemsEqual = self.assertCountEqual
124
125    def test_by_name(self):
126        self.assertItemsEqual(
127            self.cache.retrieve(dict(cn='group1')),
128            [
129                ['group1', 'pass1', 10, ['user1', 'user2']],
130            ])
131        self.assertItemsEqual(
132            self.cache.retrieve(dict(cn='group3')),
133            [
134                ['group3', 'pass3', 30, []],
135            ])
136
137    def test_by_gid(self):
138        self.assertItemsEqual(
139            self.cache.retrieve(dict(gidNumber=10)),
140            [
141                ['group1', 'pass1', 10, ['user1', 'user2']],
142            ])
143        self.assertItemsEqual(
144            self.cache.retrieve(dict(gidNumber=40)),
145            [
146                ['group4', 'pass4', 40, ['user2']],
147            ])
148
149    def test_all(self):
150        self.assertItemsEqual(
151            self.cache.retrieve({}),
152            [
153                ['group1', 'pass1', 10, ['user1', 'user2']],
154                ['group2', 'pass2', 20, ['user1', 'user2', 'user3']],
155                ['group3', 'pass3', 30, []],
156                ['group4', 'pass4', 40, ['user2']],
157            ])
158
159    def test_bymember(self):
160        self.assertItemsEqual(
161            self.cache.retrieve(dict(memberUid='user1')),
162            [
163                ['group1', 'pass1', 10, ['user1', 'user2']],
164                ['group2', 'pass2', 20, ['user1', 'user2', 'user3']],
165            ])
166        self.assertItemsEqual(
167            self.cache.retrieve(dict(memberUid='user2')),
168            [
169                ['group1', 'pass1', 10, ['user1', 'user2']],
170                ['group2', 'pass2', 20, ['user1', 'user2', 'user3']],
171                ['group4', 'pass4', 40, ['user2']],
172            ])
173        self.assertItemsEqual(
174            self.cache.retrieve(dict(memberUid='user3')),
175            [
176                ['group2', 'pass2', 20, ['user1', 'user2', 'user3']],
177            ])
178
179
180class TestHost(unittest.TestCase):
181
182    def setUp(self):
183        import host
184        cache = host.Cache()
185        cache.store('hostname1', [], ['127.0.0.1'])
186        cache.store('hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3'])
187        self.cache = cache
188        if not hasattr(self, 'assertItemsEqual'):
189            self.assertItemsEqual = self.assertCountEqual
190
191    def test_by_name(self):
192        self.assertItemsEqual(
193            self.cache.retrieve(dict(cn='hostname1')),
194            [
195                ['hostname1', [], ['127.0.0.1']],
196            ])
197        self.assertItemsEqual(
198            self.cache.retrieve(dict(cn='hostname2')),
199            [
200                ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
201            ])
202
203    def test_by_alias(self):
204        self.assertItemsEqual(
205            self.cache.retrieve(dict(cn='alias1')),
206            [
207                ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
208            ])
209        self.assertItemsEqual(
210            self.cache.retrieve(dict(cn='alias2')),
211            [
212                ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
213            ])
214
215    def test_by_address(self):
216        self.assertItemsEqual(
217            self.cache.retrieve(dict(ipHostNumber='127.0.0.3')),
218            [
219                ['hostname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
220            ])
221
222
223class TestNetgroup(unittest.TestCase):
224
225    def setUp(self):
226        import netgroup
227        cache = netgroup.Cache()
228        cache.store(
229            'netgroup1',
230            ['(host1, user1,)', '(host1, user2,)', '(host2, user1,)'],
231            ['netgroup2'])
232        cache.store(
233            'netgroup2', ['(host3, user1,)', '(host3, user3,)'], [])
234        self.cache = cache
235        if not hasattr(self, 'assertItemsEqual'):
236            self.assertItemsEqual = self.assertCountEqual
237
238    def test_by_name(self):
239        self.assertItemsEqual(
240            self.cache.retrieve(dict(cn='netgroup1')),
241            [
242                [
243                    'netgroup1',
244                    ['(host1, user1,)', '(host1, user2,)', '(host2, user1,)'],
245                    ['netgroup2'],
246                ],
247            ])
248        self.assertItemsEqual(
249            self.cache.retrieve(dict(cn='netgroup2')),
250            [
251                ['netgroup2', ['(host3, user1,)', '(host3, user3,)'], []],
252            ])
253
254
255class TestNetwork(unittest.TestCase):
256
257    def setUp(self):
258        import network
259        cache = network.Cache()
260        cache.store('networkname1', [], ['127.0.0.1'])
261        cache.store('networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3'])
262        self.cache = cache
263        if not hasattr(self, 'assertItemsEqual'):
264            self.assertItemsEqual = self.assertCountEqual
265
266    def test_by_name(self):
267        self.assertItemsEqual(
268            self.cache.retrieve(dict(cn='networkname1')),
269            [
270                ['networkname1', [], ['127.0.0.1']],
271            ])
272        self.assertItemsEqual(
273            self.cache.retrieve(dict(cn='networkname2')),
274            [
275                ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
276            ])
277
278    def test_by_alias(self):
279        self.assertItemsEqual(
280            self.cache.retrieve(dict(cn='alias1')),
281            [
282                ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
283            ])
284        self.assertItemsEqual(
285            self.cache.retrieve(dict(cn='alias2')),
286            [
287                ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
288            ])
289
290    def test_by_address(self):
291        self.assertItemsEqual(
292            self.cache.retrieve(dict(ipNetworkNumber='127.0.0.3')),
293            [
294                ['networkname2', ['alias1', 'alias2'], ['127.0.0.2', '127.0.0.3']],
295            ])
296
297
298class TestPasswd(unittest.TestCase):
299
300    def setUp(self):
301        import passwd
302        cache = passwd.Cache()
303        cache.store('name', 'passwd', 100, 200, 'gecos', '/home/user', '/bin/bash')
304        cache.store('name2', 'passwd2', 101, 202, 'gecos2', '/home/user2', '/bin/bash')
305        self.cache = cache
306        if not hasattr(self, 'assertItemsEqual'):
307            self.assertItemsEqual = self.assertCountEqual
308
309    def test_by_name(self):
310        self.assertItemsEqual(
311            self.cache.retrieve(dict(uid='name')),
312            [
313                [u'name', u'passwd', 100, 200, u'gecos', u'/home/user', u'/bin/bash'],
314            ])
315
316    def test_by_unknown_name(self):
317        self.assertItemsEqual(
318            self.cache.retrieve(dict(uid='notfound')),
319            [])
320
321    def test_by_number(self):
322        self.assertItemsEqual(
323            self.cache.retrieve(dict(uidNumber=100)),
324            [
325                [u'name', u'passwd', 100, 200, u'gecos', u'/home/user', u'/bin/bash'],
326            ])
327        self.assertItemsEqual(
328            self.cache.retrieve(dict(uidNumber=101)),
329            [
330                ['name2', 'passwd2', 101, 202, 'gecos2', '/home/user2', '/bin/bash'],
331            ])
332
333    def test_all(self):
334        self.assertItemsEqual(
335            self.cache.retrieve({}),
336            [
337                [u'name', u'passwd', 100, 200, u'gecos', u'/home/user', u'/bin/bash'],
338                [u'name2', u'passwd2', 101, 202, u'gecos2', u'/home/user2', u'/bin/bash'],
339            ])
340
341
342class TestProtocol(unittest.TestCase):
343
344    def setUp(self):
345        import protocol
346        cache = protocol.Cache()
347        cache.store('protocol1', ['alias1', 'alias2'], 100)
348        cache.store('protocol2', ['alias3'], 200)
349        cache.store('protocol3', [], 300)
350        self.cache = cache
351        if not hasattr(self, 'assertItemsEqual'):
352            self.assertItemsEqual = self.assertCountEqual
353
354    def test_by_name(self):
355        self.assertItemsEqual(
356            self.cache.retrieve(dict(cn='protocol1')),
357            [
358                ['protocol1', ['alias1', 'alias2'], 100],
359            ])
360        self.assertItemsEqual(
361            self.cache.retrieve(dict(cn='protocol2')),
362            [
363                ['protocol2', ['alias3'], 200],
364            ])
365        self.assertItemsEqual(
366            self.cache.retrieve(dict(cn='protocol3')),
367            [
368                ['protocol3', [], 300],
369            ])
370
371    def test_by_unknown_name(self):
372        self.assertItemsEqual(
373            self.cache.retrieve(dict(cn='notfound')),
374            [])
375
376    def test_by_number(self):
377        self.assertItemsEqual(
378            self.cache.retrieve(dict(ipProtocolNumber=100)),
379            [
380                ['protocol1', ['alias1', 'alias2'], 100],
381            ])
382        self.assertItemsEqual(
383            self.cache.retrieve(dict(ipProtocolNumber=200)),
384            [
385                ['protocol2', ['alias3'], 200],
386            ])
387
388    def test_by_alias(self):
389        self.assertItemsEqual(
390            self.cache.retrieve(dict(cn='alias1')),
391            [
392                ['protocol1', ['alias1', 'alias2'], 100],
393            ])
394        self.assertItemsEqual(
395            self.cache.retrieve(dict(cn='alias3')),
396            [
397                ['protocol2', ['alias3'], 200],
398            ])
399
400    def test_all(self):
401        self.assertItemsEqual(
402            self.cache.retrieve({}),
403            [
404                ['protocol1', ['alias1', 'alias2'], 100],
405                ['protocol2', ['alias3'], 200],
406                ['protocol3', [], 300],
407            ])
408
409
410class TestRpc(unittest.TestCase):
411
412    def setUp(self):
413        import rpc
414        cache = rpc.Cache()
415        cache.store('rpc1', ['alias1', 'alias2'], 100)
416        cache.store('rpc2', ['alias3'], 200)
417        cache.store('rpc3', [], 300)
418        self.cache = cache
419        if not hasattr(self, 'assertItemsEqual'):
420            self.assertItemsEqual = self.assertCountEqual
421
422    def test_by_name(self):
423        self.assertItemsEqual(
424            self.cache.retrieve(dict(cn='rpc1')),
425            [
426                ['rpc1', ['alias1', 'alias2'], 100],
427            ])
428        self.assertItemsEqual(
429            self.cache.retrieve(dict(cn='rpc2')),
430            [
431                ['rpc2', ['alias3'], 200],
432            ])
433        self.assertItemsEqual(
434            self.cache.retrieve(dict(cn='rpc3')),
435            [
436                ['rpc3', [], 300],
437            ])
438
439    def test_by_unknown_name(self):
440        self.assertItemsEqual(
441            self.cache.retrieve(dict(cn='notfound')),
442            [])
443
444    def test_by_number(self):
445        self.assertItemsEqual(
446            self.cache.retrieve(dict(oncRpcNumber=100)),
447            [
448                ['rpc1', ['alias1', 'alias2'], 100],
449            ])
450        self.assertItemsEqual(
451            self.cache.retrieve(dict(oncRpcNumber=200)),
452            [
453                ['rpc2', ['alias3'], 200],
454            ])
455
456    def test_by_alias(self):
457        self.assertItemsEqual(
458            self.cache.retrieve(dict(cn='alias1')),
459            [
460                ['rpc1', ['alias1', 'alias2'], 100],
461            ])
462        self.assertItemsEqual(
463            self.cache.retrieve(dict(cn='alias3')),
464            [
465                ['rpc2', ['alias3'], 200],
466            ])
467
468    def test_all(self):
469        self.assertItemsEqual(
470            self.cache.retrieve({}),
471            [
472                ['rpc1', ['alias1', 'alias2'], 100],
473                ['rpc2', ['alias3'], 200],
474                ['rpc3', [], 300],
475            ])
476
477
478class TestService(unittest.TestCase):
479
480    def setUp(self):
481        import service
482        cache = service.Cache()
483        cache.store('service1', ['alias1', 'alias2'], 100, 'tcp')
484        cache.store('service1', ['alias1', 'alias2'], 100, 'udp')
485        cache.store('service2', ['alias3'], 200, 'udp')
486        cache.store('service3', [], 300, 'udp')
487        self.cache = cache
488        if not hasattr(self, 'assertItemsEqual'):
489            self.assertItemsEqual = self.assertCountEqual
490
491    def test_by_name(self):
492        self.assertItemsEqual(
493            self.cache.retrieve(dict(cn='service1')),
494            [
495                ['service1', ['alias1', 'alias2'], 100, 'tcp'],
496                ['service1', ['alias1', 'alias2'], 100, 'udp'],
497            ])
498        self.assertItemsEqual(
499            self.cache.retrieve(dict(cn='service2')),
500            [
501                ['service2', ['alias3'], 200, 'udp'],
502            ])
503        self.assertItemsEqual(
504            self.cache.retrieve(dict(cn='service3')),
505            [
506                ['service3', [], 300, 'udp'],
507            ])
508
509    def test_by_name_and_protocol(self):
510        self.assertItemsEqual(
511            self.cache.retrieve(dict(cn='service1', ipServiceProtocol='udp')),
512            [
513                ['service1', ['alias1', 'alias2'], 100, 'udp'],
514            ])
515        self.assertItemsEqual(
516            self.cache.retrieve(dict(cn='service1', ipServiceProtocol='tcp')),
517            [
518                ['service1', ['alias1', 'alias2'], 100, 'tcp'],
519            ])
520        self.assertItemsEqual(
521            self.cache.retrieve(dict(cn='service2', ipServiceProtocol='udp')),
522            [
523                ['service2', ['alias3'], 200, 'udp'],
524            ])
525        self.assertItemsEqual(
526            self.cache.retrieve(dict(cn='service2', ipServiceProtocol='tcp')),
527            [])
528
529    def test_by_unknown_name(self):
530        self.assertItemsEqual(self.cache.retrieve(dict(cn='notfound')), [])
531
532    def test_by_number(self):
533        self.assertItemsEqual(
534            self.cache.retrieve(dict(ipServicePort=100)),
535            [
536                ['service1', ['alias1', 'alias2'], 100, 'tcp'],
537                ['service1', ['alias1', 'alias2'], 100, 'udp'],
538            ])
539        self.assertItemsEqual(
540            self.cache.retrieve(dict(ipServicePort=200)),
541            [
542                ['service2', ['alias3'], 200, 'udp'],
543            ])
544
545    def test_by_number_and_protocol(self):
546        self.assertItemsEqual(
547            self.cache.retrieve(dict(ipServicePort=100, ipServiceProtocol='udp')),
548            [
549                ['service1', ['alias1', 'alias2'], 100, 'udp'],
550            ])
551        self.assertItemsEqual(
552            self.cache.retrieve(dict(ipServicePort=100, ipServiceProtocol='tcp')),
553            [
554                ['service1', ['alias1', 'alias2'], 100, 'tcp'],
555            ])
556        self.assertItemsEqual(
557            self.cache.retrieve(dict(ipServicePort=200, ipServiceProtocol='udp')),
558            [
559                ['service2', ['alias3'], 200, 'udp'],
560            ])
561        self.assertItemsEqual(
562            self.cache.retrieve(dict(ipServicePort=200, ipServiceProtocol='tcp')),
563            [])
564
565    def test_by_alias(self):
566        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias1')), [
567            ['service1', ['alias1', 'alias2'], 100, 'udp'],
568            ['service1', ['alias1', 'alias2'], 100, 'tcp'],
569        ])
570        self.assertItemsEqual(self.cache.retrieve(dict(cn='alias3')), [
571            ['service2', ['alias3'], 200, 'udp'],
572        ])
573
574    def test_all(self):
575        self.assertItemsEqual(self.cache.retrieve({}), [
576            ['service1', ['alias1', 'alias2'], 100, 'tcp'],
577            ['service1', ['alias1', 'alias2'], 100, 'udp'],
578            ['service2', ['alias3'], 200, 'udp'],
579            ['service3', [], 300, 'udp'],
580        ])
581
582
583class Testshadow(unittest.TestCase):
584
585    def setUp(self):
586        import shadow
587        cache = shadow.Cache()
588        cache.store('name', 'passwd', 15639, 0, 7, -1, -1, -1, 0)
589        cache.store('name2', 'passwd2', 15639, 0, 7, -1, -1, -1, 0)
590        self.cache = cache
591        if not hasattr(self, 'assertItemsEqual'):
592            self.assertItemsEqual = self.assertCountEqual
593
594    def test_by_name(self):
595        self.assertItemsEqual(
596            self.cache.retrieve(dict(uid='name')),
597            [
598                [u'name', u'passwd', 15639, 0, 7, -1, -1, -1, 0],
599            ])
600        self.assertItemsEqual(
601            self.cache.retrieve(dict(uid='name2')),
602            [
603                [u'name2', u'passwd2', 15639, 0, 7, -1, -1, -1, 0],
604            ])
605
606    def test_by_unknown_name(self):
607        self.assertItemsEqual(
608            self.cache.retrieve(dict(uid='notfound')),
609            [])
610
611    def test_all(self):
612        self.assertItemsEqual(
613            self.cache.retrieve({}), [
614                [u'name', u'passwd', 15639, 0, 7, -1, -1, -1, 0],
615                [u'name2', u'passwd2', 15639, 0, 7, -1, -1, -1, 0],
616            ])
617
618
619if __name__ == '__main__':
620    unittest.main()
621