1# (c) 2014, Maciej Delmanowski <drybjed@gmail.com> 2# 3# This file is part of Ansible 4# 5# Ansible is free software: you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation, either version 3 of the License, or 8# (at your option) any later version. 9# 10# Ansible is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with Ansible. If not, see <http://www.gnu.org/licenses/>. 17 18# Make coding more python3-ish 19from __future__ import absolute_import, division, print_function 20 21__metaclass__ = type 22 23from functools import partial 24import types 25 26try: 27 import netaddr 28except ImportError: 29 # in this case, we'll make the filters return error messages (see bottom) 30 netaddr = None 31else: 32 33 class mac_linux(netaddr.mac_unix): 34 pass 35 36 mac_linux.word_fmt = "%.2x" 37 38from ansible import errors 39 40 41# ---- IP address and network query helpers ---- 42def _empty_ipaddr_query(v, vtype): 43 # We don't have any query to process, so just check what type the user 44 # expects, and return the IP address in a correct format 45 if v: 46 if vtype == "address": 47 return str(v.ip) 48 elif vtype == "network": 49 return str(v) 50 51 52def _first_last(v): 53 if v.size == 2: 54 first_usable = int(netaddr.IPAddress(v.first)) 55 last_usable = int(netaddr.IPAddress(v.last)) 56 return first_usable, last_usable 57 elif v.size > 1: 58 first_usable = int(netaddr.IPAddress(v.first + 1)) 59 last_usable = int(netaddr.IPAddress(v.last - 1)) 60 return first_usable, last_usable 61 62 63def _6to4_query(v, vtype, value): 64 if v.version == 4: 65 66 if v.size == 1: 67 ipconv = str(v.ip) 68 elif v.size > 1: 69 if v.ip != v.network: 70 ipconv = str(v.ip) 71 else: 72 ipconv = False 73 74 if ipaddr(ipconv, "public"): 75 numbers = list(map(int, ipconv.split("."))) 76 77 try: 78 return "2002:{:02x}{:02x}:{:02x}{:02x}::1/48".format(*numbers) 79 except Exception: 80 return False 81 82 elif v.version == 6: 83 if vtype == "address": 84 if ipaddr(str(v), "2002::/16"): 85 return value 86 elif vtype == "network": 87 if v.ip != v.network: 88 if ipaddr(str(v.ip), "2002::/16"): 89 return value 90 else: 91 return False 92 93 94def _ip_query(v): 95 if v.size == 1: 96 return str(v.ip) 97 if v.size > 1: 98 # /31 networks in netaddr have no broadcast address 99 if v.ip != v.network or not v.broadcast: 100 return str(v.ip) 101 102 103def _gateway_query(v): 104 if v.size > 1: 105 if v.ip != v.network: 106 return str(v.ip) + "/" + str(v.prefixlen) 107 108 109def _address_prefix_query(v): 110 if v.size > 1: 111 if v.ip != v.network: 112 return str(v.ip) + "/" + str(v.prefixlen) 113 114 115def _bool_ipaddr_query(v): 116 if v: 117 return True 118 119 120def _broadcast_query(v): 121 if v.size > 2: 122 return str(v.broadcast) 123 124 125def _cidr_query(v): 126 return str(v) 127 128 129def _cidr_lookup_query(v, iplist, value): 130 try: 131 if v in iplist: 132 return value 133 except Exception: 134 return False 135 136 137def _first_usable_query(v, vtype): 138 if vtype == "address": 139 "Does it make sense to raise an error" 140 raise errors.AnsibleFilterError("Not a network address") 141 elif vtype == "network": 142 if v.size == 2: 143 return str(netaddr.IPAddress(int(v.network))) 144 elif v.size > 1: 145 return str(netaddr.IPAddress(int(v.network) + 1)) 146 147 148def _host_query(v): 149 if v.size == 1: 150 return str(v) 151 elif v.size > 1: 152 if v.ip != v.network: 153 return str(v.ip) + "/" + str(v.prefixlen) 154 155 156def _hostmask_query(v): 157 return str(v.hostmask) 158 159 160def _int_query(v, vtype): 161 if vtype == "address": 162 return int(v.ip) 163 elif vtype == "network": 164 return str(int(v.ip)) + "/" + str(int(v.prefixlen)) 165 166 167def _ip_prefix_query(v): 168 if v.size == 2: 169 return str(v.ip) + "/" + str(v.prefixlen) 170 elif v.size > 1: 171 if v.ip != v.network: 172 return str(v.ip) + "/" + str(v.prefixlen) 173 174 175def _ip_netmask_query(v): 176 if v.size == 2: 177 return str(v.ip) + " " + str(v.netmask) 178 elif v.size > 1: 179 if v.ip != v.network: 180 return str(v.ip) + " " + str(v.netmask) 181 182 183""" 184def _ip_wildcard_query(v): 185 if v.size == 2: 186 return str(v.ip) + ' ' + str(v.hostmask) 187 elif v.size > 1: 188 if v.ip != v.network: 189 return str(v.ip) + ' ' + str(v.hostmask) 190""" 191 192 193def _ipv4_query(v, value): 194 if v.version == 6: 195 try: 196 return str(v.ipv4()) 197 except Exception: 198 return False 199 else: 200 return value 201 202 203def _ipv6_query(v, value): 204 if v.version == 4: 205 return str(v.ipv6()) 206 else: 207 return value 208 209 210def _last_usable_query(v, vtype): 211 if vtype == "address": 212 "Does it make sense to raise an error" 213 raise errors.AnsibleFilterError("Not a network address") 214 elif vtype == "network": 215 if v.size > 1: 216 first_usable, last_usable = _first_last(v) 217 return str(netaddr.IPAddress(last_usable)) 218 219 220def _link_local_query(v, value): 221 v_ip = netaddr.IPAddress(str(v.ip)) 222 if v.version == 4: 223 if ipaddr(str(v_ip), "169.254.0.0/24"): 224 return value 225 226 elif v.version == 6: 227 if ipaddr(str(v_ip), "fe80::/10"): 228 return value 229 230 231def _loopback_query(v, value): 232 v_ip = netaddr.IPAddress(str(v.ip)) 233 if v_ip.is_loopback(): 234 return value 235 236 237def _multicast_query(v, value): 238 if v.is_multicast(): 239 return value 240 241 242def _net_query(v): 243 if v.size > 1: 244 if v.ip == v.network: 245 return str(v.network) + "/" + str(v.prefixlen) 246 247 248def _netmask_query(v): 249 return str(v.netmask) 250 251 252def _network_query(v): 253 """Return the network of a given IP or subnet""" 254 return str(v.network) 255 256 257def _network_id_query(v): 258 """Return the network of a given IP or subnet""" 259 return str(v.network) 260 261 262def _network_netmask_query(v): 263 return str(v.network) + " " + str(v.netmask) 264 265 266def _network_wildcard_query(v): 267 return str(v.network) + " " + str(v.hostmask) 268 269 270def _next_usable_query(v, vtype): 271 if vtype == "address": 272 "Does it make sense to raise an error" 273 raise errors.AnsibleFilterError("Not a network address") 274 elif vtype == "network": 275 if v.size > 1: 276 first_usable, last_usable = _first_last(v) 277 next_ip = int(netaddr.IPAddress(int(v.ip) + 1)) 278 if next_ip >= first_usable and next_ip <= last_usable: 279 return str(netaddr.IPAddress(int(v.ip) + 1)) 280 281 282def _peer_query(v, vtype): 283 if vtype == "address": 284 raise errors.AnsibleFilterError("Not a network address") 285 elif vtype == "network": 286 if v.size == 2: 287 return str(netaddr.IPAddress(int(v.ip) ^ 1)) 288 if v.size == 4: 289 if int(v.ip) % 4 == 0: 290 raise errors.AnsibleFilterError( 291 "Network address of /30 has no peer" 292 ) 293 if int(v.ip) % 4 == 3: 294 raise errors.AnsibleFilterError( 295 "Broadcast address of /30 has no peer" 296 ) 297 return str(netaddr.IPAddress(int(v.ip) ^ 3)) 298 raise errors.AnsibleFilterError("Not a point-to-point network") 299 300 301def _prefix_query(v): 302 return int(v.prefixlen) 303 304 305def _previous_usable_query(v, vtype): 306 if vtype == "address": 307 "Does it make sense to raise an error" 308 raise errors.AnsibleFilterError("Not a network address") 309 elif vtype == "network": 310 if v.size > 1: 311 first_usable, last_usable = _first_last(v) 312 previous_ip = int(netaddr.IPAddress(int(v.ip) - 1)) 313 if previous_ip >= first_usable and previous_ip <= last_usable: 314 return str(netaddr.IPAddress(int(v.ip) - 1)) 315 316 317def _private_query(v, value): 318 if v.is_private(): 319 return value 320 321 322def _public_query(v, value): 323 v_ip = netaddr.IPAddress(str(v.ip)) 324 if ( 325 v_ip.is_unicast() 326 and not v_ip.is_private() 327 and not v_ip.is_loopback() 328 and not v_ip.is_netmask() 329 and not v_ip.is_hostmask() 330 ): 331 return value 332 333 334def _range_usable_query(v, vtype): 335 if vtype == "address": 336 "Does it make sense to raise an error" 337 raise errors.AnsibleFilterError("Not a network address") 338 elif vtype == "network": 339 if v.size > 1: 340 first_usable, last_usable = _first_last(v) 341 first_usable = str(netaddr.IPAddress(first_usable)) 342 last_usable = str(netaddr.IPAddress(last_usable)) 343 return "{0}-{1}".format(first_usable, last_usable) 344 345 346def _revdns_query(v): 347 v_ip = netaddr.IPAddress(str(v.ip)) 348 return v_ip.reverse_dns 349 350 351def _size_query(v): 352 return v.size 353 354 355def _size_usable_query(v): 356 if v.size == 1: 357 return 0 358 elif v.size == 2: 359 return 2 360 return v.size - 2 361 362 363def _subnet_query(v): 364 return str(v.cidr) 365 366 367def _type_query(v): 368 if v.size == 1: 369 return "address" 370 if v.size > 1: 371 if v.ip != v.network: 372 return "address" 373 else: 374 return "network" 375 376 377def _unicast_query(v, value): 378 if v.is_unicast(): 379 return value 380 381 382def _version_query(v): 383 return v.version 384 385 386def _wrap_query(v, vtype, value): 387 if v.version == 6: 388 if vtype == "address": 389 return "[" + str(v.ip) + "]" 390 elif vtype == "network": 391 return "[" + str(v.ip) + "]/" + str(v.prefixlen) 392 else: 393 return value 394 395 396# ---- HWaddr query helpers ---- 397def _bare_query(v): 398 v.dialect = netaddr.mac_bare 399 return str(v) 400 401 402def _bool_hwaddr_query(v): 403 if v: 404 return True 405 406 407def _int_hwaddr_query(v): 408 return int(v) 409 410 411def _cisco_query(v): 412 v.dialect = netaddr.mac_cisco 413 return str(v) 414 415 416def _empty_hwaddr_query(v, value): 417 if v: 418 return value 419 420 421def _linux_query(v): 422 v.dialect = mac_linux 423 return str(v) 424 425 426def _postgresql_query(v): 427 v.dialect = netaddr.mac_pgsql 428 return str(v) 429 430 431def _unix_query(v): 432 v.dialect = netaddr.mac_unix 433 return str(v) 434 435 436def _win_query(v): 437 v.dialect = netaddr.mac_eui48 438 return str(v) 439 440 441# ---- IP address and network filters ---- 442 443# Returns a minified list of subnets or a single subnet that spans all of 444# the inputs. 445def cidr_merge(value, action="merge"): 446 if not hasattr(value, "__iter__"): 447 raise errors.AnsibleFilterError( 448 "cidr_merge: expected iterable, got " + repr(value) 449 ) 450 451 if action == "merge": 452 try: 453 return [str(ip) for ip in netaddr.cidr_merge(value)] 454 except Exception as e: 455 raise errors.AnsibleFilterError( 456 "cidr_merge: error in netaddr:\n%s" % e 457 ) 458 459 elif action == "span": 460 # spanning_cidr needs at least two values 461 if len(value) == 0: 462 return None 463 elif len(value) == 1: 464 try: 465 return str(netaddr.IPNetwork(value[0])) 466 except Exception as e: 467 raise errors.AnsibleFilterError( 468 "cidr_merge: error in netaddr:\n%s" % e 469 ) 470 else: 471 try: 472 return str(netaddr.spanning_cidr(value)) 473 except Exception as e: 474 raise errors.AnsibleFilterError( 475 "cidr_merge: error in netaddr:\n%s" % e 476 ) 477 478 else: 479 raise errors.AnsibleFilterError( 480 "cidr_merge: invalid action '%s'" % action 481 ) 482 483 484def ipaddr(value, query="", version=False, alias="ipaddr"): 485 """ Check if string is an IP address or network and filter it """ 486 487 query_func_extra_args = { 488 "": ("vtype",), 489 "6to4": ("vtype", "value"), 490 "cidr_lookup": ("iplist", "value"), 491 "first_usable": ("vtype",), 492 "int": ("vtype",), 493 "ipv4": ("value",), 494 "ipv6": ("value",), 495 "last_usable": ("vtype",), 496 "link-local": ("value",), 497 "loopback": ("value",), 498 "lo": ("value",), 499 "multicast": ("value",), 500 "next_usable": ("vtype",), 501 "peer": ("vtype",), 502 "previous_usable": ("vtype",), 503 "private": ("value",), 504 "public": ("value",), 505 "unicast": ("value",), 506 "range_usable": ("vtype",), 507 "wrap": ("vtype", "value"), 508 } 509 510 query_func_map = { 511 "": _empty_ipaddr_query, 512 "6to4": _6to4_query, 513 "address": _ip_query, 514 "address/prefix": _address_prefix_query, # deprecate 515 "bool": _bool_ipaddr_query, 516 "broadcast": _broadcast_query, 517 "cidr": _cidr_query, 518 "cidr_lookup": _cidr_lookup_query, 519 "first_usable": _first_usable_query, 520 "gateway": _gateway_query, # deprecate 521 "gw": _gateway_query, # deprecate 522 "host": _host_query, 523 "host/prefix": _address_prefix_query, # deprecate 524 "hostmask": _hostmask_query, 525 "hostnet": _gateway_query, # deprecate 526 "int": _int_query, 527 "ip": _ip_query, 528 "ip/prefix": _ip_prefix_query, 529 "ip_netmask": _ip_netmask_query, 530 # 'ip_wildcard': _ip_wildcard_query, built then could not think of use case 531 "ipv4": _ipv4_query, 532 "ipv6": _ipv6_query, 533 "last_usable": _last_usable_query, 534 "link-local": _link_local_query, 535 "lo": _loopback_query, 536 "loopback": _loopback_query, 537 "multicast": _multicast_query, 538 "net": _net_query, 539 "next_usable": _next_usable_query, 540 "netmask": _netmask_query, 541 "network": _network_query, 542 "network_id": _network_id_query, 543 "network/prefix": _subnet_query, 544 "network_netmask": _network_netmask_query, 545 "network_wildcard": _network_wildcard_query, 546 "peer": _peer_query, 547 "prefix": _prefix_query, 548 "previous_usable": _previous_usable_query, 549 "private": _private_query, 550 "public": _public_query, 551 "range_usable": _range_usable_query, 552 "revdns": _revdns_query, 553 "router": _gateway_query, # deprecate 554 "size": _size_query, 555 "size_usable": _size_usable_query, 556 "subnet": _subnet_query, 557 "type": _type_query, 558 "unicast": _unicast_query, 559 "v4": _ipv4_query, 560 "v6": _ipv6_query, 561 "version": _version_query, 562 "wildcard": _hostmask_query, 563 "wrap": _wrap_query, 564 } 565 566 vtype = None 567 568 if not value: 569 return False 570 571 elif value is True: 572 return False 573 574 # Check if value is a list and parse each element 575 elif isinstance(value, (list, tuple, types.GeneratorType)): 576 577 _ret = [] 578 for element in value: 579 if ipaddr(element, str(query), version): 580 _ret.append(ipaddr(element, str(query), version)) 581 582 if _ret: 583 return _ret 584 else: 585 return list() 586 587 # Check if value is a number and convert it to an IP address 588 elif str(value).isdigit(): 589 590 # We don't know what IP version to assume, so let's check IPv4 first, 591 # then IPv6 592 try: 593 if (not version) or (version and version == 4): 594 v = netaddr.IPNetwork("0.0.0.0/0") 595 v.value = int(value) 596 v.prefixlen = 32 597 elif version and version == 6: 598 v = netaddr.IPNetwork("::/0") 599 v.value = int(value) 600 v.prefixlen = 128 601 602 # IPv4 didn't work the first time, so it definitely has to be IPv6 603 except Exception: 604 try: 605 v = netaddr.IPNetwork("::/0") 606 v.value = int(value) 607 v.prefixlen = 128 608 609 # The value is too big for IPv6. Are you a nanobot? 610 except Exception: 611 return False 612 613 # We got an IP address, let's mark it as such 614 value = str(v) 615 vtype = "address" 616 617 # value has not been recognized, check if it's a valid IP string 618 else: 619 try: 620 v = netaddr.IPNetwork(value) 621 622 # value is a valid IP string, check if user specified 623 # CIDR prefix or just an IP address, this will indicate default 624 # output format 625 try: 626 address, prefix = value.split("/") 627 vtype = "network" 628 except Exception: 629 vtype = "address" 630 631 # value hasn't been recognized, maybe it's a numerical CIDR? 632 except Exception: 633 try: 634 address, prefix = value.split("/") 635 address.isdigit() 636 address = int(address) 637 prefix.isdigit() 638 prefix = int(prefix) 639 640 # It's not numerical CIDR, give up 641 except Exception: 642 return False 643 644 # It is something, so let's try and build a CIDR from the parts 645 try: 646 v = netaddr.IPNetwork("0.0.0.0/0") 647 v.value = address 648 v.prefixlen = prefix 649 650 # It's not a valid IPv4 CIDR 651 except Exception: 652 try: 653 v = netaddr.IPNetwork("::/0") 654 v.value = address 655 v.prefixlen = prefix 656 657 # It's not a valid IPv6 CIDR. Give up. 658 except Exception: 659 return False 660 661 # We have a valid CIDR, so let's write it in correct format 662 value = str(v) 663 vtype = "network" 664 665 # We have a query string but it's not in the known query types. Check if 666 # that string is a valid subnet, if so, we can check later if given IP 667 # address/network is inside that specific subnet 668 try: 669 # ?? 6to4 and link-local were True here before. Should they still? 670 if ( 671 query 672 and (query not in query_func_map or query == "cidr_lookup") 673 and not str(query).isdigit() 674 and ipaddr(query, "network") 675 ): 676 iplist = netaddr.IPSet([netaddr.IPNetwork(query)]) 677 query = "cidr_lookup" 678 except Exception: 679 pass 680 681 # This code checks if value maches the IP version the user wants, ie. if 682 # it's any version ("ipaddr()"), IPv4 ("ipv4()") or IPv6 ("ipv6()") 683 # If version does not match, return False 684 if version and v.version != version: 685 return False 686 687 extras = [] 688 for arg in query_func_extra_args.get(query, tuple()): 689 extras.append(locals()[arg]) 690 try: 691 return query_func_map[query](v, *extras) 692 except KeyError: 693 try: 694 float(query) 695 if v.size == 1: 696 if vtype == "address": 697 return str(v.ip) 698 elif vtype == "network": 699 return str(v) 700 701 elif v.size > 1: 702 try: 703 return str(v[query]) + "/" + str(v.prefixlen) 704 except Exception: 705 return False 706 707 else: 708 return value 709 710 except Exception: 711 raise errors.AnsibleFilterError( 712 alias + ": unknown filter type: %s" % query 713 ) 714 715 return False 716 717 718def ipmath(value, amount): 719 try: 720 if "/" in value: 721 ip = netaddr.IPNetwork(value).ip 722 else: 723 ip = netaddr.IPAddress(value) 724 except (netaddr.AddrFormatError, ValueError): 725 msg = "You must pass a valid IP address; {0} is invalid".format(value) 726 raise errors.AnsibleFilterError(msg) 727 728 if not isinstance(amount, int): 729 msg = ( 730 "You must pass an integer for arithmetic; " 731 "{0} is not a valid integer" 732 ).format(amount) 733 raise errors.AnsibleFilterError(msg) 734 735 return str(ip + amount) 736 737 738def ipwrap(value, query=""): 739 try: 740 if isinstance(value, (list, tuple, types.GeneratorType)): 741 _ret = [] 742 for element in value: 743 if ipaddr(element, query, version=False, alias="ipwrap"): 744 _ret.append(ipaddr(element, "wrap")) 745 else: 746 _ret.append(element) 747 748 return _ret 749 else: 750 _ret = ipaddr(value, query, version=False, alias="ipwrap") 751 if _ret: 752 return ipaddr(_ret, "wrap") 753 else: 754 return value 755 756 except Exception: 757 return value 758 759 760def ipv4(value, query=""): 761 return ipaddr(value, query, version=4, alias="ipv4") 762 763 764def ipv6(value, query=""): 765 return ipaddr(value, query, version=6, alias="ipv6") 766 767 768# Split given subnet into smaller subnets or find out the biggest subnet of 769# a given IP address with given CIDR prefix 770# Usage: 771# 772# - address or address/prefix | ipsubnet 773# returns CIDR subnet of a given input 774# 775# - address/prefix | ipsubnet(cidr) 776# returns number of possible subnets for given CIDR prefix 777# 778# - address/prefix | ipsubnet(cidr, index) 779# returns new subnet with given CIDR prefix 780# 781# - address | ipsubnet(cidr) 782# returns biggest subnet with given CIDR prefix that address belongs to 783# 784# - address | ipsubnet(cidr, index) 785# returns next indexed subnet which contains given address 786# 787# - address/prefix | ipsubnet(subnet/prefix) 788# return the index of the subnet in the subnet 789def ipsubnet(value, query="", index="x"): 790 """ Manipulate IPv4/IPv6 subnets """ 791 792 try: 793 vtype = ipaddr(value, "type") 794 if vtype == "address": 795 v = ipaddr(value, "cidr") 796 elif vtype == "network": 797 v = ipaddr(value, "subnet") 798 799 value = netaddr.IPNetwork(v) 800 except Exception: 801 return False 802 query_string = str(query) 803 if not query: 804 return str(value) 805 806 elif query_string.isdigit(): 807 vsize = ipaddr(v, "size") 808 query = int(query) 809 810 try: 811 float(index) 812 index = int(index) 813 814 if vsize > 1: 815 try: 816 return str(list(value.subnet(query))[index]) 817 except Exception: 818 return False 819 820 elif vsize == 1: 821 try: 822 return str(value.supernet(query)[index]) 823 except Exception: 824 return False 825 826 except Exception: 827 if vsize > 1: 828 try: 829 return str(len(list(value.subnet(query)))) 830 except Exception: 831 return False 832 833 elif vsize == 1: 834 try: 835 return str(value.supernet(query)[0]) 836 except Exception: 837 return False 838 839 elif query_string: 840 vtype = ipaddr(query, "type") 841 if vtype == "address": 842 v = ipaddr(query, "cidr") 843 elif vtype == "network": 844 v = ipaddr(query, "subnet") 845 else: 846 msg = "You must pass a valid subnet or IP address; {0} is invalid".format( 847 query_string 848 ) 849 raise errors.AnsibleFilterError(msg) 850 query = netaddr.IPNetwork(v) 851 for i, subnet in enumerate(query.subnet(value.prefixlen), 1): 852 if subnet == value: 853 return str(i) 854 msg = "{0} is not in the subnet {1}".format(value.cidr, query.cidr) 855 raise errors.AnsibleFilterError(msg) 856 return False 857 858 859# Returns the nth host within a network described by value. 860# Usage: 861# 862# - address or address/prefix | nthhost(nth) 863# returns the nth host within the given network 864def nthhost(value, query=""): 865 """ Get the nth host within a given network """ 866 try: 867 vtype = ipaddr(value, "type") 868 if vtype == "address": 869 v = ipaddr(value, "cidr") 870 elif vtype == "network": 871 v = ipaddr(value, "subnet") 872 873 value = netaddr.IPNetwork(v) 874 except Exception: 875 return False 876 877 if not query: 878 return False 879 880 try: 881 nth = int(query) 882 if value.size > nth: 883 return value[nth] 884 885 except ValueError: 886 return False 887 888 return False 889 890 891# Returns the next nth usable ip within a network described by value. 892def next_nth_usable(value, offset): 893 try: 894 vtype = ipaddr(value, "type") 895 if vtype == "address": 896 v = ipaddr(value, "cidr") 897 elif vtype == "network": 898 v = ipaddr(value, "subnet") 899 900 v = netaddr.IPNetwork(v) 901 except Exception: 902 return False 903 904 if type(offset) != int: 905 raise errors.AnsibleFilterError("Must pass in an integer") 906 if v.size > 1: 907 first_usable, last_usable = _first_last(v) 908 nth_ip = int(netaddr.IPAddress(int(v.ip) + offset)) 909 if nth_ip >= first_usable and nth_ip <= last_usable: 910 return str(netaddr.IPAddress(int(v.ip) + offset)) 911 912 913# Returns the previous nth usable ip within a network described by value. 914def previous_nth_usable(value, offset): 915 try: 916 vtype = ipaddr(value, "type") 917 if vtype == "address": 918 v = ipaddr(value, "cidr") 919 elif vtype == "network": 920 v = ipaddr(value, "subnet") 921 922 v = netaddr.IPNetwork(v) 923 except Exception: 924 return False 925 926 if type(offset) != int: 927 raise errors.AnsibleFilterError("Must pass in an integer") 928 if v.size > 1: 929 first_usable, last_usable = _first_last(v) 930 nth_ip = int(netaddr.IPAddress(int(v.ip) - offset)) 931 if nth_ip >= first_usable and nth_ip <= last_usable: 932 return str(netaddr.IPAddress(int(v.ip) - offset)) 933 934 935def _range_checker(ip_check, first, last): 936 """ 937 Tests whether an ip address is within the bounds of the first and last address. 938 939 :param ip_check: The ip to test if it is within first and last. 940 :param first: The first IP in the range to test against. 941 :param last: The last IP in the range to test against. 942 943 :return: bool 944 """ 945 if ip_check >= first and ip_check <= last: 946 return True 947 else: 948 return False 949 950 951def _address_normalizer(value): 952 """ 953 Used to validate an address or network type and return it in a consistent format. 954 This is being used for future use cases not currently available such as an address range. 955 956 :param value: The string representation of an address or network. 957 958 :return: The address or network in the normalized form. 959 """ 960 try: 961 vtype = ipaddr(value, "type") 962 if vtype == "address" or vtype == "network": 963 v = ipaddr(value, "subnet") 964 except Exception: 965 return False 966 967 return v 968 969 970def network_in_usable(value, test): 971 """ 972 Checks whether 'test' is a useable address or addresses in 'value' 973 974 :param: value: The string representation of an address or network to test against. 975 :param test: The string representation of an address or network to validate if it is within the range of 'value'. 976 977 :return: bool 978 """ 979 # normalize value and test variables into an ipaddr 980 v = _address_normalizer(value) 981 w = _address_normalizer(test) 982 983 # get first and last addresses as integers to compare value and test; or cathes value when case is /32 984 v_first = ipaddr(ipaddr(v, "first_usable") or ipaddr(v, "address"), "int") 985 v_last = ipaddr(ipaddr(v, "last_usable") or ipaddr(v, "address"), "int") 986 w_first = ipaddr(ipaddr(w, "network") or ipaddr(w, "address"), "int") 987 w_last = ipaddr(ipaddr(w, "broadcast") or ipaddr(w, "address"), "int") 988 989 if _range_checker(w_first, v_first, v_last) and _range_checker( 990 w_last, v_first, v_last 991 ): 992 return True 993 else: 994 return False 995 996 997def network_in_network(value, test): 998 """ 999 Checks whether the 'test' address or addresses are in 'value', including broadcast and network 1000 1001 :param: value: The network address or range to test against. 1002 :param test: The address or network to validate if it is within the range of 'value'. 1003 1004 :return: bool 1005 """ 1006 # normalize value and test variables into an ipaddr 1007 v = _address_normalizer(value) 1008 w = _address_normalizer(test) 1009 1010 # get first and last addresses as integers to compare value and test; or cathes value when case is /32 1011 v_first = ipaddr(ipaddr(v, "network") or ipaddr(v, "address"), "int") 1012 v_last = ipaddr(ipaddr(v, "broadcast") or ipaddr(v, "address"), "int") 1013 w_first = ipaddr(ipaddr(w, "network") or ipaddr(w, "address"), "int") 1014 w_last = ipaddr(ipaddr(w, "broadcast") or ipaddr(w, "address"), "int") 1015 1016 if _range_checker(w_first, v_first, v_last) and _range_checker( 1017 w_last, v_first, v_last 1018 ): 1019 return True 1020 else: 1021 return False 1022 1023 1024def reduce_on_network(value, network): 1025 """ 1026 Reduces a list of addresses to only the addresses that match a given network. 1027 1028 :param: value: The list of addresses to filter on. 1029 :param: network: The network to validate against. 1030 1031 :return: The reduced list of addresses. 1032 """ 1033 # normalize network variable into an ipaddr 1034 n = _address_normalizer(network) 1035 1036 # get first and last addresses as integers to compare value and test; or cathes value when case is /32 1037 n_first = ipaddr(ipaddr(n, "network") or ipaddr(n, "address"), "int") 1038 n_last = ipaddr(ipaddr(n, "broadcast") or ipaddr(n, "address"), "int") 1039 1040 # create an empty list to fill and return 1041 r = [] 1042 1043 for address in value: 1044 # normalize address variables into an ipaddr 1045 a = _address_normalizer(address) 1046 1047 # get first and last addresses as integers to compare value and test; or cathes value when case is /32 1048 a_first = ipaddr(ipaddr(a, "network") or ipaddr(a, "address"), "int") 1049 a_last = ipaddr(ipaddr(a, "broadcast") or ipaddr(a, "address"), "int") 1050 1051 if _range_checker(a_first, n_first, n_last) and _range_checker( 1052 a_last, n_first, n_last 1053 ): 1054 r.append(address) 1055 1056 return r 1057 1058 1059# Returns the SLAAC address within a network for a given HW/MAC address. 1060# Usage: 1061# 1062# - prefix | slaac(mac) 1063def slaac(value, query=""): 1064 """ Get the SLAAC address within given network """ 1065 try: 1066 vtype = ipaddr(value, "type") 1067 if vtype == "address": 1068 v = ipaddr(value, "cidr") 1069 elif vtype == "network": 1070 v = ipaddr(value, "subnet") 1071 1072 if ipaddr(value, "version") != 6: 1073 return False 1074 1075 value = netaddr.IPNetwork(v) 1076 except Exception: 1077 return False 1078 1079 if not query: 1080 return False 1081 1082 try: 1083 mac = hwaddr(query, alias="slaac") 1084 1085 eui = netaddr.EUI(mac) 1086 except Exception: 1087 return False 1088 1089 return eui.ipv6(value.network) 1090 1091 1092# ---- HWaddr / MAC address filters ---- 1093def hwaddr(value, query="", alias="hwaddr"): 1094 """ Check if string is a HW/MAC address and filter it """ 1095 1096 query_func_extra_args = {"": ("value",)} 1097 1098 query_func_map = { 1099 "": _empty_hwaddr_query, 1100 "bare": _bare_query, 1101 "bool": _bool_hwaddr_query, 1102 "int": _int_hwaddr_query, 1103 "cisco": _cisco_query, 1104 "eui48": _win_query, 1105 "linux": _linux_query, 1106 "pgsql": _postgresql_query, 1107 "postgresql": _postgresql_query, 1108 "psql": _postgresql_query, 1109 "unix": _unix_query, 1110 "win": _win_query, 1111 } 1112 1113 try: 1114 v = netaddr.EUI(value) 1115 except Exception: 1116 if query and query != "bool": 1117 raise errors.AnsibleFilterError( 1118 alias + ": not a hardware address: %s" % value 1119 ) 1120 1121 extras = [] 1122 for arg in query_func_extra_args.get(query, tuple()): 1123 extras.append(locals()[arg]) 1124 try: 1125 return query_func_map[query](v, *extras) 1126 except KeyError: 1127 raise errors.AnsibleFilterError( 1128 alias + ": unknown filter type: %s" % query 1129 ) 1130 1131 return False 1132 1133 1134def macaddr(value, query=""): 1135 return hwaddr(value, query, alias="macaddr") 1136 1137 1138def _need_netaddr(f_name, *args, **kwargs): 1139 raise errors.AnsibleFilterError( 1140 "The %s filter requires python's netaddr be " 1141 "installed on the ansible controller" % f_name 1142 ) 1143 1144 1145def ip4_hex(arg, delimiter=""): 1146 """ Convert an IPv4 address to Hexadecimal notation """ 1147 numbers = list(map(int, arg.split("."))) 1148 return "{0:02x}{sep}{1:02x}{sep}{2:02x}{sep}{3:02x}".format( 1149 *numbers, sep=delimiter 1150 ) 1151 1152 1153# ---- Ansible filters ---- 1154class FilterModule(object): 1155 """ IP address and network manipulation filters """ 1156 1157 filter_map = { 1158 # IP addresses and networks 1159 "cidr_merge": cidr_merge, 1160 "ipaddr": ipaddr, 1161 "ipmath": ipmath, 1162 "ipwrap": ipwrap, 1163 "ip4_hex": ip4_hex, 1164 "ipv4": ipv4, 1165 "ipv6": ipv6, 1166 "ipsubnet": ipsubnet, 1167 "next_nth_usable": next_nth_usable, 1168 "network_in_network": network_in_network, 1169 "network_in_usable": network_in_usable, 1170 "reduce_on_network": reduce_on_network, 1171 "nthhost": nthhost, 1172 "previous_nth_usable": previous_nth_usable, 1173 "slaac": slaac, 1174 # MAC / HW addresses 1175 "hwaddr": hwaddr, 1176 "macaddr": macaddr, 1177 } 1178 1179 def filters(self): 1180 if netaddr: 1181 return self.filter_map 1182 else: 1183 # Need to install python's netaddr for these filters to work 1184 return dict( 1185 (f, partial(_need_netaddr, f)) for f in self.filter_map 1186 ) 1187