1# Copyright 2009 Shikhar Bhushan
2# Copyright 2011 Leonidas Poulopoulos
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#    http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"Methods for creating, parsing, and dealing with XML and ElementTree objects."
17
18
19import io
20import sys
21import six
22import types
23from six import StringIO
24from io import BytesIO
25from lxml import etree
26
27# In case issues come up with XML generation/parsing
28# make sure you have the ElementTree v1.2.7+ lib as
29# well as lxml v3.0+
30
31from ncclient import NCClientError
32
33parser = etree.XMLParser(recover=False)
34huge_parser = etree.XMLParser(recover=False, huge_tree=True)
35
36
37def _get_parser(huge_tree=False):
38    return huge_parser if huge_tree else parser
39
40
41class XMLError(NCClientError):
42    pass
43
44### Namespace-related
45
46#: Base NETCONF namespace
47BASE_NS_1_0 = "urn:ietf:params:xml:ns:netconf:base:1.0"
48#: YANG (RFC 6020/RFC 7950) namespace
49YANG_NS_1_0 = "urn:ietf:params:xml:ns:yang:1"
50#: NXOS_1_0
51NXOS_1_0 = "http://www.cisco.com/nxos:1.0"
52#: NXOS_IF
53NXOS_IF = "http://www.cisco.com/nxos:1.0:if_manager"
54#: Namespace for Tail-f core data model
55TAILF_AAA_1_1 = "http://tail-f.com/ns/aaa/1.1"
56#: Namespace for Tail-f execd data model
57TAILF_EXECD_1_1 = "http://tail-f.com/ns/execd/1.1"
58#: Namespace for Cisco data model
59CISCO_CPI_1_0 = "http://www.cisco.com/cpi_10/schema"
60#: Namespace for Flowmon data model
61FLOWMON_1_0 = "http://www.liberouter.org/ns/netopeer/flowmon/1.0"
62#: Namespace for Juniper 9.6R4. Tested with Junos 9.6R4+
63JUNIPER_1_1 = "http://xml.juniper.net/xnm/1.1/xnm"
64#: Namespace for Huawei data model
65HUAWEI_NS = "http://www.huawei.com/netconf/vrp"
66#: Namespace for Huawei private
67HW_PRIVATE_NS = "http://www.huawei.com/netconf/capability/base/1.0"
68#: Namespace for H3C data model
69H3C_DATA_1_0 = "http://www.h3c.com/netconf/data:1.0"
70#: Namespace for H3C config model
71H3C_CONFIG_1_0 = "http://www.h3c.com/netconf/config:1.0"
72#: Namespace for H3C action model
73H3C_ACTION_1_0 = "http://www.h3c.com/netconf/action:1.0"
74#: Namespace for netconf monitoring
75NETCONF_MONITORING_NS = "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring"
76#: Namespace for netconf notifications
77NETCONF_NOTIFICATION_NS = "urn:ietf:params:xml:ns:netconf:notification:1.0"
78#: Namespace for netconf with-defaults (RFC 6243)
79NETCONF_WITH_DEFAULTS_NS = "urn:ietf:params:xml:ns:yang:ietf-netconf-with-defaults"
80#: Namespace for Alcatel-Lucent SR OS Base r13 YANG models
81ALU_CONFIG = "urn:alcatel-lucent.com:sros:ns:yang:conf-r13"
82#: Namespace for Nokia SR OS global operations
83SROS_GLOBAL_OPS_NS = "urn:nokia.com:sros:ns:yang:sr:oper-global"
84
85
86try:
87    register_namespace = etree.register_namespace
88except AttributeError:
89    def register_namespace(prefix, uri):
90        from xml.etree import ElementTree
91        # cElementTree uses ElementTree's _namespace_map, so that's ok
92        ElementTree._namespace_map[uri] = prefix
93
94for (ns, pre) in six.iteritems({
95    BASE_NS_1_0: 'nc',
96    NETCONF_MONITORING_NS: 'ncm',
97    NXOS_1_0: 'nxos',
98    NXOS_IF: 'if',
99    TAILF_AAA_1_1: 'aaa',
100    TAILF_EXECD_1_1: 'execd',
101    CISCO_CPI_1_0: 'cpi',
102    FLOWMON_1_0: 'fm',
103    JUNIPER_1_1: 'junos',
104}):
105    register_namespace(pre, ns)
106
107qualify = lambda tag, ns=BASE_NS_1_0: tag if ns is None else "{%s}%s" % (ns, tag)
108"""Qualify a *tag* name with a *namespace*, in :mod:`~xml.etree.ElementTree` fashion i.e. *{namespace}tagname*."""
109
110
111def to_xml(ele, encoding="UTF-8", pretty_print=False):
112    "Convert and return the XML for an *ele* (:class:`~xml.etree.ElementTree.Element`) with specified *encoding*."
113    xml = etree.tostring(ele, encoding=encoding, pretty_print=pretty_print)
114    if sys.version < '3':
115        return xml if xml.startswith('<?xml') else '<?xml version="1.0" encoding="%s"?>%s' % (encoding, xml)
116    else:
117        return xml.decode('UTF-8') if xml.startswith(b'<?xml') \
118            else '<?xml version="1.0" encoding="%s"?>%s' % (encoding, xml.decode('UTF-8'))
119
120
121def to_ele(x, huge_tree=False):
122    """Convert and return the :class:`~xml.etree.ElementTree.Element` for the XML document *x*. If *x* is already an :class:`~xml.etree.ElementTree.Element` simply returns that.
123
124    *huge_tree*: parse XML with very deep trees and very long text content
125    """
126    if sys.version < '3':
127        return x if etree.iselement(x) else etree.fromstring(x, parser=_get_parser(huge_tree))
128    else:
129        return x if etree.iselement(x) else etree.fromstring(x.encode('UTF-8'), parser=_get_parser(huge_tree))
130
131
132def parse_root(raw):
133    "Efficiently parses the root element of a *raw* XML document, returning a tuple of its qualified name and attribute dictionary."
134    if sys.version < '3':
135        fp = StringIO(raw)
136    else:
137        fp = BytesIO(raw.encode('UTF-8'))
138    for event, element in etree.iterparse(fp, events=('start',)):
139        return (element.tag, element.attrib)
140
141def validated_element(x, tags=None, attrs=None):
142    """Checks if the root element of an XML document or Element meets the supplied criteria.
143
144    *tags* if specified is either a single allowable tag name or sequence of allowable alternatives
145
146    *attrs* if specified is a sequence of required attributes, each of which may be a sequence of several allowable alternatives
147
148    Raises :exc:`XMLError` if the requirements are not met.
149    """
150    ele = to_ele(x)
151    if tags:
152        if isinstance(tags, (str, bytes)):
153            tags = [tags]
154        if ele.tag not in tags:
155            raise XMLError("Element [%s] does not meet requirement" % ele.tag)
156    if attrs:
157        for req in attrs:
158            if isinstance(req, (str, bytes)): req = [req]
159            for alt in req:
160                if alt in ele.attrib:
161                    break
162            else:
163                raise XMLError("Element [%s] does not have required attributes" % ele.tag)
164    return ele
165
166XPATH_NAMESPACES = {
167    're':'http://exslt.org/regular-expressions'
168}
169
170
171class NCElement(object):
172    def __init__(self, result, transform_reply, huge_tree=False):
173        self.__result = result
174        self.__transform_reply = transform_reply
175        self.__huge_tree = huge_tree
176        if isinstance(transform_reply, types.FunctionType):
177            self.__doc = self.__transform_reply(result._root)
178        else:
179            self.__doc = self.remove_namespaces(self.__result)
180
181    def xpath(self, expression, namespaces={}):
182        """Perform XPath navigation on an object
183
184        Args:
185            expression: A string representing a compliant XPath
186                expression.
187            namespaces: A dict of caller supplied prefix/xmlns to
188                append to the static dict of XPath namespaces.
189        Returns:
190            A list of 'lxml.etree._Element' should a match on the
191            expression be successful.  Otherwise, an empty list will
192            be returned to the caller.
193        """
194        self.__expression = expression
195        self.__namespaces = XPATH_NAMESPACES
196        self.__namespaces.update(namespaces)
197        return self.__doc.xpath(self.__expression, namespaces=self.__namespaces)
198
199    def find(self, expression):
200        """return result for a call to lxml ElementPath find()"""
201        self.__expression = expression
202        return self.__doc.find(self.__expression)
203
204    def findtext(self, expression):
205        """return result for a call to lxml ElementPath findtext()"""
206        self.__expression = expression
207        return self.__doc.findtext(self.__expression)
208
209    def findall(self, expression):
210        """return result for a call to lxml ElementPath findall()"""
211        self.__expression = expression
212        return self.__doc.findall(self.__expression)
213
214    def __str__(self):
215        """syntactic sugar for str() - alias to tostring"""
216        if sys.version < '3':
217            return self.tostring
218        else:
219            return self.tostring.decode('UTF-8')
220
221    @property
222    def tostring(self):
223        """return a pretty-printed string output for rpc reply"""
224        parser = etree.XMLParser(remove_blank_text=True, huge_tree=self.__huge_tree)
225        outputtree = etree.XML(etree.tostring(self.__doc), parser)
226        return etree.tostring(outputtree, pretty_print=True)
227
228    @property
229    def data_xml(self):
230        """return an unmodified output for rpc reply"""
231        return to_xml(self.__doc)
232
233    def remove_namespaces(self, rpc_reply):
234        """remove xmlns attributes from rpc reply"""
235        self.__xslt=self.__transform_reply
236        self.__parser = etree.XMLParser(remove_blank_text=True, huge_tree=self.__huge_tree)
237        self.__xslt_doc = etree.parse(io.BytesIO(self.__xslt), self.__parser)
238        self.__transform = etree.XSLT(self.__xslt_doc)
239        self.__root = etree.fromstring(str(self.__transform(etree.parse(StringIO(str(rpc_reply)),
240                                                                        parser=self.__parser))),
241                                       parser=self.__parser)
242        return self.__root
243
244def parent_ns(node):
245    if node.prefix:
246        return node.nsmap[node.prefix]
247    return None
248
249def yang_action(name, attrs):
250    """Instantiate a YANG action element
251
252    Args:
253        name: A string representing the first descendant name of the
254            XML element for the YANG action.
255        attrs: A dict of attributes to apply to the XML element
256            (e.g. namespaces).
257    Returns:
258        A tuple of 'lxml.etree._Element' values.  The first value
259        represents the top-level YANG action element and the second
260        represents the caller supplied initial node.
261    """
262    node = new_ele('action', attrs={'xmlns': YANG_NS_1_0})
263    return (node, sub_ele(node, name, attrs))
264
265
266def replace_namespace(root, old_ns, new_ns):
267    """
268    Substitute old_ns with new_ns for all the xml elements including and below root
269    :param root: top element (root for this change)
270    :param old_ns: old namespace
271    :param new_ns: new namespace
272    :return:
273    """
274    for elem in root.getiterator():
275        # Comments don't have a namespace
276        if elem.tag is not etree.Comment:
277            # handle tag
278            qtag = etree.QName(elem)
279            if qtag.namespace == old_ns:
280                elem.tag = etree.QName(new_ns, qtag.localname)
281
282            # handle attributes
283            attribs_dict = elem.attrib
284            for attr in attribs_dict.keys():
285                qattr = etree.QName(attr)
286                if qattr.namespace == old_ns:
287                    attribs_dict[etree.QName(new_ns, qattr.localname)] = attribs_dict.pop(attr)
288
289
290new_ele_nsmap = lambda tag, nsmap, attrs={}, **extra: etree.Element(qualify(tag), attrs, nsmap, **extra)
291
292new_ele = lambda tag, attrs={}, **extra: etree.Element(qualify(tag), attrs, **extra)
293
294new_ele_ns = lambda tag, ns, attrs={}, **extra: etree.Element(qualify(tag,ns), attrs, **extra)
295
296sub_ele = lambda parent, tag, attrs={}, **extra: etree.SubElement(parent, qualify(tag, parent_ns(parent)), attrs, **extra)
297
298sub_ele_ns = lambda parent, tag, ns, attrs={}, **extra: etree.SubElement(parent, qualify(tag, ns), attrs, **extra)
299