1# coding=utf8 2from __future__ import absolute_import 3 4import argparse 5import ast 6import six 7from six.moves import zip_longest 8 9from fluent.migrate import transforms 10from fluent.migrate.errors import MigrationError 11from fluent.migrate.helpers import transforms_from 12from fluent.syntax import ast as FTL 13from fluent.syntax.visitor import Visitor 14from compare_locales import mozpath 15 16 17class MigrateNotFoundException(Exception): 18 pass 19 20 21class BadContextAPIException(Exception): 22 pass 23 24 25def process_assign(node, context): 26 if isinstance(node.value, ast.Str): 27 val = node.value.s 28 elif isinstance(node.value, ast.Name): 29 val = context.get(node.value.id) 30 elif isinstance(node.value, ast.Call): 31 val = node.value 32 if val is None: 33 return 34 for target in node.targets: 35 if isinstance(target, ast.Name): 36 context[target.id] = val 37 38 39class Validator(object): 40 """Validate a migration recipe 41 42 Extract information from the migration recipe about which files to 43 migrate from, and which files to migrate to. 44 Also check for errors in the recipe, or bad API usage. 45 """ 46 47 @classmethod 48 def validate(cls, path, code=None): 49 if code is None: 50 with open(path) as fh: 51 code = fh.read() 52 validator = cls(code, path) 53 return validator.inspect() 54 55 def __init__(self, code, path): 56 self.ast = ast.parse(code, path) 57 58 def inspect(self): 59 migrate_func = None 60 global_assigns = {} 61 for top_level in ast.iter_child_nodes(self.ast): 62 if ( 63 isinstance(top_level, ast.FunctionDef) 64 and top_level.name == 'migrate' 65 ): 66 if migrate_func: 67 raise MigrateNotFoundException( 68 'Duplicate definition of migrate' 69 ) 70 migrate_func = top_level 71 details = self.inspect_migrate(migrate_func, global_assigns) 72 if isinstance(top_level, ast.Assign): 73 process_assign(top_level, global_assigns) 74 if isinstance(top_level, (ast.Import, ast.ImportFrom)): 75 if 'module' in top_level._fields: 76 module = top_level.module 77 else: 78 module = None 79 for alias in top_level.names: 80 asname = alias.asname or alias.name 81 dotted = alias.name 82 if module: 83 dotted = '{}.{}'.format(module, dotted) 84 global_assigns[asname] = dotted 85 if not migrate_func: 86 raise MigrateNotFoundException( 87 'migrate function not found' 88 ) 89 return details 90 91 def inspect_migrate(self, migrate_func, global_assigns): 92 if ( 93 len(migrate_func.args.args) != 1 or 94 any( 95 getattr(migrate_func.args, arg_field) 96 for arg_field in migrate_func.args._fields 97 if arg_field != 'args' 98 ) 99 ): 100 raise MigrateNotFoundException( 101 'migrate takes only one positional argument' 102 ) 103 arg = migrate_func.args.args[0] 104 if isinstance(arg, ast.Name): 105 ctx_var = arg.id # python 2 106 else: 107 ctx_var = arg.arg # python 3 108 visitor = MigrateAnalyzer(ctx_var, global_assigns) 109 visitor.visit(migrate_func) 110 return { 111 'references': visitor.references, 112 'issues': visitor.issues, 113 } 114 115 116def full_name(node, global_assigns): 117 leafs = [] 118 while isinstance(node, ast.Attribute): 119 leafs.append(node.attr) 120 node = node.value 121 if isinstance(node, ast.Name): 122 leafs.append(global_assigns.get(node.id, node.id)) 123 return '.'.join(reversed(leafs)) 124 125 126PATH_TYPES = six.string_types + (ast.Call,) 127 128 129class MigrateAnalyzer(ast.NodeVisitor): 130 def __init__(self, ctx_var, global_assigns): 131 super(MigrateAnalyzer, self).__init__() 132 self.ctx_var = ctx_var 133 self.global_assigns = global_assigns 134 self.depth = 0 135 self.issues = [] 136 self.references = set() 137 138 def generic_visit(self, node): 139 self.depth += 1 140 super(MigrateAnalyzer, self).generic_visit(node) 141 self.depth -= 1 142 143 def visit_Assign(self, node): 144 if self.depth == 1: 145 process_assign(node, self.global_assigns) 146 self.generic_visit(node) 147 148 def visit_Attribute(self, node): 149 if isinstance(node.value, ast.Name) and node.value.id == self.ctx_var: 150 if node.attr not in ( 151 'add_transforms', 152 'locale', 153 ): 154 raise BadContextAPIException( 155 'Unexpected attribute access on {}.{}'.format( 156 self.ctx_var, node.attr 157 ) 158 ) 159 self.generic_visit(node) 160 161 def visit_Call(self, node): 162 if ( 163 isinstance(node.func, ast.Attribute) and 164 isinstance(node.func.value, ast.Name) and 165 node.func.value.id == self.ctx_var 166 ): 167 return self.call_ctx(node) 168 dotted = full_name(node.func, self.global_assigns) 169 if dotted == 'fluent.migrate.helpers.transforms_from': 170 return self.call_helpers_transforms_from(node) 171 if dotted.startswith('fluent.migrate.'): 172 return self.call_transform(node, dotted) 173 self.generic_visit(node) 174 175 def call_ctx(self, node): 176 if node.func.attr == 'add_transforms': 177 return self.call_add_transforms(node) 178 raise BadContextAPIException( 179 'Unexpected call on {}.{}'.format( 180 self.ctx_var, node.func.attr 181 ) 182 ) 183 184 def call_add_transforms(self, node): 185 args_msg = ( 186 'Expected arguments to {}.add_transforms: ' 187 'target_ftl_path, reference_ftl_path, list_of_transforms' 188 ).format(self.ctx_var) 189 ref_msg = ( 190 'Expected second argument to {}.add_transforms: ' 191 'reference should be string or variable with string value' 192 ).format(self.ctx_var) 193 # Just check call signature here, check actual types below 194 if not self.check_arguments(node, (ast.AST, ast.AST, ast.AST)): 195 self.issues.append({ 196 'msg': args_msg, 197 'line': node.lineno, 198 }) 199 return 200 in_reference = node.args[1] 201 if isinstance(in_reference, ast.Name): 202 in_reference = self.global_assigns.get(in_reference.id) 203 if isinstance(in_reference, ast.Str): 204 in_reference = in_reference.s 205 if not isinstance(in_reference, six.string_types): 206 self.issues.append({ 207 'msg': ref_msg, 208 'line': node.args[1].lineno, 209 }) 210 return 211 self.references.add(in_reference) 212 # Checked node.args[1]. 213 # There's not a lot we can say about our target path, 214 # ignoring that. 215 # For our transforms, we want more checks. 216 self.generic_visit(node.args[2]) 217 218 def call_transform(self, node, dotted): 219 module, called = dotted.rsplit('.', 1) 220 if module not in ('fluent.migrate', 'fluent.migrate.transforms'): 221 return 222 transform = getattr(transforms, called) 223 if not issubclass(transform, transforms.Source): 224 return 225 bad_args = '{} takes path and key as first two params'.format(called) 226 if not self.check_arguments( 227 node, ((ast.Str, ast.Name), (ast.Str, ast.Name),), 228 allow_more=True, check_kwargs=False 229 ): 230 self.issues.append({ 231 'msg': bad_args, 232 'line': node.lineno 233 }) 234 return 235 path = node.args[0] 236 if isinstance(path, ast.Str): 237 path = path.s 238 if isinstance(path, ast.Name): 239 path = self.global_assigns.get(path.id) 240 if not isinstance(path, PATH_TYPES): 241 self.issues.append({ 242 'msg': bad_args, 243 'line': node.lineno 244 }) 245 246 def call_helpers_transforms_from(self, node): 247 args_msg = ( 248 'Expected arguments to transforms_from: ' 249 'str, **substitions' 250 ) 251 if not self.check_arguments( 252 node, (ast.Str,), check_kwargs=False 253 ): 254 self.issues.append({ 255 'msg': args_msg, 256 'line': node.lineno, 257 }) 258 return 259 kwargs = {} 260 found_bad_keywords = False 261 for keyword in node.keywords: 262 v = keyword.value 263 if isinstance(v, ast.Str): 264 v = v.s 265 if isinstance(v, ast.Name): 266 v = self.global_assigns.get(v.id) 267 if isinstance(v, ast.Call): 268 v = 'determined at runtime' 269 if not isinstance(v, PATH_TYPES): 270 msg = 'Bad keyword arg {} to transforms_from'.format( 271 keyword.arg 272 ) 273 self.issues.append({ 274 'msg': msg, 275 'line': node.lineno, 276 }) 277 found_bad_keywords = True 278 else: 279 kwargs[keyword.arg] = v 280 if found_bad_keywords: 281 return 282 try: 283 transforms = transforms_from(node.args[0].s, **kwargs) 284 except MigrationError as e: 285 self.issues.append({ 286 'msg': str(e), 287 'line': node.lineno, 288 }) 289 return 290 ti = TransformsInspector() 291 ti.visit(transforms) 292 self.issues.extend({ 293 'msg': issue, 294 'line': node.lineno, 295 } for issue in set(ti.issues)) 296 297 def check_arguments( 298 self, node, argspec, check_kwargs=True, allow_more=False 299 ): 300 if check_kwargs and ( 301 node.keywords or 302 (hasattr(node, 'kwargs') and node.kwargs) 303 ): 304 return False 305 if hasattr(node, 'starargs') and node.starargs: 306 return False 307 for arg, NODE_TYPE in zip_longest(node.args, argspec): 308 if NODE_TYPE is None: 309 return True if allow_more else False 310 if not (isinstance(arg, NODE_TYPE)): 311 return False 312 return True 313 314 315class TransformsInspector(Visitor): 316 def __init__(self): 317 super(TransformsInspector, self).__init__() 318 self.issues = [] 319 320 def generic_visit(self, node): 321 if isinstance(node, transforms.Source): 322 src = node.path 323 # Source needs paths to be normalized 324 # https://bugzilla.mozilla.org/show_bug.cgi?id=1568199 325 if src != mozpath.normpath(src): 326 self.issues.append( 327 'Source "{}" needs to be a normalized path'.format(src) 328 ) 329 super(TransformsInspector, self).generic_visit(node) 330 331 332def cli(): 333 parser = argparse.ArgumentParser() 334 parser.add_argument('migration') 335 args = parser.parse_args() 336 issues = Validator.validate(args.migration)['issues'] 337 for issue in issues: 338 print(issue['msg'], 'at line', issue['line']) 339 return 1 if issues else 0 340