1# Copyright 2009 Shikhar Bhushan 2# Copyright 2011 Leonidas Poulopoulos 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"Methods for creating, parsing, and dealing with XML and ElementTree objects." 17 18 19import io 20import sys 21import six 22import types 23from six import StringIO 24from io import BytesIO 25from lxml import etree 26 27# In case issues come up with XML generation/parsing 28# make sure you have the ElementTree v1.2.7+ lib as 29# well as lxml v3.0+ 30 31from ncclient import NCClientError 32 33parser = etree.XMLParser(recover=False) 34huge_parser = etree.XMLParser(recover=False, huge_tree=True) 35 36 37def _get_parser(huge_tree=False): 38 return huge_parser if huge_tree else parser 39 40 41class XMLError(NCClientError): 42 pass 43 44### Namespace-related 45 46#: Base NETCONF namespace 47BASE_NS_1_0 = "urn:ietf:params:xml:ns:netconf:base:1.0" 48#: YANG (RFC 6020/RFC 7950) namespace 49YANG_NS_1_0 = "urn:ietf:params:xml:ns:yang:1" 50#: NXOS_1_0 51NXOS_1_0 = "http://www.cisco.com/nxos:1.0" 52#: NXOS_IF 53NXOS_IF = "http://www.cisco.com/nxos:1.0:if_manager" 54#: Namespace for Tail-f core data model 55TAILF_AAA_1_1 = "http://tail-f.com/ns/aaa/1.1" 56#: Namespace for Tail-f execd data model 57TAILF_EXECD_1_1 = "http://tail-f.com/ns/execd/1.1" 58#: Namespace for Cisco data model 59CISCO_CPI_1_0 = "http://www.cisco.com/cpi_10/schema" 60#: Namespace for Flowmon data model 61FLOWMON_1_0 = "http://www.liberouter.org/ns/netopeer/flowmon/1.0" 62#: Namespace for Juniper 9.6R4. Tested with Junos 9.6R4+ 63JUNIPER_1_1 = "http://xml.juniper.net/xnm/1.1/xnm" 64#: Namespace for Huawei data model 65HUAWEI_NS = "http://www.huawei.com/netconf/vrp" 66#: Namespace for Huawei private 67HW_PRIVATE_NS = "http://www.huawei.com/netconf/capability/base/1.0" 68#: Namespace for H3C data model 69H3C_DATA_1_0 = "http://www.h3c.com/netconf/data:1.0" 70#: Namespace for H3C config model 71H3C_CONFIG_1_0 = "http://www.h3c.com/netconf/config:1.0" 72#: Namespace for H3C action model 73H3C_ACTION_1_0 = "http://www.h3c.com/netconf/action:1.0" 74#: Namespace for netconf monitoring 75NETCONF_MONITORING_NS = "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring" 76#: Namespace for netconf notifications 77NETCONF_NOTIFICATION_NS = "urn:ietf:params:xml:ns:netconf:notification:1.0" 78#: Namespace for netconf with-defaults (RFC 6243) 79NETCONF_WITH_DEFAULTS_NS = "urn:ietf:params:xml:ns:yang:ietf-netconf-with-defaults" 80#: Namespace for Alcatel-Lucent SR OS Base r13 YANG models 81ALU_CONFIG = "urn:alcatel-lucent.com:sros:ns:yang:conf-r13" 82#: Namespace for Nokia SR OS global operations 83SROS_GLOBAL_OPS_NS = "urn:nokia.com:sros:ns:yang:sr:oper-global" 84 85 86try: 87 register_namespace = etree.register_namespace 88except AttributeError: 89 def register_namespace(prefix, uri): 90 from xml.etree import ElementTree 91 # cElementTree uses ElementTree's _namespace_map, so that's ok 92 ElementTree._namespace_map[uri] = prefix 93 94for (ns, pre) in six.iteritems({ 95 BASE_NS_1_0: 'nc', 96 NETCONF_MONITORING_NS: 'ncm', 97 NXOS_1_0: 'nxos', 98 NXOS_IF: 'if', 99 TAILF_AAA_1_1: 'aaa', 100 TAILF_EXECD_1_1: 'execd', 101 CISCO_CPI_1_0: 'cpi', 102 FLOWMON_1_0: 'fm', 103 JUNIPER_1_1: 'junos', 104}): 105 register_namespace(pre, ns) 106 107qualify = lambda tag, ns=BASE_NS_1_0: tag if ns is None else "{%s}%s" % (ns, tag) 108"""Qualify a *tag* name with a *namespace*, in :mod:`~xml.etree.ElementTree` fashion i.e. *{namespace}tagname*.""" 109 110 111def to_xml(ele, encoding="UTF-8", pretty_print=False): 112 "Convert and return the XML for an *ele* (:class:`~xml.etree.ElementTree.Element`) with specified *encoding*." 113 xml = etree.tostring(ele, encoding=encoding, pretty_print=pretty_print) 114 if sys.version < '3': 115 return xml if xml.startswith('<?xml') else '<?xml version="1.0" encoding="%s"?>%s' % (encoding, xml) 116 else: 117 return xml.decode('UTF-8') if xml.startswith(b'<?xml') \ 118 else '<?xml version="1.0" encoding="%s"?>%s' % (encoding, xml.decode('UTF-8')) 119 120 121def to_ele(x, huge_tree=False): 122 """Convert and return the :class:`~xml.etree.ElementTree.Element` for the XML document *x*. If *x* is already an :class:`~xml.etree.ElementTree.Element` simply returns that. 123 124 *huge_tree*: parse XML with very deep trees and very long text content 125 """ 126 if sys.version < '3': 127 return x if etree.iselement(x) else etree.fromstring(x, parser=_get_parser(huge_tree)) 128 else: 129 return x if etree.iselement(x) else etree.fromstring(x.encode('UTF-8'), parser=_get_parser(huge_tree)) 130 131 132def parse_root(raw): 133 "Efficiently parses the root element of a *raw* XML document, returning a tuple of its qualified name and attribute dictionary." 134 if sys.version < '3': 135 fp = StringIO(raw) 136 else: 137 fp = BytesIO(raw.encode('UTF-8')) 138 for event, element in etree.iterparse(fp, events=('start',)): 139 return (element.tag, element.attrib) 140 141def validated_element(x, tags=None, attrs=None): 142 """Checks if the root element of an XML document or Element meets the supplied criteria. 143 144 *tags* if specified is either a single allowable tag name or sequence of allowable alternatives 145 146 *attrs* if specified is a sequence of required attributes, each of which may be a sequence of several allowable alternatives 147 148 Raises :exc:`XMLError` if the requirements are not met. 149 """ 150 ele = to_ele(x) 151 if tags: 152 if isinstance(tags, (str, bytes)): 153 tags = [tags] 154 if ele.tag not in tags: 155 raise XMLError("Element [%s] does not meet requirement" % ele.tag) 156 if attrs: 157 for req in attrs: 158 if isinstance(req, (str, bytes)): req = [req] 159 for alt in req: 160 if alt in ele.attrib: 161 break 162 else: 163 raise XMLError("Element [%s] does not have required attributes" % ele.tag) 164 return ele 165 166XPATH_NAMESPACES = { 167 're':'http://exslt.org/regular-expressions' 168} 169 170 171class NCElement(object): 172 def __init__(self, result, transform_reply, huge_tree=False): 173 self.__result = result 174 self.__transform_reply = transform_reply 175 self.__huge_tree = huge_tree 176 if isinstance(transform_reply, types.FunctionType): 177 self.__doc = self.__transform_reply(result._root) 178 else: 179 self.__doc = self.remove_namespaces(self.__result) 180 181 def xpath(self, expression, namespaces={}): 182 """Perform XPath navigation on an object 183 184 Args: 185 expression: A string representing a compliant XPath 186 expression. 187 namespaces: A dict of caller supplied prefix/xmlns to 188 append to the static dict of XPath namespaces. 189 Returns: 190 A list of 'lxml.etree._Element' should a match on the 191 expression be successful. Otherwise, an empty list will 192 be returned to the caller. 193 """ 194 self.__expression = expression 195 self.__namespaces = XPATH_NAMESPACES 196 self.__namespaces.update(namespaces) 197 return self.__doc.xpath(self.__expression, namespaces=self.__namespaces) 198 199 def find(self, expression): 200 """return result for a call to lxml ElementPath find()""" 201 self.__expression = expression 202 return self.__doc.find(self.__expression) 203 204 def findtext(self, expression): 205 """return result for a call to lxml ElementPath findtext()""" 206 self.__expression = expression 207 return self.__doc.findtext(self.__expression) 208 209 def findall(self, expression): 210 """return result for a call to lxml ElementPath findall()""" 211 self.__expression = expression 212 return self.__doc.findall(self.__expression) 213 214 def __str__(self): 215 """syntactic sugar for str() - alias to tostring""" 216 if sys.version < '3': 217 return self.tostring 218 else: 219 return self.tostring.decode('UTF-8') 220 221 @property 222 def tostring(self): 223 """return a pretty-printed string output for rpc reply""" 224 parser = etree.XMLParser(remove_blank_text=True, huge_tree=self.__huge_tree) 225 outputtree = etree.XML(etree.tostring(self.__doc), parser) 226 return etree.tostring(outputtree, pretty_print=True) 227 228 @property 229 def data_xml(self): 230 """return an unmodified output for rpc reply""" 231 return to_xml(self.__doc) 232 233 def remove_namespaces(self, rpc_reply): 234 """remove xmlns attributes from rpc reply""" 235 self.__xslt=self.__transform_reply 236 self.__parser = etree.XMLParser(remove_blank_text=True, huge_tree=self.__huge_tree) 237 self.__xslt_doc = etree.parse(io.BytesIO(self.__xslt), self.__parser) 238 self.__transform = etree.XSLT(self.__xslt_doc) 239 self.__root = etree.fromstring(str(self.__transform(etree.parse(StringIO(str(rpc_reply)), 240 parser=self.__parser))), 241 parser=self.__parser) 242 return self.__root 243 244def parent_ns(node): 245 if node.prefix: 246 return node.nsmap[node.prefix] 247 return None 248 249def yang_action(name, attrs): 250 """Instantiate a YANG action element 251 252 Args: 253 name: A string representing the first descendant name of the 254 XML element for the YANG action. 255 attrs: A dict of attributes to apply to the XML element 256 (e.g. namespaces). 257 Returns: 258 A tuple of 'lxml.etree._Element' values. The first value 259 represents the top-level YANG action element and the second 260 represents the caller supplied initial node. 261 """ 262 node = new_ele('action', attrs={'xmlns': YANG_NS_1_0}) 263 return (node, sub_ele(node, name, attrs)) 264 265 266def replace_namespace(root, old_ns, new_ns): 267 """ 268 Substitute old_ns with new_ns for all the xml elements including and below root 269 :param root: top element (root for this change) 270 :param old_ns: old namespace 271 :param new_ns: new namespace 272 :return: 273 """ 274 for elem in root.getiterator(): 275 # Comments don't have a namespace 276 if elem.tag is not etree.Comment: 277 # handle tag 278 qtag = etree.QName(elem) 279 if qtag.namespace == old_ns: 280 elem.tag = etree.QName(new_ns, qtag.localname) 281 282 # handle attributes 283 attribs_dict = elem.attrib 284 for attr in attribs_dict.keys(): 285 qattr = etree.QName(attr) 286 if qattr.namespace == old_ns: 287 attribs_dict[etree.QName(new_ns, qattr.localname)] = attribs_dict.pop(attr) 288 289 290new_ele_nsmap = lambda tag, nsmap, attrs={}, **extra: etree.Element(qualify(tag), attrs, nsmap, **extra) 291 292new_ele = lambda tag, attrs={}, **extra: etree.Element(qualify(tag), attrs, **extra) 293 294new_ele_ns = lambda tag, ns, attrs={}, **extra: etree.Element(qualify(tag,ns), attrs, **extra) 295 296sub_ele = lambda parent, tag, attrs={}, **extra: etree.SubElement(parent, qualify(tag, parent_ns(parent)), attrs, **extra) 297 298sub_ele_ns = lambda parent, tag, ns, attrs={}, **extra: etree.SubElement(parent, qualify(tag, ns), attrs, **extra) 299