1# This Source Code Form is subject to the terms of the Mozilla Public
2# License, v. 2.0. If a copy of the MPL was not distributed with this
3# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5from __future__ import unicode_literals
6
7import os
8
9import mozunit
10
11from mozpack.copier import (
12    FileCopier,
13    FileRegistry,
14)
15from mozpack.manifests import (
16    InstallManifest,
17    UnreadableInstallManifest,
18)
19from mozpack.test.test_files import TestWithTmpDir
20
21
22class TestInstallManifest(TestWithTmpDir):
23    def test_construct(self):
24        m = InstallManifest()
25        self.assertEqual(len(m), 0)
26
27    def test_malformed(self):
28        f = self.tmppath('manifest')
29        open(f, 'wb').write('junk\n')
30        with self.assertRaises(UnreadableInstallManifest):
31            m = InstallManifest(f)
32
33    def test_adds(self):
34        m = InstallManifest()
35        m.add_symlink('s_source', 's_dest')
36        m.add_copy('c_source', 'c_dest')
37        m.add_required_exists('e_dest')
38        m.add_optional_exists('o_dest')
39        m.add_pattern_symlink('ps_base', 'ps/*', 'ps_dest')
40        m.add_pattern_copy('pc_base', 'pc/**', 'pc_dest')
41        m.add_preprocess('p_source', 'p_dest', 'p_source.pp')
42        m.add_content('content', 'content')
43
44        self.assertEqual(len(m), 8)
45        self.assertIn('s_dest', m)
46        self.assertIn('c_dest', m)
47        self.assertIn('p_dest', m)
48        self.assertIn('e_dest', m)
49        self.assertIn('o_dest', m)
50        self.assertIn('content', m)
51
52        with self.assertRaises(ValueError):
53            m.add_symlink('s_other', 's_dest')
54
55        with self.assertRaises(ValueError):
56            m.add_copy('c_other', 'c_dest')
57
58        with self.assertRaises(ValueError):
59            m.add_preprocess('p_other', 'p_dest', 'p_other.pp')
60
61        with self.assertRaises(ValueError):
62            m.add_required_exists('e_dest')
63
64        with self.assertRaises(ValueError):
65            m.add_optional_exists('o_dest')
66
67        with self.assertRaises(ValueError):
68            m.add_pattern_symlink('ps_base', 'ps/*', 'ps_dest')
69
70        with self.assertRaises(ValueError):
71            m.add_pattern_copy('pc_base', 'pc/**', 'pc_dest')
72
73        with self.assertRaises(ValueError):
74            m.add_content('content', 'content')
75
76    def _get_test_manifest(self):
77        m = InstallManifest()
78        m.add_symlink(self.tmppath('s_source'), 's_dest')
79        m.add_copy(self.tmppath('c_source'), 'c_dest')
80        m.add_preprocess(self.tmppath('p_source'), 'p_dest', self.tmppath('p_source.pp'), '#', {'FOO':'BAR', 'BAZ':'QUX'})
81        m.add_required_exists('e_dest')
82        m.add_optional_exists('o_dest')
83        m.add_pattern_symlink('ps_base', '*', 'ps_dest')
84        m.add_pattern_copy('pc_base', '**', 'pc_dest')
85        m.add_content('the content\non\nmultiple lines', 'content')
86
87        return m
88
89    def test_serialization(self):
90        m = self._get_test_manifest()
91
92        p = self.tmppath('m')
93        m.write(path=p)
94        self.assertTrue(os.path.isfile(p))
95
96        with open(p, 'rb') as fh:
97            c = fh.read()
98
99        self.assertEqual(c.count('\n'), 9)
100
101        lines = c.splitlines()
102        self.assertEqual(len(lines), 9)
103
104        self.assertEqual(lines[0], '5')
105
106        m2 = InstallManifest(path=p)
107        self.assertEqual(m, m2)
108        p2 = self.tmppath('m2')
109        m2.write(path=p2)
110
111        with open(p2, 'rb') as fh:
112            c2 = fh.read()
113
114        self.assertEqual(c, c2)
115
116    def test_populate_registry(self):
117        m = self._get_test_manifest()
118        r = FileRegistry()
119        m.populate_registry(r)
120
121        self.assertEqual(len(r), 6)
122        self.assertEqual(r.paths(), ['c_dest', 'content', 'e_dest', 'o_dest',
123                                     'p_dest', 's_dest'])
124
125    def test_pattern_expansion(self):
126        source = self.tmppath('source')
127        os.mkdir(source)
128        os.mkdir('%s/base' % source)
129        os.mkdir('%s/base/foo' % source)
130
131        with open('%s/base/foo/file1' % source, 'a'):
132            pass
133
134        with open('%s/base/foo/file2' % source, 'a'):
135            pass
136
137        m = InstallManifest()
138        m.add_pattern_symlink('%s/base' % source, '**', 'dest')
139
140        c = FileCopier()
141        m.populate_registry(c)
142        self.assertEqual(c.paths(), ['dest/foo/file1', 'dest/foo/file2'])
143
144    def test_or(self):
145        m1 = self._get_test_manifest()
146        orig_length = len(m1)
147        m2 = InstallManifest()
148        m2.add_symlink('s_source2', 's_dest2')
149        m2.add_copy('c_source2', 'c_dest2')
150
151        m1 |= m2
152
153        self.assertEqual(len(m2), 2)
154        self.assertEqual(len(m1), orig_length + 2)
155
156        self.assertIn('s_dest2', m1)
157        self.assertIn('c_dest2', m1)
158
159    def test_copier_application(self):
160        dest = self.tmppath('dest')
161        os.mkdir(dest)
162
163        to_delete = self.tmppath('dest/to_delete')
164        with open(to_delete, 'a'):
165            pass
166
167        with open(self.tmppath('s_source'), 'wt') as fh:
168            fh.write('symlink!')
169
170        with open(self.tmppath('c_source'), 'wt') as fh:
171            fh.write('copy!')
172
173        with open(self.tmppath('p_source'), 'wt') as fh:
174            fh.write('#define FOO 1\npreprocess!')
175
176        with open(self.tmppath('dest/e_dest'), 'a'):
177            pass
178
179        with open(self.tmppath('dest/o_dest'), 'a'):
180            pass
181
182        m = self._get_test_manifest()
183        c = FileCopier()
184        m.populate_registry(c)
185        result = c.copy(dest)
186
187        self.assertTrue(os.path.exists(self.tmppath('dest/s_dest')))
188        self.assertTrue(os.path.exists(self.tmppath('dest/c_dest')))
189        self.assertTrue(os.path.exists(self.tmppath('dest/p_dest')))
190        self.assertTrue(os.path.exists(self.tmppath('dest/e_dest')))
191        self.assertTrue(os.path.exists(self.tmppath('dest/o_dest')))
192        self.assertTrue(os.path.exists(self.tmppath('dest/content')))
193        self.assertFalse(os.path.exists(to_delete))
194
195        with open(self.tmppath('dest/s_dest'), 'rt') as fh:
196            self.assertEqual(fh.read(), 'symlink!')
197
198        with open(self.tmppath('dest/c_dest'), 'rt') as fh:
199            self.assertEqual(fh.read(), 'copy!')
200
201        with open(self.tmppath('dest/p_dest'), 'rt') as fh:
202            self.assertEqual(fh.read(), 'preprocess!')
203
204        self.assertEqual(result.updated_files, set(self.tmppath(p) for p in (
205            'dest/s_dest', 'dest/c_dest', 'dest/p_dest', 'dest/content')))
206        self.assertEqual(result.existing_files,
207            set([self.tmppath('dest/e_dest'), self.tmppath('dest/o_dest')]))
208        self.assertEqual(result.removed_files, {to_delete})
209        self.assertEqual(result.removed_directories, set())
210
211    def test_preprocessor(self):
212        manifest = self.tmppath('m')
213        deps = self.tmppath('m.pp')
214        dest = self.tmppath('dest')
215        include = self.tmppath('p_incl')
216
217        with open(include, 'wt') as fh:
218            fh.write('#define INCL\n')
219        time = os.path.getmtime(include) - 3
220        os.utime(include, (time, time))
221
222        with open(self.tmppath('p_source'), 'wt') as fh:
223            fh.write('#ifdef FOO\n#if BAZ == QUX\nPASS1\n#endif\n#endif\n')
224            fh.write('#ifdef DEPTEST\nPASS2\n#endif\n')
225            fh.write('#include p_incl\n#ifdef INCLTEST\nPASS3\n#endif\n')
226        time = os.path.getmtime(self.tmppath('p_source')) - 3
227        os.utime(self.tmppath('p_source'), (time, time))
228
229        # Create and write a manifest with the preprocessed file, then apply it.
230        # This should write out our preprocessed file.
231        m = InstallManifest()
232        m.add_preprocess(self.tmppath('p_source'), 'p_dest', deps, '#', {'FOO':'BAR', 'BAZ':'QUX'})
233        m.write(path=manifest)
234
235        m = InstallManifest(path=manifest)
236        c = FileCopier()
237        m.populate_registry(c)
238        c.copy(dest)
239
240        self.assertTrue(os.path.exists(self.tmppath('dest/p_dest')))
241
242        with open(self.tmppath('dest/p_dest'), 'rt') as fh:
243            self.assertEqual(fh.read(), 'PASS1\n')
244
245        # Create a second manifest with the preprocessed file, then apply it.
246        # Since this manifest does not exist on the disk, there should not be a
247        # dependency on it, and the preprocessed file should not be modified.
248        m2 = InstallManifest()
249        m2.add_preprocess(self.tmppath('p_source'), 'p_dest', deps, '#', {'DEPTEST':True})
250        c = FileCopier()
251        m2.populate_registry(c)
252        result = c.copy(dest)
253
254        self.assertFalse(self.tmppath('dest/p_dest') in result.updated_files)
255        self.assertTrue(self.tmppath('dest/p_dest') in result.existing_files)
256
257        # Write out the second manifest, then load it back in from the disk.
258        # This should add the dependency on the manifest file, so our
259        # preprocessed file should be regenerated with the new defines.
260        # We also set the mtime on the destination file back, so it will be
261        # older than the manifest file.
262        m2.write(path=manifest)
263        time = os.path.getmtime(manifest) - 1
264        os.utime(self.tmppath('dest/p_dest'), (time, time))
265        m2 = InstallManifest(path=manifest)
266        c = FileCopier()
267        m2.populate_registry(c)
268        self.assertTrue(c.copy(dest))
269
270        with open(self.tmppath('dest/p_dest'), 'rt') as fh:
271            self.assertEqual(fh.read(), 'PASS2\n')
272
273        # Set the time on the manifest back, so it won't be picked up as
274        # modified in the next test
275        time = os.path.getmtime(manifest) - 1
276        os.utime(manifest, (time, time))
277
278        # Update the contents of a file included by the source file. This should
279        # cause the destination to be regenerated.
280        with open(include, 'wt') as fh:
281            fh.write('#define INCLTEST\n')
282
283        time = os.path.getmtime(include) - 1
284        os.utime(self.tmppath('dest/p_dest'), (time, time))
285        c = FileCopier()
286        m2.populate_registry(c)
287        self.assertTrue(c.copy(dest))
288
289        with open(self.tmppath('dest/p_dest'), 'rt') as fh:
290            self.assertEqual(fh.read(), 'PASS2\nPASS3\n')
291
292    def test_preprocessor_dependencies(self):
293        manifest = self.tmppath('m')
294        deps = self.tmppath('m.pp')
295        dest = self.tmppath('dest')
296        source = self.tmppath('p_source')
297        destfile = self.tmppath('dest/p_dest')
298        include = self.tmppath('p_incl')
299        os.mkdir(dest)
300
301        with open(source, 'wt') as fh:
302            fh.write('#define SRC\nSOURCE\n')
303        time = os.path.getmtime(source) - 3
304        os.utime(source, (time, time))
305
306        with open(include, 'wt') as fh:
307            fh.write('INCLUDE\n')
308        time = os.path.getmtime(source) - 3
309        os.utime(include, (time, time))
310
311        # Create and write a manifest with the preprocessed file.
312        m = InstallManifest()
313        m.add_preprocess(source, 'p_dest', deps, '#', {'FOO':'BAR', 'BAZ':'QUX'})
314        m.write(path=manifest)
315
316        time = os.path.getmtime(source) - 5
317        os.utime(manifest, (time, time))
318
319        # Now read the manifest back in, and apply it. This should write out
320        # our preprocessed file.
321        m = InstallManifest(path=manifest)
322        c = FileCopier()
323        m.populate_registry(c)
324        self.assertTrue(c.copy(dest))
325
326        with open(destfile, 'rt') as fh:
327            self.assertEqual(fh.read(), 'SOURCE\n')
328
329        # Next, modify the source to #INCLUDE another file.
330        with open(source, 'wt') as fh:
331            fh.write('SOURCE\n#include p_incl\n')
332        time = os.path.getmtime(source) - 1
333        os.utime(destfile, (time, time))
334
335        # Apply the manifest, and confirm that it also reads the newly included
336        # file.
337        m = InstallManifest(path=manifest)
338        c = FileCopier()
339        m.populate_registry(c)
340        c.copy(dest)
341
342        with open(destfile, 'rt') as fh:
343            self.assertEqual(fh.read(), 'SOURCE\nINCLUDE\n')
344
345        # Set the time on the source file back, so it won't be picked up as
346        # modified in the next test.
347        time = os.path.getmtime(source) - 1
348        os.utime(source, (time, time))
349
350        # Now, modify the include file (but not the original source).
351        with open(include, 'wt') as fh:
352            fh.write('INCLUDE MODIFIED\n')
353        time = os.path.getmtime(include) - 1
354        os.utime(destfile, (time, time))
355
356        # Apply the manifest, and confirm that the change to the include file
357        # is detected. That should cause the preprocessor to run again.
358        m = InstallManifest(path=manifest)
359        c = FileCopier()
360        m.populate_registry(c)
361        c.copy(dest)
362
363        with open(destfile, 'rt') as fh:
364            self.assertEqual(fh.read(), 'SOURCE\nINCLUDE MODIFIED\n')
365
366        # ORing an InstallManifest should copy file dependencies
367        m = InstallManifest()
368        m |= InstallManifest(path=manifest)
369        c = FileCopier()
370        m.populate_registry(c)
371        e = c._files['p_dest']
372        self.assertEqual(e.extra_depends, [manifest])
373
374if __name__ == '__main__':
375    mozunit.main()
376