1# Copyright 2007 Google Inc. 2# 3# This program is free software; you can redistribute it and/or 4# modify it under the terms of the GNU General Public License 5# as published by the Free Software Foundation; either version 2 6# of the License, or (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software Foundation, 15# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 16"""Unit tests for nss_cache/map_updater.py.""" 17 18__author__ = ('vasilios@google.com (V Hoffman)', 19 'jaq@google.com (Jamie Wilkinson)') 20 21import os 22import shutil 23import tempfile 24import unittest 25from mox3 import mox 26 27from nss_cache.caches import caches 28from nss_cache.caches import files 29from nss_cache.sources import source 30from nss_cache.caches import cache_factory 31from nss_cache import config 32from nss_cache import error 33from nss_cache.maps import automount 34from nss_cache.maps import passwd 35 36from nss_cache.update import map_updater 37 38 39class SingleMapUpdaterTest(mox.MoxTestBase): 40 """Unit tests for FileMapUpdater class.""" 41 42 def setUp(self): 43 super(SingleMapUpdaterTest, self).setUp() 44 self.workdir = tempfile.mkdtemp() 45 self.workdir2 = tempfile.mkdtemp() 46 47 def tearDown(self): 48 super(SingleMapUpdaterTest, self).tearDown() 49 shutil.rmtree(self.workdir) 50 shutil.rmtree(self.workdir2) 51 52 def testFullUpdate(self): 53 """A full update reads the source, writes to cache, and updates 54 times.""" 55 original_modify_stamp = 1 56 new_modify_stamp = 2 57 58 updater = map_updater.MapUpdater(config.MAP_PASSWORD, self.workdir, {}) 59 updater.WriteModifyTimestamp(original_modify_stamp) 60 61 map_entry = passwd.PasswdMapEntry({'name': 'foo', 'uid': 10, 'gid': 10}) 62 password_map = passwd.PasswdMap([map_entry]) 63 password_map.SetModifyTimestamp(new_modify_stamp) 64 65 cache_mock = self.mox.CreateMock(files.FilesCache) 66 cache_mock.WriteMap(map_data=password_map).AndReturn(0) 67 68 source_mock = self.mox.CreateMock(source.Source) 69 source_mock.GetMap(config.MAP_PASSWORD, 70 location=None).AndReturn(password_map) 71 72 self.mox.ReplayAll() 73 74 self.assertEqual( 75 0, 76 updater.UpdateCacheFromSource(cache_mock, source_mock, False, False, 77 None)) 78 self.assertEqual(updater.GetModifyTimestamp(), new_modify_stamp) 79 self.assertNotEqual(updater.GetUpdateTimestamp(), None) 80 81 def testIncrementalUpdate(self): 82 """An incremental update reads a partial map and merges it.""" 83 84 # Unlike in a full update, we create a cache map and a source map, and 85 # let it merge them. If it goes to write the merged map, we're good. 86 # Also check that timestamps were updated, as in testFullUpdate above. 87 88 def compare_function(map_object): 89 return len(map_object) == 2 90 91 original_modify_stamp = 1 92 new_modify_stamp = 2 93 updater = map_updater.MapUpdater(config.MAP_PASSWORD, 94 self.workdir, {}, 95 can_do_incremental=True) 96 updater.WriteModifyTimestamp(original_modify_stamp) 97 98 cache_map_entry = passwd.PasswdMapEntry({ 99 'name': 'bar', 100 'uid': 20, 101 'gid': 20 102 }) 103 cache_map = passwd.PasswdMap([cache_map_entry]) 104 cache_map.SetModifyTimestamp(original_modify_stamp) 105 106 cache_mock = self.mox.CreateMock(caches.Cache) 107 cache_mock.GetMap().AndReturn(cache_map) 108 cache_mock.WriteMap(map_data=mox.Func(compare_function)).AndReturn(0) 109 110 source_map_entry = passwd.PasswdMapEntry({ 111 'name': 'foo', 112 'uid': 10, 113 'gid': 10 114 }) 115 source_map = passwd.PasswdMap([source_map_entry]) 116 source_map.SetModifyTimestamp(new_modify_stamp) 117 118 source_mock = self.mox.CreateMock(source.Source) 119 source_mock.GetMap(config.MAP_PASSWORD, 120 location=None, 121 since=original_modify_stamp).AndReturn(source_map) 122 123 self.mox.ReplayAll() 124 125 self.assertEqual( 126 0, 127 updater.UpdateCacheFromSource(cache_mock, 128 source_mock, 129 incremental=True, 130 force_write=False, 131 location=None)) 132 self.assertEqual(updater.GetModifyTimestamp(), new_modify_stamp) 133 self.assertNotEqual(updater.GetUpdateTimestamp(), None) 134 135 def testFullUpdateOnMissingCache(self): 136 """We fault to a full update if our cache is missing.""" 137 138 original_modify_stamp = 1 139 updater = map_updater.MapUpdater(config.MAP_PASSWORD, self.workdir, {}) 140 updater.WriteModifyTimestamp(original_modify_stamp) 141 142 source_mock = self.mox.CreateMock(source.Source) 143 # Try incremental first. 144 source_mock.GetMap(config.MAP_PASSWORD, 145 location=None, 146 since=original_modify_stamp).AndReturn('first map') 147 # Try full second. 148 source_mock.GetMap(config.MAP_PASSWORD, 149 location=None).AndReturn('second map') 150 151 updater = map_updater.MapUpdater(config.MAP_PASSWORD, 152 self.workdir, {}, 153 can_do_incremental=True) 154 self.mox.StubOutWithMock(updater, 'GetModifyTimestamp') 155 updater.GetModifyTimestamp().AndReturn(original_modify_stamp) 156 self.mox.StubOutWithMock(updater, '_IncrementalUpdateFromMap') 157 # force a cache not found on incremental 158 updater._IncrementalUpdateFromMap('cache', 'first map').AndRaise( 159 error.CacheNotFound) 160 self.mox.StubOutWithMock(updater, 'FullUpdateFromMap') 161 updater.FullUpdateFromMap(mox.IgnoreArg(), 'second map', 162 False).AndReturn(0) 163 164 self.mox.ReplayAll() 165 166 self.assertEqual( 167 0, 168 updater.UpdateCacheFromSource('cache', 169 source_mock, 170 incremental=True, 171 force_write=False, 172 location=None)) 173 174 def testFullUpdateOnMissingTimestamp(self): 175 """We fault to a full update if our modify timestamp is missing.""" 176 177 updater = map_updater.MapUpdater(config.MAP_PASSWORD, self.workdir, {}) 178 # We do not call WriteModifyTimestamp() so we force a full sync. 179 180 source_mock = self.mox.CreateMock(source.Source) 181 source_mock.GetMap(config.MAP_PASSWORD, 182 location=None).AndReturn('second map') 183 updater = map_updater.MapUpdater(config.MAP_PASSWORD, self.workdir, {}) 184 self.mox.StubOutWithMock(updater, 'FullUpdateFromMap') 185 updater.FullUpdateFromMap(mox.IgnoreArg(), 'second map', 186 False).AndReturn(0) 187 188 self.mox.ReplayAll() 189 self.assertEqual( 190 0, 191 updater.UpdateCacheFromSource('cache', source_mock, True, False, 192 None)) 193 194 195class MapAutomountUpdaterTest(mox.MoxTestBase): 196 """Unit tests for AutomountUpdater class.""" 197 198 def setUp(self): 199 super(MapAutomountUpdaterTest, self).setUp() 200 self.workdir = tempfile.mkdtemp() 201 202 def tearDown(self): 203 super(MapAutomountUpdaterTest, self).tearDown() 204 os.rmdir(self.workdir) 205 206 def testInit(self): 207 """An automount object correctly sets map-specific attributes.""" 208 updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT, 209 self.workdir, {}) 210 self.assertEqual(updater.local_master, False) 211 212 conf = {map_updater.AutomountUpdater.OPT_LOCAL_MASTER: 'yes'} 213 updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT, 214 self.workdir, conf) 215 self.assertEqual(updater.local_master, True) 216 217 conf = {map_updater.AutomountUpdater.OPT_LOCAL_MASTER: 'no'} 218 updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT, 219 self.workdir, conf) 220 self.assertEqual(updater.local_master, False) 221 222 def testUpdate(self): 223 """An update gets a master map and updates each entry.""" 224 map_entry1 = automount.AutomountMapEntry() 225 map_entry2 = automount.AutomountMapEntry() 226 map_entry1.key = '/home' 227 map_entry2.key = '/auto' 228 map_entry1.location = 'ou=auto.home,ou=automounts' 229 map_entry2.location = 'ou=auto.auto,ou=automounts' 230 master_map = automount.AutomountMap([map_entry1, map_entry2]) 231 232 source_mock = self.mox.CreateMock(source.Source) 233 # return the master map 234 source_mock.GetAutomountMasterMap().AndReturn(master_map) 235 236 # the auto.home cache 237 cache_home = self.mox.CreateMock(caches.Cache) 238 # GetMapLocation() is called, and set to the master map map_entry 239 cache_home.GetMapLocation().AndReturn('/etc/auto.home') 240 241 # the auto.auto cache 242 cache_auto = self.mox.CreateMock(caches.Cache) 243 # GetMapLocation() is called, and set to the master map map_entry 244 cache_auto.GetMapLocation().AndReturn('/etc/auto.auto') 245 246 # the auto.master cache 247 cache_master = self.mox.CreateMock(caches.Cache) 248 249 self.mox.StubOutWithMock(cache_factory, 'Create') 250 cache_factory.Create(mox.IgnoreArg(), 251 'automount', 252 automount_mountpoint='/home').AndReturn(cache_home) 253 cache_factory.Create(mox.IgnoreArg(), 254 'automount', 255 automount_mountpoint='/auto').AndReturn(cache_auto) 256 cache_factory.Create(mox.IgnoreArg(), 257 'automount', 258 automount_mountpoint=None).AndReturn(cache_master) 259 260 updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT, 261 self.workdir, {}) 262 263 self.mox.StubOutClassWithMocks(map_updater, 'MapUpdater') 264 updater_home = map_updater.MapUpdater(config.MAP_AUTOMOUNT, 265 self.workdir, {}, 266 automount_mountpoint='/home') 267 updater_home.UpdateCacheFromSource( 268 cache_home, source_mock, True, False, 269 'ou=auto.home,ou=automounts').AndReturn(0) 270 updater_auto = map_updater.MapUpdater(config.MAP_AUTOMOUNT, 271 self.workdir, {}, 272 automount_mountpoint='/auto') 273 updater_auto.UpdateCacheFromSource( 274 cache_auto, source_mock, True, False, 275 'ou=auto.auto,ou=automounts').AndReturn(0) 276 updater_master = map_updater.MapUpdater(config.MAP_AUTOMOUNT, 277 self.workdir, {}) 278 updater_master.FullUpdateFromMap(cache_master, master_map).AndReturn(0) 279 280 self.mox.ReplayAll() 281 282 updater.UpdateFromSource(source_mock) 283 284 self.assertEqual(map_entry1.location, '/etc/auto.home') 285 self.assertEqual(map_entry2.location, '/etc/auto.auto') 286 287 def testUpdateNoMaster(self): 288 """An update skips updating the master map, and approprate sub maps.""" 289 source_entry1 = automount.AutomountMapEntry() 290 source_entry2 = automount.AutomountMapEntry() 291 source_entry1.key = '/home' 292 source_entry2.key = '/auto' 293 source_entry1.location = 'ou=auto.home,ou=automounts' 294 source_entry2.location = 'ou=auto.auto,ou=automounts' 295 source_master = automount.AutomountMap([source_entry1, source_entry2]) 296 297 local_entry1 = automount.AutomountMapEntry() 298 local_entry2 = automount.AutomountMapEntry() 299 local_entry1.key = '/home' 300 local_entry2.key = '/auto' 301 local_entry1.location = '/etc/auto.home' 302 local_entry2.location = '/etc/auto.null' 303 local_master = automount.AutomountMap([local_entry1, local_entry2]) 304 305 source_mock = self.mox.CreateMock(source.Source) 306 # return the source master map 307 source_mock.GetAutomountMasterMap().AndReturn(source_master) 308 309 # the auto.home cache 310 cache_home = self.mox.CreateMock(caches.Cache) 311 # GetMapLocation() is called, and set to the master map map_entry 312 cache_home.GetMapLocation().AndReturn('/etc/auto.home') 313 314 # the auto.auto cache 315 cache_auto = self.mox.CreateMock(caches.Cache) 316 # GetMapLocation() is called, and set to the master map map_entry 317 cache_auto.GetMapLocation().AndReturn('/etc/auto.auto') 318 319 # the auto.master cache, which should not be written to 320 cache_master = self.mox.CreateMock(caches.Cache) 321 cache_master.GetMap().AndReturn(local_master) 322 323 self.mox.StubOutWithMock(cache_factory, 'Create') 324 cache_factory.Create(mox.IgnoreArg(), 325 mox.IgnoreArg(), 326 automount_mountpoint=None).AndReturn(cache_master) 327 cache_factory.Create(mox.IgnoreArg(), 328 mox.IgnoreArg(), 329 automount_mountpoint='/home').AndReturn(cache_home) 330 cache_factory.Create(mox.IgnoreArg(), 331 mox.IgnoreArg(), 332 automount_mountpoint='/auto').AndReturn(cache_auto) 333 334 skip = map_updater.AutomountUpdater.OPT_LOCAL_MASTER 335 updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT, 336 self.workdir, {skip: 'yes'}) 337 338 self.mox.StubOutClassWithMocks(map_updater, 'MapUpdater') 339 updater_home = map_updater.MapUpdater(config.MAP_AUTOMOUNT, 340 self.workdir, 341 {'local_automount_master': 'yes'}, 342 automount_mountpoint='/home') 343 updater_home.UpdateCacheFromSource( 344 cache_home, source_mock, True, False, 345 'ou=auto.home,ou=automounts').AndReturn(0) 346 347 self.mox.ReplayAll() 348 349 updater.UpdateFromSource(source_mock) 350 351 352class AutomountUpdaterMoxTest(mox.MoxTestBase): 353 354 def setUp(self): 355 super(AutomountUpdaterMoxTest, self).setUp() 356 self.workdir = tempfile.mkdtemp() 357 358 def tearDown(self): 359 super(AutomountUpdaterMoxTest, self).tearDown() 360 shutil.rmtree(self.workdir) 361 362 def testUpdateCatchesMissingMaster(self): 363 """Gracefully handle a missing local master maps.""" 364 # use an empty master map from the source, to avoid mocking out already 365 # tested code 366 master_map = automount.AutomountMap() 367 368 source_mock = self.mox.CreateMockAnything() 369 source_mock.GetAutomountMasterMap().AndReturn(master_map) 370 371 cache_mock = self.mox.CreateMock(caches.Cache) 372 # raise error on GetMap() 373 cache_mock.GetMap().AndRaise(error.CacheNotFound) 374 375 skip = map_updater.AutomountUpdater.OPT_LOCAL_MASTER 376 cache_options = {skip: 'yes'} 377 378 self.mox.StubOutWithMock(cache_factory, 'Create') 379 cache_factory.Create(cache_options, 380 'automount', 381 automount_mountpoint=None).AndReturn(cache_mock) 382 383 self.mox.ReplayAll() 384 385 updater = map_updater.AutomountUpdater(config.MAP_AUTOMOUNT, 386 self.workdir, cache_options) 387 388 return_value = updater.UpdateFromSource(source_mock) 389 390 self.assertEqual(return_value, 1) 391 392 393if __name__ == '__main__': 394 unittest.main() 395