1"""The pyang library for parsing, validating, and converting YANG modules"""
2
3import os
4import string
5import sys
6import zlib
7import re
8import io
9
10from . import error
11from . import yang_parser
12from . import yin_parser
13from . import grammar
14from . import util
15from . import statements
16from . import syntax
17
18__version__ = '1.7.8'
19__date__ = '2019-01-21'
20
21class Context(object):
22    """Class which encapsulates a parse session"""
23
24    def __init__(self, repository):
25        """`repository` is a `Repository` instance"""
26
27        self.modules = {}
28        """dict of (modulename,revision):<class Statement>
29        contains all modules and submodule found"""
30
31        self.revs = {}
32        """dict of modulename:(revision,handle)
33        contains all modulenames and revisions found in the repository"""
34
35        self.strict = False
36        self.repository = repository
37        self.errors = []
38        self.canonical = False
39        self.max_line_len = None
40        self.max_identifier_len = None
41        self.implicit_errors = True
42        self.lax_quote_checks = False
43        self.lax_xpath_checks = False
44        self.deviation_modules = []
45        self.features = {}
46        self.max_status = None
47        self.keep_comments = False
48        self.keep_arg_substrings = False
49
50        for mod, rev, handle in self.repository.get_modules_and_revisions(self):
51            if mod not in self.revs:
52                self.revs[mod] = []
53            revs = self.revs[mod]
54            revs.append((rev, handle))
55
56    def add_module(self, ref, text, format=None,
57                   expect_modulename=None, expect_revision=None,
58                   expect_failure_error=True):
59        """Parse a module text and add the module data to the context
60
61        `ref` is a string which is used to identify the source of
62              the text for the user.  used in error messages
63        `text` is the raw text data
64        `format` is one of 'yang' or 'yin'.
65
66        Returns the parsed and validated module on success, and None on error.
67        """
68        if format == None:
69            format = util.guess_format(text)
70
71        if format == 'yin':
72            p = yin_parser.YinParser()
73        else:
74            p = yang_parser.YangParser()
75
76        module = p.parse(self, ref, text)
77        if module is None:
78            return None
79
80        if expect_modulename is not None:
81            if not re.match(syntax.re_identifier, expect_modulename):
82                error.err_add(self.errors, module.pos,
83                              'FILENAME_BAD_MODULE_NAME',
84                              (ref, expect_modulename, syntax.identifier))
85            elif expect_modulename != module.arg:
86                if expect_failure_error:
87                    error.err_add(self.errors, module.pos, 'BAD_MODULE_NAME',
88                                  (module.arg, ref, expect_modulename))
89                    return None
90                else:
91                    error.err_add(self.errors, module.pos, 'WBAD_MODULE_NAME',
92                                  (module.arg, ref, expect_modulename))
93
94        latest_rev = util.get_latest_revision(module)
95        if expect_revision is not None:
96            if not re.match(syntax.re_date, expect_revision):
97                error.err_add(self.errors, module.pos, 'FILENAME_BAD_REVISION',
98                              (ref, expect_revision, 'YYYY-MM-DD'))
99            elif expect_revision != latest_rev:
100                if expect_failure_error:
101                    error.err_add(self.errors, module.pos, 'BAD_REVISION',
102                                  (latest_rev, ref, expect_revision))
103                    return None
104                else:
105                    error.err_add(self.errors, module.pos, 'WBAD_REVISION',
106                                  (latest_rev, ref, expect_revision))
107
108        if module.arg not in self.revs:
109            self.revs[module.arg] = []
110            revs = self.revs[module.arg]
111            revs.append((latest_rev, None))
112
113        return self.add_parsed_module(module)
114
115    def add_parsed_module(self, module):
116        if module is None:
117            return None
118        if module.arg is None:
119            error.err_add(self.errors, module.pos,
120                          'EXPECTED_ARGUMENT', module.keyword)
121            return None
122        top_keywords = ['module', 'submodule']
123        if module.keyword not in top_keywords:
124            error.err_add(self.errors, module.pos,
125                          'UNEXPECTED_KEYWORD_N',
126                          (module.keyword, top_keywords))
127            return None
128
129        rev = util.get_latest_revision(module)
130        if (module.arg, rev) in self.modules:
131            other = self.modules[(module.arg, rev)]
132            return other
133
134        self.modules[(module.arg, rev)] = module
135        statements.validate_module(self, module)
136
137        return module
138
139    def del_module(self, module):
140        """Remove a module from the context"""
141        rev = util.get_latest_revision(module)
142        del self.modules[(module.arg, rev)]
143
144    def get_module(self, modulename, revision=None):
145        """Return the module if it exists in the context"""
146        if revision is None and modulename in self.revs:
147            (revision, _handle) = self._get_latest_rev(self.revs[modulename])
148        if revision is not None:
149            if (modulename,revision) in self.modules:
150                return self.modules[(modulename, revision)]
151        else:
152            return None
153
154    def _get_latest_rev(self, revs):
155        self._ensure_revs(revs)
156        latest = None
157        lhandle = None
158        for (rev, handle) in revs:
159            if rev is not None and (latest is None or rev > latest):
160                latest = rev
161                lhandle = handle
162        return (latest, lhandle)
163
164    def _ensure_revs(self, revs):
165        i = 0
166        length = len(revs)
167        while i < length:
168            (rev, handle) = revs[i]
169            if rev is None:
170                # now we must read the revision from the module
171                try:
172                    r = self.repository.get_module_from_handle(handle)
173                except self.repository.ReadError as ex:
174                    i += 1
175                    continue
176                (ref, format, text) = r
177
178                if format == None:
179                    format = util.guess_format(text)
180
181                if format == 'yin':
182                    yintext = text
183                    p = yin_parser.YinParser({'no_include':True,
184                                              'no_extensions':True})
185                else:
186                    yintext = None
187                    p = yang_parser.YangParser()
188
189                module = p.parse(self, ref, text)
190                if module is not None:
191                    rev = util.get_latest_revision(module)
192                    revs[i] = (rev, ('parsed', module, ref, yintext))
193            i += 1
194
195    def search_module(self, pos, modulename, revision=None):
196        """Searches for a module named `modulename` in the repository
197
198        If the module is found, it is added to the context.
199        Returns the module if found, and None otherwise"""
200
201        if modulename not in self.revs:
202            # this module doesn't exist in the repos at all
203            error.err_add(self.errors, pos, 'MODULE_NOT_FOUND', modulename)
204            # keep track of this to avoid multiple errors
205            self.revs[modulename] = []
206            return None
207        elif self.revs[modulename] == []:
208            # this module doesn't exist in the repos at all, error reported
209            return None
210
211        if revision is not None:
212            if (modulename,revision) in self.modules:
213                return self.modules[(modulename, revision)]
214            self._ensure_revs(self.revs[modulename])
215            x = util.keysearch(revision, 0, self.revs[modulename])
216            if x is not None:
217                (_revision, handle) = x
218                if handle == None:
219                    # this revision doesn't exist in the repos, error reported
220                    return None
221            else:
222                # this revision doesn't exist in the repos
223                error.err_add(self.errors, pos, 'MODULE_NOT_FOUND_REV',
224                              (modulename, revision))
225                # keep track of this to avoid multiple errors
226                self.revs[modulename].append((revision, None))
227                return None
228        else:
229            # get the latest revision
230            (revision, handle) = self._get_latest_rev(self.revs[modulename])
231            if (modulename, revision) in self.modules:
232                return self.modules[(modulename, revision)]
233
234        if handle is None:
235            module = None
236        elif handle[0] == 'parsed':
237            module = handle[1]
238            ref = handle[2]
239            yintext = handle[3]
240            if modulename != module.arg:
241                error.err_add(self.errors, module.pos, 'BAD_MODULE_NAME',
242                              (module.arg, ref, modulename))
243                module = None
244            elif yintext is None:
245                module = self.add_parsed_module(handle[1])
246            else:
247                p = yin_parser.YinParser()
248                self.yin_module_map[module.arg] = []
249                module = p.parse(self, ref, yintext)
250                if module is not None:
251                    module = self.add_parsed_module(module)
252        else:
253            # get it from the repos
254            try:
255                r = self.repository.get_module_from_handle(handle)
256                (ref, format, text) = r
257                module = self.add_module(ref, text, format,
258                                         modulename, revision)
259            except self.repository.ReadError as ex:
260                error.err_add(self.errors, pos, 'READ_ERROR', str(ex))
261                module = None
262
263        if module == None:
264            return None
265        # if modulename != module.arg:
266        #     error.err_add(self.errors, module.pos, 'BAD_MODULE_FILENAME',
267        #                   (module.arg, ref, modulename))
268        #     latest_rev = util.get_latest_revision(module)
269
270        #     if revision is not None and revision != latest_rev:
271        #         error.err_add(self.errors, module.pos, 'BAD_REVISION',
272        #                       (latest_rev, ref, revision))
273
274        #     self.del_module(module)
275        #     self.modules[(modulename, latest_rev)] = None
276        #     return None
277        return module
278
279    def read_module(self, modulename, revision=None, extra={}):
280        """Searches for a module named `modulename` in the repository
281
282        The module is just read, and not compiled at all.
283        Returns the module if found, and None otherwise"""
284
285        if modulename not in self.revs:
286            # this module doesn't exist in the repos at all
287            return None
288        elif self.revs[modulename] == []:
289            # this module doesn't exist in the repos at all, error reported
290            return None
291
292        if revision is not None:
293            if (modulename,revision) in self.modules:
294                return self.modules[(modulename, revision)]
295            self._ensure_revs(self.revs[modulename])
296            x = util.keysearch(revision, 1, self.revs[modulename])
297            if x is not None:
298                (_revision, handle) = x
299                if handle == None:
300                    # this revision doesn't exist in the repos, error reported
301                    return None
302            else:
303                # this revision doesn't exist in the repos
304                return None
305        else:
306            # get the latest revision
307            (revision, handle) = self._get_latest_rev(self.revs[modulename])
308            if (modulename, revision) in self.modules:
309                return self.modules[(modulename, revision)]
310
311        if handle[0] == 'parsed':
312            module = handle[1]
313            return module
314        else:
315            # get it from the repos
316            try:
317                r = self.repository.get_module_from_handle(handle)
318                (ref, format, text) = r
319                if format == None:
320                    format = util.guess_format(text)
321
322                if format == 'yin':
323                    p = yin_parser.YinParser(extra)
324                else:
325                    p = yang_parser.YangParser(extra)
326
327                return p.parse(self, ref, text)
328            except self.repository.ReadError as ex:
329                return None
330
331    def validate(self):
332        uris = {}
333        for k in self.modules:
334            m = self.modules[k]
335            if m != None:
336                namespace = m.search_one('namespace')
337                if namespace != None:
338                    uri = namespace.arg
339                    if uri in uris:
340                        if uris[uri] != m.arg:
341                            error.err_add(self.errors, namespace.pos,
342                                          'DUPLICATE_NAMESPACE',
343                                          (uri, uris[uri]))
344                    else:
345                        uris[uri] = m.arg
346
347class Repository(object):
348    """Abstract base class that represents a module repository"""
349
350    def __init__(self):
351        pass
352
353    def get_modules_and_revisions(self, ctx):
354        """Return a list of all modules and their revisons
355
356        Returns a tuple (`modulename`, `revision`, `handle`), where
357        `handle' is used in the call to get_module_from_handle() to
358        retrieve the module.
359        """
360
361    def get_module_from_handle(self, handle):
362        """Return the raw module text from the repository
363
364        Returns (`ref`, `format`, `text`) if found, or None if not found.
365        `ref` is a string which is used to identify the source of
366              the text for the user.  used in error messages
367        `format` is one of 'yang' or 'yin' or None.
368        `text` is the raw text data
369
370        Raises `ReadError`
371        """
372
373    class ReadError(Exception):
374        """Signals that an error occured during module retrieval"""
375
376        def __init__(self, str):
377            Exception.__init__(self, str)
378
379class FileRepository(Repository):
380    def __init__(self, path="", use_env=True, no_path_recurse=False,
381                 verbose=False):
382        """Create a Repository which searches the filesystem for modules
383
384        `path` is a `os.pathsep`-separated string of directories
385        """
386
387        Repository.__init__(self)
388        self.dirs = path.split(os.pathsep)
389        self.no_path_recurse = no_path_recurse
390        self.modules = None
391        self.verbose = verbose
392
393        if use_env:
394            modpath = os.getenv('YANG_MODPATH')
395            if modpath is not None:
396                self.dirs.extend(modpath.split(os.pathsep))
397
398            home = os.getenv('HOME')
399            if home is not None:
400                self.dirs.append(os.path.join(home, 'yang', 'modules'))
401
402            inst = os.getenv('YANG_INSTALL')
403            if inst is not None:
404                self.dirs.append(os.path.join(inst, 'yang', 'modules'))
405                return  # skip search if install location is indicated
406
407            default_install = os.path.join(sys.prefix,
408                                           'share','yang','modules')
409            if os.path.exists(default_install):
410                self.dirs.append(default_install)
411                return  # end search if default location exists
412
413            # for some systems, sys.prefix returns `/usr`
414            # but the real location is `/usr/local`
415            # if the package is installed with pip
416            # this information can be easily retrieved
417            import pkgutil
418            if not pkgutil.find_loader('pip'):
419                return  # abort search if pip is not installed
420
421            # hack below to handle pip 10 internals
422            # if someone knows pip and how to fix this, it would be great!
423            location = None
424            try:
425                import pip.locations as locations
426                location = locations.distutils_scheme('pyang')
427            except:
428                try:
429                    import pip._internal.locations as locations
430                    location = locations.distutils_scheme('pyang')
431                except:
432                    pass
433            if location is not None:
434                self.dirs.append(os.path.join(location['data'],
435                                              'share','yang','modules'))
436
437
438
439    def _setup(self, ctx):
440        # check all dirs for yang and yin files
441        self.modules = []
442        def add_files_from_dir(d):
443            try:
444                files = os.listdir(d)
445            except OSError:
446                files = []
447            for fname in files:
448                absfilename = os.path.join(d, fname)
449                if os.path.isfile(absfilename):
450                    m = syntax.re_filename.search(fname)
451                    if m is not None:
452                        (name, rev, format) = m.groups()
453                        if not os.access(absfilename, os.R_OK): continue
454                        if absfilename.startswith("./"):
455                            absfilename = absfilename[2:]
456                        handle = (format, absfilename)
457                        self.modules.append((name, rev, handle))
458                elif (not self.no_path_recurse
459                      and d != '.' and os.path.isdir(absfilename)):
460                    add_files_from_dir(absfilename)
461        for d in self.dirs:
462            add_files_from_dir(d)
463
464    # FIXME: bad strategy; when revisions are not used in the filename
465    # this code parses all modules :(  need to do this lazily
466    # FIXME: actually this function is never called and can be deleted
467    def _peek_revision(self, absfilename, format, ctx):
468        fd = None
469        try:
470            fd = io.open(absfilename, "r", encoding="utf-8")
471            text = fd.read()
472        except IOError as ex:
473            return None
474        except UnicodeDecodeError as ex:
475            return None
476        finally:
477            if fd is not None:
478                fd.close()
479
480        if format == 'yin':
481            p = yin_parser.YinParser()
482        else:
483            p = yang_parser.YangParser()
484
485        # FIXME: optimization - do not parse the entire text
486        # just to read the revisions...
487        module = p.parse(ctx, absfilename, text)
488        if module is None:
489            return None
490        return (util.get_latest_revision(module), module)
491
492    def get_modules_and_revisions(self, ctx):
493        if self.modules is None:
494            self._setup(ctx)
495        return self.modules
496
497    def get_module_from_handle(self, handle):
498        (format, absfilename) = handle
499        fd = None
500        try:
501            fd = io.open(absfilename, "r", encoding="utf-8")
502            text = fd.read()
503            if self.verbose:
504                util.report_file_read(absfilename)
505        except IOError as ex:
506            raise self.ReadError(absfilename + ": " + str(ex))
507        except UnicodeDecodeError as ex:
508            s = str(ex).replace('utf-8', 'utf8')
509            raise self.ReadError(absfilename + ": unicode error: " + s)
510        finally:
511            if fd is not None:
512                fd.close()
513
514        if format is None:
515            format = util.guess_format(text)
516        return (absfilename, format, text)
517