1# This Source Code Form is subject to the terms of the Mozilla Public 2# License, v. 2.0. If a copy of the MPL was not distributed with this 3# file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5from __future__ import unicode_literals 6 7import os 8 9import mozunit 10 11from mozpack.copier import ( 12 FileCopier, 13 FileRegistry, 14) 15from mozpack.manifests import ( 16 InstallManifest, 17 UnreadableInstallManifest, 18) 19from mozpack.test.test_files import TestWithTmpDir 20 21 22class TestInstallManifest(TestWithTmpDir): 23 def test_construct(self): 24 m = InstallManifest() 25 self.assertEqual(len(m), 0) 26 27 def test_malformed(self): 28 f = self.tmppath('manifest') 29 open(f, 'wb').write('junk\n') 30 with self.assertRaises(UnreadableInstallManifest): 31 m = InstallManifest(f) 32 33 def test_adds(self): 34 m = InstallManifest() 35 m.add_symlink('s_source', 's_dest') 36 m.add_copy('c_source', 'c_dest') 37 m.add_required_exists('e_dest') 38 m.add_optional_exists('o_dest') 39 m.add_pattern_symlink('ps_base', 'ps/*', 'ps_dest') 40 m.add_pattern_copy('pc_base', 'pc/**', 'pc_dest') 41 m.add_preprocess('p_source', 'p_dest', 'p_source.pp') 42 m.add_content('content', 'content') 43 44 self.assertEqual(len(m), 8) 45 self.assertIn('s_dest', m) 46 self.assertIn('c_dest', m) 47 self.assertIn('p_dest', m) 48 self.assertIn('e_dest', m) 49 self.assertIn('o_dest', m) 50 self.assertIn('content', m) 51 52 with self.assertRaises(ValueError): 53 m.add_symlink('s_other', 's_dest') 54 55 with self.assertRaises(ValueError): 56 m.add_copy('c_other', 'c_dest') 57 58 with self.assertRaises(ValueError): 59 m.add_preprocess('p_other', 'p_dest', 'p_other.pp') 60 61 with self.assertRaises(ValueError): 62 m.add_required_exists('e_dest') 63 64 with self.assertRaises(ValueError): 65 m.add_optional_exists('o_dest') 66 67 with self.assertRaises(ValueError): 68 m.add_pattern_symlink('ps_base', 'ps/*', 'ps_dest') 69 70 with self.assertRaises(ValueError): 71 m.add_pattern_copy('pc_base', 'pc/**', 'pc_dest') 72 73 with self.assertRaises(ValueError): 74 m.add_content('content', 'content') 75 76 def _get_test_manifest(self): 77 m = InstallManifest() 78 m.add_symlink(self.tmppath('s_source'), 's_dest') 79 m.add_copy(self.tmppath('c_source'), 'c_dest') 80 m.add_preprocess(self.tmppath('p_source'), 'p_dest', self.tmppath('p_source.pp'), '#', {'FOO':'BAR', 'BAZ':'QUX'}) 81 m.add_required_exists('e_dest') 82 m.add_optional_exists('o_dest') 83 m.add_pattern_symlink('ps_base', '*', 'ps_dest') 84 m.add_pattern_copy('pc_base', '**', 'pc_dest') 85 m.add_content('the content\non\nmultiple lines', 'content') 86 87 return m 88 89 def test_serialization(self): 90 m = self._get_test_manifest() 91 92 p = self.tmppath('m') 93 m.write(path=p) 94 self.assertTrue(os.path.isfile(p)) 95 96 with open(p, 'rb') as fh: 97 c = fh.read() 98 99 self.assertEqual(c.count('\n'), 9) 100 101 lines = c.splitlines() 102 self.assertEqual(len(lines), 9) 103 104 self.assertEqual(lines[0], '5') 105 106 m2 = InstallManifest(path=p) 107 self.assertEqual(m, m2) 108 p2 = self.tmppath('m2') 109 m2.write(path=p2) 110 111 with open(p2, 'rb') as fh: 112 c2 = fh.read() 113 114 self.assertEqual(c, c2) 115 116 def test_populate_registry(self): 117 m = self._get_test_manifest() 118 r = FileRegistry() 119 m.populate_registry(r) 120 121 self.assertEqual(len(r), 6) 122 self.assertEqual(r.paths(), ['c_dest', 'content', 'e_dest', 'o_dest', 123 'p_dest', 's_dest']) 124 125 def test_pattern_expansion(self): 126 source = self.tmppath('source') 127 os.mkdir(source) 128 os.mkdir('%s/base' % source) 129 os.mkdir('%s/base/foo' % source) 130 131 with open('%s/base/foo/file1' % source, 'a'): 132 pass 133 134 with open('%s/base/foo/file2' % source, 'a'): 135 pass 136 137 m = InstallManifest() 138 m.add_pattern_symlink('%s/base' % source, '**', 'dest') 139 140 c = FileCopier() 141 m.populate_registry(c) 142 self.assertEqual(c.paths(), ['dest/foo/file1', 'dest/foo/file2']) 143 144 def test_or(self): 145 m1 = self._get_test_manifest() 146 orig_length = len(m1) 147 m2 = InstallManifest() 148 m2.add_symlink('s_source2', 's_dest2') 149 m2.add_copy('c_source2', 'c_dest2') 150 151 m1 |= m2 152 153 self.assertEqual(len(m2), 2) 154 self.assertEqual(len(m1), orig_length + 2) 155 156 self.assertIn('s_dest2', m1) 157 self.assertIn('c_dest2', m1) 158 159 def test_copier_application(self): 160 dest = self.tmppath('dest') 161 os.mkdir(dest) 162 163 to_delete = self.tmppath('dest/to_delete') 164 with open(to_delete, 'a'): 165 pass 166 167 with open(self.tmppath('s_source'), 'wt') as fh: 168 fh.write('symlink!') 169 170 with open(self.tmppath('c_source'), 'wt') as fh: 171 fh.write('copy!') 172 173 with open(self.tmppath('p_source'), 'wt') as fh: 174 fh.write('#define FOO 1\npreprocess!') 175 176 with open(self.tmppath('dest/e_dest'), 'a'): 177 pass 178 179 with open(self.tmppath('dest/o_dest'), 'a'): 180 pass 181 182 m = self._get_test_manifest() 183 c = FileCopier() 184 m.populate_registry(c) 185 result = c.copy(dest) 186 187 self.assertTrue(os.path.exists(self.tmppath('dest/s_dest'))) 188 self.assertTrue(os.path.exists(self.tmppath('dest/c_dest'))) 189 self.assertTrue(os.path.exists(self.tmppath('dest/p_dest'))) 190 self.assertTrue(os.path.exists(self.tmppath('dest/e_dest'))) 191 self.assertTrue(os.path.exists(self.tmppath('dest/o_dest'))) 192 self.assertTrue(os.path.exists(self.tmppath('dest/content'))) 193 self.assertFalse(os.path.exists(to_delete)) 194 195 with open(self.tmppath('dest/s_dest'), 'rt') as fh: 196 self.assertEqual(fh.read(), 'symlink!') 197 198 with open(self.tmppath('dest/c_dest'), 'rt') as fh: 199 self.assertEqual(fh.read(), 'copy!') 200 201 with open(self.tmppath('dest/p_dest'), 'rt') as fh: 202 self.assertEqual(fh.read(), 'preprocess!') 203 204 self.assertEqual(result.updated_files, set(self.tmppath(p) for p in ( 205 'dest/s_dest', 'dest/c_dest', 'dest/p_dest', 'dest/content'))) 206 self.assertEqual(result.existing_files, 207 set([self.tmppath('dest/e_dest'), self.tmppath('dest/o_dest')])) 208 self.assertEqual(result.removed_files, {to_delete}) 209 self.assertEqual(result.removed_directories, set()) 210 211 def test_preprocessor(self): 212 manifest = self.tmppath('m') 213 deps = self.tmppath('m.pp') 214 dest = self.tmppath('dest') 215 include = self.tmppath('p_incl') 216 217 with open(include, 'wt') as fh: 218 fh.write('#define INCL\n') 219 time = os.path.getmtime(include) - 3 220 os.utime(include, (time, time)) 221 222 with open(self.tmppath('p_source'), 'wt') as fh: 223 fh.write('#ifdef FOO\n#if BAZ == QUX\nPASS1\n#endif\n#endif\n') 224 fh.write('#ifdef DEPTEST\nPASS2\n#endif\n') 225 fh.write('#include p_incl\n#ifdef INCLTEST\nPASS3\n#endif\n') 226 time = os.path.getmtime(self.tmppath('p_source')) - 3 227 os.utime(self.tmppath('p_source'), (time, time)) 228 229 # Create and write a manifest with the preprocessed file, then apply it. 230 # This should write out our preprocessed file. 231 m = InstallManifest() 232 m.add_preprocess(self.tmppath('p_source'), 'p_dest', deps, '#', {'FOO':'BAR', 'BAZ':'QUX'}) 233 m.write(path=manifest) 234 235 m = InstallManifest(path=manifest) 236 c = FileCopier() 237 m.populate_registry(c) 238 c.copy(dest) 239 240 self.assertTrue(os.path.exists(self.tmppath('dest/p_dest'))) 241 242 with open(self.tmppath('dest/p_dest'), 'rt') as fh: 243 self.assertEqual(fh.read(), 'PASS1\n') 244 245 # Create a second manifest with the preprocessed file, then apply it. 246 # Since this manifest does not exist on the disk, there should not be a 247 # dependency on it, and the preprocessed file should not be modified. 248 m2 = InstallManifest() 249 m2.add_preprocess(self.tmppath('p_source'), 'p_dest', deps, '#', {'DEPTEST':True}) 250 c = FileCopier() 251 m2.populate_registry(c) 252 result = c.copy(dest) 253 254 self.assertFalse(self.tmppath('dest/p_dest') in result.updated_files) 255 self.assertTrue(self.tmppath('dest/p_dest') in result.existing_files) 256 257 # Write out the second manifest, then load it back in from the disk. 258 # This should add the dependency on the manifest file, so our 259 # preprocessed file should be regenerated with the new defines. 260 # We also set the mtime on the destination file back, so it will be 261 # older than the manifest file. 262 m2.write(path=manifest) 263 time = os.path.getmtime(manifest) - 1 264 os.utime(self.tmppath('dest/p_dest'), (time, time)) 265 m2 = InstallManifest(path=manifest) 266 c = FileCopier() 267 m2.populate_registry(c) 268 self.assertTrue(c.copy(dest)) 269 270 with open(self.tmppath('dest/p_dest'), 'rt') as fh: 271 self.assertEqual(fh.read(), 'PASS2\n') 272 273 # Set the time on the manifest back, so it won't be picked up as 274 # modified in the next test 275 time = os.path.getmtime(manifest) - 1 276 os.utime(manifest, (time, time)) 277 278 # Update the contents of a file included by the source file. This should 279 # cause the destination to be regenerated. 280 with open(include, 'wt') as fh: 281 fh.write('#define INCLTEST\n') 282 283 time = os.path.getmtime(include) - 1 284 os.utime(self.tmppath('dest/p_dest'), (time, time)) 285 c = FileCopier() 286 m2.populate_registry(c) 287 self.assertTrue(c.copy(dest)) 288 289 with open(self.tmppath('dest/p_dest'), 'rt') as fh: 290 self.assertEqual(fh.read(), 'PASS2\nPASS3\n') 291 292 def test_preprocessor_dependencies(self): 293 manifest = self.tmppath('m') 294 deps = self.tmppath('m.pp') 295 dest = self.tmppath('dest') 296 source = self.tmppath('p_source') 297 destfile = self.tmppath('dest/p_dest') 298 include = self.tmppath('p_incl') 299 os.mkdir(dest) 300 301 with open(source, 'wt') as fh: 302 fh.write('#define SRC\nSOURCE\n') 303 time = os.path.getmtime(source) - 3 304 os.utime(source, (time, time)) 305 306 with open(include, 'wt') as fh: 307 fh.write('INCLUDE\n') 308 time = os.path.getmtime(source) - 3 309 os.utime(include, (time, time)) 310 311 # Create and write a manifest with the preprocessed file. 312 m = InstallManifest() 313 m.add_preprocess(source, 'p_dest', deps, '#', {'FOO':'BAR', 'BAZ':'QUX'}) 314 m.write(path=manifest) 315 316 time = os.path.getmtime(source) - 5 317 os.utime(manifest, (time, time)) 318 319 # Now read the manifest back in, and apply it. This should write out 320 # our preprocessed file. 321 m = InstallManifest(path=manifest) 322 c = FileCopier() 323 m.populate_registry(c) 324 self.assertTrue(c.copy(dest)) 325 326 with open(destfile, 'rt') as fh: 327 self.assertEqual(fh.read(), 'SOURCE\n') 328 329 # Next, modify the source to #INCLUDE another file. 330 with open(source, 'wt') as fh: 331 fh.write('SOURCE\n#include p_incl\n') 332 time = os.path.getmtime(source) - 1 333 os.utime(destfile, (time, time)) 334 335 # Apply the manifest, and confirm that it also reads the newly included 336 # file. 337 m = InstallManifest(path=manifest) 338 c = FileCopier() 339 m.populate_registry(c) 340 c.copy(dest) 341 342 with open(destfile, 'rt') as fh: 343 self.assertEqual(fh.read(), 'SOURCE\nINCLUDE\n') 344 345 # Set the time on the source file back, so it won't be picked up as 346 # modified in the next test. 347 time = os.path.getmtime(source) - 1 348 os.utime(source, (time, time)) 349 350 # Now, modify the include file (but not the original source). 351 with open(include, 'wt') as fh: 352 fh.write('INCLUDE MODIFIED\n') 353 time = os.path.getmtime(include) - 1 354 os.utime(destfile, (time, time)) 355 356 # Apply the manifest, and confirm that the change to the include file 357 # is detected. That should cause the preprocessor to run again. 358 m = InstallManifest(path=manifest) 359 c = FileCopier() 360 m.populate_registry(c) 361 c.copy(dest) 362 363 with open(destfile, 'rt') as fh: 364 self.assertEqual(fh.read(), 'SOURCE\nINCLUDE MODIFIED\n') 365 366 # ORing an InstallManifest should copy file dependencies 367 m = InstallManifest() 368 m |= InstallManifest(path=manifest) 369 c = FileCopier() 370 m.populate_registry(c) 371 e = c._files['p_dest'] 372 self.assertEqual(e.extra_depends, [manifest]) 373 374if __name__ == '__main__': 375 mozunit.main() 376