1#!/usr/bin/env python
2
3from __future__ import print_function
4
5import argparse
6import collections
7import copy
8import json
9import os
10import sys
11
12import spec_validator
13import util
14
15
16def expand_pattern(expansion_pattern, test_expansion_schema):
17    expansion = {}
18    for artifact_key in expansion_pattern:
19        artifact_value = expansion_pattern[artifact_key]
20        if artifact_value == '*':
21            expansion[artifact_key] = test_expansion_schema[artifact_key]
22        elif isinstance(artifact_value, list):
23            expansion[artifact_key] = artifact_value
24        elif isinstance(artifact_value, dict):
25            # Flattened expansion.
26            expansion[artifact_key] = []
27            values_dict = expand_pattern(artifact_value,
28                                         test_expansion_schema[artifact_key])
29            for sub_key in values_dict.keys():
30                expansion[artifact_key] += values_dict[sub_key]
31        else:
32            expansion[artifact_key] = [artifact_value]
33
34    return expansion
35
36
37def permute_expansion(expansion,
38                      artifact_order,
39                      selection={},
40                      artifact_index=0):
41    assert isinstance(artifact_order, list), "artifact_order should be a list"
42
43    if artifact_index >= len(artifact_order):
44        yield selection
45        return
46
47    artifact_key = artifact_order[artifact_index]
48
49    for artifact_value in expansion[artifact_key]:
50        selection[artifact_key] = artifact_value
51        for next_selection in permute_expansion(expansion, artifact_order,
52                                                selection, artifact_index + 1):
53            yield next_selection
54
55
56# Dumps the test config `selection` into a serialized JSON string.
57def dump_test_parameters(selection):
58    return json.dumps(
59        selection,
60        indent=2,
61        separators=(',', ': '),
62        sort_keys=True,
63        cls=util.CustomEncoder)
64
65
66def get_test_filename(spec_directory, spec_json, selection):
67    '''Returns the filname for the main test HTML file'''
68
69    selection_for_filename = copy.deepcopy(selection)
70    # Use 'unset' rather than 'None' in test filenames.
71    if selection_for_filename['delivery_value'] is None:
72        selection_for_filename['delivery_value'] = 'unset'
73
74    return os.path.join(
75        spec_directory,
76        spec_json['test_file_path_pattern'] % selection_for_filename)
77
78
79def get_csp_value(value):
80    '''
81    Returns actual CSP header values (e.g. "worker-src 'self'") for the
82    given string used in PolicyDelivery's value (e.g. "worker-src-self").
83    '''
84
85    # script-src
86    # Test-related scripts like testharness.js and inline scripts containing
87    # test bodies.
88    # 'unsafe-inline' is added as a workaround here. This is probably not so
89    # bad, as it shouldn't intefere non-inline-script requests that we want to
90    # test.
91    if value == 'script-src-wildcard':
92        return "script-src * 'unsafe-inline'"
93    if value == 'script-src-self':
94        return "script-src 'self' 'unsafe-inline'"
95    # Workaround for "script-src 'none'" would be more complicated, because
96    # - "script-src 'none' 'unsafe-inline'" is handled somehow differently from
97    #   "script-src 'none'", i.e.
98    #   https://w3c.github.io/webappsec-csp/#match-url-to-source-list Step 3
99    #   handles the latter but not the former.
100    # - We need nonce- or path-based additional values to allow same-origin
101    #   test scripts like testharness.js.
102    # Therefore, we disable 'script-src-none' tests for now in
103    # `/content-security-policy/spec.src.json`.
104    if value == 'script-src-none':
105        return "script-src 'none'"
106
107    # worker-src
108    if value == 'worker-src-wildcard':
109        return 'worker-src *'
110    if value == 'worker-src-self':
111        return "worker-src 'self'"
112    if value == 'worker-src-none':
113        return "worker-src 'none'"
114    raise Exception('Invalid delivery_value: %s' % value)
115
116def handle_deliveries(policy_deliveries):
117    '''
118    Generate <meta> elements and HTTP headers for the given list of
119    PolicyDelivery.
120    TODO(hiroshige): Merge duplicated code here, scope/document.py, etc.
121    '''
122
123    meta = ''
124    headers = {}
125
126    for delivery in policy_deliveries:
127        if delivery.value is None:
128            continue
129        if delivery.key == 'referrerPolicy':
130            if delivery.delivery_type == 'meta':
131                meta += \
132                    '<meta name="referrer" content="%s">' % delivery.value
133            elif delivery.delivery_type == 'http-rp':
134                headers['Referrer-Policy'] = delivery.value
135                # TODO(kristijanburnik): Limit to WPT origins.
136                headers['Access-Control-Allow-Origin'] = '*'
137            else:
138                raise Exception(
139                    'Invalid delivery_type: %s' % delivery.delivery_type)
140        elif delivery.key == 'mixedContent':
141            assert (delivery.value == 'opt-in')
142            if delivery.delivery_type == 'meta':
143                meta += '<meta http-equiv="Content-Security-Policy" ' + \
144                       'content="block-all-mixed-content">'
145            elif delivery.delivery_type == 'http-rp':
146                headers['Content-Security-Policy'] = 'block-all-mixed-content'
147            else:
148                raise Exception(
149                    'Invalid delivery_type: %s' % delivery.delivery_type)
150        elif delivery.key == 'contentSecurityPolicy':
151            csp_value = get_csp_value(delivery.value)
152            if delivery.delivery_type == 'meta':
153                meta += '<meta http-equiv="Content-Security-Policy" ' + \
154                       'content="' + csp_value + '">'
155            elif delivery.delivery_type == 'http-rp':
156                headers['Content-Security-Policy'] = csp_value
157            else:
158                raise Exception(
159                    'Invalid delivery_type: %s' % delivery.delivery_type)
160        elif delivery.key == 'upgradeInsecureRequests':
161            # https://w3c.github.io/webappsec-upgrade-insecure-requests/#delivery
162            assert (delivery.value == 'upgrade')
163            if delivery.delivery_type == 'meta':
164                meta += '<meta http-equiv="Content-Security-Policy" ' + \
165                       'content="upgrade-insecure-requests">'
166            elif delivery.delivery_type == 'http-rp':
167                headers[
168                    'Content-Security-Policy'] = 'upgrade-insecure-requests'
169            else:
170                raise Exception(
171                    'Invalid delivery_type: %s' % delivery.delivery_type)
172        else:
173            raise Exception('Invalid delivery_key: %s' % delivery.key)
174    return {"meta": meta, "headers": headers}
175
176
177def generate_selection(spec_json, selection):
178    '''
179    Returns a scenario object (with a top-level source_context_list entry,
180    which will be removed in generate_test_file() later).
181    '''
182
183    target_policy_delivery = util.PolicyDelivery(selection['delivery_type'],
184                                                 selection['delivery_key'],
185                                                 selection['delivery_value'])
186    del selection['delivery_type']
187    del selection['delivery_key']
188    del selection['delivery_value']
189
190    # Parse source context list and policy deliveries of source contexts.
191    # `util.ShouldSkip()` exceptions are raised if e.g. unsuppported
192    # combinations of source contexts and policy deliveries are used.
193    source_context_list_scheme = spec_json['source_context_list_schema'][
194        selection['source_context_list']]
195    selection['source_context_list'] = [
196        util.SourceContext.from_json(source_context, target_policy_delivery,
197                                     spec_json['source_context_schema'])
198        for source_context in source_context_list_scheme['sourceContextList']
199    ]
200
201    # Check if the subresource is supported by the innermost source context.
202    innermost_source_context = selection['source_context_list'][-1]
203    supported_subresource = spec_json['source_context_schema'][
204        'supported_subresource'][innermost_source_context.source_context_type]
205    if supported_subresource != '*':
206        if selection['subresource'] not in supported_subresource:
207            raise util.ShouldSkip()
208
209    # Parse subresource policy deliveries.
210    selection[
211        'subresource_policy_deliveries'] = util.PolicyDelivery.list_from_json(
212            source_context_list_scheme['subresourcePolicyDeliveries'],
213            target_policy_delivery, spec_json['subresource_schema']
214            ['supported_delivery_type'][selection['subresource']])
215
216    # Generate per-scenario test description.
217    selection['test_description'] = spec_json[
218        'test_description_template'] % selection
219
220    return selection
221
222
223def generate_test_file(spec_directory, test_helper_filenames,
224                       test_html_template_basename, test_filename, scenarios):
225    '''
226    Generates a test HTML file (and possibly its associated .headers file)
227    from `scenarios`.
228    '''
229
230    # Scenarios for the same file should have the same `source_context_list`,
231    # including the top-level one.
232    # Note: currently, non-top-level source contexts aren't necessarily required
233    # to be the same, but we set this requirement as it will be useful e.g. when
234    # we e.g. reuse a worker among multiple scenarios.
235    for scenario in scenarios:
236        assert (scenario['source_context_list'] == scenarios[0]
237                ['source_context_list'])
238
239    # We process the top source context below, and do not include it in
240    # the JSON objects (i.e. `scenarios`) in generated HTML files.
241    top_source_context = scenarios[0]['source_context_list'].pop(0)
242    assert (top_source_context.source_context_type == 'top')
243    for scenario in scenarios[1:]:
244        assert (scenario['source_context_list'].pop(0) == top_source_context)
245
246    parameters = {}
247
248    # Sort scenarios, to avoid unnecessary diffs due to different orders in
249    # `scenarios`.
250    serialized_scenarios = sorted(
251        [dump_test_parameters(scenario) for scenario in scenarios])
252
253    parameters['scenarios'] = ",\n".join(serialized_scenarios).replace(
254        "\n", "\n" + " " * 10)
255
256    test_directory = os.path.dirname(test_filename)
257
258    parameters['helper_js'] = ""
259    for test_helper_filename in test_helper_filenames:
260        parameters['helper_js'] += '    <script src="%s"></script>\n' % (
261            os.path.relpath(test_helper_filename, test_directory))
262    parameters['sanity_checker_js'] = os.path.relpath(
263        os.path.join(spec_directory, 'generic', 'sanity-checker.js'),
264        test_directory)
265    parameters['spec_json_js'] = os.path.relpath(
266        os.path.join(spec_directory, 'generic', 'spec_json.js'),
267        test_directory)
268
269    test_headers_filename = test_filename + ".headers"
270
271    test_html_template = util.get_template(test_html_template_basename)
272    disclaimer_template = util.get_template('disclaimer.template')
273
274    html_template_filename = os.path.join(util.template_directory,
275                                          test_html_template_basename)
276    generated_disclaimer = disclaimer_template \
277        % {'generating_script_filename': os.path.relpath(sys.argv[0],
278                                                         util.test_root_directory),
279           'spec_directory': os.path.relpath(spec_directory,
280                                             util.test_root_directory)}
281
282    # Adjust the template for the test invoking JS. Indent it to look nice.
283    parameters['generated_disclaimer'] = generated_disclaimer.rstrip()
284
285    # Directory for the test files.
286    try:
287        os.makedirs(test_directory)
288    except:
289        pass
290
291    delivery = handle_deliveries(top_source_context.policy_deliveries)
292
293    if len(delivery['headers']) > 0:
294        with open(test_headers_filename, "w") as f:
295            for header in delivery['headers']:
296                f.write('%s: %s\n' % (header, delivery['headers'][header]))
297
298    parameters['meta_delivery_method'] = delivery['meta']
299    # Obey the lint and pretty format.
300    if len(parameters['meta_delivery_method']) > 0:
301        parameters['meta_delivery_method'] = "\n    " + \
302                                            parameters['meta_delivery_method']
303
304    # Write out the generated HTML file.
305    util.write_file(test_filename, test_html_template % parameters)
306
307
308def generate_test_source_files(spec_directory, test_helper_filenames,
309                               spec_json, target):
310    test_expansion_schema = spec_json['test_expansion_schema']
311    specification = spec_json['specification']
312
313    if target == "debug":
314        spec_json_js_template = util.get_template('spec_json.js.template')
315        util.write_file(
316            os.path.join(spec_directory, "generic", "spec_json.js"),
317            spec_json_js_template % {'spec_json': json.dumps(spec_json)})
318        util.write_file(
319            os.path.join(spec_directory, "generic",
320                         "debug-output.spec.src.json"),
321            json.dumps(spec_json, indent=2, separators=(',', ': ')))
322
323    # Choose a debug/release template depending on the target.
324    html_template = "test.%s.html.template" % target
325
326    artifact_order = test_expansion_schema.keys()
327    artifact_order.remove('expansion')
328
329    excluded_selection_pattern = ''
330    for key in artifact_order:
331        excluded_selection_pattern += '%(' + key + ')s/'
332
333    # Create list of excluded tests.
334    exclusion_dict = set()
335    for excluded_pattern in spec_json['excluded_tests']:
336        excluded_expansion = \
337            expand_pattern(excluded_pattern, test_expansion_schema)
338        for excluded_selection in permute_expansion(excluded_expansion,
339                                                    artifact_order):
340            excluded_selection['delivery_key'] = spec_json['delivery_key']
341            exclusion_dict.add(excluded_selection_pattern % excluded_selection)
342
343    # `scenarios[filename]` represents the list of scenario objects to be
344    # generated into `filename`.
345    scenarios = {}
346
347    for spec in specification:
348        # Used to make entries with expansion="override" override preceding
349        # entries with the same |selection_path|.
350        output_dict = {}
351
352        for expansion_pattern in spec['test_expansion']:
353            expansion = expand_pattern(expansion_pattern,
354                                       test_expansion_schema)
355            for selection in permute_expansion(expansion, artifact_order):
356                selection['delivery_key'] = spec_json['delivery_key']
357                selection_path = spec_json['selection_pattern'] % selection
358                if selection_path in output_dict:
359                    if expansion_pattern['expansion'] != 'override':
360                        print("Error: expansion is default in:")
361                        print(dump_test_parameters(selection))
362                        print("but overrides:")
363                        print(dump_test_parameters(
364                            output_dict[selection_path]))
365                        sys.exit(1)
366                output_dict[selection_path] = copy.deepcopy(selection)
367
368        for selection_path in output_dict:
369            selection = output_dict[selection_path]
370            if (excluded_selection_pattern % selection) in exclusion_dict:
371                print('Excluding selection:', selection_path)
372                continue
373            try:
374                test_filename = get_test_filename(spec_directory, spec_json,
375                                                  selection)
376                scenario = generate_selection(spec_json, selection)
377                scenarios[test_filename] = scenarios.get(test_filename,
378                                                         []) + [scenario]
379            except util.ShouldSkip:
380                continue
381
382    for filename in scenarios:
383        generate_test_file(spec_directory, test_helper_filenames,
384                           html_template, filename, scenarios[filename])
385
386
387def merge_json(base, child):
388    for key in child:
389        if key not in base:
390            base[key] = child[key]
391            continue
392        # `base[key]` and `child[key]` both exists.
393        if isinstance(base[key], list) and isinstance(child[key], list):
394            base[key].extend(child[key])
395        elif isinstance(base[key], dict) and isinstance(child[key], dict):
396            merge_json(base[key], child[key])
397        else:
398            base[key] = child[key]
399
400
401def main():
402    parser = argparse.ArgumentParser(
403        description='Test suite generator utility')
404    parser.add_argument(
405        '-t',
406        '--target',
407        type=str,
408        choices=("release", "debug"),
409        default="release",
410        help='Sets the appropriate template for generating tests')
411    parser.add_argument(
412        '-s',
413        '--spec',
414        type=str,
415        default=os.getcwd(),
416        help='Specify a file used for describing and generating the tests')
417    # TODO(kristijanburnik): Add option for the spec_json file.
418    args = parser.parse_args()
419
420    spec_directory = os.path.abspath(args.spec)
421
422    # Read `spec.src.json` files, starting from `spec_directory`, and
423    # continuing to parent directories as long as `spec.src.json` exists.
424    spec_filenames = []
425    test_helper_filenames = []
426    spec_src_directory = spec_directory
427    while len(spec_src_directory) >= len(util.test_root_directory):
428        spec_filename = os.path.join(spec_src_directory, "spec.src.json")
429        if not os.path.exists(spec_filename):
430            break
431        spec_filenames.append(spec_filename)
432        test_filename = os.path.join(spec_src_directory, 'generic',
433                                     'test-case.sub.js')
434        assert (os.path.exists(test_filename))
435        test_helper_filenames.append(test_filename)
436        spec_src_directory = os.path.abspath(
437            os.path.join(spec_src_directory, ".."))
438
439    spec_filenames = list(reversed(spec_filenames))
440    test_helper_filenames = list(reversed(test_helper_filenames))
441
442    if len(spec_filenames) == 0:
443        print('Error: No spec.src.json is found at %s.' % spec_directory)
444        return
445
446    # Load the default spec JSON file, ...
447    default_spec_filename = os.path.join(util.script_directory,
448                                         'spec.src.json')
449    spec_json = collections.OrderedDict()
450    if os.path.exists(default_spec_filename):
451        spec_json = util.load_spec_json(default_spec_filename)
452
453    # ... and then make spec JSON files in subdirectories override the default.
454    for spec_filename in spec_filenames:
455        child_spec_json = util.load_spec_json(spec_filename)
456        merge_json(spec_json, child_spec_json)
457
458    spec_validator.assert_valid_spec_json(spec_json)
459    generate_test_source_files(spec_directory, test_helper_filenames,
460                               spec_json, args.target)
461
462
463if __name__ == '__main__':
464    main()
465