1"""The pyang library for parsing, validating, and converting YANG modules""" 2 3import os 4import string 5import sys 6import zlib 7import re 8import io 9 10from . import error 11from . import yang_parser 12from . import yin_parser 13from . import grammar 14from . import util 15from . import statements 16from . import syntax 17 18__version__ = '1.7.8' 19__date__ = '2019-01-21' 20 21class Context(object): 22 """Class which encapsulates a parse session""" 23 24 def __init__(self, repository): 25 """`repository` is a `Repository` instance""" 26 27 self.modules = {} 28 """dict of (modulename,revision):<class Statement> 29 contains all modules and submodule found""" 30 31 self.revs = {} 32 """dict of modulename:(revision,handle) 33 contains all modulenames and revisions found in the repository""" 34 35 self.strict = False 36 self.repository = repository 37 self.errors = [] 38 self.canonical = False 39 self.max_line_len = None 40 self.max_identifier_len = None 41 self.implicit_errors = True 42 self.lax_quote_checks = False 43 self.lax_xpath_checks = False 44 self.deviation_modules = [] 45 self.features = {} 46 self.max_status = None 47 self.keep_comments = False 48 self.keep_arg_substrings = False 49 50 for mod, rev, handle in self.repository.get_modules_and_revisions(self): 51 if mod not in self.revs: 52 self.revs[mod] = [] 53 revs = self.revs[mod] 54 revs.append((rev, handle)) 55 56 def add_module(self, ref, text, format=None, 57 expect_modulename=None, expect_revision=None, 58 expect_failure_error=True): 59 """Parse a module text and add the module data to the context 60 61 `ref` is a string which is used to identify the source of 62 the text for the user. used in error messages 63 `text` is the raw text data 64 `format` is one of 'yang' or 'yin'. 65 66 Returns the parsed and validated module on success, and None on error. 67 """ 68 if format == None: 69 format = util.guess_format(text) 70 71 if format == 'yin': 72 p = yin_parser.YinParser() 73 else: 74 p = yang_parser.YangParser() 75 76 module = p.parse(self, ref, text) 77 if module is None: 78 return None 79 80 if expect_modulename is not None: 81 if not re.match(syntax.re_identifier, expect_modulename): 82 error.err_add(self.errors, module.pos, 83 'FILENAME_BAD_MODULE_NAME', 84 (ref, expect_modulename, syntax.identifier)) 85 elif expect_modulename != module.arg: 86 if expect_failure_error: 87 error.err_add(self.errors, module.pos, 'BAD_MODULE_NAME', 88 (module.arg, ref, expect_modulename)) 89 return None 90 else: 91 error.err_add(self.errors, module.pos, 'WBAD_MODULE_NAME', 92 (module.arg, ref, expect_modulename)) 93 94 latest_rev = util.get_latest_revision(module) 95 if expect_revision is not None: 96 if not re.match(syntax.re_date, expect_revision): 97 error.err_add(self.errors, module.pos, 'FILENAME_BAD_REVISION', 98 (ref, expect_revision, 'YYYY-MM-DD')) 99 elif expect_revision != latest_rev: 100 if expect_failure_error: 101 error.err_add(self.errors, module.pos, 'BAD_REVISION', 102 (latest_rev, ref, expect_revision)) 103 return None 104 else: 105 error.err_add(self.errors, module.pos, 'WBAD_REVISION', 106 (latest_rev, ref, expect_revision)) 107 108 if module.arg not in self.revs: 109 self.revs[module.arg] = [] 110 revs = self.revs[module.arg] 111 revs.append((latest_rev, None)) 112 113 return self.add_parsed_module(module) 114 115 def add_parsed_module(self, module): 116 if module is None: 117 return None 118 if module.arg is None: 119 error.err_add(self.errors, module.pos, 120 'EXPECTED_ARGUMENT', module.keyword) 121 return None 122 top_keywords = ['module', 'submodule'] 123 if module.keyword not in top_keywords: 124 error.err_add(self.errors, module.pos, 125 'UNEXPECTED_KEYWORD_N', 126 (module.keyword, top_keywords)) 127 return None 128 129 rev = util.get_latest_revision(module) 130 if (module.arg, rev) in self.modules: 131 other = self.modules[(module.arg, rev)] 132 return other 133 134 self.modules[(module.arg, rev)] = module 135 statements.validate_module(self, module) 136 137 return module 138 139 def del_module(self, module): 140 """Remove a module from the context""" 141 rev = util.get_latest_revision(module) 142 del self.modules[(module.arg, rev)] 143 144 def get_module(self, modulename, revision=None): 145 """Return the module if it exists in the context""" 146 if revision is None and modulename in self.revs: 147 (revision, _handle) = self._get_latest_rev(self.revs[modulename]) 148 if revision is not None: 149 if (modulename,revision) in self.modules: 150 return self.modules[(modulename, revision)] 151 else: 152 return None 153 154 def _get_latest_rev(self, revs): 155 self._ensure_revs(revs) 156 latest = None 157 lhandle = None 158 for (rev, handle) in revs: 159 if rev is not None and (latest is None or rev > latest): 160 latest = rev 161 lhandle = handle 162 return (latest, lhandle) 163 164 def _ensure_revs(self, revs): 165 i = 0 166 length = len(revs) 167 while i < length: 168 (rev, handle) = revs[i] 169 if rev is None: 170 # now we must read the revision from the module 171 try: 172 r = self.repository.get_module_from_handle(handle) 173 except self.repository.ReadError as ex: 174 i += 1 175 continue 176 (ref, format, text) = r 177 178 if format == None: 179 format = util.guess_format(text) 180 181 if format == 'yin': 182 yintext = text 183 p = yin_parser.YinParser({'no_include':True, 184 'no_extensions':True}) 185 else: 186 yintext = None 187 p = yang_parser.YangParser() 188 189 module = p.parse(self, ref, text) 190 if module is not None: 191 rev = util.get_latest_revision(module) 192 revs[i] = (rev, ('parsed', module, ref, yintext)) 193 i += 1 194 195 def search_module(self, pos, modulename, revision=None): 196 """Searches for a module named `modulename` in the repository 197 198 If the module is found, it is added to the context. 199 Returns the module if found, and None otherwise""" 200 201 if modulename not in self.revs: 202 # this module doesn't exist in the repos at all 203 error.err_add(self.errors, pos, 'MODULE_NOT_FOUND', modulename) 204 # keep track of this to avoid multiple errors 205 self.revs[modulename] = [] 206 return None 207 elif self.revs[modulename] == []: 208 # this module doesn't exist in the repos at all, error reported 209 return None 210 211 if revision is not None: 212 if (modulename,revision) in self.modules: 213 return self.modules[(modulename, revision)] 214 self._ensure_revs(self.revs[modulename]) 215 x = util.keysearch(revision, 0, self.revs[modulename]) 216 if x is not None: 217 (_revision, handle) = x 218 if handle == None: 219 # this revision doesn't exist in the repos, error reported 220 return None 221 else: 222 # this revision doesn't exist in the repos 223 error.err_add(self.errors, pos, 'MODULE_NOT_FOUND_REV', 224 (modulename, revision)) 225 # keep track of this to avoid multiple errors 226 self.revs[modulename].append((revision, None)) 227 return None 228 else: 229 # get the latest revision 230 (revision, handle) = self._get_latest_rev(self.revs[modulename]) 231 if (modulename, revision) in self.modules: 232 return self.modules[(modulename, revision)] 233 234 if handle is None: 235 module = None 236 elif handle[0] == 'parsed': 237 module = handle[1] 238 ref = handle[2] 239 yintext = handle[3] 240 if modulename != module.arg: 241 error.err_add(self.errors, module.pos, 'BAD_MODULE_NAME', 242 (module.arg, ref, modulename)) 243 module = None 244 elif yintext is None: 245 module = self.add_parsed_module(handle[1]) 246 else: 247 p = yin_parser.YinParser() 248 self.yin_module_map[module.arg] = [] 249 module = p.parse(self, ref, yintext) 250 if module is not None: 251 module = self.add_parsed_module(module) 252 else: 253 # get it from the repos 254 try: 255 r = self.repository.get_module_from_handle(handle) 256 (ref, format, text) = r 257 module = self.add_module(ref, text, format, 258 modulename, revision) 259 except self.repository.ReadError as ex: 260 error.err_add(self.errors, pos, 'READ_ERROR', str(ex)) 261 module = None 262 263 if module == None: 264 return None 265 # if modulename != module.arg: 266 # error.err_add(self.errors, module.pos, 'BAD_MODULE_FILENAME', 267 # (module.arg, ref, modulename)) 268 # latest_rev = util.get_latest_revision(module) 269 270 # if revision is not None and revision != latest_rev: 271 # error.err_add(self.errors, module.pos, 'BAD_REVISION', 272 # (latest_rev, ref, revision)) 273 274 # self.del_module(module) 275 # self.modules[(modulename, latest_rev)] = None 276 # return None 277 return module 278 279 def read_module(self, modulename, revision=None, extra={}): 280 """Searches for a module named `modulename` in the repository 281 282 The module is just read, and not compiled at all. 283 Returns the module if found, and None otherwise""" 284 285 if modulename not in self.revs: 286 # this module doesn't exist in the repos at all 287 return None 288 elif self.revs[modulename] == []: 289 # this module doesn't exist in the repos at all, error reported 290 return None 291 292 if revision is not None: 293 if (modulename,revision) in self.modules: 294 return self.modules[(modulename, revision)] 295 self._ensure_revs(self.revs[modulename]) 296 x = util.keysearch(revision, 1, self.revs[modulename]) 297 if x is not None: 298 (_revision, handle) = x 299 if handle == None: 300 # this revision doesn't exist in the repos, error reported 301 return None 302 else: 303 # this revision doesn't exist in the repos 304 return None 305 else: 306 # get the latest revision 307 (revision, handle) = self._get_latest_rev(self.revs[modulename]) 308 if (modulename, revision) in self.modules: 309 return self.modules[(modulename, revision)] 310 311 if handle[0] == 'parsed': 312 module = handle[1] 313 return module 314 else: 315 # get it from the repos 316 try: 317 r = self.repository.get_module_from_handle(handle) 318 (ref, format, text) = r 319 if format == None: 320 format = util.guess_format(text) 321 322 if format == 'yin': 323 p = yin_parser.YinParser(extra) 324 else: 325 p = yang_parser.YangParser(extra) 326 327 return p.parse(self, ref, text) 328 except self.repository.ReadError as ex: 329 return None 330 331 def validate(self): 332 uris = {} 333 for k in self.modules: 334 m = self.modules[k] 335 if m != None: 336 namespace = m.search_one('namespace') 337 if namespace != None: 338 uri = namespace.arg 339 if uri in uris: 340 if uris[uri] != m.arg: 341 error.err_add(self.errors, namespace.pos, 342 'DUPLICATE_NAMESPACE', 343 (uri, uris[uri])) 344 else: 345 uris[uri] = m.arg 346 347class Repository(object): 348 """Abstract base class that represents a module repository""" 349 350 def __init__(self): 351 pass 352 353 def get_modules_and_revisions(self, ctx): 354 """Return a list of all modules and their revisons 355 356 Returns a tuple (`modulename`, `revision`, `handle`), where 357 `handle' is used in the call to get_module_from_handle() to 358 retrieve the module. 359 """ 360 361 def get_module_from_handle(self, handle): 362 """Return the raw module text from the repository 363 364 Returns (`ref`, `format`, `text`) if found, or None if not found. 365 `ref` is a string which is used to identify the source of 366 the text for the user. used in error messages 367 `format` is one of 'yang' or 'yin' or None. 368 `text` is the raw text data 369 370 Raises `ReadError` 371 """ 372 373 class ReadError(Exception): 374 """Signals that an error occured during module retrieval""" 375 376 def __init__(self, str): 377 Exception.__init__(self, str) 378 379class FileRepository(Repository): 380 def __init__(self, path="", use_env=True, no_path_recurse=False, 381 verbose=False): 382 """Create a Repository which searches the filesystem for modules 383 384 `path` is a `os.pathsep`-separated string of directories 385 """ 386 387 Repository.__init__(self) 388 self.dirs = path.split(os.pathsep) 389 self.no_path_recurse = no_path_recurse 390 self.modules = None 391 self.verbose = verbose 392 393 if use_env: 394 modpath = os.getenv('YANG_MODPATH') 395 if modpath is not None: 396 self.dirs.extend(modpath.split(os.pathsep)) 397 398 home = os.getenv('HOME') 399 if home is not None: 400 self.dirs.append(os.path.join(home, 'yang', 'modules')) 401 402 inst = os.getenv('YANG_INSTALL') 403 if inst is not None: 404 self.dirs.append(os.path.join(inst, 'yang', 'modules')) 405 return # skip search if install location is indicated 406 407 default_install = os.path.join(sys.prefix, 408 'share','yang','modules') 409 if os.path.exists(default_install): 410 self.dirs.append(default_install) 411 return # end search if default location exists 412 413 # for some systems, sys.prefix returns `/usr` 414 # but the real location is `/usr/local` 415 # if the package is installed with pip 416 # this information can be easily retrieved 417 import pkgutil 418 if not pkgutil.find_loader('pip'): 419 return # abort search if pip is not installed 420 421 # hack below to handle pip 10 internals 422 # if someone knows pip and how to fix this, it would be great! 423 location = None 424 try: 425 import pip.locations as locations 426 location = locations.distutils_scheme('pyang') 427 except: 428 try: 429 import pip._internal.locations as locations 430 location = locations.distutils_scheme('pyang') 431 except: 432 pass 433 if location is not None: 434 self.dirs.append(os.path.join(location['data'], 435 'share','yang','modules')) 436 437 438 439 def _setup(self, ctx): 440 # check all dirs for yang and yin files 441 self.modules = [] 442 def add_files_from_dir(d): 443 try: 444 files = os.listdir(d) 445 except OSError: 446 files = [] 447 for fname in files: 448 absfilename = os.path.join(d, fname) 449 if os.path.isfile(absfilename): 450 m = syntax.re_filename.search(fname) 451 if m is not None: 452 (name, rev, format) = m.groups() 453 if not os.access(absfilename, os.R_OK): continue 454 if absfilename.startswith("./"): 455 absfilename = absfilename[2:] 456 handle = (format, absfilename) 457 self.modules.append((name, rev, handle)) 458 elif (not self.no_path_recurse 459 and d != '.' and os.path.isdir(absfilename)): 460 add_files_from_dir(absfilename) 461 for d in self.dirs: 462 add_files_from_dir(d) 463 464 # FIXME: bad strategy; when revisions are not used in the filename 465 # this code parses all modules :( need to do this lazily 466 # FIXME: actually this function is never called and can be deleted 467 def _peek_revision(self, absfilename, format, ctx): 468 fd = None 469 try: 470 fd = io.open(absfilename, "r", encoding="utf-8") 471 text = fd.read() 472 except IOError as ex: 473 return None 474 except UnicodeDecodeError as ex: 475 return None 476 finally: 477 if fd is not None: 478 fd.close() 479 480 if format == 'yin': 481 p = yin_parser.YinParser() 482 else: 483 p = yang_parser.YangParser() 484 485 # FIXME: optimization - do not parse the entire text 486 # just to read the revisions... 487 module = p.parse(ctx, absfilename, text) 488 if module is None: 489 return None 490 return (util.get_latest_revision(module), module) 491 492 def get_modules_and_revisions(self, ctx): 493 if self.modules is None: 494 self._setup(ctx) 495 return self.modules 496 497 def get_module_from_handle(self, handle): 498 (format, absfilename) = handle 499 fd = None 500 try: 501 fd = io.open(absfilename, "r", encoding="utf-8") 502 text = fd.read() 503 if self.verbose: 504 util.report_file_read(absfilename) 505 except IOError as ex: 506 raise self.ReadError(absfilename + ": " + str(ex)) 507 except UnicodeDecodeError as ex: 508 s = str(ex).replace('utf-8', 'utf8') 509 raise self.ReadError(absfilename + ": unicode error: " + s) 510 finally: 511 if fd is not None: 512 fd.close() 513 514 if format is None: 515 format = util.guess_format(text) 516 return (absfilename, format, text) 517