1# This Source Code Form is subject to the terms of the Mozilla Public 2# License, v. 2.0. If a copy of the MPL was not distributed with this 3# file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5from __future__ import absolute_import, print_function, unicode_literals 6 7import os 8 9import mozunit 10 11from mozpack.copier import ( 12 FileCopier, 13 FileRegistry, 14) 15from mozpack.manifests import ( 16 InstallManifest, 17 UnreadableInstallManifest, 18) 19from mozpack.test.test_files import TestWithTmpDir 20 21 22class TestInstallManifest(TestWithTmpDir): 23 def test_construct(self): 24 m = InstallManifest() 25 self.assertEqual(len(m), 0) 26 27 def test_malformed(self): 28 f = self.tmppath("manifest") 29 open(f, "wt").write("junk\n") 30 with self.assertRaises(UnreadableInstallManifest): 31 InstallManifest(f) 32 33 def test_adds(self): 34 m = InstallManifest() 35 m.add_link("s_source", "s_dest") 36 m.add_copy("c_source", "c_dest") 37 m.add_required_exists("e_dest") 38 m.add_optional_exists("o_dest") 39 m.add_pattern_link("ps_base", "ps/*", "ps_dest") 40 m.add_pattern_copy("pc_base", "pc/**", "pc_dest") 41 m.add_preprocess("p_source", "p_dest", "p_source.pp") 42 m.add_content("content", "content") 43 44 self.assertEqual(len(m), 8) 45 self.assertIn("s_dest", m) 46 self.assertIn("c_dest", m) 47 self.assertIn("p_dest", m) 48 self.assertIn("e_dest", m) 49 self.assertIn("o_dest", m) 50 self.assertIn("content", m) 51 52 with self.assertRaises(ValueError): 53 m.add_link("s_other", "s_dest") 54 55 with self.assertRaises(ValueError): 56 m.add_copy("c_other", "c_dest") 57 58 with self.assertRaises(ValueError): 59 m.add_preprocess("p_other", "p_dest", "p_other.pp") 60 61 with self.assertRaises(ValueError): 62 m.add_required_exists("e_dest") 63 64 with self.assertRaises(ValueError): 65 m.add_optional_exists("o_dest") 66 67 with self.assertRaises(ValueError): 68 m.add_pattern_link("ps_base", "ps/*", "ps_dest") 69 70 with self.assertRaises(ValueError): 71 m.add_pattern_copy("pc_base", "pc/**", "pc_dest") 72 73 with self.assertRaises(ValueError): 74 m.add_content("content", "content") 75 76 def _get_test_manifest(self): 77 m = InstallManifest() 78 m.add_link(self.tmppath("s_source"), "s_dest") 79 m.add_copy(self.tmppath("c_source"), "c_dest") 80 m.add_preprocess( 81 self.tmppath("p_source"), 82 "p_dest", 83 self.tmppath("p_source.pp"), 84 "#", 85 {"FOO": "BAR", "BAZ": "QUX"}, 86 ) 87 m.add_required_exists("e_dest") 88 m.add_optional_exists("o_dest") 89 m.add_pattern_link("ps_base", "*", "ps_dest") 90 m.add_pattern_copy("pc_base", "**", "pc_dest") 91 m.add_content("the content\non\nmultiple lines", "content") 92 93 return m 94 95 def test_serialization(self): 96 m = self._get_test_manifest() 97 98 p = self.tmppath("m") 99 m.write(path=p) 100 self.assertTrue(os.path.isfile(p)) 101 102 with open(p, "r") as fh: 103 c = fh.read() 104 105 self.assertEqual(c.count("\n"), 9) 106 107 lines = c.splitlines() 108 self.assertEqual(len(lines), 9) 109 110 self.assertEqual(lines[0], "5") 111 112 m2 = InstallManifest(path=p) 113 self.assertEqual(m, m2) 114 p2 = self.tmppath("m2") 115 m2.write(path=p2) 116 117 with open(p2, "r") as fh: 118 c2 = fh.read() 119 120 self.assertEqual(c, c2) 121 122 def test_populate_registry(self): 123 m = self._get_test_manifest() 124 r = FileRegistry() 125 m.populate_registry(r) 126 127 self.assertEqual(len(r), 6) 128 self.assertEqual( 129 r.paths(), ["c_dest", "content", "e_dest", "o_dest", "p_dest", "s_dest"] 130 ) 131 132 def test_pattern_expansion(self): 133 source = self.tmppath("source") 134 os.mkdir(source) 135 os.mkdir("%s/base" % source) 136 os.mkdir("%s/base/foo" % source) 137 138 with open("%s/base/foo/file1" % source, "a"): 139 pass 140 141 with open("%s/base/foo/file2" % source, "a"): 142 pass 143 144 m = InstallManifest() 145 m.add_pattern_link("%s/base" % source, "**", "dest") 146 147 c = FileCopier() 148 m.populate_registry(c) 149 self.assertEqual(c.paths(), ["dest/foo/file1", "dest/foo/file2"]) 150 151 def test_write_expand_pattern(self): 152 source = self.tmppath("source") 153 os.mkdir(source) 154 os.mkdir("%s/base" % source) 155 os.mkdir("%s/base/foo" % source) 156 157 with open("%s/base/foo/file1" % source, "a"): 158 pass 159 160 with open("%s/base/foo/file2" % source, "a"): 161 pass 162 163 m = InstallManifest() 164 m.add_pattern_link("%s/base" % source, "**", "dest") 165 166 track = self.tmppath("track") 167 m.write(path=track, expand_pattern=True) 168 169 m = InstallManifest(path=track) 170 self.assertEqual( 171 sorted(dest for dest in m._dests), ["dest/foo/file1", "dest/foo/file2"] 172 ) 173 174 def test_or(self): 175 m1 = self._get_test_manifest() 176 orig_length = len(m1) 177 m2 = InstallManifest() 178 m2.add_link("s_source2", "s_dest2") 179 m2.add_copy("c_source2", "c_dest2") 180 181 m1 |= m2 182 183 self.assertEqual(len(m2), 2) 184 self.assertEqual(len(m1), orig_length + 2) 185 186 self.assertIn("s_dest2", m1) 187 self.assertIn("c_dest2", m1) 188 189 def test_copier_application(self): 190 dest = self.tmppath("dest") 191 os.mkdir(dest) 192 193 to_delete = self.tmppath("dest/to_delete") 194 with open(to_delete, "a"): 195 pass 196 197 with open(self.tmppath("s_source"), "wt") as fh: 198 fh.write("symlink!") 199 200 with open(self.tmppath("c_source"), "wt") as fh: 201 fh.write("copy!") 202 203 with open(self.tmppath("p_source"), "wt") as fh: 204 fh.write("#define FOO 1\npreprocess!") 205 206 with open(self.tmppath("dest/e_dest"), "a"): 207 pass 208 209 with open(self.tmppath("dest/o_dest"), "a"): 210 pass 211 212 m = self._get_test_manifest() 213 c = FileCopier() 214 m.populate_registry(c) 215 result = c.copy(dest) 216 217 self.assertTrue(os.path.exists(self.tmppath("dest/s_dest"))) 218 self.assertTrue(os.path.exists(self.tmppath("dest/c_dest"))) 219 self.assertTrue(os.path.exists(self.tmppath("dest/p_dest"))) 220 self.assertTrue(os.path.exists(self.tmppath("dest/e_dest"))) 221 self.assertTrue(os.path.exists(self.tmppath("dest/o_dest"))) 222 self.assertTrue(os.path.exists(self.tmppath("dest/content"))) 223 self.assertFalse(os.path.exists(to_delete)) 224 225 with open(self.tmppath("dest/s_dest"), "rt") as fh: 226 self.assertEqual(fh.read(), "symlink!") 227 228 with open(self.tmppath("dest/c_dest"), "rt") as fh: 229 self.assertEqual(fh.read(), "copy!") 230 231 with open(self.tmppath("dest/p_dest"), "rt") as fh: 232 self.assertEqual(fh.read(), "preprocess!") 233 234 self.assertEqual( 235 result.updated_files, 236 set( 237 self.tmppath(p) 238 for p in ("dest/s_dest", "dest/c_dest", "dest/p_dest", "dest/content") 239 ), 240 ) 241 self.assertEqual( 242 result.existing_files, 243 set([self.tmppath("dest/e_dest"), self.tmppath("dest/o_dest")]), 244 ) 245 self.assertEqual(result.removed_files, {to_delete}) 246 self.assertEqual(result.removed_directories, set()) 247 248 def test_preprocessor(self): 249 manifest = self.tmppath("m") 250 deps = self.tmppath("m.pp") 251 dest = self.tmppath("dest") 252 include = self.tmppath("p_incl") 253 254 with open(include, "wt") as fh: 255 fh.write("#define INCL\n") 256 time = os.path.getmtime(include) - 3 257 os.utime(include, (time, time)) 258 259 with open(self.tmppath("p_source"), "wt") as fh: 260 fh.write("#ifdef FOO\n#if BAZ == QUX\nPASS1\n#endif\n#endif\n") 261 fh.write("#ifdef DEPTEST\nPASS2\n#endif\n") 262 fh.write("#include p_incl\n#ifdef INCLTEST\nPASS3\n#endif\n") 263 time = os.path.getmtime(self.tmppath("p_source")) - 3 264 os.utime(self.tmppath("p_source"), (time, time)) 265 266 # Create and write a manifest with the preprocessed file, then apply it. 267 # This should write out our preprocessed file. 268 m = InstallManifest() 269 m.add_preprocess( 270 self.tmppath("p_source"), "p_dest", deps, "#", {"FOO": "BAR", "BAZ": "QUX"} 271 ) 272 m.write(path=manifest) 273 274 m = InstallManifest(path=manifest) 275 c = FileCopier() 276 m.populate_registry(c) 277 c.copy(dest) 278 279 self.assertTrue(os.path.exists(self.tmppath("dest/p_dest"))) 280 281 with open(self.tmppath("dest/p_dest"), "rt") as fh: 282 self.assertEqual(fh.read(), "PASS1\n") 283 284 # Create a second manifest with the preprocessed file, then apply it. 285 # Since this manifest does not exist on the disk, there should not be a 286 # dependency on it, and the preprocessed file should not be modified. 287 m2 = InstallManifest() 288 m2.add_preprocess( 289 self.tmppath("p_source"), "p_dest", deps, "#", {"DEPTEST": True} 290 ) 291 c = FileCopier() 292 m2.populate_registry(c) 293 result = c.copy(dest) 294 295 self.assertFalse(self.tmppath("dest/p_dest") in result.updated_files) 296 self.assertTrue(self.tmppath("dest/p_dest") in result.existing_files) 297 298 # Write out the second manifest, then load it back in from the disk. 299 # This should add the dependency on the manifest file, so our 300 # preprocessed file should be regenerated with the new defines. 301 # We also set the mtime on the destination file back, so it will be 302 # older than the manifest file. 303 m2.write(path=manifest) 304 time = os.path.getmtime(manifest) - 1 305 os.utime(self.tmppath("dest/p_dest"), (time, time)) 306 m2 = InstallManifest(path=manifest) 307 c = FileCopier() 308 m2.populate_registry(c) 309 self.assertTrue(c.copy(dest)) 310 311 with open(self.tmppath("dest/p_dest"), "rt") as fh: 312 self.assertEqual(fh.read(), "PASS2\n") 313 314 # Set the time on the manifest back, so it won't be picked up as 315 # modified in the next test 316 time = os.path.getmtime(manifest) - 1 317 os.utime(manifest, (time, time)) 318 319 # Update the contents of a file included by the source file. This should 320 # cause the destination to be regenerated. 321 with open(include, "wt") as fh: 322 fh.write("#define INCLTEST\n") 323 324 time = os.path.getmtime(include) - 1 325 os.utime(self.tmppath("dest/p_dest"), (time, time)) 326 c = FileCopier() 327 m2.populate_registry(c) 328 self.assertTrue(c.copy(dest)) 329 330 with open(self.tmppath("dest/p_dest"), "rt") as fh: 331 self.assertEqual(fh.read(), "PASS2\nPASS3\n") 332 333 def test_preprocessor_dependencies(self): 334 manifest = self.tmppath("m") 335 deps = self.tmppath("m.pp") 336 dest = self.tmppath("dest") 337 source = self.tmppath("p_source") 338 destfile = self.tmppath("dest/p_dest") 339 include = self.tmppath("p_incl") 340 os.mkdir(dest) 341 342 with open(source, "wt") as fh: 343 fh.write("#define SRC\nSOURCE\n") 344 time = os.path.getmtime(source) - 3 345 os.utime(source, (time, time)) 346 347 with open(include, "wt") as fh: 348 fh.write("INCLUDE\n") 349 time = os.path.getmtime(source) - 3 350 os.utime(include, (time, time)) 351 352 # Create and write a manifest with the preprocessed file. 353 m = InstallManifest() 354 m.add_preprocess(source, "p_dest", deps, "#", {"FOO": "BAR", "BAZ": "QUX"}) 355 m.write(path=manifest) 356 357 time = os.path.getmtime(source) - 5 358 os.utime(manifest, (time, time)) 359 360 # Now read the manifest back in, and apply it. This should write out 361 # our preprocessed file. 362 m = InstallManifest(path=manifest) 363 c = FileCopier() 364 m.populate_registry(c) 365 self.assertTrue(c.copy(dest)) 366 367 with open(destfile, "rt") as fh: 368 self.assertEqual(fh.read(), "SOURCE\n") 369 370 # Next, modify the source to #INCLUDE another file. 371 with open(source, "wt") as fh: 372 fh.write("SOURCE\n#include p_incl\n") 373 time = os.path.getmtime(source) - 1 374 os.utime(destfile, (time, time)) 375 376 # Apply the manifest, and confirm that it also reads the newly included 377 # file. 378 m = InstallManifest(path=manifest) 379 c = FileCopier() 380 m.populate_registry(c) 381 c.copy(dest) 382 383 with open(destfile, "rt") as fh: 384 self.assertEqual(fh.read(), "SOURCE\nINCLUDE\n") 385 386 # Set the time on the source file back, so it won't be picked up as 387 # modified in the next test. 388 time = os.path.getmtime(source) - 1 389 os.utime(source, (time, time)) 390 391 # Now, modify the include file (but not the original source). 392 with open(include, "wt") as fh: 393 fh.write("INCLUDE MODIFIED\n") 394 time = os.path.getmtime(include) - 1 395 os.utime(destfile, (time, time)) 396 397 # Apply the manifest, and confirm that the change to the include file 398 # is detected. That should cause the preprocessor to run again. 399 m = InstallManifest(path=manifest) 400 c = FileCopier() 401 m.populate_registry(c) 402 c.copy(dest) 403 404 with open(destfile, "rt") as fh: 405 self.assertEqual(fh.read(), "SOURCE\nINCLUDE MODIFIED\n") 406 407 # ORing an InstallManifest should copy file dependencies 408 m = InstallManifest() 409 m |= InstallManifest(path=manifest) 410 c = FileCopier() 411 m.populate_registry(c) 412 e = c._files["p_dest"] 413 self.assertEqual(e.extra_depends, [manifest]) 414 415 def test_add_entries_from(self): 416 source = self.tmppath("source") 417 os.mkdir(source) 418 os.mkdir("%s/base" % source) 419 os.mkdir("%s/base/foo" % source) 420 421 with open("%s/base/foo/file1" % source, "a"): 422 pass 423 424 with open("%s/base/foo/file2" % source, "a"): 425 pass 426 427 m = InstallManifest() 428 m.add_pattern_link("%s/base" % source, "**", "dest") 429 430 p = InstallManifest() 431 p.add_entries_from(m) 432 self.assertEqual(len(p), 1) 433 434 c = FileCopier() 435 p.populate_registry(c) 436 self.assertEqual(c.paths(), ["dest/foo/file1", "dest/foo/file2"]) 437 438 q = InstallManifest() 439 q.add_entries_from(m, base="target") 440 self.assertEqual(len(q), 1) 441 442 d = FileCopier() 443 q.populate_registry(d) 444 self.assertEqual(d.paths(), ["target/dest/foo/file1", "target/dest/foo/file2"]) 445 446 # Some of the values in an InstallManifest include destination 447 # information that is present in the keys. Verify that we can 448 # round-trip serialization. 449 r = InstallManifest() 450 r.add_entries_from(m) 451 r.add_entries_from(m, base="target") 452 self.assertEqual(len(r), 2) 453 454 temp_path = self.tmppath("temp_path") 455 r.write(path=temp_path) 456 457 s = InstallManifest(path=temp_path) 458 e = FileCopier() 459 s.populate_registry(e) 460 461 self.assertEqual( 462 e.paths(), 463 [ 464 "dest/foo/file1", 465 "dest/foo/file2", 466 "target/dest/foo/file1", 467 "target/dest/foo/file2", 468 ], 469 ) 470 471 472if __name__ == "__main__": 473 mozunit.main() 474