1#! /usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright 2020 Google LLC
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17import argparse
18import os
19import libcst as cst
20import pathlib
21import sys
22from typing import (Any, Callable, Dict, List, Sequence, Tuple)
23
24
25def partition(
26    predicate: Callable[[Any], bool],
27    iterator: Sequence[Any]
28) -> Tuple[List[Any], List[Any]]:
29    """A stable, out-of-place partition."""
30    results = ([], [])
31
32    for i in iterator:
33        results[int(predicate(i))].append(i)
34
35    # Returns trueList, falseList
36    return results[1], results[0]
37
38
39class dlpCallTransformer(cst.CSTTransformer):
40    CTRL_PARAMS: Tuple[str] = ('retry', 'timeout', 'metadata')
41    METHOD_TO_PARAMS: Dict[str, Tuple[str]] = {
42        'activate_job_trigger': ('name', ),
43        'cancel_dlp_job': ('name', ),
44        'create_deidentify_template': ('parent', 'deidentify_template', 'template_id', 'location_id', ),
45        'create_dlp_job': ('parent', 'inspect_job', 'risk_job', 'job_id', 'location_id', ),
46        'create_inspect_template': ('parent', 'inspect_template', 'template_id', 'location_id', ),
47        'create_job_trigger': ('parent', 'job_trigger', 'trigger_id', 'location_id', ),
48        'create_stored_info_type': ('parent', 'config', 'stored_info_type_id', 'location_id', ),
49        'deidentify_content': ('parent', 'deidentify_config', 'inspect_config', 'item', 'inspect_template_name', 'deidentify_template_name', 'location_id', ),
50        'delete_deidentify_template': ('name', ),
51        'delete_dlp_job': ('name', ),
52        'delete_inspect_template': ('name', ),
53        'delete_job_trigger': ('name', ),
54        'delete_stored_info_type': ('name', ),
55        'finish_dlp_job': ('name', ),
56        'get_deidentify_template': ('name', ),
57        'get_dlp_job': ('name', ),
58        'get_inspect_template': ('name', ),
59        'get_job_trigger': ('name', ),
60        'get_stored_info_type': ('name', ),
61        'hybrid_inspect_dlp_job': ('name', 'hybrid_item', ),
62        'hybrid_inspect_job_trigger': ('name', 'hybrid_item', ),
63        'inspect_content': ('parent', 'inspect_config', 'item', 'inspect_template_name', 'location_id', ),
64        'list_deidentify_templates': ('parent', 'page_token', 'page_size', 'order_by', 'location_id', ),
65        'list_dlp_jobs': ('parent', 'filter', 'page_size', 'page_token', 'type_', 'order_by', 'location_id', ),
66        'list_info_types': ('parent', 'language_code', 'filter', 'location_id', ),
67        'list_inspect_templates': ('parent', 'page_token', 'page_size', 'order_by', 'location_id', ),
68        'list_job_triggers': ('parent', 'page_token', 'page_size', 'order_by', 'filter', 'location_id', ),
69        'list_stored_info_types': ('parent', 'page_token', 'page_size', 'order_by', 'location_id', ),
70        'redact_image': ('parent', 'location_id', 'inspect_config', 'image_redaction_configs', 'include_findings', 'byte_item', ),
71        'reidentify_content': ('parent', 'reidentify_config', 'inspect_config', 'item', 'inspect_template_name', 'reidentify_template_name', 'location_id', ),
72        'update_deidentify_template': ('name', 'deidentify_template', 'update_mask', ),
73        'update_inspect_template': ('name', 'inspect_template', 'update_mask', ),
74        'update_job_trigger': ('name', 'job_trigger', 'update_mask', ),
75        'update_stored_info_type': ('name', 'config', 'update_mask', ),
76    }
77
78    def leave_Call(self, original: cst.Call, updated: cst.Call) -> cst.CSTNode:
79        try:
80            key = original.func.attr.value
81            kword_params = self.METHOD_TO_PARAMS[key]
82        except (AttributeError, KeyError):
83            # Either not a method from the API or too convoluted to be sure.
84            return updated
85
86        # If the existing code is valid, keyword args come after positional args.
87        # Therefore, all positional args must map to the first parameters.
88        args, kwargs = partition(lambda a: not bool(a.keyword), updated.args)
89        if any(k.keyword.value == "request" for k in kwargs):
90            # We've already fixed this file, don't fix it again.
91            return updated
92
93        kwargs, ctrl_kwargs = partition(
94            lambda a: a.keyword.value not in self.CTRL_PARAMS,
95            kwargs
96        )
97
98        args, ctrl_args = args[:len(kword_params)], args[len(kword_params):]
99        ctrl_kwargs.extend(cst.Arg(value=a.value, keyword=cst.Name(value=ctrl))
100                           for a, ctrl in zip(ctrl_args, self.CTRL_PARAMS))
101
102        request_arg = cst.Arg(
103            value=cst.Dict([
104                cst.DictElement(
105                    cst.SimpleString("'{}'".format(name)),
106cst.Element(value=arg.value)
107                )
108                # Note: the args + kwargs looks silly, but keep in mind that
109                # the control parameters had to be stripped out, and that
110                # those could have been passed positionally or by keyword.
111                for name, arg in zip(kword_params, args + kwargs)]),
112            keyword=cst.Name("request")
113        )
114
115        return updated.with_changes(
116            args=[request_arg] + ctrl_kwargs
117        )
118
119
120def fix_files(
121    in_dir: pathlib.Path,
122    out_dir: pathlib.Path,
123    *,
124    transformer=dlpCallTransformer(),
125):
126    """Duplicate the input dir to the output dir, fixing file method calls.
127
128    Preconditions:
129    * in_dir is a real directory
130    * out_dir is a real, empty directory
131    """
132    pyfile_gen = (
133        pathlib.Path(os.path.join(root, f))
134        for root, _, files in os.walk(in_dir)
135        for f in files if os.path.splitext(f)[1] == ".py"
136    )
137
138    for fpath in pyfile_gen:
139        with open(fpath, 'r') as f:
140            src = f.read()
141
142        # Parse the code and insert method call fixes.
143        tree = cst.parse_module(src)
144        updated = tree.visit(transformer)
145
146        # Create the path and directory structure for the new file.
147        updated_path = out_dir.joinpath(fpath.relative_to(in_dir))
148        updated_path.parent.mkdir(parents=True, exist_ok=True)
149
150        # Generate the updated source file at the corresponding path.
151        with open(updated_path, 'w') as f:
152            f.write(updated.code)
153
154
155if __name__ == '__main__':
156    parser = argparse.ArgumentParser(
157        description="""Fix up source that uses the dlp client library.
158
159The existing sources are NOT overwritten but are copied to output_dir with changes made.
160
161Note: This tool operates at a best-effort level at converting positional
162      parameters in client method calls to keyword based parameters.
163      Cases where it WILL FAIL include
164      A) * or ** expansion in a method call.
165      B) Calls via function or method alias (includes free function calls)
166      C) Indirect or dispatched calls (e.g. the method is looked up dynamically)
167
168      These all constitute false negatives. The tool will also detect false
169      positives when an API method shares a name with another method.
170""")
171    parser.add_argument(
172        '-d',
173        '--input-directory',
174        required=True,
175        dest='input_dir',
176        help='the input directory to walk for python files to fix up',
177    )
178    parser.add_argument(
179        '-o',
180        '--output-directory',
181        required=True,
182        dest='output_dir',
183        help='the directory to output files fixed via un-flattening',
184    )
185    args = parser.parse_args()
186    input_dir = pathlib.Path(args.input_dir)
187    output_dir = pathlib.Path(args.output_dir)
188    if not input_dir.is_dir():
189        print(
190            f"input directory '{input_dir}' does not exist or is not a directory",
191            file=sys.stderr,
192        )
193        sys.exit(-1)
194
195    if not output_dir.is_dir():
196        print(
197            f"output directory '{output_dir}' does not exist or is not a directory",
198            file=sys.stderr,
199        )
200        sys.exit(-1)
201
202    if os.listdir(output_dir):
203        print(
204            f"output directory '{output_dir}' is not empty",
205            file=sys.stderr,
206        )
207        sys.exit(-1)
208
209    fix_files(input_dir, output_dir)
210