1# Copyright 2007 Google Inc.
2#
3# This program is free software; you can redistribute it and/or
4# modify it under the terms of the GNU General Public License
5# as published by the Free Software Foundation; either version 2
6# of the License, or (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software Foundation,
15# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
16"""Unit tests for nss_cache/map_updater.py."""
17
18__author__ = ('vasilios@google.com (V Hoffman)',
19              'jaq@google.com (Jamie Wilkinson)')
20
21import os
22import shutil
23import tempfile
24import unittest
25from mox3 import mox
26
27from nss_cache.caches import caches
28from nss_cache.caches import files
29from nss_cache.sources import source
30from nss_cache.caches import cache_factory
31from nss_cache import config
32from nss_cache import error
33from nss_cache.maps import automount
34from nss_cache.maps import passwd
35
36from nss_cache.update import map_updater
37
38
39class SingleMapUpdaterTest(mox.MoxTestBase):
40    """Unit tests for FileMapUpdater class."""
41
42    def setUp(self):
43        super(SingleMapUpdaterTest, self).setUp()
44        self.workdir = tempfile.mkdtemp()
45        self.workdir2 = tempfile.mkdtemp()
46
47    def tearDown(self):
48        super(SingleMapUpdaterTest, self).tearDown()
49        shutil.rmtree(self.workdir)
50        shutil.rmtree(self.workdir2)
51
52    def testFullUpdate(self):
53        """A full update reads the source, writes to cache, and updates
54        times."""
55        original_modify_stamp = 1
56        new_modify_stamp = 2
57
58        updater = map_updater.MapUpdater(config.MAP_PASSWORD, self.workdir, {})
59        updater.WriteModifyTimestamp(original_modify_stamp)
60
61        map_entry = passwd.PasswdMapEntry({'name': 'foo', 'uid': 10, 'gid': 10})
62        password_map = passwd.PasswdMap([map_entry])
63        password_map.SetModifyTimestamp(new_modify_stamp)
64
65        cache_mock = self.mox.CreateMock(files.FilesCache)
66        cache_mock.WriteMap(map_data=password_map).AndReturn(0)
67
68        source_mock = self.mox.CreateMock(source.Source)
69        source_mock.GetMap(config.MAP_PASSWORD,
70                           location=None).AndReturn(password_map)
71
72        self.mox.ReplayAll()
73
74        self.assertEqual(
75            0,
76            updater.UpdateCacheFromSource(cache_mock, source_mock, False, False,
77                                          None))
78        self.assertEqual(updater.GetModifyTimestamp(), new_modify_stamp)
79        self.assertNotEqual(updater.GetUpdateTimestamp(), None)
80
81    def testIncrementalUpdate(self):
82        """An incremental update reads a partial map and merges it."""
83
84        # Unlike in a full update, we create a cache map and a source map, and
85        # let it merge them.  If it goes to write the merged map, we're good.
86        # Also check that timestamps were updated, as in testFullUpdate above.
87
88        def compare_function(map_object):
89            return len(map_object) == 2
90
91        original_modify_stamp = 1
92        new_modify_stamp = 2
93        updater = map_updater.MapUpdater(config.MAP_PASSWORD,
94                                         self.workdir, {},
95                                         can_do_incremental=True)
96        updater.WriteModifyTimestamp(original_modify_stamp)
97
98        cache_map_entry = passwd.PasswdMapEntry({
99            'name': 'bar',
100            'uid': 20,
101            'gid': 20
102        })
103        cache_map = passwd.PasswdMap([cache_map_entry])
104        cache_map.SetModifyTimestamp(original_modify_stamp)
105
106        cache_mock = self.mox.CreateMock(caches.Cache)
107        cache_mock.GetMap().AndReturn(cache_map)
108        cache_mock.WriteMap(map_data=mox.Func(compare_function)).AndReturn(0)
109
110        source_map_entry = passwd.PasswdMapEntry({
111            'name': 'foo',
112            'uid': 10,
113            'gid': 10
114        })
115        source_map = passwd.PasswdMap([source_map_entry])
116        source_map.SetModifyTimestamp(new_modify_stamp)
117
118        source_mock = self.mox.CreateMock(source.Source)
119        source_mock.GetMap(config.MAP_PASSWORD,
120                           location=None,
121                           since=original_modify_stamp).AndReturn(source_map)
122
123        self.mox.ReplayAll()
124
125        self.assertEqual(
126            0,
127            updater.UpdateCacheFromSource(cache_mock,
128                                          source_mock,
129                                          incremental=True,
130                                          force_write=False,
131                                          location=None))
132        self.assertEqual(updater.GetModifyTimestamp(), new_modify_stamp)
133        self.assertNotEqual(updater.GetUpdateTimestamp(), None)
134
135    def testFullUpdateOnMissingCache(self):
136        """We fault to a full update if our cache is missing."""
137
138        original_modify_stamp = 1
139        updater = map_updater.MapUpdater(config.MAP_PASSWORD, self.workdir, {})
140        updater.WriteModifyTimestamp(original_modify_stamp)
141
142        source_mock = self.mox.CreateMock(source.Source)
143        # Try incremental first.
144        source_mock.GetMap(config.MAP_PASSWORD,
145                           location=None,
146                           since=original_modify_stamp).AndReturn('first map')
147        # Try full second.
148        source_mock.GetMap(config.MAP_PASSWORD,
149                           location=None).AndReturn('second map')
150
151        updater = map_updater.MapUpdater(config.MAP_PASSWORD,
152                                         self.workdir, {},
153                                         can_do_incremental=True)
154        self.mox.StubOutWithMock(updater, 'GetModifyTimestamp')
155        updater.GetModifyTimestamp().AndReturn(original_modify_stamp)
156        self.mox.StubOutWithMock(updater, '_IncrementalUpdateFromMap')
157        # force a cache not found on incremental
158        updater._IncrementalUpdateFromMap('cache', 'first map').AndRaise(
159            error.CacheNotFound)
160        self.mox.StubOutWithMock(updater, 'FullUpdateFromMap')
161        updater.FullUpdateFromMap(mox.IgnoreArg(), 'second map',
162                                  False).AndReturn(0)
163
164        self.mox.ReplayAll()
165
166        self.assertEqual(
167            0,
168            updater.UpdateCacheFromSource('cache',
169                                          source_mock,
170                                          incremental=True,
171                                          force_write=False,
172                                          location=None))
173
174    def testFullUpdateOnMissingTimestamp(self):
175        """We fault to a full update if our modify timestamp is missing."""
176
177        updater = map_updater.MapUpdater(config.MAP_PASSWORD, self.workdir, {})
178        # We do not call WriteModifyTimestamp() so we force a full sync.
179
180        source_mock = self.mox.CreateMock(source.Source)
181        source_mock.GetMap(config.MAP_PASSWORD,
182                           location=None).AndReturn('second map')
183        updater = map_updater.MapUpdater(config.MAP_PASSWORD, self.workdir, {})
184        self.mox.StubOutWithMock(updater, 'FullUpdateFromMap')
185        updater.FullUpdateFromMap(mox.IgnoreArg(), 'second map',
186                                  False).AndReturn(0)
187
188        self.mox.ReplayAll()
189        self.assertEqual(
190            0,
191            updater.UpdateCacheFromSource('cache', source_mock, True, False,
192                                          None))
193
194
195class MapAutomountUpdaterTest(mox.MoxTestBase):
196    """Unit tests for AutomountUpdater class."""
197
198    def setUp(self):
199        super(MapAutomountUpdaterTest, self).setUp()
200        self.workdir = tempfile.mkdtemp()
201
202    def tearDown(self):
203        super(MapAutomountUpdaterTest, self).tearDown()
204        os.rmdir(self.workdir)
205
206    def testInit(self):
207        """An automount object correctly sets map-specific attributes."""
208        updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT,
209                                               self.workdir, {})
210        self.assertEqual(updater.local_master, False)
211
212        conf = {map_updater.AutomountUpdater.OPT_LOCAL_MASTER: 'yes'}
213        updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT,
214                                               self.workdir, conf)
215        self.assertEqual(updater.local_master, True)
216
217        conf = {map_updater.AutomountUpdater.OPT_LOCAL_MASTER: 'no'}
218        updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT,
219                                               self.workdir, conf)
220        self.assertEqual(updater.local_master, False)
221
222    def testUpdate(self):
223        """An update gets a master map and updates each entry."""
224        map_entry1 = automount.AutomountMapEntry()
225        map_entry2 = automount.AutomountMapEntry()
226        map_entry1.key = '/home'
227        map_entry2.key = '/auto'
228        map_entry1.location = 'ou=auto.home,ou=automounts'
229        map_entry2.location = 'ou=auto.auto,ou=automounts'
230        master_map = automount.AutomountMap([map_entry1, map_entry2])
231
232        source_mock = self.mox.CreateMock(source.Source)
233        # return the master map
234        source_mock.GetAutomountMasterMap().AndReturn(master_map)
235
236        # the auto.home cache
237        cache_home = self.mox.CreateMock(caches.Cache)
238        # GetMapLocation() is called, and set to the master map map_entry
239        cache_home.GetMapLocation().AndReturn('/etc/auto.home')
240
241        # the auto.auto cache
242        cache_auto = self.mox.CreateMock(caches.Cache)
243        # GetMapLocation() is called, and set to the master map map_entry
244        cache_auto.GetMapLocation().AndReturn('/etc/auto.auto')
245
246        # the auto.master cache
247        cache_master = self.mox.CreateMock(caches.Cache)
248
249        self.mox.StubOutWithMock(cache_factory, 'Create')
250        cache_factory.Create(mox.IgnoreArg(),
251                             'automount',
252                             automount_mountpoint='/home').AndReturn(cache_home)
253        cache_factory.Create(mox.IgnoreArg(),
254                             'automount',
255                             automount_mountpoint='/auto').AndReturn(cache_auto)
256        cache_factory.Create(mox.IgnoreArg(),
257                             'automount',
258                             automount_mountpoint=None).AndReturn(cache_master)
259
260        updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT,
261                                               self.workdir, {})
262
263        self.mox.StubOutClassWithMocks(map_updater, 'MapUpdater')
264        updater_home = map_updater.MapUpdater(config.MAP_AUTOMOUNT,
265                                              self.workdir, {},
266                                              automount_mountpoint='/home')
267        updater_home.UpdateCacheFromSource(
268            cache_home, source_mock, True, False,
269            'ou=auto.home,ou=automounts').AndReturn(0)
270        updater_auto = map_updater.MapUpdater(config.MAP_AUTOMOUNT,
271                                              self.workdir, {},
272                                              automount_mountpoint='/auto')
273        updater_auto.UpdateCacheFromSource(
274            cache_auto, source_mock, True, False,
275            'ou=auto.auto,ou=automounts').AndReturn(0)
276        updater_master = map_updater.MapUpdater(config.MAP_AUTOMOUNT,
277                                                self.workdir, {})
278        updater_master.FullUpdateFromMap(cache_master, master_map).AndReturn(0)
279
280        self.mox.ReplayAll()
281
282        updater.UpdateFromSource(source_mock)
283
284        self.assertEqual(map_entry1.location, '/etc/auto.home')
285        self.assertEqual(map_entry2.location, '/etc/auto.auto')
286
287    def testUpdateNoMaster(self):
288        """An update skips updating the master map, and approprate sub maps."""
289        source_entry1 = automount.AutomountMapEntry()
290        source_entry2 = automount.AutomountMapEntry()
291        source_entry1.key = '/home'
292        source_entry2.key = '/auto'
293        source_entry1.location = 'ou=auto.home,ou=automounts'
294        source_entry2.location = 'ou=auto.auto,ou=automounts'
295        source_master = automount.AutomountMap([source_entry1, source_entry2])
296
297        local_entry1 = automount.AutomountMapEntry()
298        local_entry2 = automount.AutomountMapEntry()
299        local_entry1.key = '/home'
300        local_entry2.key = '/auto'
301        local_entry1.location = '/etc/auto.home'
302        local_entry2.location = '/etc/auto.null'
303        local_master = automount.AutomountMap([local_entry1, local_entry2])
304
305        source_mock = self.mox.CreateMock(source.Source)
306        # return the source master map
307        source_mock.GetAutomountMasterMap().AndReturn(source_master)
308
309        # the auto.home cache
310        cache_home = self.mox.CreateMock(caches.Cache)
311        # GetMapLocation() is called, and set to the master map map_entry
312        cache_home.GetMapLocation().AndReturn('/etc/auto.home')
313
314        # the auto.auto cache
315        cache_auto = self.mox.CreateMock(caches.Cache)
316        # GetMapLocation() is called, and set to the master map map_entry
317        cache_auto.GetMapLocation().AndReturn('/etc/auto.auto')
318
319        # the auto.master cache, which should not be written to
320        cache_master = self.mox.CreateMock(caches.Cache)
321        cache_master.GetMap().AndReturn(local_master)
322
323        self.mox.StubOutWithMock(cache_factory, 'Create')
324        cache_factory.Create(mox.IgnoreArg(),
325                             mox.IgnoreArg(),
326                             automount_mountpoint=None).AndReturn(cache_master)
327        cache_factory.Create(mox.IgnoreArg(),
328                             mox.IgnoreArg(),
329                             automount_mountpoint='/home').AndReturn(cache_home)
330        cache_factory.Create(mox.IgnoreArg(),
331                             mox.IgnoreArg(),
332                             automount_mountpoint='/auto').AndReturn(cache_auto)
333
334        skip = map_updater.AutomountUpdater.OPT_LOCAL_MASTER
335        updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT,
336                                               self.workdir, {skip: 'yes'})
337
338        self.mox.StubOutClassWithMocks(map_updater, 'MapUpdater')
339        updater_home = map_updater.MapUpdater(config.MAP_AUTOMOUNT,
340                                              self.workdir,
341                                              {'local_automount_master': 'yes'},
342                                              automount_mountpoint='/home')
343        updater_home.UpdateCacheFromSource(
344            cache_home, source_mock, True, False,
345            'ou=auto.home,ou=automounts').AndReturn(0)
346
347        self.mox.ReplayAll()
348
349        updater.UpdateFromSource(source_mock)
350
351
352class AutomountUpdaterMoxTest(mox.MoxTestBase):
353
354    def setUp(self):
355        super(AutomountUpdaterMoxTest, self).setUp()
356        self.workdir = tempfile.mkdtemp()
357
358    def tearDown(self):
359        super(AutomountUpdaterMoxTest, self).tearDown()
360        shutil.rmtree(self.workdir)
361
362    def testUpdateCatchesMissingMaster(self):
363        """Gracefully handle a missing local master maps."""
364        # use an empty master map from the source, to avoid mocking out already
365        # tested code
366        master_map = automount.AutomountMap()
367
368        source_mock = self.mox.CreateMockAnything()
369        source_mock.GetAutomountMasterMap().AndReturn(master_map)
370
371        cache_mock = self.mox.CreateMock(caches.Cache)
372        # raise error on GetMap()
373        cache_mock.GetMap().AndRaise(error.CacheNotFound)
374
375        skip = map_updater.AutomountUpdater.OPT_LOCAL_MASTER
376        cache_options = {skip: 'yes'}
377
378        self.mox.StubOutWithMock(cache_factory, 'Create')
379        cache_factory.Create(cache_options,
380                             'automount',
381                             automount_mountpoint=None).AndReturn(cache_mock)
382
383        self.mox.ReplayAll()
384
385        updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT,
386                                               self.workdir, cache_options)
387
388        return_value = updater.UpdateFromSource(source_mock)
389
390        self.assertEqual(return_value, 1)
391
392
393if __name__ == '__main__':
394    unittest.main()
395