1#! /usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright 2020 Google LLC
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17import argparse
18import os
19import libcst as cst
20import pathlib
21import sys
22from typing import (Any, Callable, Dict, List, Sequence, Tuple)
23
24
25def partition(
26    predicate: Callable[[Any], bool],
27    iterator: Sequence[Any]
28) -> Tuple[List[Any], List[Any]]:
29    """A stable, out-of-place partition."""
30    results = ([], [])
31
32    for i in iterator:
33        results[int(predicate(i))].append(i)
34
35    # Returns trueList, falseList
36    return results[1], results[0]
37
38
39class bigtableCallTransformer(cst.CSTTransformer):
40    CTRL_PARAMS: Tuple[str] = ('retry', 'timeout', 'metadata')
41    METHOD_TO_PARAMS: Dict[str, Tuple[str]] = {
42        'check_and_mutate_row': ('table_name', 'row_key', 'app_profile_id', 'predicate_filter', 'true_mutations', 'false_mutations', ),
43        'mutate_row': ('table_name', 'row_key', 'mutations', 'app_profile_id', ),
44        'mutate_rows': ('table_name', 'entries', 'app_profile_id', ),
45        'read_modify_write_row': ('table_name', 'row_key', 'rules', 'app_profile_id', ),
46        'read_rows': ('table_name', 'app_profile_id', 'rows', 'filter', 'rows_limit', ),
47        'sample_row_keys': ('table_name', 'app_profile_id', ),
48    }
49
50    def leave_Call(self, original: cst.Call, updated: cst.Call) -> cst.CSTNode:
51        try:
52            key = original.func.attr.value
53            kword_params = self.METHOD_TO_PARAMS[key]
54        except (AttributeError, KeyError):
55            # Either not a method from the API or too convoluted to be sure.
56            return updated
57
58        # If the existing code is valid, keyword args come after positional args.
59        # Therefore, all positional args must map to the first parameters.
60        args, kwargs = partition(lambda a: not bool(a.keyword), updated.args)
61        if any(k.keyword.value == "request" for k in kwargs):
62            # We've already fixed this file, don't fix it again.
63            return updated
64
65        kwargs, ctrl_kwargs = partition(
66            lambda a: a.keyword.value not in self.CTRL_PARAMS,
67            kwargs
68        )
69
70        args, ctrl_args = args[:len(kword_params)], args[len(kword_params):]
71        ctrl_kwargs.extend(cst.Arg(value=a.value, keyword=cst.Name(value=ctrl))
72                           for a, ctrl in zip(ctrl_args, self.CTRL_PARAMS))
73
74        request_arg = cst.Arg(
75            value=cst.Dict([
76                cst.DictElement(
77                    cst.SimpleString("'{}'".format(name)),
78cst.Element(value=arg.value)
79                )
80                # Note: the args + kwargs looks silly, but keep in mind that
81                # the control parameters had to be stripped out, and that
82                # those could have been passed positionally or by keyword.
83                for name, arg in zip(kword_params, args + kwargs)]),
84            keyword=cst.Name("request")
85        )
86
87        return updated.with_changes(
88            args=[request_arg] + ctrl_kwargs
89        )
90
91
92def fix_files(
93    in_dir: pathlib.Path,
94    out_dir: pathlib.Path,
95    *,
96    transformer=bigtableCallTransformer(),
97):
98    """Duplicate the input dir to the output dir, fixing file method calls.
99
100    Preconditions:
101    * in_dir is a real directory
102    * out_dir is a real, empty directory
103    """
104    pyfile_gen = (
105        pathlib.Path(os.path.join(root, f))
106        for root, _, files in os.walk(in_dir)
107        for f in files if os.path.splitext(f)[1] == ".py"
108    )
109
110    for fpath in pyfile_gen:
111        with open(fpath, 'r') as f:
112            src = f.read()
113
114        # Parse the code and insert method call fixes.
115        tree = cst.parse_module(src)
116        updated = tree.visit(transformer)
117
118        # Create the path and directory structure for the new file.
119        updated_path = out_dir.joinpath(fpath.relative_to(in_dir))
120        updated_path.parent.mkdir(parents=True, exist_ok=True)
121
122        # Generate the updated source file at the corresponding path.
123        with open(updated_path, 'w') as f:
124            f.write(updated.code)
125
126
127if __name__ == '__main__':
128    parser = argparse.ArgumentParser(
129        description="""Fix up source that uses the bigtable client library.
130
131The existing sources are NOT overwritten but are copied to output_dir with changes made.
132
133Note: This tool operates at a best-effort level at converting positional
134      parameters in client method calls to keyword based parameters.
135      Cases where it WILL FAIL include
136      A) * or ** expansion in a method call.
137      B) Calls via function or method alias (includes free function calls)
138      C) Indirect or dispatched calls (e.g. the method is looked up dynamically)
139
140      These all constitute false negatives. The tool will also detect false
141      positives when an API method shares a name with another method.
142""")
143    parser.add_argument(
144        '-d',
145        '--input-directory',
146        required=True,
147        dest='input_dir',
148        help='the input directory to walk for python files to fix up',
149    )
150    parser.add_argument(
151        '-o',
152        '--output-directory',
153        required=True,
154        dest='output_dir',
155        help='the directory to output files fixed via un-flattening',
156    )
157    args = parser.parse_args()
158    input_dir = pathlib.Path(args.input_dir)
159    output_dir = pathlib.Path(args.output_dir)
160    if not input_dir.is_dir():
161        print(
162            f"input directory '{input_dir}' does not exist or is not a directory",
163            file=sys.stderr,
164        )
165        sys.exit(-1)
166
167    if not output_dir.is_dir():
168        print(
169            f"output directory '{output_dir}' does not exist or is not a directory",
170            file=sys.stderr,
171        )
172        sys.exit(-1)
173
174    if os.listdir(output_dir):
175        print(
176            f"output directory '{output_dir}' is not empty",
177            file=sys.stderr,
178        )
179        sys.exit(-1)
180
181    fix_files(input_dir, output_dir)
182