1# Copyright 2010 Google Inc.
2#
3# This program is free software; you can redistribute it and/or
4# modify it under the terms of the GNU General Public License
5# as published by the Free Software Foundation; either version 2
6# of the License, or (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software Foundation,
15# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
16"""Unit tests for nss_cache/files_updater.py."""
17
18__author__ = ('vasilios@google.com (V Hoffman)',
19              'jaq@google.com (Jamie Wilkinson)',
20              'blaedd@google.com (David MacKinnon)')
21
22import os
23import shutil
24import tempfile
25import time
26import unittest
27from mox3 import mox
28
29from nss_cache import config
30from nss_cache import error
31
32from nss_cache.caches import cache_factory
33from nss_cache.caches import files
34from nss_cache.maps import automount
35from nss_cache.maps import passwd
36from nss_cache.sources import source
37
38from nss_cache.update import files_updater
39
40
41class SingleFileUpdaterTest(mox.MoxTestBase):
42    """Unit tests for FileMapUpdater."""
43
44    def setUp(self):
45        super(SingleFileUpdaterTest, self).setUp()
46        self.workdir = tempfile.mkdtemp()
47        self.workdir2 = tempfile.mkdtemp()
48
49    def tearDown(self):
50        super(SingleFileUpdaterTest, self).tearDown()
51        shutil.rmtree(self.workdir)
52        shutil.rmtree(self.workdir2)
53
54    @unittest.skip('timestamp isnt propagaged correctly')
55    def testFullUpdate(self):
56        original_modify_stamp = 1
57        new_modify_stamp = 2
58
59        # Construct a fake source.
60        def GetFile(map_name, dst_file, current_file, location):
61            print(("GetFile: %s" % dst_file))
62            f = open(dst_file, 'w')
63            f.write('root:x:0:0:root:/root:/bin/bash\n')
64            f.close()
65            os.utime(dst_file, (1, 2))
66            os.system("ls -al %s" % dst_file)
67            return dst_file
68
69        dst_file = mox.Value()
70        source_mock = self.mox.CreateMock(source.FileSource)
71        source_mock.GetFile(config.MAP_PASSWORD,
72                            mox.Remember(dst_file),
73                            current_file=mox.IgnoreArg(),
74                            location=mox.IgnoreArg()).WithSideEffects(
75                                GetFile).AndReturn(dst_file)
76
77        # Construct the cache.
78        cache = files.FilesPasswdMapHandler({'dir': self.workdir2})
79        map_entry = passwd.PasswdMapEntry({'name': 'foo', 'uid': 10, 'gid': 10})
80        password_map = passwd.PasswdMap()
81        password_map.SetModifyTimestamp(new_modify_stamp)
82        password_map.Add(map_entry)
83        cache.Write(password_map)
84
85        updater = files_updater.FileMapUpdater(config.MAP_PASSWORD,
86                                               self.workdir, {
87                                                   'name': 'files',
88                                                   'dir': self.workdir2
89                                               })
90        updater.WriteModifyTimestamp(original_modify_stamp)
91
92        self.mox.ReplayAll()
93
94        self.assertEqual(
95            0,
96            updater.UpdateCacheFromSource(cache,
97                                          source_mock,
98                                          force_write=False,
99                                          location=None))
100
101        self.assertEqual(new_modify_stamp, updater.GetModifyTimestamp())
102        self.assertNotEqual(None, updater.GetUpdateTimestamp())
103
104    @unittest.skip('source map empty during full update')
105    def testFullUpdateOnEmptyCache(self):
106        """A full update as above, but the initial cache is empty."""
107        original_modify_stamp = 1
108        new_modify_stamp = 2
109        # Construct an updater
110        self.updater = files_updater.FileMapUpdater(config.MAP_PASSWORD,
111                                                    self.workdir, {
112                                                        'name': 'files',
113                                                        'dir': self.workdir2
114                                                    })
115        self.updater.WriteModifyTimestamp(original_modify_stamp)
116
117        # Construct a cache
118        cache = files.FilesPasswdMapHandler({'dir': self.workdir2})
119
120        def GetFileEffects(map_name, dst_file, current_file, location):
121            f = open(dst_file, 'w')
122            f.write('root:x:0:0:root:/root:/bin/bash\n')
123            f.close()
124            os.utime(dst_file, (1, 2))
125            return dst_file
126
127        source_mock = self.mox.CreateMock(source.FileSource)
128        source_mock.GetFile(config.MAP_PASSWORD,
129                            mox.IgnoreArg(),
130                            mox.IgnoreArg(),
131                            location=None).WithSideEffects(GetFileEffects)
132
133        #source_mock = MockSource()
134        self.assertEqual(
135            0,
136            self.updater.UpdateCacheFromSource(cache,
137                                               source_mock,
138                                               force_write=False,
139                                               location=None))
140        self.assertEqual(new_modify_stamp, self.updater.GetModifyTimestamp())
141        self.assertNotEqual(None, self.updater.GetUpdateTimestamp())
142
143    def testFullUpdateOnEmptySource(self):
144        """A full update as above, but instead, the initial source is empty."""
145        original_modify_stamp = 1
146        new_modify_stamp = 2
147        # Construct an updater
148        self.updater = files_updater.FileMapUpdater(config.MAP_PASSWORD,
149                                                    self.workdir, {
150                                                        'name': 'files',
151                                                        'dir': self.workdir2
152                                                    })
153        self.updater.WriteModifyTimestamp(original_modify_stamp)
154
155        # Construct a cache
156        cache = files.FilesPasswdMapHandler({'dir': self.workdir2})
157        map_entry = passwd.PasswdMapEntry({'name': 'foo', 'uid': 10, 'gid': 10})
158        password_map = passwd.PasswdMap()
159        password_map.SetModifyTimestamp(new_modify_stamp)
160        password_map.Add(map_entry)
161        cache.Write(password_map)
162
163        source_mock = self.mox.CreateMock(source.FileSource)
164        source_mock.GetFile(config.MAP_PASSWORD,
165                            mox.IgnoreArg(),
166                            current_file=mox.IgnoreArg(),
167                            location=None).AndReturn(None)
168        self.mox.ReplayAll()
169        self.assertRaises(error.EmptyMap,
170                          self.updater.UpdateCacheFromSource,
171                          cache,
172                          source_mock,
173                          force_write=False,
174                          location=None)
175        self.assertNotEqual(new_modify_stamp, self.updater.GetModifyTimestamp())
176        self.assertEqual(None, self.updater.GetUpdateTimestamp())
177
178    @unittest.skip('disabled')
179    def testFullUpdateOnEmptySourceForceWrite(self):
180        """A full update as above, but instead, the initial source is empty."""
181        original_modify_stamp = time.gmtime(1)
182        new_modify_stamp = time.gmtime(2)
183        # Construct an updater
184        self.updater = files_updater.FileMapUpdater(config.MAP_PASSWORD,
185                                                    self.workdir, {
186                                                        'name': 'files',
187                                                        'dir': self.workdir2
188                                                    })
189        self.updater.WriteModifyTimestamp(original_modify_stamp)
190
191        # Construct a cache
192        cache = files.FilesPasswdMapHandler({'dir': self.workdir2})
193        map_entry = passwd.PasswdMapEntry({'name': 'foo', 'uid': 10, 'gid': 10})
194        password_map = passwd.PasswdMap()
195        password_map.SetModifyTimestamp(new_modify_stamp)
196        password_map.Add(map_entry)
197        cache.Write(password_map)
198
199        class MockSource(pmock.Mock):
200
201            def GetFile(self, map_name, dst_file, current_file, location=None):
202                assert location is None
203                assert map_name == config.MAP_PASSWORD
204                f = open(dst_file, 'w')
205                f.write('')
206                f.close()
207                os.utime(dst_file, (1, 2))
208                return dst_file
209
210        source_mock = MockSource()
211        self.assertEqual(
212            0,
213            self.updater.UpdateCacheFromSource(cache,
214                                               source_mock,
215                                               force_write=True,
216                                               location=None))
217        self.assertEqual(new_modify_stamp, self.updater.GetModifyTimestamp())
218        self.assertNotEqual(None, self.updater.GetUpdateTimestamp())
219
220
221@unittest.skip('disabled')
222class AutomountUpdaterTest(mox.MoxTestBase):
223    """Unit tests for FileAutomountUpdater class."""
224
225    def setUp(self):
226        super(AutomountUpdaterTest, self).setUp()
227        self.workdir = tempfile.mkdtemp()
228
229    def tearDown(self):
230        shutil.rmtree(self.workdir)
231        super(AutomountUpdaterTest, self).tearDown()
232
233    def testInit(self):
234        """An automount object correctly sets map-specific attributes."""
235        updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT,
236                                                     self.workdir, {})
237        self.assertEqual(updater.local_master, False)
238
239        conf = {files_updater.FileAutomountUpdater.OPT_LOCAL_MASTER: 'yes'}
240        updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT,
241                                                     self.workdir, conf)
242        self.assertEqual(updater.local_master, True)
243
244        conf = {files_updater.FileAutomountUpdater.OPT_LOCAL_MASTER: 'no'}
245        updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT,
246                                                     self.workdir, conf)
247        self.assertEqual(updater.local_master, False)
248
249    def testUpdate(self):
250        """An update gets a master map and updates each entry."""
251        map_entry1 = automount.AutomountMapEntry()
252        map_entry2 = automount.AutomountMapEntry()
253        map_entry1.key = '/home'
254        map_entry2.key = '/auto'
255        map_entry1.location = 'ou=auto.home,ou=automounts'
256        map_entry2.location = 'ou=auto.auto,ou=automounts'
257        master_map = automount.AutomountMap([map_entry1, map_entry2])
258
259        source_mock = self.mox.CreateMock(zsyncsource.ZSyncSource)
260        source_mock.GetAutomountMasterFile(
261            mox.IgnoreArg()).AndReturn(master_map)
262
263        # the auto.home cache
264        cache_mock1 = self.mox.CreateMock(files.FilesCache)
265        cache_mock1.GetCacheFilename().AndReturn(None)
266        cache_mock1.GetMapLocation().AndReturn('/etc/auto.home')
267
268        # the auto.auto cache
269        cache_mock2 = self.mox.CreateMock(files.FilesCache)
270        cache_mock2.GetMapLocation().AndReturn('/etc/auto.auto')
271        cache_mock2.GetCacheFilename().AndReturn(None)
272
273        # the auto.master cache
274        cache_mock3 = self.mox.CreateMock(files.FilesCache)
275        cache_mock3.GetMap().AndReturn(master_map)
276
277        self.mox.StubOutWithMock(cache_factory, 'Create')
278        cache_factory.Create(mox.IgnoreArg(), mox.IgnoreArg(),
279                             None).AndReturn(cache_mock3)
280        cache_factory.Create(
281            mox.IgnoreArg(), mox.IgnoreArg(),
282            automount_mountpoint='/auto').AndReturn(cache_mock2)
283        cache_factory.Create(
284            mox.IgnoreArg(), mox.IgnoreArg(),
285            automount_mountpoint='/home').AndReturn(cache_mock1)
286
287        self.mox.ReplayAll()
288
289        options = {'name': 'files', 'dir': self.workdir}
290        updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT,
291                                                     self.workdir, options)
292        updater.UpdateFromSource(source_mock)
293
294        self.assertEqual(map_entry1.location, '/etc/auto.home')
295        self.assertEqual(map_entry2.location, '/etc/auto.auto')
296
297    def testUpdateNoMaster(self):
298        """An update skips updating the master map, and approprate sub maps."""
299        source_entry1 = automount.AutomountMapEntry()
300        source_entry2 = automount.AutomountMapEntry()
301        source_entry1.key = '/home'
302        source_entry2.key = '/auto'
303        source_entry1.location = 'ou=auto.home,ou=automounts'
304        source_entry2.location = 'ou=auto.auto,ou=automounts'
305        source_master = automount.AutomountMap([source_entry1, source_entry2])
306
307        local_entry1 = automount.AutomountMapEntry()
308        local_entry2 = automount.AutomountMapEntry()
309        local_entry1.key = '/home'
310        local_entry2.key = '/auto'
311        local_entry1.location = '/etc/auto.home'
312        local_entry2.location = '/etc/auto.null'
313        local_master = automount.AutomountMap([local_entry1, local_entry2])
314        source_mock = self.mock()
315        invocation = source_mock.expects(pmock.at_least_once())
316        invocation._CalledUpdateCacheFromSource()
317        # we should get called inside the DummyUpdater, too.
318
319        # the auto.home cache
320        cache_mock1 = self.mock()
321        # GetMapLocation() is called, and set to the master map map_entry
322        invocation = cache_mock1.expects(pmock.at_least_once()).GetMapLocation()
323        invocation.will(pmock.return_value('/etc/auto.home'))
324        # we should get called inside the DummyUpdater
325        cache_mock1.expects(
326            pmock.at_least_once())._CalledUpdateCacheFromSource()
327
328        # the auto.auto cache
329        cache_mock2 = self.mock()
330        # GetMapLocation() is called, and set to the master map map_entry
331        invocation = cache_mock2.expects(pmock.at_least_once()).GetMapLocation()
332        invocation.will(pmock.return_value('/etc/auto.auto'))
333        invocation = cache_mock2.expects(
334            pmock.at_least_once())._CalledUpdateCacheFromSource()
335        # the auto.master cache, which should not be written to
336        cache_mock3 = self.mock()
337        invocation = cache_mock3.expects(pmock.once())
338        invocation = invocation.method('GetMap')
339        invocation.will(pmock.return_value(local_master))
340        invocation = cache_mock3.expects(pmock.once())
341        invocation = invocation.method('GetMap')
342        invocation.will(pmock.return_value(local_master))
343
344        cache_mocks = {
345            '/home': cache_mock1,
346            '/auto': cache_mock2,
347            None: cache_mock3
348        }
349
350        # Create needs to return our mock_caches
351        def DummyCreate(unused_cache_options,
352                        unused_map_name,
353                        automount_mountpoint=None):
354            # the order of the master_map iterable is not predictable, so we use the
355            # automount_mountpoint as the key to return the right one.
356            return cache_mocks[automount_mountpoint]
357
358        original_create = cache_factory.Create
359        cache_factory.Create = DummyCreate
360
361        skip = files_updater.FileAutomountUpdater.OPT_LOCAL_MASTER
362        options = {skip: 'yes', 'dir': self.workdir}
363        updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT,
364                                                     self.workdir, options)
365        updater.UpdateFromSource(source_mock)
366
367        cache_factory.Create = original_create
368
369    def testUpdateCatchesMissingMaster(self):
370        """Gracefully handle a missing local master map."""
371        # use an empty master map from the source, to avoid mocking out already
372        # tested code
373        source_mock = self.mock()
374
375        cache_mock = self.mock()
376        # raise error on GetMap()
377        invocation = cache_mock.expects(pmock.once()).GetMap()
378        invocation.will(pmock.raise_exception(error.CacheNotFound))
379
380        # Create needs to return our mock_cache
381        def DummyCreate(unused_cache_options,
382                        unused_map_name,
383                        automount_mountpoint=None):
384            # the order of the master_map iterable is not predictable, so we use the
385            # automount_mountpoint as the key to return the right one.
386            return cache_mock
387
388        original_create = cache_factory.Create
389        cache_factory.Create = DummyCreate
390
391        skip = files_updater.FileAutomountUpdater.OPT_LOCAL_MASTER
392        options = {skip: 'yes', 'dir': self.workdir}
393        updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT,
394                                                     self.workdir, options)
395
396        return_value = updater.UpdateFromSource(source_mock)
397
398        self.assertEqual(return_value, 1)
399
400        cache_factory.Create = original_create
401
402
403if __name__ == '__main__':
404    unittest.main()
405