1#! /usr/bin/env python3 2{% extends '_base.py.j2' %} 3{% block content %} 4import argparse 5import os 6import libcst as cst 7import pathlib 8import sys 9from typing import (Any, Callable, Dict, List, Sequence, Tuple) 10 11 12def partition( 13 predicate: Callable[[Any], bool], 14 iterator: Sequence[Any] 15) -> Tuple[List[Any], List[Any]]: 16 """A stable, out-of-place partition.""" 17 results = ([], []) 18 19 for i in iterator: 20 results[int(predicate(i))].append(i) 21 22 # Returns trueList, falseList 23 return results[1], results[0] 24 25 26class {{ api.naming.module_name }}CallTransformer(cst.CSTTransformer): 27 CTRL_PARAMS: Tuple[str] = ('retry', 'timeout', 'metadata') 28 {% with all_methods = [] -%} 29 {% for service in api.services.values() %}{% for method in service.methods.values() -%} 30 {% do all_methods.append(method) -%} 31 {% endfor %}{% endfor -%} 32 METHOD_TO_PARAMS: Dict[str, Tuple[str]] = { 33 {% for method in all_methods|sort(attribute='name')|unique(attribute='name') -%} 34 '{{ method.name|snake_case }}': ({% for field in method.legacy_flattened_fields.values() %}'{{ field.name }}', {% endfor %}), 35 {% endfor -%} 36 {% if opts.add_iam_methods %} 37 'get_iam_policy': ('resource', 'options', ), 38 'set_iam_policy': ('resource', 'policy', ), 39 'test_iam_permissions': ('resource', 'permissions', ), 40 {% endif %} 41 } 42 {% endwith %} 43 44 def leave_Call(self, original: cst.Call, updated: cst.Call) -> cst.CSTNode: 45 try: 46 key = original.func.attr.value 47 kword_params = self.METHOD_TO_PARAMS[key] 48 except (AttributeError, KeyError): 49 # Either not a method from the API or too convoluted to be sure. 50 return updated 51 52 53 # If the existing code is valid, keyword args come after positional args. 54 # Therefore, all positional args must map to the first parameters. 55 args, kwargs = partition(lambda a: not bool(a.keyword), updated.args) 56 if any(k.keyword.value == "request" for k in kwargs): 57 # We've already fixed this file, don't fix it again. 58 return updated 59 60 kwargs, ctrl_kwargs = partition( 61 lambda a: not a.keyword.value in self.CTRL_PARAMS, 62 kwargs 63 ) 64 65 args, ctrl_args = args[:len(kword_params)], args[len(kword_params):] 66 ctrl_kwargs.extend(cst.Arg(value=a.value, keyword=cst.Name(value=ctrl)) 67 for a, ctrl in zip(ctrl_args, self.CTRL_PARAMS)) 68 69 request_arg = cst.Arg( 70 value=cst.Dict([ 71 cst.DictElement( 72 cst.SimpleString("'{}'".format(name)), 73 {# Inline comments and formatting are currently stripped out. -#} 74 {# My current attempts at preverving comments and formatting -#} 75 {# keep the comments, but the formatting is run through a log -#} 76 {# chipper, and an extra comma gets added, which causes a -#} 77 {# parse error. -#} 78 cst.Element(value=arg.value) 79 ) 80 # Note: the args + kwargs looks silly, but keep in mind that 81 # the control parameters had to be stripped out, and that 82 # those could have been passed positionally or by keyword. 83 for name, arg in zip(kword_params, args + kwargs)]), 84 keyword=cst.Name("request") 85 ) 86 87 return updated.with_changes( 88 args=[request_arg] + ctrl_kwargs 89 ) 90 91 92def fix_files( 93 in_dir: pathlib.Path, 94 out_dir: pathlib.Path, 95 *, 96 transformer={{ api.naming.module_name }}CallTransformer(), 97): 98 """Duplicate the input dir to the output dir, fixing file method calls. 99 100 Preconditions: 101 * in_dir is a real directory 102 * out_dir is a real, empty directory 103 """ 104 pyfile_gen = ( 105 pathlib.Path(os.path.join(root, f)) 106 for root, _, files in os.walk(in_dir) 107 for f in files if os.path.splitext(f)[1] == ".py" 108 ) 109 110 for fpath in pyfile_gen: 111 with open(fpath, 'r') as f: 112 src = f.read() 113 114 # Parse the code and insert method call fixes. 115 tree = cst.parse_module(src) 116 updated = tree.visit(transformer) 117 118 # Create the path and directory structure for the new file. 119 updated_path = out_dir.joinpath(fpath.relative_to(in_dir)) 120 updated_path.parent.mkdir(parents=True, exist_ok=True) 121 122 # Generate the updated source file at the corresponding path. 123 with open(updated_path, 'w') as f: 124 f.write(updated.code) 125 126 127if __name__ == '__main__': 128 parser = argparse.ArgumentParser( 129 description="""Fix up source that uses the {{ api.naming.module_name }} client library. 130 131The existing sources are NOT overwritten but are copied to output_dir with changes made. 132 133Note: This tool operates at a best-effort level at converting positional 134 parameters in client method calls to keyword based parameters. 135 Cases where it WILL FAIL include 136 A) * or ** expansion in a method call. 137 B) Calls via function or method alias (includes free function calls) 138 C) Indirect or dispatched calls (e.g. the method is looked up dynamically) 139 140 These all constitute false negatives. The tool will also detect false 141 positives when an API method shares a name with another method. 142""") 143 parser.add_argument( 144 '-d', 145 '--input-directory', 146 required=True, 147 dest='input_dir', 148 help='the input directory to walk for python files to fix up', 149 ) 150 parser.add_argument( 151 '-o', 152 '--output-directory', 153 required=True, 154 dest='output_dir', 155 help='the directory to output files fixed via un-flattening', 156 ) 157 args = parser.parse_args() 158 input_dir = pathlib.Path(args.input_dir) 159 output_dir = pathlib.Path(args.output_dir) 160 if not input_dir.is_dir(): 161 print( 162 f"input directory '{input_dir}' does not exist or is not a directory", 163 file=sys.stderr, 164 ) 165 sys.exit(-1) 166 167 if not output_dir.is_dir(): 168 print( 169 f"output directory '{output_dir}' does not exist or is not a directory", 170 file=sys.stderr, 171 ) 172 sys.exit(-1) 173 174 if os.listdir(output_dir): 175 print( 176 f"output directory '{output_dir}' is not empty", 177 file=sys.stderr, 178 ) 179 sys.exit(-1) 180 181 fix_files(input_dir, output_dir) 182{% endblock %} 183