1# coding=utf8
2from __future__ import absolute_import
3
4import argparse
5import ast
6import six
7from six.moves import zip_longest
8
9from fluent.migrate import transforms
10from fluent.migrate.errors import MigrationError
11from fluent.migrate.helpers import transforms_from
12from fluent.syntax import ast as FTL
13from fluent.syntax.visitor import Visitor
14from compare_locales import mozpath
15
16
17class MigrateNotFoundException(Exception):
18    pass
19
20
21class BadContextAPIException(Exception):
22    pass
23
24
25def process_assign(node, context):
26    if isinstance(node.value, ast.Str):
27        val = node.value.s
28    elif isinstance(node.value, ast.Name):
29        val = context.get(node.value.id)
30    elif isinstance(node.value, ast.Call):
31        val = node.value
32    if val is None:
33        return
34    for target in node.targets:
35        if isinstance(target, ast.Name):
36            context[target.id] = val
37
38
39class Validator(object):
40    """Validate a migration recipe
41
42    Extract information from the migration recipe about which files to
43    migrate from, and which files to migrate to.
44    Also check for errors in the recipe, or bad API usage.
45    """
46
47    @classmethod
48    def validate(cls, path, code=None):
49        if code is None:
50            with open(path) as fh:
51                code = fh.read()
52        validator = cls(code, path)
53        return validator.inspect()
54
55    def __init__(self, code, path):
56        self.ast = ast.parse(code, path)
57
58    def inspect(self):
59        migrate_func = None
60        global_assigns = {}
61        for top_level in ast.iter_child_nodes(self.ast):
62            if (
63                isinstance(top_level, ast.FunctionDef)
64                and top_level.name == 'migrate'
65            ):
66                if migrate_func:
67                    raise MigrateNotFoundException(
68                        'Duplicate definition of migrate'
69                    )
70                migrate_func = top_level
71                details = self.inspect_migrate(migrate_func, global_assigns)
72            if isinstance(top_level, ast.Assign):
73                process_assign(top_level, global_assigns)
74            if isinstance(top_level, (ast.Import, ast.ImportFrom)):
75                if 'module' in top_level._fields:
76                    module = top_level.module
77                else:
78                    module = None
79                for alias in top_level.names:
80                    asname = alias.asname or alias.name
81                    dotted = alias.name
82                    if module:
83                        dotted = '{}.{}'.format(module, dotted)
84                    global_assigns[asname] = dotted
85        if not migrate_func:
86            raise MigrateNotFoundException(
87                'migrate function not found'
88            )
89        return details
90
91    def inspect_migrate(self, migrate_func, global_assigns):
92        if (
93                len(migrate_func.args.args) != 1 or
94                any(
95                    getattr(migrate_func.args, arg_field)
96                    for arg_field in migrate_func.args._fields
97                    if arg_field != 'args'
98                )
99        ):
100            raise MigrateNotFoundException(
101                'migrate takes only one positional argument'
102            )
103        arg = migrate_func.args.args[0]
104        if isinstance(arg, ast.Name):
105            ctx_var = arg.id  # python 2
106        else:
107            ctx_var = arg.arg  # python 3
108        visitor = MigrateAnalyzer(ctx_var, global_assigns)
109        visitor.visit(migrate_func)
110        return {
111            'references': visitor.references,
112            'issues': visitor.issues,
113        }
114
115
116def full_name(node, global_assigns):
117    leafs = []
118    while isinstance(node, ast.Attribute):
119        leafs.append(node.attr)
120        node = node.value
121    if isinstance(node, ast.Name):
122        leafs.append(global_assigns.get(node.id, node.id))
123    return '.'.join(reversed(leafs))
124
125
126PATH_TYPES = six.string_types + (ast.Call,)
127
128
129class MigrateAnalyzer(ast.NodeVisitor):
130    def __init__(self, ctx_var, global_assigns):
131        super(MigrateAnalyzer, self).__init__()
132        self.ctx_var = ctx_var
133        self.global_assigns = global_assigns
134        self.depth = 0
135        self.issues = []
136        self.references = set()
137
138    def generic_visit(self, node):
139        self.depth += 1
140        super(MigrateAnalyzer, self).generic_visit(node)
141        self.depth -= 1
142
143    def visit_Assign(self, node):
144        if self.depth == 1:
145            process_assign(node, self.global_assigns)
146        self.generic_visit(node)
147
148    def visit_Attribute(self, node):
149        if isinstance(node.value, ast.Name) and node.value.id == self.ctx_var:
150            if node.attr not in (
151                'add_transforms',
152                'locale',
153            ):
154                raise BadContextAPIException(
155                    'Unexpected attribute access on {}.{}'.format(
156                        self.ctx_var, node.attr
157                    )
158                )
159        self.generic_visit(node)
160
161    def visit_Call(self, node):
162        if (
163                isinstance(node.func, ast.Attribute) and
164                isinstance(node.func.value, ast.Name) and
165                node.func.value.id == self.ctx_var
166        ):
167            return self.call_ctx(node)
168        dotted = full_name(node.func, self.global_assigns)
169        if dotted == 'fluent.migrate.helpers.transforms_from':
170            return self.call_helpers_transforms_from(node)
171        if dotted.startswith('fluent.migrate.'):
172            return self.call_transform(node, dotted)
173        self.generic_visit(node)
174
175    def call_ctx(self, node):
176        if node.func.attr == 'add_transforms':
177            return self.call_add_transforms(node)
178        raise BadContextAPIException(
179            'Unexpected call on {}.{}'.format(
180                self.ctx_var, node.func.attr
181            )
182        )
183
184    def call_add_transforms(self, node):
185        args_msg = (
186            'Expected arguments to {}.add_transforms: '
187            'target_ftl_path, reference_ftl_path, list_of_transforms'
188        ).format(self.ctx_var)
189        ref_msg = (
190            'Expected second argument to {}.add_transforms: '
191            'reference should be string or variable with string value'
192        ).format(self.ctx_var)
193        # Just check call signature here, check actual types below
194        if not self.check_arguments(node, (ast.AST, ast.AST, ast.AST)):
195            self.issues.append({
196                'msg': args_msg,
197                'line': node.lineno,
198            })
199            return
200        in_reference = node.args[1]
201        if isinstance(in_reference, ast.Name):
202            in_reference = self.global_assigns.get(in_reference.id)
203        if isinstance(in_reference, ast.Str):
204            in_reference = in_reference.s
205        if not isinstance(in_reference, six.string_types):
206            self.issues.append({
207                'msg': ref_msg,
208                'line': node.args[1].lineno,
209            })
210            return
211        self.references.add(in_reference)
212        # Checked node.args[1].
213        # There's not a lot we can say about our target path,
214        # ignoring that.
215        # For our transforms, we want more checks.
216        self.generic_visit(node.args[2])
217
218    def call_transform(self, node, dotted):
219        module, called = dotted.rsplit('.', 1)
220        if module not in ('fluent.migrate', 'fluent.migrate.transforms'):
221            return
222        transform = getattr(transforms, called)
223        if not issubclass(transform, transforms.Source):
224            return
225        bad_args = '{} takes path and key as first two params'.format(called)
226        if not self.check_arguments(
227            node, ((ast.Str, ast.Name), (ast.Str, ast.Name),),
228            allow_more=True, check_kwargs=False
229        ):
230            self.issues.append({
231                'msg': bad_args,
232                'line': node.lineno
233            })
234            return
235        path = node.args[0]
236        if isinstance(path, ast.Str):
237            path = path.s
238        if isinstance(path, ast.Name):
239            path = self.global_assigns.get(path.id)
240        if not isinstance(path, PATH_TYPES):
241            self.issues.append({
242                'msg': bad_args,
243                'line': node.lineno
244            })
245
246    def call_helpers_transforms_from(self, node):
247        args_msg = (
248            'Expected arguments to transforms_from: '
249            'str, **substitions'
250        )
251        if not self.check_arguments(
252            node, (ast.Str,), check_kwargs=False
253        ):
254            self.issues.append({
255                'msg': args_msg,
256                'line': node.lineno,
257            })
258            return
259        kwargs = {}
260        found_bad_keywords = False
261        for keyword in node.keywords:
262            v = keyword.value
263            if isinstance(v, ast.Str):
264                v = v.s
265            if isinstance(v, ast.Name):
266                v = self.global_assigns.get(v.id)
267            if isinstance(v, ast.Call):
268                v = 'determined at runtime'
269            if not isinstance(v, PATH_TYPES):
270                msg = 'Bad keyword arg {} to transforms_from'.format(
271                    keyword.arg
272                )
273                self.issues.append({
274                    'msg': msg,
275                    'line': node.lineno,
276                })
277                found_bad_keywords = True
278            else:
279                kwargs[keyword.arg] = v
280        if found_bad_keywords:
281            return
282        try:
283            transforms = transforms_from(node.args[0].s, **kwargs)
284        except MigrationError as e:
285            self.issues.append({
286                'msg': str(e),
287                'line': node.lineno,
288            })
289            return
290        ti = TransformsInspector()
291        ti.visit(transforms)
292        self.issues.extend({
293            'msg': issue,
294            'line': node.lineno,
295        } for issue in set(ti.issues))
296
297    def check_arguments(
298        self, node, argspec, check_kwargs=True, allow_more=False
299    ):
300        if check_kwargs and (
301            node.keywords or
302            (hasattr(node, 'kwargs') and node.kwargs)
303        ):
304            return False
305        if hasattr(node, 'starargs') and node.starargs:
306            return False
307        for arg, NODE_TYPE in zip_longest(node.args, argspec):
308            if NODE_TYPE is None:
309                return True if allow_more else False
310            if not (isinstance(arg, NODE_TYPE)):
311                return False
312        return True
313
314
315class TransformsInspector(Visitor):
316    def __init__(self):
317        super(TransformsInspector, self).__init__()
318        self.issues = []
319
320    def generic_visit(self, node):
321        if isinstance(node, transforms.Source):
322            src = node.path
323            # Source needs paths to be normalized
324            # https://bugzilla.mozilla.org/show_bug.cgi?id=1568199
325            if src != mozpath.normpath(src):
326                self.issues.append(
327                    'Source "{}" needs to be a normalized path'.format(src)
328                )
329        super(TransformsInspector, self).generic_visit(node)
330
331
332def cli():
333    parser = argparse.ArgumentParser()
334    parser.add_argument('migration')
335    args = parser.parse_args()
336    issues = Validator.validate(args.migration)['issues']
337    for issue in issues:
338        print(issue['msg'], 'at line', issue['line'])
339    return 1 if issues else 0
340