1# This Source Code Form is subject to the terms of the Mozilla Public
2# License, v. 2.0. If a copy of the MPL was not distributed with this
3# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5from __future__ import absolute_import, print_function, unicode_literals
6
7import os
8
9import mozunit
10
11from mozpack.copier import (
12    FileCopier,
13    FileRegistry,
14)
15from mozpack.manifests import (
16    InstallManifest,
17    UnreadableInstallManifest,
18)
19from mozpack.test.test_files import TestWithTmpDir
20
21
22class TestInstallManifest(TestWithTmpDir):
23    def test_construct(self):
24        m = InstallManifest()
25        self.assertEqual(len(m), 0)
26
27    def test_malformed(self):
28        f = self.tmppath("manifest")
29        open(f, "wt").write("junk\n")
30        with self.assertRaises(UnreadableInstallManifest):
31            InstallManifest(f)
32
33    def test_adds(self):
34        m = InstallManifest()
35        m.add_link("s_source", "s_dest")
36        m.add_copy("c_source", "c_dest")
37        m.add_required_exists("e_dest")
38        m.add_optional_exists("o_dest")
39        m.add_pattern_link("ps_base", "ps/*", "ps_dest")
40        m.add_pattern_copy("pc_base", "pc/**", "pc_dest")
41        m.add_preprocess("p_source", "p_dest", "p_source.pp")
42        m.add_content("content", "content")
43
44        self.assertEqual(len(m), 8)
45        self.assertIn("s_dest", m)
46        self.assertIn("c_dest", m)
47        self.assertIn("p_dest", m)
48        self.assertIn("e_dest", m)
49        self.assertIn("o_dest", m)
50        self.assertIn("content", m)
51
52        with self.assertRaises(ValueError):
53            m.add_link("s_other", "s_dest")
54
55        with self.assertRaises(ValueError):
56            m.add_copy("c_other", "c_dest")
57
58        with self.assertRaises(ValueError):
59            m.add_preprocess("p_other", "p_dest", "p_other.pp")
60
61        with self.assertRaises(ValueError):
62            m.add_required_exists("e_dest")
63
64        with self.assertRaises(ValueError):
65            m.add_optional_exists("o_dest")
66
67        with self.assertRaises(ValueError):
68            m.add_pattern_link("ps_base", "ps/*", "ps_dest")
69
70        with self.assertRaises(ValueError):
71            m.add_pattern_copy("pc_base", "pc/**", "pc_dest")
72
73        with self.assertRaises(ValueError):
74            m.add_content("content", "content")
75
76    def _get_test_manifest(self):
77        m = InstallManifest()
78        m.add_link(self.tmppath("s_source"), "s_dest")
79        m.add_copy(self.tmppath("c_source"), "c_dest")
80        m.add_preprocess(
81            self.tmppath("p_source"),
82            "p_dest",
83            self.tmppath("p_source.pp"),
84            "#",
85            {"FOO": "BAR", "BAZ": "QUX"},
86        )
87        m.add_required_exists("e_dest")
88        m.add_optional_exists("o_dest")
89        m.add_pattern_link("ps_base", "*", "ps_dest")
90        m.add_pattern_copy("pc_base", "**", "pc_dest")
91        m.add_content("the content\non\nmultiple lines", "content")
92
93        return m
94
95    def test_serialization(self):
96        m = self._get_test_manifest()
97
98        p = self.tmppath("m")
99        m.write(path=p)
100        self.assertTrue(os.path.isfile(p))
101
102        with open(p, "r") as fh:
103            c = fh.read()
104
105        self.assertEqual(c.count("\n"), 9)
106
107        lines = c.splitlines()
108        self.assertEqual(len(lines), 9)
109
110        self.assertEqual(lines[0], "5")
111
112        m2 = InstallManifest(path=p)
113        self.assertEqual(m, m2)
114        p2 = self.tmppath("m2")
115        m2.write(path=p2)
116
117        with open(p2, "r") as fh:
118            c2 = fh.read()
119
120        self.assertEqual(c, c2)
121
122    def test_populate_registry(self):
123        m = self._get_test_manifest()
124        r = FileRegistry()
125        m.populate_registry(r)
126
127        self.assertEqual(len(r), 6)
128        self.assertEqual(
129            r.paths(), ["c_dest", "content", "e_dest", "o_dest", "p_dest", "s_dest"]
130        )
131
132    def test_pattern_expansion(self):
133        source = self.tmppath("source")
134        os.mkdir(source)
135        os.mkdir("%s/base" % source)
136        os.mkdir("%s/base/foo" % source)
137
138        with open("%s/base/foo/file1" % source, "a"):
139            pass
140
141        with open("%s/base/foo/file2" % source, "a"):
142            pass
143
144        m = InstallManifest()
145        m.add_pattern_link("%s/base" % source, "**", "dest")
146
147        c = FileCopier()
148        m.populate_registry(c)
149        self.assertEqual(c.paths(), ["dest/foo/file1", "dest/foo/file2"])
150
151    def test_write_expand_pattern(self):
152        source = self.tmppath("source")
153        os.mkdir(source)
154        os.mkdir("%s/base" % source)
155        os.mkdir("%s/base/foo" % source)
156
157        with open("%s/base/foo/file1" % source, "a"):
158            pass
159
160        with open("%s/base/foo/file2" % source, "a"):
161            pass
162
163        m = InstallManifest()
164        m.add_pattern_link("%s/base" % source, "**", "dest")
165
166        track = self.tmppath("track")
167        m.write(path=track, expand_pattern=True)
168
169        m = InstallManifest(path=track)
170        self.assertEqual(
171            sorted(dest for dest in m._dests), ["dest/foo/file1", "dest/foo/file2"]
172        )
173
174    def test_or(self):
175        m1 = self._get_test_manifest()
176        orig_length = len(m1)
177        m2 = InstallManifest()
178        m2.add_link("s_source2", "s_dest2")
179        m2.add_copy("c_source2", "c_dest2")
180
181        m1 |= m2
182
183        self.assertEqual(len(m2), 2)
184        self.assertEqual(len(m1), orig_length + 2)
185
186        self.assertIn("s_dest2", m1)
187        self.assertIn("c_dest2", m1)
188
189    def test_copier_application(self):
190        dest = self.tmppath("dest")
191        os.mkdir(dest)
192
193        to_delete = self.tmppath("dest/to_delete")
194        with open(to_delete, "a"):
195            pass
196
197        with open(self.tmppath("s_source"), "wt") as fh:
198            fh.write("symlink!")
199
200        with open(self.tmppath("c_source"), "wt") as fh:
201            fh.write("copy!")
202
203        with open(self.tmppath("p_source"), "wt") as fh:
204            fh.write("#define FOO 1\npreprocess!")
205
206        with open(self.tmppath("dest/e_dest"), "a"):
207            pass
208
209        with open(self.tmppath("dest/o_dest"), "a"):
210            pass
211
212        m = self._get_test_manifest()
213        c = FileCopier()
214        m.populate_registry(c)
215        result = c.copy(dest)
216
217        self.assertTrue(os.path.exists(self.tmppath("dest/s_dest")))
218        self.assertTrue(os.path.exists(self.tmppath("dest/c_dest")))
219        self.assertTrue(os.path.exists(self.tmppath("dest/p_dest")))
220        self.assertTrue(os.path.exists(self.tmppath("dest/e_dest")))
221        self.assertTrue(os.path.exists(self.tmppath("dest/o_dest")))
222        self.assertTrue(os.path.exists(self.tmppath("dest/content")))
223        self.assertFalse(os.path.exists(to_delete))
224
225        with open(self.tmppath("dest/s_dest"), "rt") as fh:
226            self.assertEqual(fh.read(), "symlink!")
227
228        with open(self.tmppath("dest/c_dest"), "rt") as fh:
229            self.assertEqual(fh.read(), "copy!")
230
231        with open(self.tmppath("dest/p_dest"), "rt") as fh:
232            self.assertEqual(fh.read(), "preprocess!")
233
234        self.assertEqual(
235            result.updated_files,
236            set(
237                self.tmppath(p)
238                for p in ("dest/s_dest", "dest/c_dest", "dest/p_dest", "dest/content")
239            ),
240        )
241        self.assertEqual(
242            result.existing_files,
243            set([self.tmppath("dest/e_dest"), self.tmppath("dest/o_dest")]),
244        )
245        self.assertEqual(result.removed_files, {to_delete})
246        self.assertEqual(result.removed_directories, set())
247
248    def test_preprocessor(self):
249        manifest = self.tmppath("m")
250        deps = self.tmppath("m.pp")
251        dest = self.tmppath("dest")
252        include = self.tmppath("p_incl")
253
254        with open(include, "wt") as fh:
255            fh.write("#define INCL\n")
256        time = os.path.getmtime(include) - 3
257        os.utime(include, (time, time))
258
259        with open(self.tmppath("p_source"), "wt") as fh:
260            fh.write("#ifdef FOO\n#if BAZ == QUX\nPASS1\n#endif\n#endif\n")
261            fh.write("#ifdef DEPTEST\nPASS2\n#endif\n")
262            fh.write("#include p_incl\n#ifdef INCLTEST\nPASS3\n#endif\n")
263        time = os.path.getmtime(self.tmppath("p_source")) - 3
264        os.utime(self.tmppath("p_source"), (time, time))
265
266        # Create and write a manifest with the preprocessed file, then apply it.
267        # This should write out our preprocessed file.
268        m = InstallManifest()
269        m.add_preprocess(
270            self.tmppath("p_source"), "p_dest", deps, "#", {"FOO": "BAR", "BAZ": "QUX"}
271        )
272        m.write(path=manifest)
273
274        m = InstallManifest(path=manifest)
275        c = FileCopier()
276        m.populate_registry(c)
277        c.copy(dest)
278
279        self.assertTrue(os.path.exists(self.tmppath("dest/p_dest")))
280
281        with open(self.tmppath("dest/p_dest"), "rt") as fh:
282            self.assertEqual(fh.read(), "PASS1\n")
283
284        # Create a second manifest with the preprocessed file, then apply it.
285        # Since this manifest does not exist on the disk, there should not be a
286        # dependency on it, and the preprocessed file should not be modified.
287        m2 = InstallManifest()
288        m2.add_preprocess(
289            self.tmppath("p_source"), "p_dest", deps, "#", {"DEPTEST": True}
290        )
291        c = FileCopier()
292        m2.populate_registry(c)
293        result = c.copy(dest)
294
295        self.assertFalse(self.tmppath("dest/p_dest") in result.updated_files)
296        self.assertTrue(self.tmppath("dest/p_dest") in result.existing_files)
297
298        # Write out the second manifest, then load it back in from the disk.
299        # This should add the dependency on the manifest file, so our
300        # preprocessed file should be regenerated with the new defines.
301        # We also set the mtime on the destination file back, so it will be
302        # older than the manifest file.
303        m2.write(path=manifest)
304        time = os.path.getmtime(manifest) - 1
305        os.utime(self.tmppath("dest/p_dest"), (time, time))
306        m2 = InstallManifest(path=manifest)
307        c = FileCopier()
308        m2.populate_registry(c)
309        self.assertTrue(c.copy(dest))
310
311        with open(self.tmppath("dest/p_dest"), "rt") as fh:
312            self.assertEqual(fh.read(), "PASS2\n")
313
314        # Set the time on the manifest back, so it won't be picked up as
315        # modified in the next test
316        time = os.path.getmtime(manifest) - 1
317        os.utime(manifest, (time, time))
318
319        # Update the contents of a file included by the source file. This should
320        # cause the destination to be regenerated.
321        with open(include, "wt") as fh:
322            fh.write("#define INCLTEST\n")
323
324        time = os.path.getmtime(include) - 1
325        os.utime(self.tmppath("dest/p_dest"), (time, time))
326        c = FileCopier()
327        m2.populate_registry(c)
328        self.assertTrue(c.copy(dest))
329
330        with open(self.tmppath("dest/p_dest"), "rt") as fh:
331            self.assertEqual(fh.read(), "PASS2\nPASS3\n")
332
333    def test_preprocessor_dependencies(self):
334        manifest = self.tmppath("m")
335        deps = self.tmppath("m.pp")
336        dest = self.tmppath("dest")
337        source = self.tmppath("p_source")
338        destfile = self.tmppath("dest/p_dest")
339        include = self.tmppath("p_incl")
340        os.mkdir(dest)
341
342        with open(source, "wt") as fh:
343            fh.write("#define SRC\nSOURCE\n")
344        time = os.path.getmtime(source) - 3
345        os.utime(source, (time, time))
346
347        with open(include, "wt") as fh:
348            fh.write("INCLUDE\n")
349        time = os.path.getmtime(source) - 3
350        os.utime(include, (time, time))
351
352        # Create and write a manifest with the preprocessed file.
353        m = InstallManifest()
354        m.add_preprocess(source, "p_dest", deps, "#", {"FOO": "BAR", "BAZ": "QUX"})
355        m.write(path=manifest)
356
357        time = os.path.getmtime(source) - 5
358        os.utime(manifest, (time, time))
359
360        # Now read the manifest back in, and apply it. This should write out
361        # our preprocessed file.
362        m = InstallManifest(path=manifest)
363        c = FileCopier()
364        m.populate_registry(c)
365        self.assertTrue(c.copy(dest))
366
367        with open(destfile, "rt") as fh:
368            self.assertEqual(fh.read(), "SOURCE\n")
369
370        # Next, modify the source to #INCLUDE another file.
371        with open(source, "wt") as fh:
372            fh.write("SOURCE\n#include p_incl\n")
373        time = os.path.getmtime(source) - 1
374        os.utime(destfile, (time, time))
375
376        # Apply the manifest, and confirm that it also reads the newly included
377        # file.
378        m = InstallManifest(path=manifest)
379        c = FileCopier()
380        m.populate_registry(c)
381        c.copy(dest)
382
383        with open(destfile, "rt") as fh:
384            self.assertEqual(fh.read(), "SOURCE\nINCLUDE\n")
385
386        # Set the time on the source file back, so it won't be picked up as
387        # modified in the next test.
388        time = os.path.getmtime(source) - 1
389        os.utime(source, (time, time))
390
391        # Now, modify the include file (but not the original source).
392        with open(include, "wt") as fh:
393            fh.write("INCLUDE MODIFIED\n")
394        time = os.path.getmtime(include) - 1
395        os.utime(destfile, (time, time))
396
397        # Apply the manifest, and confirm that the change to the include file
398        # is detected. That should cause the preprocessor to run again.
399        m = InstallManifest(path=manifest)
400        c = FileCopier()
401        m.populate_registry(c)
402        c.copy(dest)
403
404        with open(destfile, "rt") as fh:
405            self.assertEqual(fh.read(), "SOURCE\nINCLUDE MODIFIED\n")
406
407        # ORing an InstallManifest should copy file dependencies
408        m = InstallManifest()
409        m |= InstallManifest(path=manifest)
410        c = FileCopier()
411        m.populate_registry(c)
412        e = c._files["p_dest"]
413        self.assertEqual(e.extra_depends, [manifest])
414
415    def test_add_entries_from(self):
416        source = self.tmppath("source")
417        os.mkdir(source)
418        os.mkdir("%s/base" % source)
419        os.mkdir("%s/base/foo" % source)
420
421        with open("%s/base/foo/file1" % source, "a"):
422            pass
423
424        with open("%s/base/foo/file2" % source, "a"):
425            pass
426
427        m = InstallManifest()
428        m.add_pattern_link("%s/base" % source, "**", "dest")
429
430        p = InstallManifest()
431        p.add_entries_from(m)
432        self.assertEqual(len(p), 1)
433
434        c = FileCopier()
435        p.populate_registry(c)
436        self.assertEqual(c.paths(), ["dest/foo/file1", "dest/foo/file2"])
437
438        q = InstallManifest()
439        q.add_entries_from(m, base="target")
440        self.assertEqual(len(q), 1)
441
442        d = FileCopier()
443        q.populate_registry(d)
444        self.assertEqual(d.paths(), ["target/dest/foo/file1", "target/dest/foo/file2"])
445
446        # Some of the values in an InstallManifest include destination
447        # information that is present in the keys.  Verify that we can
448        # round-trip serialization.
449        r = InstallManifest()
450        r.add_entries_from(m)
451        r.add_entries_from(m, base="target")
452        self.assertEqual(len(r), 2)
453
454        temp_path = self.tmppath("temp_path")
455        r.write(path=temp_path)
456
457        s = InstallManifest(path=temp_path)
458        e = FileCopier()
459        s.populate_registry(e)
460
461        self.assertEqual(
462            e.paths(),
463            [
464                "dest/foo/file1",
465                "dest/foo/file2",
466                "target/dest/foo/file1",
467                "target/dest/foo/file2",
468            ],
469        )
470
471
472if __name__ == "__main__":
473    mozunit.main()
474