1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright 2020 Google LLC 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17import argparse 18import os 19import libcst as cst 20import pathlib 21import sys 22from typing import (Any, Callable, Dict, List, Sequence, Tuple) 23 24 25def partition( 26 predicate: Callable[[Any], bool], 27 iterator: Sequence[Any] 28) -> Tuple[List[Any], List[Any]]: 29 """A stable, out-of-place partition.""" 30 results = ([], []) 31 32 for i in iterator: 33 results[int(predicate(i))].append(i) 34 35 # Returns trueList, falseList 36 return results[1], results[0] 37 38 39class dlpCallTransformer(cst.CSTTransformer): 40 CTRL_PARAMS: Tuple[str] = ('retry', 'timeout', 'metadata') 41 METHOD_TO_PARAMS: Dict[str, Tuple[str]] = { 42 'activate_job_trigger': ('name', ), 43 'cancel_dlp_job': ('name', ), 44 'create_deidentify_template': ('parent', 'deidentify_template', 'template_id', 'location_id', ), 45 'create_dlp_job': ('parent', 'inspect_job', 'risk_job', 'job_id', 'location_id', ), 46 'create_inspect_template': ('parent', 'inspect_template', 'template_id', 'location_id', ), 47 'create_job_trigger': ('parent', 'job_trigger', 'trigger_id', 'location_id', ), 48 'create_stored_info_type': ('parent', 'config', 'stored_info_type_id', 'location_id', ), 49 'deidentify_content': ('parent', 'deidentify_config', 'inspect_config', 'item', 'inspect_template_name', 'deidentify_template_name', 'location_id', ), 50 'delete_deidentify_template': ('name', ), 51 'delete_dlp_job': ('name', ), 52 'delete_inspect_template': ('name', ), 53 'delete_job_trigger': ('name', ), 54 'delete_stored_info_type': ('name', ), 55 'finish_dlp_job': ('name', ), 56 'get_deidentify_template': ('name', ), 57 'get_dlp_job': ('name', ), 58 'get_inspect_template': ('name', ), 59 'get_job_trigger': ('name', ), 60 'get_stored_info_type': ('name', ), 61 'hybrid_inspect_dlp_job': ('name', 'hybrid_item', ), 62 'hybrid_inspect_job_trigger': ('name', 'hybrid_item', ), 63 'inspect_content': ('parent', 'inspect_config', 'item', 'inspect_template_name', 'location_id', ), 64 'list_deidentify_templates': ('parent', 'page_token', 'page_size', 'order_by', 'location_id', ), 65 'list_dlp_jobs': ('parent', 'filter', 'page_size', 'page_token', 'type_', 'order_by', 'location_id', ), 66 'list_info_types': ('parent', 'language_code', 'filter', 'location_id', ), 67 'list_inspect_templates': ('parent', 'page_token', 'page_size', 'order_by', 'location_id', ), 68 'list_job_triggers': ('parent', 'page_token', 'page_size', 'order_by', 'filter', 'location_id', ), 69 'list_stored_info_types': ('parent', 'page_token', 'page_size', 'order_by', 'location_id', ), 70 'redact_image': ('parent', 'location_id', 'inspect_config', 'image_redaction_configs', 'include_findings', 'byte_item', ), 71 'reidentify_content': ('parent', 'reidentify_config', 'inspect_config', 'item', 'inspect_template_name', 'reidentify_template_name', 'location_id', ), 72 'update_deidentify_template': ('name', 'deidentify_template', 'update_mask', ), 73 'update_inspect_template': ('name', 'inspect_template', 'update_mask', ), 74 'update_job_trigger': ('name', 'job_trigger', 'update_mask', ), 75 'update_stored_info_type': ('name', 'config', 'update_mask', ), 76 } 77 78 def leave_Call(self, original: cst.Call, updated: cst.Call) -> cst.CSTNode: 79 try: 80 key = original.func.attr.value 81 kword_params = self.METHOD_TO_PARAMS[key] 82 except (AttributeError, KeyError): 83 # Either not a method from the API or too convoluted to be sure. 84 return updated 85 86 # If the existing code is valid, keyword args come after positional args. 87 # Therefore, all positional args must map to the first parameters. 88 args, kwargs = partition(lambda a: not bool(a.keyword), updated.args) 89 if any(k.keyword.value == "request" for k in kwargs): 90 # We've already fixed this file, don't fix it again. 91 return updated 92 93 kwargs, ctrl_kwargs = partition( 94 lambda a: a.keyword.value not in self.CTRL_PARAMS, 95 kwargs 96 ) 97 98 args, ctrl_args = args[:len(kword_params)], args[len(kword_params):] 99 ctrl_kwargs.extend(cst.Arg(value=a.value, keyword=cst.Name(value=ctrl)) 100 for a, ctrl in zip(ctrl_args, self.CTRL_PARAMS)) 101 102 request_arg = cst.Arg( 103 value=cst.Dict([ 104 cst.DictElement( 105 cst.SimpleString("'{}'".format(name)), 106cst.Element(value=arg.value) 107 ) 108 # Note: the args + kwargs looks silly, but keep in mind that 109 # the control parameters had to be stripped out, and that 110 # those could have been passed positionally or by keyword. 111 for name, arg in zip(kword_params, args + kwargs)]), 112 keyword=cst.Name("request") 113 ) 114 115 return updated.with_changes( 116 args=[request_arg] + ctrl_kwargs 117 ) 118 119 120def fix_files( 121 in_dir: pathlib.Path, 122 out_dir: pathlib.Path, 123 *, 124 transformer=dlpCallTransformer(), 125): 126 """Duplicate the input dir to the output dir, fixing file method calls. 127 128 Preconditions: 129 * in_dir is a real directory 130 * out_dir is a real, empty directory 131 """ 132 pyfile_gen = ( 133 pathlib.Path(os.path.join(root, f)) 134 for root, _, files in os.walk(in_dir) 135 for f in files if os.path.splitext(f)[1] == ".py" 136 ) 137 138 for fpath in pyfile_gen: 139 with open(fpath, 'r') as f: 140 src = f.read() 141 142 # Parse the code and insert method call fixes. 143 tree = cst.parse_module(src) 144 updated = tree.visit(transformer) 145 146 # Create the path and directory structure for the new file. 147 updated_path = out_dir.joinpath(fpath.relative_to(in_dir)) 148 updated_path.parent.mkdir(parents=True, exist_ok=True) 149 150 # Generate the updated source file at the corresponding path. 151 with open(updated_path, 'w') as f: 152 f.write(updated.code) 153 154 155if __name__ == '__main__': 156 parser = argparse.ArgumentParser( 157 description="""Fix up source that uses the dlp client library. 158 159The existing sources are NOT overwritten but are copied to output_dir with changes made. 160 161Note: This tool operates at a best-effort level at converting positional 162 parameters in client method calls to keyword based parameters. 163 Cases where it WILL FAIL include 164 A) * or ** expansion in a method call. 165 B) Calls via function or method alias (includes free function calls) 166 C) Indirect or dispatched calls (e.g. the method is looked up dynamically) 167 168 These all constitute false negatives. The tool will also detect false 169 positives when an API method shares a name with another method. 170""") 171 parser.add_argument( 172 '-d', 173 '--input-directory', 174 required=True, 175 dest='input_dir', 176 help='the input directory to walk for python files to fix up', 177 ) 178 parser.add_argument( 179 '-o', 180 '--output-directory', 181 required=True, 182 dest='output_dir', 183 help='the directory to output files fixed via un-flattening', 184 ) 185 args = parser.parse_args() 186 input_dir = pathlib.Path(args.input_dir) 187 output_dir = pathlib.Path(args.output_dir) 188 if not input_dir.is_dir(): 189 print( 190 f"input directory '{input_dir}' does not exist or is not a directory", 191 file=sys.stderr, 192 ) 193 sys.exit(-1) 194 195 if not output_dir.is_dir(): 196 print( 197 f"output directory '{output_dir}' does not exist or is not a directory", 198 file=sys.stderr, 199 ) 200 sys.exit(-1) 201 202 if os.listdir(output_dir): 203 print( 204 f"output directory '{output_dir}' is not empty", 205 file=sys.stderr, 206 ) 207 sys.exit(-1) 208 209 fix_files(input_dir, output_dir) 210