1# Copyright 2010 Google Inc. 2# 3# This program is free software; you can redistribute it and/or 4# modify it under the terms of the GNU General Public License 5# as published by the Free Software Foundation; either version 2 6# of the License, or (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software Foundation, 15# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 16"""Unit tests for nss_cache/files_updater.py.""" 17 18__author__ = ('vasilios@google.com (V Hoffman)', 19 'jaq@google.com (Jamie Wilkinson)', 20 'blaedd@google.com (David MacKinnon)') 21 22import os 23import shutil 24import tempfile 25import time 26import unittest 27from mox3 import mox 28 29from nss_cache import config 30from nss_cache import error 31 32from nss_cache.caches import cache_factory 33from nss_cache.caches import files 34from nss_cache.maps import automount 35from nss_cache.maps import passwd 36from nss_cache.sources import source 37 38from nss_cache.update import files_updater 39 40 41class SingleFileUpdaterTest(mox.MoxTestBase): 42 """Unit tests for FileMapUpdater.""" 43 44 def setUp(self): 45 super(SingleFileUpdaterTest, self).setUp() 46 self.workdir = tempfile.mkdtemp() 47 self.workdir2 = tempfile.mkdtemp() 48 49 def tearDown(self): 50 super(SingleFileUpdaterTest, self).tearDown() 51 shutil.rmtree(self.workdir) 52 shutil.rmtree(self.workdir2) 53 54 @unittest.skip('timestamp isnt propagaged correctly') 55 def testFullUpdate(self): 56 original_modify_stamp = 1 57 new_modify_stamp = 2 58 59 # Construct a fake source. 60 def GetFile(map_name, dst_file, current_file, location): 61 print(("GetFile: %s" % dst_file)) 62 f = open(dst_file, 'w') 63 f.write('root:x:0:0:root:/root:/bin/bash\n') 64 f.close() 65 os.utime(dst_file, (1, 2)) 66 os.system("ls -al %s" % dst_file) 67 return dst_file 68 69 dst_file = mox.Value() 70 source_mock = self.mox.CreateMock(source.FileSource) 71 source_mock.GetFile(config.MAP_PASSWORD, 72 mox.Remember(dst_file), 73 current_file=mox.IgnoreArg(), 74 location=mox.IgnoreArg()).WithSideEffects( 75 GetFile).AndReturn(dst_file) 76 77 # Construct the cache. 78 cache = files.FilesPasswdMapHandler({'dir': self.workdir2}) 79 map_entry = passwd.PasswdMapEntry({'name': 'foo', 'uid': 10, 'gid': 10}) 80 password_map = passwd.PasswdMap() 81 password_map.SetModifyTimestamp(new_modify_stamp) 82 password_map.Add(map_entry) 83 cache.Write(password_map) 84 85 updater = files_updater.FileMapUpdater(config.MAP_PASSWORD, 86 self.workdir, { 87 'name': 'files', 88 'dir': self.workdir2 89 }) 90 updater.WriteModifyTimestamp(original_modify_stamp) 91 92 self.mox.ReplayAll() 93 94 self.assertEqual( 95 0, 96 updater.UpdateCacheFromSource(cache, 97 source_mock, 98 force_write=False, 99 location=None)) 100 101 self.assertEqual(new_modify_stamp, updater.GetModifyTimestamp()) 102 self.assertNotEqual(None, updater.GetUpdateTimestamp()) 103 104 @unittest.skip('source map empty during full update') 105 def testFullUpdateOnEmptyCache(self): 106 """A full update as above, but the initial cache is empty.""" 107 original_modify_stamp = 1 108 new_modify_stamp = 2 109 # Construct an updater 110 self.updater = files_updater.FileMapUpdater(config.MAP_PASSWORD, 111 self.workdir, { 112 'name': 'files', 113 'dir': self.workdir2 114 }) 115 self.updater.WriteModifyTimestamp(original_modify_stamp) 116 117 # Construct a cache 118 cache = files.FilesPasswdMapHandler({'dir': self.workdir2}) 119 120 def GetFileEffects(map_name, dst_file, current_file, location): 121 f = open(dst_file, 'w') 122 f.write('root:x:0:0:root:/root:/bin/bash\n') 123 f.close() 124 os.utime(dst_file, (1, 2)) 125 return dst_file 126 127 source_mock = self.mox.CreateMock(source.FileSource) 128 source_mock.GetFile(config.MAP_PASSWORD, 129 mox.IgnoreArg(), 130 mox.IgnoreArg(), 131 location=None).WithSideEffects(GetFileEffects) 132 133 #source_mock = MockSource() 134 self.assertEqual( 135 0, 136 self.updater.UpdateCacheFromSource(cache, 137 source_mock, 138 force_write=False, 139 location=None)) 140 self.assertEqual(new_modify_stamp, self.updater.GetModifyTimestamp()) 141 self.assertNotEqual(None, self.updater.GetUpdateTimestamp()) 142 143 def testFullUpdateOnEmptySource(self): 144 """A full update as above, but instead, the initial source is empty.""" 145 original_modify_stamp = 1 146 new_modify_stamp = 2 147 # Construct an updater 148 self.updater = files_updater.FileMapUpdater(config.MAP_PASSWORD, 149 self.workdir, { 150 'name': 'files', 151 'dir': self.workdir2 152 }) 153 self.updater.WriteModifyTimestamp(original_modify_stamp) 154 155 # Construct a cache 156 cache = files.FilesPasswdMapHandler({'dir': self.workdir2}) 157 map_entry = passwd.PasswdMapEntry({'name': 'foo', 'uid': 10, 'gid': 10}) 158 password_map = passwd.PasswdMap() 159 password_map.SetModifyTimestamp(new_modify_stamp) 160 password_map.Add(map_entry) 161 cache.Write(password_map) 162 163 source_mock = self.mox.CreateMock(source.FileSource) 164 source_mock.GetFile(config.MAP_PASSWORD, 165 mox.IgnoreArg(), 166 current_file=mox.IgnoreArg(), 167 location=None).AndReturn(None) 168 self.mox.ReplayAll() 169 self.assertRaises(error.EmptyMap, 170 self.updater.UpdateCacheFromSource, 171 cache, 172 source_mock, 173 force_write=False, 174 location=None) 175 self.assertNotEqual(new_modify_stamp, self.updater.GetModifyTimestamp()) 176 self.assertEqual(None, self.updater.GetUpdateTimestamp()) 177 178 @unittest.skip('disabled') 179 def testFullUpdateOnEmptySourceForceWrite(self): 180 """A full update as above, but instead, the initial source is empty.""" 181 original_modify_stamp = time.gmtime(1) 182 new_modify_stamp = time.gmtime(2) 183 # Construct an updater 184 self.updater = files_updater.FileMapUpdater(config.MAP_PASSWORD, 185 self.workdir, { 186 'name': 'files', 187 'dir': self.workdir2 188 }) 189 self.updater.WriteModifyTimestamp(original_modify_stamp) 190 191 # Construct a cache 192 cache = files.FilesPasswdMapHandler({'dir': self.workdir2}) 193 map_entry = passwd.PasswdMapEntry({'name': 'foo', 'uid': 10, 'gid': 10}) 194 password_map = passwd.PasswdMap() 195 password_map.SetModifyTimestamp(new_modify_stamp) 196 password_map.Add(map_entry) 197 cache.Write(password_map) 198 199 class MockSource(pmock.Mock): 200 201 def GetFile(self, map_name, dst_file, current_file, location=None): 202 assert location is None 203 assert map_name == config.MAP_PASSWORD 204 f = open(dst_file, 'w') 205 f.write('') 206 f.close() 207 os.utime(dst_file, (1, 2)) 208 return dst_file 209 210 source_mock = MockSource() 211 self.assertEqual( 212 0, 213 self.updater.UpdateCacheFromSource(cache, 214 source_mock, 215 force_write=True, 216 location=None)) 217 self.assertEqual(new_modify_stamp, self.updater.GetModifyTimestamp()) 218 self.assertNotEqual(None, self.updater.GetUpdateTimestamp()) 219 220 221@unittest.skip('disabled') 222class AutomountUpdaterTest(mox.MoxTestBase): 223 """Unit tests for FileAutomountUpdater class.""" 224 225 def setUp(self): 226 super(AutomountUpdaterTest, self).setUp() 227 self.workdir = tempfile.mkdtemp() 228 229 def tearDown(self): 230 shutil.rmtree(self.workdir) 231 super(AutomountUpdaterTest, self).tearDown() 232 233 def testInit(self): 234 """An automount object correctly sets map-specific attributes.""" 235 updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT, 236 self.workdir, {}) 237 self.assertEqual(updater.local_master, False) 238 239 conf = {files_updater.FileAutomountUpdater.OPT_LOCAL_MASTER: 'yes'} 240 updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT, 241 self.workdir, conf) 242 self.assertEqual(updater.local_master, True) 243 244 conf = {files_updater.FileAutomountUpdater.OPT_LOCAL_MASTER: 'no'} 245 updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT, 246 self.workdir, conf) 247 self.assertEqual(updater.local_master, False) 248 249 def testUpdate(self): 250 """An update gets a master map and updates each entry.""" 251 map_entry1 = automount.AutomountMapEntry() 252 map_entry2 = automount.AutomountMapEntry() 253 map_entry1.key = '/home' 254 map_entry2.key = '/auto' 255 map_entry1.location = 'ou=auto.home,ou=automounts' 256 map_entry2.location = 'ou=auto.auto,ou=automounts' 257 master_map = automount.AutomountMap([map_entry1, map_entry2]) 258 259 source_mock = self.mox.CreateMock(zsyncsource.ZSyncSource) 260 source_mock.GetAutomountMasterFile( 261 mox.IgnoreArg()).AndReturn(master_map) 262 263 # the auto.home cache 264 cache_mock1 = self.mox.CreateMock(files.FilesCache) 265 cache_mock1.GetCacheFilename().AndReturn(None) 266 cache_mock1.GetMapLocation().AndReturn('/etc/auto.home') 267 268 # the auto.auto cache 269 cache_mock2 = self.mox.CreateMock(files.FilesCache) 270 cache_mock2.GetMapLocation().AndReturn('/etc/auto.auto') 271 cache_mock2.GetCacheFilename().AndReturn(None) 272 273 # the auto.master cache 274 cache_mock3 = self.mox.CreateMock(files.FilesCache) 275 cache_mock3.GetMap().AndReturn(master_map) 276 277 self.mox.StubOutWithMock(cache_factory, 'Create') 278 cache_factory.Create(mox.IgnoreArg(), mox.IgnoreArg(), 279 None).AndReturn(cache_mock3) 280 cache_factory.Create( 281 mox.IgnoreArg(), mox.IgnoreArg(), 282 automount_mountpoint='/auto').AndReturn(cache_mock2) 283 cache_factory.Create( 284 mox.IgnoreArg(), mox.IgnoreArg(), 285 automount_mountpoint='/home').AndReturn(cache_mock1) 286 287 self.mox.ReplayAll() 288 289 options = {'name': 'files', 'dir': self.workdir} 290 updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT, 291 self.workdir, options) 292 updater.UpdateFromSource(source_mock) 293 294 self.assertEqual(map_entry1.location, '/etc/auto.home') 295 self.assertEqual(map_entry2.location, '/etc/auto.auto') 296 297 def testUpdateNoMaster(self): 298 """An update skips updating the master map, and approprate sub maps.""" 299 source_entry1 = automount.AutomountMapEntry() 300 source_entry2 = automount.AutomountMapEntry() 301 source_entry1.key = '/home' 302 source_entry2.key = '/auto' 303 source_entry1.location = 'ou=auto.home,ou=automounts' 304 source_entry2.location = 'ou=auto.auto,ou=automounts' 305 source_master = automount.AutomountMap([source_entry1, source_entry2]) 306 307 local_entry1 = automount.AutomountMapEntry() 308 local_entry2 = automount.AutomountMapEntry() 309 local_entry1.key = '/home' 310 local_entry2.key = '/auto' 311 local_entry1.location = '/etc/auto.home' 312 local_entry2.location = '/etc/auto.null' 313 local_master = automount.AutomountMap([local_entry1, local_entry2]) 314 source_mock = self.mock() 315 invocation = source_mock.expects(pmock.at_least_once()) 316 invocation._CalledUpdateCacheFromSource() 317 # we should get called inside the DummyUpdater, too. 318 319 # the auto.home cache 320 cache_mock1 = self.mock() 321 # GetMapLocation() is called, and set to the master map map_entry 322 invocation = cache_mock1.expects(pmock.at_least_once()).GetMapLocation() 323 invocation.will(pmock.return_value('/etc/auto.home')) 324 # we should get called inside the DummyUpdater 325 cache_mock1.expects( 326 pmock.at_least_once())._CalledUpdateCacheFromSource() 327 328 # the auto.auto cache 329 cache_mock2 = self.mock() 330 # GetMapLocation() is called, and set to the master map map_entry 331 invocation = cache_mock2.expects(pmock.at_least_once()).GetMapLocation() 332 invocation.will(pmock.return_value('/etc/auto.auto')) 333 invocation = cache_mock2.expects( 334 pmock.at_least_once())._CalledUpdateCacheFromSource() 335 # the auto.master cache, which should not be written to 336 cache_mock3 = self.mock() 337 invocation = cache_mock3.expects(pmock.once()) 338 invocation = invocation.method('GetMap') 339 invocation.will(pmock.return_value(local_master)) 340 invocation = cache_mock3.expects(pmock.once()) 341 invocation = invocation.method('GetMap') 342 invocation.will(pmock.return_value(local_master)) 343 344 cache_mocks = { 345 '/home': cache_mock1, 346 '/auto': cache_mock2, 347 None: cache_mock3 348 } 349 350 # Create needs to return our mock_caches 351 def DummyCreate(unused_cache_options, 352 unused_map_name, 353 automount_mountpoint=None): 354 # the order of the master_map iterable is not predictable, so we use the 355 # automount_mountpoint as the key to return the right one. 356 return cache_mocks[automount_mountpoint] 357 358 original_create = cache_factory.Create 359 cache_factory.Create = DummyCreate 360 361 skip = files_updater.FileAutomountUpdater.OPT_LOCAL_MASTER 362 options = {skip: 'yes', 'dir': self.workdir} 363 updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT, 364 self.workdir, options) 365 updater.UpdateFromSource(source_mock) 366 367 cache_factory.Create = original_create 368 369 def testUpdateCatchesMissingMaster(self): 370 """Gracefully handle a missing local master map.""" 371 # use an empty master map from the source, to avoid mocking out already 372 # tested code 373 source_mock = self.mock() 374 375 cache_mock = self.mock() 376 # raise error on GetMap() 377 invocation = cache_mock.expects(pmock.once()).GetMap() 378 invocation.will(pmock.raise_exception(error.CacheNotFound)) 379 380 # Create needs to return our mock_cache 381 def DummyCreate(unused_cache_options, 382 unused_map_name, 383 automount_mountpoint=None): 384 # the order of the master_map iterable is not predictable, so we use the 385 # automount_mountpoint as the key to return the right one. 386 return cache_mock 387 388 original_create = cache_factory.Create 389 cache_factory.Create = DummyCreate 390 391 skip = files_updater.FileAutomountUpdater.OPT_LOCAL_MASTER 392 options = {skip: 'yes', 'dir': self.workdir} 393 updater = files_updater.FileAutomountUpdater(config.MAP_AUTOMOUNT, 394 self.workdir, options) 395 396 return_value = updater.UpdateFromSource(source_mock) 397 398 self.assertEqual(return_value, 1) 399 400 cache_factory.Create = original_create 401 402 403if __name__ == '__main__': 404 unittest.main() 405