1from __future__ import absolute_import 2from operator import attrgetter 3from colorama import Fore 4import itertools 5import warnings 6import socket 7import time 8import sys 9import re 10import os 11 12if sys.version_info >= (3, 0, 0,): 13 from collections.abc import MutableSequence 14else: 15 ## This syntax is not supported in Python 3... 16 from collections import MutableSequence 17 18from ciscoconfparse.protocol_values import ASA_TCP_PORTS, ASA_UDP_PORTS 19import ciscoconfparse 20from dns.exception import DNSException 21from dns.resolver import Resolver 22from dns import reversename, query, zone 23 24if sys.version_info[0] < 3: 25 from ipaddr import IPv4Network, IPv6Network, IPv4Address, IPv6Address 26 import ipaddr 27 28else: 29 from ipaddress import IPv4Network, IPv6Network, IPv4Address, IPv6Address 30 import ipaddress 31 32""" ccp_util.py - Parse, Query, Build, and Modify IOS-style configurations 33 34 Copyright (C) 2020-2021 David Michael Pennington at Cisco Systems 35 Copyright (C) 2019 David Michael Pennington at ThousandEyes 36 Copyright (C) 2014-2019 David Michael Pennington at Samsung Data Services 37 38 This program is free software: you can redistribute it and/or modify 39 it under the terms of the GNU General Public License as published by 40 the Free Software Foundation, either version 3 of the License, or 41 (at your option) any later version. 42 43 This program is distributed in the hope that it will be useful, 44 but WITHOUT ANY WARRANTY; without even the implied warranty of 45 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 46 GNU General Public License for more details. 47 48 You should have received a copy of the GNU General Public License 49 along with this program. If not, see <http://www.gnu.org/licenses/>. 50 51 If you need to contact the author, you can do so by emailing: 52 mike [~at~] pennington [/dot\] net 53""" 54 55class UnsupportedFeatureWarning(SyntaxWarning): 56 pass 57 58def as_text_list(object_list): 59 """This is a helper-function to convert a list of configuration objects into a list of text config lines. 60 61 Examples 62 -------- 63 64 >>> from ciscoconfparse.ccp_util import as_text_list 65 >>> from ciscoconfparse import CiscoConfParse 66 >>> 67 >>> config = [ 68 ... 'interface GigabitEthernet1/13', 69 ... ' ip address 192.0.2.1/30', 70 ... ' vrf member ThisRestrictedVrf', 71 ... ' no ip redirects', 72 ... ' no ipv6 redirects', 73 ... ] 74 >>> parse = CiscoConfParse(config) 75 >>> interface_object = parse.find_objects("^interface")[0] 76 >>> interface_config_objects = interface_object.all_children 77 >>> interface_config_objects 78 [<IOSCfgLine # 1 ' ip address 192.0.2.1/30' (parent is # 0)>, <IOSCfgLine # 2 ' vrf member ThisRestrictedVrf' (parent is # 0)>, <IOSCfgLine # 3 ' no ip redirects' (parent is # 0)>, <IOSCfgLine # 4 ' no ipv6 redirects' (parent is # 0)>] 79 >>> 80 >>> as_text_list(interface_config_objects) 81 [' ip address 192.0.2.1/30', ' vrf member ThisRestrictedVrf', ' no ip redirects', ' no ipv6 redirects'] 82 >>> 83 84 """ 85 assert isinstance(object_list, list) or isinstance(object_list, tuple) 86 for obj in object_list: 87 assert isinstance(obj.linenum, int) 88 assert isinstance(obj.text, str) 89 return list(map(attrgetter("text"), object_list)) 90 91def junos_unsupported(func): 92 """A function wrapper to warn junos users of unsupported features""" 93 def wrapper(*args, **kwargs): 94 color_warn = Fore.YELLOW+"syntax='junos' does not fully support config modifications such as .{}(); see Github Issue #185. https://github.com/mpenning/ciscoconfparse/issues/185".format(func.__name__)+Fore.RESET 95 syntax = "" 96 if len(args)>=1: 97 if isinstance(args[0], ciscoconfparse.IOSConfigList): 98 syntax = args[0].syntax 99 else: 100 #print("TYPE", type(args[0])) 101 syntax = args[0].confobj.syntax 102 if syntax=="junos": 103 warnings.warn(color_warn, UnsupportedFeatureWarning) 104 func(*args, **kwargs) 105 return wrapper 106 107 108_IPV6_REGEX_STR = r"""(?!:::\S+?$) # Negative Lookahead for 3 colons 109 (?P<addr> # Begin a group named 'addr' 110 (?P<opt1>{0}(?::{0}){{7}}) # no double colons, option 1 111|(?P<opt2>(?:{0}:){{1}}(?::{0}){{1,6}}) # match fe80::1 112|(?P<opt3>(?:{0}:){{2}}(?::{0}){{1,5}}) # match fe80:a::1 113|(?P<opt4>(?:{0}:){{3}}(?::{0}){{1,4}}) # match fe80:a:b::1 114|(?P<opt5>(?:{0}:){{4}}(?::{0}){{1,3}}) # match fe80:a:b:c::1 115|(?P<opt6>(?:{0}:){{5}}(?::{0}){{1,2}}) # match fe80:a:b:c:d::1 116|(?P<opt7>(?:{0}:){{6}}(?::{0}){{1,1}}) # match fe80:a:b:c:d:e::1 117|(?P<opt8>:(?::{0}){{1,7}}) # leading double colons 118|(?P<opt9>(?:{0}:){{1,7}}:) # trailing double colons 119|(?P<opt10>(?:::)) # bare double colons (default route) 120) # End group named 'addr' 121""".format( 122 r"[0-9a-fA-F]{1,4}" 123) 124_IPV6_REGEX_STR_COMPRESSED1 = r"""(?!:::\S+?$)(?P<addr1>(?P<opt1_1>{0}(?::{0}){{7}})|(?P<opt1_2>(?:{0}:){{1}}(?::{0}){{1,6}})|(?P<opt1_3>(?:{0}:){{2}}(?::{0}){{1,5}})|(?P<opt1_4>(?:{0}:){{3}}(?::{0}){{1,4}})|(?P<opt1_5>(?:{0}:){{4}}(?::{0}){{1,3}})|(?P<opt1_6>(?:{0}:){{5}}(?::{0}){{1,2}})|(?P<opt1_7>(?:{0}:){{6}}(?::{0}){{1,1}})|(?P<opt1_8>:(?::{0}){{1,7}})|(?P<opt1_9>(?:{0}:){{1,7}}:)|(?P<opt1_10>(?:::)))""".format( 125 r"[0-9a-fA-F]{1,4}" 126) 127_IPV6_REGEX_STR_COMPRESSED2 = r"""(?!:::\S+?$)(?P<addr2>(?P<opt2_1>{0}(?::{0}){{7}})|(?P<opt2_2>(?:{0}:){{1}}(?::{0}){{1,6}})|(?P<opt2_3>(?:{0}:){{2}}(?::{0}){{1,5}})|(?P<opt2_4>(?:{0}:){{3}}(?::{0}){{1,4}})|(?P<opt2_5>(?:{0}:){{4}}(?::{0}){{1,3}})|(?P<opt2_6>(?:{0}:){{5}}(?::{0}){{1,2}})|(?P<opt2_7>(?:{0}:){{6}}(?::{0}){{1,1}})|(?P<opt2_8>:(?::{0}){{1,7}})|(?P<opt2_9>(?:{0}:){{1,7}}:)|(?P<opt2_10>(?:::)))""".format( 128 r"[0-9a-fA-F]{1,4}" 129) 130_IPV6_REGEX_STR_COMPRESSED3 = r"""(?!:::\S+?$)(?P<addr3>(?P<opt3_1>{0}(?::{0}){{7}})|(?P<opt3_2>(?:{0}:){{1}}(?::{0}){{1,6}})|(?P<opt3_3>(?:{0}:){{2}}(?::{0}){{1,5}})|(?P<opt3_4>(?:{0}:){{3}}(?::{0}){{1,4}})|(?P<opt3_5>(?:{0}:){{4}}(?::{0}){{1,3}})|(?P<opt3_6>(?:{0}:){{5}}(?::{0}){{1,2}})|(?P<opt3_7>(?:{0}:){{6}}(?::{0}){{1,1}})|(?P<opt3_8>:(?::{0}){{1,7}})|(?P<opt3_9>(?:{0}:){{1,7}}:)|(?P<opt3_10>(?:::)))""".format( 131 r"[0-9a-fA-F]{1,4}" 132) 133 134_CISCO_RANGE_ATOM_STR = r"""\d+\s*\-*\s*\d*""" 135_CISCO_RANGE_STR = r"""^(?P<line_prefix>[a-zA-Z\s]*)(?P<slot_prefix>[\d\/]*\d+\/)*(?P<range_text>(\s*{0})*)$""".format( 136 _CISCO_RANGE_ATOM_STR 137) 138 139_RGX_IPV6ADDR = re.compile(_IPV6_REGEX_STR, re.VERBOSE) 140 141_RGX_IPV4ADDR = re.compile(r"^(?P<addr>\d+\.\d+\.\d+\.\d+)") 142_RGX_IPV4ADDR_NETMASK = re.compile( 143 r""" 144 (?: 145 ^(?P<addr0>\d+\.\d+\.\d+\.\d+)$ 146 |(?:^ 147 (?:(?P<addr1>\d+\.\d+\.\d+\.\d+))(?:\s+|\/)(?:(?P<netmask>\d+\.\d+\.\d+\.\d+)) 148 $) 149 |^(?:\s*(?P<addr2>\d+\.\d+\.\d+\.\d+)(?:\/(?P<masklen>\d+))\s*)$ 150 ) 151 """, 152 re.VERBOSE, 153) 154 155_RGX_CISCO_RANGE = re.compile(_CISCO_RANGE_STR) 156 157 158def is_valid_ipv4_addr(input_str=""): 159 """Check if this is a valid IPv4 string. 160 161 Returns 162 ------- 163 bool 164 A boolean indicating whether this is a valid IPv4 string 165 """ 166 assert input_str != "" 167 if _RGX_IPV4ADDR.search(input_str): 168 return True 169 return False 170 171 172def is_valid_ipv6_addr(input_str=""): 173 """Check if this is a valid IPv6 string. 174 175 Returns 176 ------- 177 bool 178 A boolean indicating whether this is a valid IPv6 string 179 """ 180 assert input_str != "" 181 if _RGX_IPV6ADDR.search(input_str): 182 return True 183 return False 184 185def collapse_addresses(network_list): 186 """ 187 This is a ciscoconfparse proxy for ipaddress.collapse_addresses() 188 189 It attempts to summarize network_list into the closest network(s) 190 containing prefixes in `network_list`. 191 192 Return an iterator of the collapsed IPv4Network or IPv6Network objects. 193 addresses is an iterator of IPv4Network or IPv6Network objects. A 194 TypeError is raised if addresses contains mixed version objects. 195 """ 196 assert isinstance(network_list, list) or isinstance(network_list, tuple) 197 198 def ip_net(arg): 199 if isinstance(arg, IPv4Obj): 200 return arg.network 201 elif isinstance(arg, IPv4Network): 202 return arg 203 elif isinstance(arg, IPv6Obj): 204 return arg.network 205 elif isinstance(arg, IPv6Network): 206 return arg 207 else: 208 ValueError("collapse_addresses() isn't sure how to handle %s" % arg) 209 210 return ipaddress.collapse_addresses([ip_net(ii) for ii in network_list]) 211 212## Emulate the old behavior of ipaddr.IPv4Network in Python2, which can use 213## IPv4Network with a host address. Google removed that in Python3's 214## ipaddress.py module 215class IPv4Obj(object): 216 def __init__(self, arg="127.0.0.1/32", strict=False): 217 """An object to represent IPv4 addresses and IPv4 networks. 218 219 When :class:`~ccp_util.IPv4Obj` objects are compared or sorted, network numbers are sorted lower to higher. If network numbers are the same, shorter masks are lower than longer masks. After comparing mask length, numerically higher IP addresses are greater than numerically lower IP addresses.. Comparisons between :class:`~ccp_util.IPv4Obj` instances was chosen so it's easy to find the longest-match for a given prefix (see examples below). 220 221 This object emulates the behavior of ipaddr.IPv4Network (in Python2) where host-bits were retained in the IPv4Network() object. :class:`ipaddress.IPv4Network` in Python3 does not retain host-bits; the desire to retain host-bits in both Python2 and Python3 ip network objects was the genesis of this API. 222 223 Parameters 224 ---------- 225 arg : str or int 226 A string (or integer) containing an IPv4 address, and optionally a netmask or masklength. Integers are also accepted. The following address/netmask formats are supported: "10.1.1.1/24", "10.1.1.1 255.255.255.0", "10.1.1.1/255.255.255.0" 227 strict: bool 228 When `strict` is True, the value of `arg` must not have host-bits set. The default value is False. 229 230 231 Examples 232 -------- 233 234 >>> from ciscoconfparse.ccp_util import IPv4Obj 235 >>> ## Parse from an integer... 236 >>> net = IPv4Obj(2886729984) 237 >>> net 238 <IPv4Obj 172.16.1.0/32> 239 >>> net.prefixlen = 24 240 >>> net 241 <IPv4Obj 172.16.1.0/24> 242 >>> ## Parse from an string... 243 >>> net = IPv4Obj('172.16.1.0/24') 244 >>> net 245 <IPv4Obj 172.16.1.0/24> 246 >>> net.ip 247 IPv4Address('172.16.1.0') 248 >>> net.ip + 1 249 IPv4Address('172.16.1.1') 250 >>> str(net.ip+1) 251 '172.16.1.1' 252 >>> net.network 253 IPv4Network('172.16.1.0/24') 254 >>> net.network_object 255 IPv4Network('172.16.1.0/24') 256 >>> str(net.network_object) 257 '172.16.1.0/24' 258 >>> net.prefixlen 259 24 260 >>> net.network_object.iterhosts() 261 <generator object iterhosts at 0x7f00bfcce730> 262 >>> 263 >>> # Example of finding the longest-match IPv4 route for an addr... 264 >>> prefix_list = ['0.0.0.0/0', '4.0.0.0/8', '2.0.0.0/7', '4.0.0.0/16', '2.0.0.0/32'] 265 >>> rt_table = sorted([IPv4Obj(ii) for ii in prefix_list], reverse=True) 266 >>> addr = IPv4Obj('4.0.1.1') 267 >>> for route in rt_table: 268 ... if addr in route: 269 ... break 270 ... 271 >>> # The longest match is contained in route 272 >>> route 273 <IPv4Obj 4.0.0.0/16> 274 >>> 275 276 277 Attributes 278 ---------- 279 network : :class:`ipaddress.IPv4Network` 280 Returns an :class:`ipaddress.IPv4Network` with the network of this object 281 network_object : :class:`ipaddress.IPv4Network` 282 Returns an :class:`ipaddress.IPv4Network` with the network of this object 283 ip_object : :class:`ipaddress.IPv4Address` 284 Returns an :class:`ipaddress.IPv4Address` with the host address of this object 285 ip : :class:`ipaddress.IPv4Address` 286 Returns an :class:`ipaddress.IPv4Address` with the host address of this object 287 as_binary_tuple : :py:class:`tuple` 288 The address as a tuple of zero-padded binary strings 289 as_hex_tuple : tuple 290 The address as a tuple of zero-padded 8-bit hex strings 291 as_decimal : int 292 The ip address as a decimal integer 293 as_decimal_network : int 294 The network address as a decimal integer 295 as_zeropadded : str 296 Return a zero-padded string of the ip address (example: '10.1.1.1' returns '010.001.001.001') 297 as_zeropadded_network : str 298 Return a zero-padded string of the ip network (example: '10.1.1.1' returns '010.001.001.000') 299 netmask : :class:`ipaddress.IPv4Address` 300 An :class:`ipaddress.IPv4Address` object containing the netmask 301 prefixlen : int 302 An integer representing the length of the netmask 303 prefixlength : int 304 An integer representing the length of the netmask 305 broadcast : str 306 A string representing the broadcast address 307 hostmask : :class:`ipaddress.IPv4Address` 308 A :class:`ipaddress.IPv4Address` representing the hostmask 309 numhosts : int 310 An integer representing the number of hosts contained in the network 311 312 """ 313 314 # RGX_IPV4ADDR = re.compile(r'^(\d+\.\d+\.\d+\.\d+)') 315 # RGX_IPV4ADDR_NETMASK = re.compile(r'(\d+\.\d+\.\d+\.\d+)\s+(\d+\.\d+\.\d+\.\d+)') 316 317 self.arg = arg 318 self.dna = "IPv4Obj" 319 self.ip_object = None 320 self.network_object = None 321 322 try: 323 mm = _RGX_IPV4ADDR_NETMASK.search(arg) 324 except TypeError: 325 if isinstance(arg, int): 326 self.ip_object = IPv4Address(arg) 327 self.network_object = IPv4Network( 328 str(self.ip_object) + "/32", strict=False 329 ) 330 return None 331 elif getattr(arg, "dna", "") == "IPv4Obj": 332 ip_str = "{0}/{1}".format(str(arg.ip_object), arg.prefixlen) 333 self.network_object = IPv4Network(ip_str, strict=False) 334 self.ip_object = IPv4Address(str(arg.ip_object)) 335 return None 336 elif isinstance(arg, IPv4Network): 337 self.network_object = arg 338 self.ip_object = IPv4Address(str(arg).split("/")[0]) 339 return None 340 elif isinstance(arg, IPv4Address): 341 self.network_object = IPv4Network(str(arg) + "/32") 342 self.ip_object = IPv4Address(str(arg).split("/")[0]) 343 return None 344 else: 345 raise ValueError( 346 "IPv4Obj doesn't understand how to parse {0}".format(arg) 347 ) 348 349 ERROR = "IPv4Obj couldn't parse '{0}'".format(arg) 350 assert not (mm is None), ERROR 351 352 mm_result = mm.groupdict() 353 addr = ( 354 mm_result["addr0"] 355 or mm_result["addr1"] 356 or mm_result["addr2"] 357 or "127.0.0.1" 358 ) 359 360 ## Normalize addr if we get zero-padded strings, i.e. 172.001.001.001 361 addr = ".".join([str(int(ii)) for ii in addr.split(".")]) 362 363 masklen = int(mm_result["masklen"] or 32) 364 netmask = mm_result["netmask"] 365 if netmask: 366 ## ALWAYS check for the netmask first 367 self.network_object = IPv4Network( 368 "{0}/{1}".format(addr, netmask), strict=strict 369 ) 370 self.ip_object = IPv4Address("{0}".format(addr)) 371 else: 372 self.network_object = IPv4Network( 373 "{0}/{1}".format(addr, masklen), strict=strict 374 ) 375 self.ip_object = IPv4Address("{0}".format(addr)) 376 377 def __repr__(self): 378 return """<IPv4Obj {0}/{1}>""".format(str(self.ip_object), self.prefixlen) 379 380 def __eq__(self, val): 381 try: 382 # Code to fix Github issue #180 383 for obj in [self, val]: 384 for attr_name in ["as_decimal", "prefixlen"]: 385 try: 386 assert getattr(obj, attr_name, None) is not None 387 except AssertionError: 388 return False 389 390 # Compare objects numerically... 391 if self.as_decimal == val.as_decimal and self.prefixlen == val.prefixlen: 392 return True 393 return False 394 except AttributeError as e: 395 errmsg = "'{0}' cannot compare itself to '{1}': {2}".format( 396 self.__repr__(), val, e 397 ) 398 raise AttributeError(errmsg) 399 400 def __ne__(self, val): 401 return not self.__eq__(val) 402 403 def __gt__(self, val): 404 try: 405 for obj in [self, val]: 406 for attr_name in ["as_decimal", "as_decimal_network", "prefixlen"]: 407 try: 408 assert getattr(obj, attr_name, None) is not None 409 except (AssertionError) as ee: 410 error_str = "Cannot compare {} with '{}'".format(self, type(obj)) 411 raise AssertionError(error_str) 412 413 val_prefixlen = int(getattr(val, "prefixlen")) 414 self_prefixlen = int(getattr(self, "prefixlen")) 415 val_ndec = int(getattr(val, "as_decimal_network")) 416 self_ndec = int(getattr(self, "as_decimal_network")) 417 val_dec = int(getattr(val, "as_decimal")) 418 self_dec = int(getattr(self, "as_decimal")) 419 420 if self_ndec == val_ndec and self_prefixlen == val_prefixlen: 421 return self_dec > val_dec 422 423 # for the same network, longer prefixlens sort "higher" than shorter prefixlens 424 elif self_ndec == val_ndec: 425 return self_prefixlen > val_prefixlen 426 427 else: 428 return self_ndec > val_ndec 429 430 except: 431 errmsg = "{0} cannot compare itself to '{1}'".format(self.__repr__(), val) 432 raise ValueError(errmsg) 433 434 def __lt__(self, val): 435 try: 436 for obj in [self, val]: 437 for attr_name in ["as_decimal", "as_decimal_network", "prefixlen"]: 438 try: 439 assert getattr(obj, attr_name, None) is not None 440 except (AssertionError) as ee: 441 error_str = "Cannot compare {} with '{}'".format(self, type(obj)) 442 raise AssertionError(error_str) 443 444 val_prefixlen = int(getattr(val, "prefixlen")) 445 self_prefixlen = int(getattr(self, "prefixlen")) 446 val_ndec = int(getattr(val, "as_decimal_network")) 447 self_ndec = int(getattr(self, "as_decimal_network")) 448 val_dec = int(getattr(val, "as_decimal")) 449 self_dec = int(getattr(self, "as_decimal")) 450 451 if self_ndec == val_ndec and self_prefixlen == val_prefixlen: 452 return self_dec < val_dec 453 454 # for the same network, longer prefixlens sort "higher" than shorter prefixlens 455 elif self_ndec == val_ndec: 456 return self_prefixlen < val_prefixlen 457 458 else: 459 return self_ndec < val_ndec 460 461 except Exception as ee: 462 print(ee) 463 errmsg = "{0} cannot compare itself to '{1}'".format(self.__repr__(), val) 464 raise ValueError(errmsg) 465 466 def __int__(self): 467 """Return this object as an integer""" 468 if getattr(self, "as_decimal", None) is not None: 469 return self.as_decimal 470 else: 471 return False 472 473 def __index__(self): 474 """Return this object as an integer (used for hex() and bin() operations)""" 475 if getattr(self, "as_decimal", None) is not None: 476 return self.as_decimal 477 else: 478 return False 479 480 def __add__(self, val): 481 """Add an integer to IPv4Obj() and return an IPv4Obj()""" 482 assert isinstance(val, int), "Cannot add type: '{}' to {}".format( 483 type(val), self 484 ) 485 orig_prefixlen = self.prefixlen 486 total = self.as_decimal + val 487 assert total <= 4294967295, "Max IPv4 integer exceeded" 488 assert total >= 0, "Min IPv4 integer exceeded" 489 retval = IPv4Obj(total) 490 retval.prefixlen = orig_prefixlen 491 return retval 492 493 def __sub__(self, val): 494 """Subtract an integer from IPv4Obj() and return an IPv4Obj()""" 495 assert isinstance(val, int), "Cannot subtract type: '{}' from {}".format( 496 type(val), self 497 ) 498 orig_prefixlen = self.prefixlen 499 total = self.as_decimal - val 500 assert total <= 4294967295, "Max IPv4 integer exceeded" 501 assert total >= 0, "Min IPv4 integer exceeded" 502 retval = IPv4Obj(total) 503 retval.prefixlen = orig_prefixlen 504 return retval 505 506 def __contains__(self, val): 507 # Used for "foo in bar"... python calls bar.__contains__(foo) 508 try: 509 if self.network_object.prefixlen == 0: 510 return True 511 elif self.network_object.prefixlen > val.network_object.prefixlen: 512 # obvious shortcut... if this object's mask is longer than 513 # val, this object cannot contain val 514 return False 515 else: 516 # return (val.network in self.network) 517 # 518 ## Last used: 2020-07-12... version 1.5.6 519 # return (self.network <= val.network) and ( 520 # self.broadcast >= val.broadcast 521 # ) 522 return (self.as_decimal_network <= val.as_decimal_network) and ( 523 (self.as_decimal_network + self.numhosts - 1) 524 >= (val.as_decimal_network + val.numhosts - 1) 525 ) 526 527 except ValueError as e: 528 raise ValueError( 529 "Could not check whether '{0}' is contained in '{1}': {2}".format( 530 val, self, e 531 ) 532 ) 533 534 def __hash__(self): 535 # Python3 needs __hash__() 536 return hash(str(self.ip_object)) + hash(str(self.prefixlen)) 537 538 def __iter__(self): 539 return self.network_object.__iter__() 540 541 def __next__(self): 542 ## For Python3 iteration... 543 return self.network_object.__next__() 544 545 def next(self): 546 ## For Python2 iteration... 547 return self.network_object.__next__() 548 549 @property 550 def _version(self): 551 """ 552 Fix github issue #203... build a `_prefixlen` attribute... 553 """ 554 return self.version 555 556 @property 557 def _prefixlen(self): 558 """ 559 Fix github issue #203... build a `_prefixlen` attribute... 560 """ 561 return self.prefixlen 562 563 @property 564 def _max_prefixlen(self): 565 """ 566 Fix github issue #203... build a `_prefixlen` attribute... 567 """ 568 if self.version == 4: 569 return 32 570 else: 571 return 128 572 573 @property 574 def ip(self): 575 """Returns the address as an :class:`ipaddress.IPv4Address` object.""" 576 return self.ip_object 577 578 @property 579 def netmask(self): 580 """Returns the network mask as an :class:`ipaddress.IPv4Address` object.""" 581 return self.network_object.netmask 582 583 @property 584 def prefixlen(self): 585 """Returns the length of the network mask as an integer.""" 586 return int(self.network_object.prefixlen) 587 588 @prefixlen.setter 589 def prefixlen(self, arg): 590 """prefixlen setter method""" 591 self.network_object = IPv4Network( 592 "{0}/{1}".format(str(self.ip_object), arg), strict=False 593 ) 594 595 @property 596 def prefixlength(self): 597 """Returns the length of the network mask as an integer.""" 598 return self.prefixlen 599 600 @property 601 def exploded(self): 602 """Returns the IPv4 Address object in exploded form""" 603 return self.ip_object.exploded 604 605 @property 606 def packed(self): 607 """Returns the IPv4 object in packed binary form""" 608 return self.ip_object.packed 609 610 @property 611 def broadcast(self): 612 """Returns the broadcast address as an :class:`ipaddress.IPv4Address` object.""" 613 if sys.version_info[0] < 3: 614 return self.network_object.broadcast 615 else: 616 return self.network_object.broadcast_address 617 618 @property 619 def network(self): 620 """Returns an :class:`ipaddress.IPv4Network` object, which represents this network. 621 """ 622 if sys.version_info[0] < 3: 623 return self.network_object.network 624 else: 625 ## The ipaddress module returns an "IPAddress" object in Python3... 626 return IPv4Network("{0}".format(self.network_object.compressed)) 627 628 @property 629 def as_decimal_network(self): 630 """Returns an integer calculated from the network address... 631 """ 632 num_strings = str(self.network).split(".") 633 num_strings.reverse() # reverse the order 634 return sum( 635 [int(num, 16) * (65536 ** idx) for idx, num in enumerate(num_strings)] 636 ) 637 638 @property 639 def hostmask(self): 640 """Returns the host mask as an :class:`ipaddress.IPv4Address` object.""" 641 return self.network_object.hostmask 642 643 @property 644 def inverse_netmask(self): 645 """Returns the host mask as an :class:`ipaddress.IPv4Address` object.""" 646 return self.network_object.hostmask 647 648 @property 649 def version(self): 650 """Returns the IP version of the object as an integer. i.e. 4""" 651 return 4 652 653 @property 654 def numhosts(self): 655 """Returns the total number of IP addresses in this network, including broadcast and the "subnet zero" address""" 656 if sys.version_info[0] < 3: 657 return self.network_object.numhosts 658 else: 659 return 2 ** (32 - self.network_object.prefixlen) 660 661 @property 662 def as_decimal(self): 663 """Returns the IP address as a decimal integer""" 664 num_strings = str(self.ip).split(".") 665 num_strings.reverse() # reverse the order 666 return sum([int(num) * (256 ** idx) for idx, num in enumerate(num_strings)]) 667 668 @property 669 def as_decimal_network(self): 670 """Returns the IP address as a decimal integer""" 671 num_strings = str(self.network).split("/")[0].split(".") 672 num_strings.reverse() # reverse the order 673 return sum([int(num) * (256 ** idx) for idx, num in enumerate(num_strings)]) 674 675 @property 676 def as_zeropadded(self): 677 """Returns the IP address as a zero-padded string (useful when sorting in a text-file)""" 678 num_strings = str(self.ip).split(".") 679 return ".".join(["{0:03}".format(int(num)) for num in num_strings]) 680 681 @property 682 def as_zeropadded_network(self): 683 """Returns the IP network as a zero-padded string (useful when sorting in a text-file)""" 684 num_strings = self.as_cidr_net.split("/")[0].split(".") 685 return ( 686 ".".join(["{0:03}".format(int(num)) for num in num_strings]) 687 + "/" 688 + str(self.prefixlen) 689 ) 690 691 @property 692 def as_binary_tuple(self): 693 """Returns the IP address as a tuple of zero-padded binary strings""" 694 return tuple(["{0:08b}".format(int(num)) for num in str(self.ip).split(".")]) 695 696 @property 697 def as_hex_tuple(self): 698 """Returns the IP address as a tuple of zero-padded hex strings""" 699 return tuple(["{0:02x}".format(int(num)) for num in str(self.ip).split(".")]) 700 701 @property 702 def as_cidr_addr(self): 703 """Returns a string with the address in CIDR notation""" 704 return str(self.ip) + "/" + str(self.prefixlen) 705 706 @property 707 def as_cidr_net(self): 708 """Returns a string with the network in CIDR notation""" 709 if sys.version_info[0] < 3: 710 return str(self.network) + "/" + str(self.prefixlen) 711 else: 712 return str(self.network) 713 714 @property 715 def is_multicast(self): 716 """Returns a boolean for whether this is a multicast address""" 717 return self.network_object.is_multicast 718 719 @property 720 def is_private(self): 721 """Returns a boolean for whether this is a private address""" 722 return self.network_object.is_private 723 724 @property 725 def is_reserved(self): 726 """Returns a boolean for whether this is a reserved address""" 727 return self.network_object.is_reserved 728 729 730## Emulate the old behavior of ipaddr.IPv6Network in Python2, which can use 731## IPv6Network with a host address. Google removed that in Python3's 732## ipaddress.py module 733class IPv6Obj(object): 734 def __init__(self, arg="::1/128", strict=False): 735 """An object to represent IPv6 addresses and IPv6 networks. 736 737 When :class:`~ccp_util.IPv6Obj` objects are compared or sorted, network numbers are sorted lower to higher. If network numbers are the same, shorter masks are lower than longer masks. After comparing mask length, numerically higher IP addresses are greater than numerically lower IP addresses. Comparisons between :class:`~ccp_util.IPv6Obj` instances was chosen so it's easy to find the longest-match for a given prefix. 738 739 This object emulates the behavior of ipaddr.IPv6Network() (in Python2) where host-bits were retained in the IPv6Network() object. :class:`ipaddress.IPv6Network` in Python3 does not retain host-bits; the desire to retain host-bits in both Python2 and Python3 ip network objects was the genesis of this API. 740 741 Parameters 742 ---------- 743 arg : str or int 744 A string containing an IPv6 address, and optionally a netmask or masklength. Integers are also accepted. The following address/netmask formats are supported: "2001::dead:beef", "2001::dead:beef/64", 745 strict : bool 746 When `strict` is True, the value of `arg` must not have host-bits set. The default value is False. 747 748 Examples 749 -------- 750 751 >>> from ciscoconfparse.ccp_util import IPv6Obj 752 >>> net = IPv6Obj(42540488161975842760550356429036175087) 753 >>> net 754 <IPv6Obj 2001::dead:beef/64> 755 >>> net = IPv6Obj("2001::dead:beef/64") 756 >>> net 757 <IPv6Obj 2001::dead:beef/64> 758 >>> 759 760 Attributes 761 ---------- 762 network : :class:`ipaddress.IPv6Network` 763 Returns an :class:`ipaddress.IPv6Network` with the network of this object 764 network_object : :class:`ipaddress.IPv6Network` 765 Returns an :class:`ipaddress.IPv6Network` with the network of this object 766 ip_object : :class:`ipaddress.IPv6Address` 767 Returns an :class:`ipaddress.IPv6Address` with the host address of this object 768 ip : :class:`ipaddress.IPv6Address` 769 Returns an :class:`ipaddress.IPv6Address` with the host address of this object 770 as_binary_tuple : tuple 771 The ipv6 address as a tuple of zero-padded binary strings 772 as_decimal : int 773 The ipv6 address as a decimal integer 774 as_decimal_network : int 775 The network address as a decimal integer 776 as_hex_tuple : tuple 777 The ipv6 address as a tuple of zero-padded 8-bit hex strings 778 netmask : :class:`ipaddress.IPv6Address` 779 An :class:`ipaddress.IPv6Address` object containing the netmask 780 prefixlen : int 781 An integer representing the length of the netmask 782 broadcast: raises `NotImplementedError`; IPv6 doesn't use broadcast addresses 783 hostmask : :class:`ipaddress.IPv6Address` 784 An :class:`ipaddress.IPv6Address` representing the hostmask 785 numhosts : int 786 An integer representing the number of hosts contained in the network 787 788 """ 789 790 # arg= _RGX_IPV6ADDR_NETMASK.sub(r'\1/\2', arg) # mangle IOS: 'addr mask' 791 self.arg = arg 792 self.dna = "IPv6Obj" 793 self.ip_object = None 794 self.network_object = None 795 796 try: 797 mm = _RGX_IPV6ADDR.search(arg) 798 except TypeError: 799 if isinstance(arg, int): 800 self.ip_object = IPv6Address(arg) 801 self.network_object = IPv6Network( 802 str(self.ip_object) + "/128", strict=False 803 ) 804 return None 805 elif getattr(arg, "dna", "") == "IPv6Obj": 806 ip_str = "{0}/{1}".format(str(arg.ip_object), arg.prefixlen) 807 self.network_object = IPv6Network(ip_str, strict=False) 808 self.ip_object = IPv6Address(str(arg.ip_object)) 809 return None 810 elif isinstance(arg, IPv6Network): 811 self.network_object = arg 812 self.ip_object = IPv6Address(str(arg).split("/")[0]) 813 return None 814 elif isinstance(arg, IPv6Address): 815 self.network_object = IPv6Network(str(arg) + "/128") 816 self.ip_object = IPv6Address(str(arg).split("/")[0]) 817 return None 818 else: 819 raise ValueError( 820 "IPv6Obj doesn't understand how to parse {0}".format(arg) 821 ) 822 823 824 ERROR = "IPv6Obj couldn't parse {0}".format(arg) 825 assert not (mm is None), ERROR 826 self.network_object = IPv6Network(arg, strict=strict) 827 self.ip_object = IPv6Address(mm.group(1)) 828 829 # 'address_exclude', 'compare_networks', 'hostmask', 'ipv4_mapped', 'iter_subnets', 'iterhosts', 'masked', 'max_prefixlen', 'netmask', 'network', 'numhosts', 'overlaps', 'prefixlen', 'sixtofour', 'subnet', 'supernet', 'teredo', 'with_hostmask', 'with_netmask', 'with_prefixlen' 830 831 def __repr__(self): 832 return """<IPv6Obj {0}/{1}>""".format(str(self.ip_object), self.prefixlen) 833 834 def __eq__(self, val): 835 try: 836 for obj in [self, val]: 837 for attr_name in ["as_decimal", "prefixlen"]: 838 try: 839 assert getattr(obj, attr_name, None) is not None 840 except AssertionError: 841 return False 842 843 # Compare objects numerically... 844 if self.as_decimal == val.as_decimal and self.prefixlen == val.prefixlen: 845 return True 846 return False 847 except (Exception) as e: 848 errmsg = "'{0}' cannot compare itself to '{1}': {2}".format( 849 self.__repr__(), val, e 850 ) 851 raise ValueError(errmsg) 852 853 def __ne__(self, val): 854 return not self.__eq__(val) 855 856 def __gt__(self, val): 857 try: 858 for obj in [self, val]: 859 for attr_name in ["as_decimal", "as_decimal_network", "prefixlen"]: 860 try: 861 assert getattr(obj, attr_name, None) is not None 862 except (AssertionError) as ee: 863 error_str = "Cannot compare {} with '{}'".format(self, type(obj)) 864 raise AssertionError(error_str) 865 866 val_prefixlen = int(getattr(val, "prefixlen")) 867 self_prefixlen = int(getattr(self, "prefixlen")) 868 val_ndec = int(getattr(val, "as_decimal_network")) 869 self_ndec = int(getattr(self, "as_decimal_network")) 870 val_dec = int(getattr(val, "as_decimal")) 871 self_dec = int(getattr(self, "as_decimal")) 872 873 if self_ndec == val_ndec and self_prefixlen == val_prefixlen: 874 return self_dec > val_dec 875 876 # for the same network, longer prefixlens sort "higher" than shorter prefixlens 877 elif self_ndec == val_ndec: 878 return self_prefixlen > val_prefixlen 879 880 else: 881 return self_ndec > val_ndec 882 883 except: 884 errmsg = "{0} cannot compare itself to '{1}'".format(self.__repr__(), val) 885 raise ValueError(errmsg) 886 887 def __lt__(self, val): 888 try: 889 for obj in [self, val]: 890 for attr_name in ["as_decimal", "prefixlen"]: 891 try: 892 assert getattr(obj, attr_name, None) is not None 893 except (AssertionError) as ee: 894 error_str = "Cannot compare {} with '{}'".format(self, type(obj)) 895 raise AssertionError(error_str) 896 897 val_prefixlen = int(getattr(val, "prefixlen")) 898 self_prefixlen = int(getattr(self, "prefixlen")) 899 val_ndec = int(getattr(val, "as_decimal_network")) 900 self_ndec = int(getattr(self, "as_decimal_network")) 901 val_dec = int(getattr(val, "as_decimal")) 902 self_dec = int(getattr(self, "as_decimal")) 903 904 if self_ndec == val_ndec and self_prefixlen == val_prefixlen: 905 return self_dec < val_dec 906 907 # for the same network, longer prefixlens sort "higher" than shorter prefixlens 908 elif self_ndec == val_ndec: 909 return self_prefixlen < val_prefixlen 910 911 else: 912 return self_ndec < val_ndec 913 914 except: 915 errmsg = "{0} cannot compare itself to '{1}'".format(self.__repr__(), val) 916 raise ValueError(errmsg) 917 918 def __int__(self): 919 """Return this object as an integer""" 920 if getattr(self, "as_decimal", None) is not None: 921 return self.as_decimal 922 else: 923 return False 924 925 def __index__(self): 926 """Return this object as an integer (used for hex() and bin() operations)""" 927 if getattr(self, "as_decimal", None) is not None: 928 return self.as_decimal 929 else: 930 return False 931 932 def __add__(self, val): 933 """Add an integer to IPv6Obj() and return an IPv6Obj()""" 934 assert isinstance(val, int), "Cannot add type: '{}' to {}".format( 935 type(val), self 936 ) 937 orig_prefixlen = self.prefixlen 938 total = self.as_decimal + val 939 error = "Max IPv6 integer exceeded" 940 assert total <= 340282366920938463463374607431768211455, error 941 assert total >= 0, "Min IPv6 integer exceeded" 942 retval = IPv6Obj(total) 943 retval.prefixlen = orig_prefixlen 944 return retval 945 946 def __sub__(self, val): 947 """Subtract an integer from IPv6Obj() and return an IPv6Obj()""" 948 assert isinstance(val, int), "Cannot subtract type: '{}' from {}".format( 949 type(val), self 950 ) 951 orig_prefixlen = self.prefixlen 952 total = self.as_decimal - val 953 error = "Max IPv6 integer exceeded" 954 assert total <= 340282366920938463463374607431768211455, error 955 assert total >= 0, "Min IPv6 integer exceeded" 956 retval = IPv6Obj(total) 957 retval.prefixlen = orig_prefixlen 958 return retval 959 960 def __contains__(self, val): 961 # Used for "foo in bar"... python calls bar.__contains__(foo) 962 try: 963 if self.network_object.prefixlen == 0: 964 return True 965 elif self.network_object.prefixlen > val.network_object.prefixlen: 966 # obvious shortcut... if this object's mask is longer than 967 # val, this object cannot contain val 968 return False 969 else: 970 # NOTE: We cannot use the same algorithm as IPv4Obj.__contains__() because IPv6Obj doesn't have .broadcast 971 # return (val.network in self.network) 972 # 973 ## Last used: 2020-07-12... version 1.5.6 974 # return (self.network <= val.network) and ( 975 # (self.as_decimal + self.numhosts - 1) 976 # >= (val.as_decimal + val.numhosts - 1) 977 # ) 978 return (self.as_decimal_network <= val.as_decimal_network) and ( 979 (self.as_decimal_network + self.numhosts - 1) 980 >= (val.as_decimal_network + val.numhosts - 1) 981 ) 982 983 except (Exception) as e: 984 raise ValueError( 985 "Could not check whether '{0}' is contained in '{1}': {2}".format( 986 val, self, e 987 ) 988 ) 989 990 def __hash__(self): 991 # Python3 needs __hash__() 992 return hash(str(self.ip_object)) + hash(str(self.prefixlen)) 993 994 def __iter__(self): 995 return self.network_object.__iter__() 996 997 def __next__(self): 998 ## For Python3 iteration... 999 return self.network_object.__next__() 1000 1001 def next(self): 1002 ## For Python2 iteration... 1003 return self.network_object.__next__() 1004 1005 @property 1006 def _version(self): 1007 """ 1008 Fix github issue #203... build a `_prefixlen` attribute... 1009 """ 1010 return self.version 1011 1012 @property 1013 def _prefixlen(self): 1014 """ 1015 Fix github issue #203... build a `_prefixlen` attribute... 1016 """ 1017 return self.prefixlen 1018 1019 @property 1020 def _max_prefixlen(self): 1021 """ 1022 Fix github issue #203... build a `_prefixlen` attribute... 1023 """ 1024 if self.version == 4: 1025 return 32 1026 else: 1027 return 128 1028 1029 @property 1030 def ip(self): 1031 """Returns the address as an :class:`ipaddress.IPv6Address` object.""" 1032 return self.ip_object 1033 1034 @property 1035 def netmask(self): 1036 """Returns the network mask as an :class:`ipaddress.IPv6Address` object.""" 1037 return self.network_object.netmask 1038 1039 @property 1040 def prefixlen(self): 1041 """Returns the length of the network mask as an integer.""" 1042 return int(self.network_object.prefixlen) 1043 1044 @prefixlen.setter 1045 def prefixlen(self, arg): 1046 """prefixlen setter method""" 1047 self.network_object = IPv6Network( 1048 "{0}/{1}".format(str(self.ip_object), arg), strict=False 1049 ) 1050 1051 @property 1052 def prefixlength(self): 1053 """Returns the length of the network mask as an integer.""" 1054 return self.prefixlen 1055 1056 @property 1057 def compressed(self): 1058 """Returns the IPv6 Network object in compressed form""" 1059 return self.network_object.compressed 1060 1061 @property 1062 def exploded(self): 1063 """Returns the IPv6 Address object in exploded form""" 1064 return self.ip_object.exploded 1065 1066 @property 1067 def packed(self): 1068 """Returns the IPv6 Address object in packed binary form""" 1069 return self.ip_object.packed 1070 1071 @property 1072 def broadcast(self): 1073 raise NotImplementedError("IPv6 does not have broadcasts") 1074 1075 @property 1076 def network(self): 1077 """Returns an :class:`ipaddress.IPv6Network` object, which represents this network. 1078 """ 1079 if sys.version_info[0] < 3: 1080 return self.network_object.network 1081 else: 1082 ## The ipaddress module returns an "IPAddress" object in Python3... 1083 return IPv6Network("{0}".format(self.network_object.compressed)) 1084 1085 @property 1086 def as_decimal_network(self): 1087 """Returns the IP network as a decimal integer""" 1088 num_strings = str(self.network.exploded).split("/")[0].split(":") 1089 num_strings.reverse() # reverse the order 1090 return sum( 1091 [int(num, 16) * (65536 ** idx) for idx, num in enumerate(num_strings)] 1092 ) 1093 1094 @property 1095 def hostmask(self): 1096 """Returns the host mask as an :class:`ipaddress.IPv6Address` object.""" 1097 return self.network_object.hostmask 1098 1099 @property 1100 def inverse_netmask(self): 1101 """Returns the host mask as an :class:`ipaddress.IPv4Address` object.""" 1102 return self.network_object.hostmask 1103 1104 @property 1105 def version(self): 1106 """Returns the IP version of the object as an integer. i.e. 6""" 1107 return 6 1108 1109 @property 1110 def numhosts(self): 1111 """Returns the total number of IP addresses in this network, including broadcast and the "subnet zero" address""" 1112 if sys.version_info[0] < 3: 1113 return self.network_object.numhosts 1114 else: 1115 return 2 ** (128 - self.network_object.prefixlen) 1116 1117 @property 1118 def as_decimal(self): 1119 """Returns the IP address as a decimal integer""" 1120 num_strings = str(self.ip.exploded).split(":") 1121 num_strings.reverse() # reverse the order 1122 return sum( 1123 [int(num, 16) * (65536 ** idx) for idx, num in enumerate(num_strings)] 1124 ) 1125 1126 @property 1127 def as_binary_tuple(self): 1128 """Returns the IPv6 address as a tuple of zero-padded 16-bit binary strings""" 1129 result_list = ["{0:016b}".format(int(ii, 16)) for ii in self.as_hex_tuple] 1130 return tuple(result_list) 1131 1132 @property 1133 def as_hex_tuple(self): 1134 """Returns the IPv6 address as a tuple of zero-padded 16-bit hex strings""" 1135 result_list = str(self.ip.exploded).split(":") 1136 return tuple(result_list) 1137 1138 @property 1139 def as_cidr_addr(self): 1140 """Returns a string with the address in CIDR notation""" 1141 return str(self.ip) + "/" + str(self.prefixlen) 1142 1143 @property 1144 def as_cidr_net(self): 1145 """Returns a string with the network in CIDR notation""" 1146 if sys.version_info[0] < 3: 1147 return str(self.network) + "/" + str(self.prefixlen) 1148 else: 1149 return str(self.network) 1150 1151 @property 1152 def is_multicast(self): 1153 """Returns a boolean for whether this is a multicast address""" 1154 return self.network_object.is_multicast 1155 1156 @property 1157 def is_private(self): 1158 """Returns a boolean for whether this is a private address""" 1159 return self.network_object.is_private 1160 1161 @property 1162 def is_reserved(self): 1163 """Returns a boolean for whether this is a reserved address""" 1164 return self.network_object.is_reserved 1165 1166 @property 1167 def is_link_local(self): 1168 """Returns a boolean for whether this is an IPv6 link-local address""" 1169 return self.network_object.is_link_local 1170 1171 @property 1172 def is_site_local(self): 1173 """Returns a boolean for whether this is an IPv6 site-local address""" 1174 return self.network_object.is_site_local 1175 1176 @property 1177 def is_unspecified(self): 1178 """Returns a boolean for whether this address is not otherwise 1179 classified""" 1180 return self.network_object.is_unspecified 1181 1182 @property 1183 def teredo(self): 1184 return self.network_object.teredo 1185 1186 @property 1187 def sixtofour(self): 1188 return self.network_object.sixtofour 1189 1190 1191class L4Object(object): 1192 """Object for Transport-layer protocols; the object ensures that logical operators (such as le, gt, eq, and ne) are parsed correctly, as well as mapping service names to port numbers 1193 1194 Examples 1195 -------- 1196 >>> from ciscoconfparse.ccp_util import L4Object 1197 >>> obj = L4Object(protocol="tcp", port_spec="range ssh smtp", syntax="asa") 1198 >>> obj 1199 <L4Object tcp ports: 22-25> 1200 >>> obj.protocol 1201 'tcp' 1202 >>> 25 in obj.port_list 1203 True 1204 >>> 1205 """ 1206 1207 def __init__(self, protocol="", port_spec="", syntax=""): 1208 self.protocol = protocol 1209 self.port_list = list() 1210 self.syntax = syntax 1211 1212 try: 1213 port_spec = port_spec.strip() 1214 except: 1215 port_spec = port_spec 1216 1217 if syntax == "asa": 1218 if protocol == "tcp": 1219 ports = ASA_TCP_PORTS 1220 elif protocol == "udp": 1221 ports = ASA_UDP_PORTS 1222 else: 1223 raise NotImplementedError( 1224 "'{0}' is not supported: '{0}'".format(protocol) 1225 ) 1226 else: 1227 raise NotImplementedError("This syntax is unknown: '{0}'".format(syntax)) 1228 1229 if "eq " in port_spec.strip(): 1230 port_tmp = re.split("\s+", port_spec)[-1].strip() 1231 eq_port = int(ports.get(port_tmp, port_tmp)) 1232 assert 1 <= eq_port <= 65535 1233 self.port_list = [eq_port] 1234 elif re.search(r"^\S+$", port_spec.strip()): 1235 # Technically, 'eq ' is optional... 1236 eq_port = int(ports.get(port_spec.strip(), port_spec.strip())) 1237 assert 1 <= eq_port <= 65535 1238 self.port_list = [eq_port] 1239 elif "range " in port_spec.strip(): 1240 port_tmp = re.split("\s+", port_spec)[1:] 1241 low_port = int(ports.get(port_tmp[0], port_tmp[0])) 1242 high_port = int(ports.get(port_tmp[1], port_tmp[1])) 1243 assert low_port <= high_port 1244 self.port_list = sorted(range(low_port, high_port+1)) 1245 elif "lt " in port_spec.strip(): 1246 port_tmp = re.split("\s+", port_spec)[-1] 1247 high_port = int(ports.get(port_tmp, port_tmp)) 1248 assert 65536 >= high_port >= 2 1249 self.port_list = sorted(range(1, high_port)) 1250 elif "gt " in port_spec.strip(): 1251 port_tmp = re.split("\s+", port_spec)[-1] 1252 low_port = int(ports.get(port_tmp, port_tmp)) 1253 assert 0 < low_port < 65535 1254 self.port_list = sorted(range(low_port+1, 65536)) 1255 elif "neq " in port_spec.strip(): 1256 port_str = re.split("\s+", port_spec)[-1] 1257 tmp = set(range(1, 65536)) 1258 tmp.remove(int(port_str)) 1259 self.port_list = sorted(tmp) 1260 else: 1261 raise NotImplementedError("This port_spec is unknown: '{0}'".format(port_spec)) 1262 1263 def __eq__(self, val): 1264 if (self.protocol == val.protocol) and (self.port_list == val.port_list): 1265 return True 1266 return False 1267 1268 def __repr__(self): 1269 crobj = CiscoRange() 1270 crobj._list = self.port_list 1271 return "<L4Object {0} ports: {1}>".format(self.protocol, crobj.compressed_str) 1272 1273 1274class DNSResponse(object): 1275 """A universal DNS Response object 1276 1277 Parameters 1278 ---------- 1279 query_type : str 1280 A string containing the DNS record type to lookup 1281 result_str : str 1282 A string containing the DNS Response 1283 input_str : str 1284 The DNS query string 1285 duration : float 1286 The query duration in seconds 1287 1288 Attributes 1289 ---------- 1290 query_type : str 1291 A string containing the DNS record type to lookup 1292 result_str : str 1293 A string containing the DNS Response 1294 input_str : str 1295 The DNS query string 1296 has_error : bool 1297 Indicates the query resulted in an error when True 1298 error_str : str 1299 The error returned by dnspython 1300 duration : float 1301 The query duration in seconds 1302 preference : int 1303 The MX record's preference (default: -1) 1304 1305 Returns 1306 ------- 1307 A :class:`~ccp_util.DNSResponse` instance 1308""" 1309 1310 def __init__(self, query_type="", result_str="", input_str="", duration=0.0): 1311 self.query_type = query_type 1312 self.result_str = result_str 1313 self.input_str = input_str 1314 self.duration = duration # Query duration in seconds 1315 1316 self.has_error = False 1317 self.error_str = "" 1318 self.preference = -1 # MX Preference 1319 1320 def __str__(self): 1321 return self.result_str 1322 1323 def __repr__(self): 1324 if not self.has_error: 1325 return '<DNSResponse "{0}" result_str="{1}">'.format( 1326 self.query_type, self.result_str 1327 ) 1328 else: 1329 return '<DNSResponse "{0}" error="{1}">'.format( 1330 self.query_type, self.error_str 1331 ) 1332 1333 1334def dns_query(input_str="", query_type="", server="", timeout=2.0): 1335 """A unified IPv4 & IPv6 DNS lookup interface; this is essentially just a wrapper around dnspython's API. When you query a PTR record, you can use an IPv4 or IPv6 address (which will automatically be converted into an in-addr.arpa name. This wrapper only supports a subset of DNS records: 'A', 'AAAA', 'CNAME', 'MX', 'NS', 'PTR', and 'TXT' 1336 1337 Paremeters 1338 ---------- 1339 input_str : str 1340 A string containing the DNS record to lookup 1341 query_type : str 1342 A string containing the DNS record type (SOA not supported) 1343 server : str 1344 A string containing the fqdn or IP address of the dns server 1345 timeout : float 1346 DNS lookup timeout duration (default: 2.0 seconds) 1347 1348 Returns 1349 ------- 1350 A set([]) of :class:`~ccp_util.DNSResponse` instances. Refer to the DNSResponse object in these docs for more information. 1351 1352 Examples 1353 -------- 1354 >>> from ciscoconfparse.ccp_util import dns_query 1355 >>> dns_query('www.pennington.net', "A", "4.2.2.2", timeout=0.5) 1356 {<DNSResponse "A" result_str="65.19.187.2">} 1357 >>> response_set = dns_query('www.pennington.net', 'A', '4.2.2.2') 1358 >>> aa = response_set.pop() 1359 >>> aa.result_str 1360 '65.19.187.2' 1361 >>> aa.error_str 1362 '' 1363 >>> 1364 """ 1365 1366 valid_records = set(["A", "AAAA", "AXFR", "CNAME", "MX", "NS", "PTR", "TXT"]) 1367 query_type = query_type.upper() 1368 assert query_type in valid_records 1369 assert server != "" 1370 assert float(timeout) > 0 1371 assert input_str != "" 1372 intput = input_str.strip() 1373 retval = set([]) 1374 resolver = Resolver() 1375 resolver.server = [socket.gethostbyname(server)] 1376 resolver.timeout = float(timeout) 1377 resolver.lifetime = float(timeout) 1378 start = time.time() 1379 if (query_type == "A") or (query_type == "AAAA"): 1380 try: 1381 answer = resolver.query(input_str, query_type) 1382 duration = time.time() - start 1383 for result in answer: 1384 response = DNSResponse( 1385 query_type=query_type, 1386 duration=duration, 1387 input_str=input_str, 1388 result_str=str(result.address), 1389 ) 1390 retval.add(response) 1391 except DNSException as e: 1392 duration = time.time() - start 1393 response = DNSResponse( 1394 input_str=input_str, duration=duration, query_type=query_type 1395 ) 1396 response.has_error = True 1397 response.error_str = e 1398 retval.add(response) 1399 elif query_type == "AXFR": 1400 """This is a hack: return text of zone transfer, instead of axfr objs""" 1401 _zone = zone.from_xfr(query.xfr(server, input_str, lifetime=timeout)) 1402 return [_zone[node].to_text(node) for node in _zone.nodes.keys()] 1403 elif query_type == "CNAME": 1404 try: 1405 answer = resolver.query(input_str, query_type) 1406 duration = time.time() - start 1407 for result in answer: 1408 response = DNSResponse( 1409 query_type=query_type, 1410 duration=duration, 1411 input_str=input_str, 1412 result_str=str(result.target), 1413 ) 1414 retval.add(response) 1415 except DNSException as e: 1416 duration = time.time() - start 1417 response = DNSResponse( 1418 input_str=input_str, duration=duration, query_type=query_type 1419 ) 1420 response.has_error = True 1421 response.error_str = e 1422 retval.add(response) 1423 elif query_type == "MX": 1424 try: 1425 answer = resolver.query(input_str, query_type) 1426 duration = time.time() - start 1427 for result in answer: 1428 response = DNSResponse( 1429 query_type=query_type, input_str=input_str, result_str=str(result.target) 1430 ) 1431 response.preference = int(result.preference) 1432 retval.add(response) 1433 except DNSException as e: 1434 duration = time.time() - start 1435 response = DNSResponse( 1436 input_str=input_str, duration=duration, query_type=query_type 1437 ) 1438 response.has_error = True 1439 response.error_str = e 1440 retval.add(response) 1441 elif query_type == "NS": 1442 try: 1443 answer = resolver.query(input_str, query_type) 1444 duration = time.time() - start 1445 for result in answer: 1446 response = DNSResponse( 1447 query_type=query_type, 1448 duration=duration, 1449 input_str=input_str, 1450 result_str=str(result.target), 1451 ) 1452 retval.add(response) 1453 except DNSException as e: 1454 duration = time.time() - start 1455 response = DNSResponse( 1456 input_str=input_str, duration=duration, query_type=query_type 1457 ) 1458 response.has_error = True 1459 response.error_str = e 1460 retval.add(response) 1461 elif query_type == "PTR": 1462 if is_valid_ipv4_addr(input_str) or is_valid_ipv6_addr(input_str): 1463 inaddr = reversename.from_address(input_str) 1464 elif "in-addr.arpa" in input_str.lower(): 1465 inaddr = input_str 1466 else: 1467 raise ValueError('Cannot query PTR record for "{0}"'.format(input_str)) 1468 1469 try: 1470 answer = resolver.query(inaddr, query_type) 1471 duration = time.time() - start 1472 for result in answer: 1473 response = DNSResponse( 1474 query_type=query_type, 1475 duration=duration, 1476 input_str=inaddr, 1477 result_str=str(result.target), 1478 ) 1479 retval.add(response) 1480 except DNSException as e: 1481 duration = time.time() - start 1482 response = DNSResponse( 1483 input_str=input_str, duration=duration, query_type=query_type 1484 ) 1485 response.has_error = True 1486 response.error_str = e 1487 retval.add(response) 1488 elif query_type == "TXT": 1489 try: 1490 answer = resolver.query(input_str, query_type) 1491 duration = time.time() - start 1492 for result in answer: 1493 response = DNSResponse( 1494 query_type=query_type, 1495 duration=duration, 1496 input_str=inaddr, 1497 result_str=str(result.strings), 1498 ) 1499 retval.add(response) 1500 except DNSException as e: 1501 duration = time.time() - start 1502 response = DNSResponse( 1503 input_str=input_str, duration=duration, query_type=query_type 1504 ) 1505 response.has_error = True 1506 response.error_str = e 1507 retval.add(response) 1508 return retval 1509 1510 1511def dns_lookup(input_str, timeout=3, server=""): 1512 """Perform a simple DNS lookup, return results in a dictionary""" 1513 resolver = Resolver() 1514 resolver.timeout = float(timeout) 1515 resolver.lifetime = float(timeout) 1516 if server: 1517 resolver.nameservers = [server] 1518 try: 1519 records = resolver.query(input_str, "A") 1520 return { 1521 "addrs": [ii.address for ii in records], 1522 "error": "", 1523 "name": input_str, 1524 } 1525 except DNSException as e: 1526 return { 1527 "addrs": [], 1528 "error": repr(e), 1529 "name": input_str, 1530 } 1531 1532 1533def dns6_lookup(input_str, timeout=3, server=""): 1534 """Perform a simple DNS lookup, return results in a dictionary""" 1535 resolver = Resolver() 1536 resolver.timeout = float(timeout) 1537 resolver.lifetime = float(timeout) 1538 if server: 1539 resolver.nameservers = [server] 1540 try: 1541 records = resolver.query(input_str, "AAAA") 1542 return { 1543 "addrs": [ii.address for ii in records], 1544 "error": "", 1545 "name": input_str, 1546 } 1547 except DNSException as e: 1548 return { 1549 "addrs": [], 1550 "error": repr(e), 1551 "name": input_str, 1552 } 1553 1554 1555_REVERSE_DNS_REGEX = re.compile(r"^\s*\d+\.\d+\.\d+\.\d+\s*$") 1556 1557 1558def reverse_dns_lookup(input_str, timeout=3, server=""): 1559 """Perform a simple reverse DNS lookup, return results in a dictionary""" 1560 assert _REVERSE_DNS_REGEX.search(input_str), "Invalid address format: '{0}'".format( 1561 input_str 1562 ) 1563 resolver = Resolver() 1564 resolver.timeout = float(timeout) 1565 resolver.lifetime = float(timeout) 1566 if server: 1567 resolver.nameservers = [server] 1568 try: 1569 tmp = input_str.strip().split(".") 1570 tmp.reverse() 1571 inaddr = ".".join(tmp) + ".in-addr.arpa" 1572 records = resolver.query(inaddr, "PTR") 1573 return { 1574 "name": records[0].to_text(), 1575 "lookup": inaddr, 1576 "error": "", 1577 "addr": input_str, 1578 } 1579 except DNSException as e: 1580 return { 1581 "addrs": [], 1582 "lookup": inaddr, 1583 "error": repr(e), 1584 "name": input_str, 1585 } 1586 1587 1588class CiscoRange(MutableSequence): 1589 """Explode Cisco ranges into a list of explicit items... examples below... 1590 1591 Examples 1592 -------- 1593 1594 >>> from ciscoconfparse.ccp_util import CiscoRange 1595 >>> CiscoRange('1-3,5,9-11,13') 1596 <CiscoRange 1-3,5,9-11,13> 1597 >>> for ii in CiscoRange('Eth2/1-3,5,9-10'): 1598 ... print(ii) 1599 ... 1600 Eth2/1 1601 Eth2/2 1602 Eth2/3 1603 Eth2/5 1604 Eth2/9 1605 Eth2/10 1606 >>> CiscoRange('Eth2/1-3,7') 1607 <CiscoRange Eth2/1-3,7> 1608 >>> CiscoRange() 1609 <CiscoRange []> 1610 """ 1611 1612 def __init__(self, text="", result_type=str): 1613 super(CiscoRange, self).__init__() 1614 self.text = text 1615 self.result_type = result_type 1616 if text: 1617 ( 1618 self.line_prefix, 1619 self.slot_prefix, 1620 self.range_text, 1621 ) = self._parse_range_text() 1622 self._list = self._range() 1623 else: 1624 self.line_prefix = "" 1625 self.slot_prefix = "" 1626 self._list = list() 1627 1628 def __repr__(self): 1629 if len(self._list) == 0: 1630 return """<CiscoRange []>""" 1631 else: 1632 return """<CiscoRange {0}>""".format(self.compressed_str) 1633 1634 def __len__(self): 1635 return len(self._list) 1636 1637 def __getitem__(self, ii): 1638 return self._list[ii] 1639 1640 def __delitem__(self, ii): 1641 del self._list[ii] 1642 1643 def __setitem__(self, ii, val): 1644 return self._list[ii] 1645 1646 def __str__(self): 1647 return self.__repr__() 1648 1649 # Github issue #124 1650 def __eq__(self, other): 1651 assert hasattr(other, "line_prefix") 1652 self_prefix_str = self.line_prefix + self.slot_prefix 1653 other_prefix_str = other.line_prefix + other.slot_prefix 1654 cmp1 = self_prefix_str.lower() == other_prefix_str.lower() 1655 cmp2 = sorted(self._list) == sorted(other._list) 1656 return cmp1 and cmp2 1657 1658 def insert(self, ii, val): 1659 ## Insert something at index ii 1660 for idx, obj in enumerate(CiscoRange(val, result_type=self.result_type)): 1661 self._list.insert(ii + idx, obj) 1662 1663 # Prune out any duplicate entries, and sort... 1664 self._list = sorted(map(self.result_type, set(self._list))) 1665 return self 1666 1667 def append(self, val): 1668 list_idx = len(self._list) 1669 self.insert(list_idx, val) 1670 return self 1671 1672 def _normalize_and_split_text(self): 1673 """Split self.text on commas, then remove all common string prefixes in the list (except on the first element). Return a 'normalized' list of strings with common_prefix removed except on the first element in the list (i.e. "Eth1/1,Eth1/4,Eth1/7" -> ["Eth1/1", "4", "7"]).""" 1674 tmp = self.text.split(",") 1675 1676 # Handle case of "Eth1/1,Eth1/5-7"... remove the common_prefix... 1677 common_prefix = os.path.commonprefix(tmp) 1678 1679 # Ensure that we don't capture trailing digits into common_prefix 1680 mm = re.search(r"^(\D.*?)\d*$", common_prefix.strip()) 1681 if mm is not None: 1682 common_prefix = mm.group(1) 1683 # Keep the common_prefix on the first element... 1684 _tmp = [tmp[0]] 1685 1686 # Remove the common_prefix from all other list elements... 1687 for idx, ii in enumerate(tmp): 1688 if idx > 0: 1689 1690 # Unicode is the only type with .isnumeric()... 1691 if sys.version_info < (3, 0, 0): 1692 prefix_removed = unicode(ii[len(common_prefix):], "utf-8") 1693 else: 1694 prefix_removed = ii[len(common_prefix):] 1695 1696 if prefix_removed.isnumeric(): 1697 _tmp.append(prefix_removed) 1698 elif re.search(r"^\d+\s*-\s*\d+$", prefix_removed.strip()): 1699 _tmp.append(prefix_removed) 1700 else: 1701 ERROR = "CiscoRange() couldn't parse '{0}'".format(self.text) 1702 raise ValueError(ERROR) 1703 tmp = _tmp 1704 return tmp 1705 1706 def _parse_range_text(self): 1707 tmp = self._normalize_and_split_text() 1708 1709 mm = _RGX_CISCO_RANGE.search(tmp[0]) 1710 1711 ERROR = "CiscoRange() couldn't parse '{0}'".format(self.text) 1712 assert (mm is not None), ERROR 1713 1714 mm_result = mm.groupdict() 1715 line_prefix = mm_result.get("line_prefix", "") or "" 1716 slot_prefix = mm_result.get("slot_prefix", "") or "" 1717 if len(tmp[1:]) > 1: 1718 range_text = mm_result["range_text"] + "," + ",".join(tmp[1:]) 1719 elif len(tmp[1:]) == 1: 1720 range_text = mm_result["range_text"] + "," + tmp[1] 1721 elif len(tmp[1:]) == 0: 1722 range_text = mm_result["range_text"] 1723 return line_prefix, slot_prefix, range_text 1724 1725 def _parse_dash_range(self, text): 1726 """Parse a dash Cisco range into a discrete list of items""" 1727 retval = list() 1728 for range_atom in text.split(","): 1729 try: 1730 begin, end = range_atom.split("-") 1731 except ValueError: 1732 ## begin and end are the same number 1733 begin, end = range_atom, range_atom 1734 begin, end = int(begin.strip()), int(end.strip()) + 1 1735 assert begin > -1 1736 assert end > begin 1737 retval.extend(range(begin, end)) 1738 return list(set(retval)) 1739 1740 def _range(self): 1741 """Enumerate all values in the CiscoRange()""" 1742 1743 def combine(arg): 1744 return self.line_prefix + self.slot_prefix + str(arg) 1745 1746 return [ 1747 self.result_type(ii) 1748 for ii in map(combine, self._parse_dash_range(self.range_text)) 1749 ] 1750 1751 def remove(self, arg): 1752 remove_obj = CiscoRange(arg) 1753 for ii in remove_obj: 1754 try: 1755 ## Remove arg, even if duplicated... Ref Github issue #126 1756 while True: 1757 index = self.index(self.result_type(ii)) 1758 self.pop(index) 1759 except ValueError: 1760 pass 1761 return self 1762 1763 @property 1764 def as_list(self): 1765 return self._list 1766 1767 ## Github issue #125 1768 @property 1769 def compressed_str(self): 1770 """Return a text string with a compressed csv of values 1771 1772>>> from ciscoconfparse.ccp_util import CiscoRange 1773>>> range_obj = CiscoRange('1,3,5,6,7') 1774>>> range_obj.compressed_str 1775'1,3,5-7' 1776>>> 1777 """ 1778 retval = list() 1779 prefix_str = self.line_prefix.strip() + self.slot_prefix.strip() 1780 prefix_str_len = len(prefix_str) 1781 1782 # Build a list of integers (without prefix_str) 1783 input_str = list() 1784 for ii in self._list: 1785 # Removed try / except which is slower than sys.version_info 1786 if sys.version_info < (3, 0, 0): 1787 unicode_ii = unicode(str(ii)) # Python2.7... 1788 else: 1789 unicode_ii = str(ii) 1790 1791 # Removed this in version 1.5.27 because it's so slow... 1792 #trailing_digits = re.sub(r"^{0}(\d+)$".format(prefix_str), "\g<1>", unicode_ii) 1793 1794 complete_len = len(unicode_ii) 1795 # Assign ii to the trailing number after prefix_str... 1796 # this is much faster than regexp processing... 1797 trailing_digits_len = complete_len - prefix_str_len 1798 trailing_digits = unicode_ii[-1*trailing_digits_len:] 1799 input_str.append(int(trailing_digits)) 1800 1801 if len(input_str) == 0: # Special case, handle empty list 1802 return "" 1803 1804 # source - https://stackoverflow.com/a/51227915/667301 1805 input_str = sorted(list(set(input_str))) 1806 range_list = [input_str[0]] 1807 for ii in range(len(input_str)): 1808 if ii + 1 < len(input_str) and ii - 1 > -1: 1809 if (input_str[ii] - input_str[ii - 1] == 1) and ( 1810 input_str[ii + 1] - input_str[ii] == 1 1811 ): 1812 if range_list[-1] != "-": 1813 range_list += ["-"] 1814 else: 1815 range_list = range_list 1816 else: 1817 range_list += [input_str[ii]] 1818 if len(input_str) > 1: 1819 range_list += [input_str[len(input_str) - 1]] 1820 1821 # Build the return value from range_list... 1822 retval = prefix_str + str(range_list[0]) 1823 for ii in range(1, len(range_list)): 1824 if str(type(range_list[ii])) != str(type(range_list[ii - 1])): 1825 retval += str(range_list[ii]) 1826 else: 1827 retval += "," + str(range_list[ii]) 1828 1829 return retval 1830