1# (c) 2014, Maciej Delmanowski <drybjed@gmail.com>
2#
3# This file is part of Ansible
4#
5# Ansible is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9#
10# Ansible is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
17
18# Make coding more python3-ish
19from __future__ import absolute_import, division, print_function
20
21__metaclass__ = type
22
23from functools import partial
24import types
25
26try:
27    import netaddr
28except ImportError:
29    # in this case, we'll make the filters return error messages (see bottom)
30    netaddr = None
31else:
32
33    class mac_linux(netaddr.mac_unix):
34        pass
35
36    mac_linux.word_fmt = "%.2x"
37
38from ansible import errors
39
40
41# ---- IP address and network query helpers ----
42def _empty_ipaddr_query(v, vtype):
43    # We don't have any query to process, so just check what type the user
44    # expects, and return the IP address in a correct format
45    if v:
46        if vtype == "address":
47            return str(v.ip)
48        elif vtype == "network":
49            return str(v)
50
51
52def _first_last(v):
53    if v.size == 2:
54        first_usable = int(netaddr.IPAddress(v.first))
55        last_usable = int(netaddr.IPAddress(v.last))
56        return first_usable, last_usable
57    elif v.size > 1:
58        first_usable = int(netaddr.IPAddress(v.first + 1))
59        last_usable = int(netaddr.IPAddress(v.last - 1))
60        return first_usable, last_usable
61
62
63def _6to4_query(v, vtype, value):
64    if v.version == 4:
65
66        if v.size == 1:
67            ipconv = str(v.ip)
68        elif v.size > 1:
69            if v.ip != v.network:
70                ipconv = str(v.ip)
71            else:
72                ipconv = False
73
74        if ipaddr(ipconv, "public"):
75            numbers = list(map(int, ipconv.split(".")))
76
77        try:
78            return "2002:{:02x}{:02x}:{:02x}{:02x}::1/48".format(*numbers)
79        except Exception:
80            return False
81
82    elif v.version == 6:
83        if vtype == "address":
84            if ipaddr(str(v), "2002::/16"):
85                return value
86        elif vtype == "network":
87            if v.ip != v.network:
88                if ipaddr(str(v.ip), "2002::/16"):
89                    return value
90            else:
91                return False
92
93
94def _ip_query(v):
95    if v.size == 1:
96        return str(v.ip)
97    if v.size > 1:
98        # /31 networks in netaddr have no broadcast address
99        if v.ip != v.network or not v.broadcast:
100            return str(v.ip)
101
102
103def _gateway_query(v):
104    if v.size > 1:
105        if v.ip != v.network:
106            return str(v.ip) + "/" + str(v.prefixlen)
107
108
109def _address_prefix_query(v):
110    if v.size > 1:
111        if v.ip != v.network:
112            return str(v.ip) + "/" + str(v.prefixlen)
113
114
115def _bool_ipaddr_query(v):
116    if v:
117        return True
118
119
120def _broadcast_query(v):
121    if v.size > 2:
122        return str(v.broadcast)
123
124
125def _cidr_query(v):
126    return str(v)
127
128
129def _cidr_lookup_query(v, iplist, value):
130    try:
131        if v in iplist:
132            return value
133    except Exception:
134        return False
135
136
137def _first_usable_query(v, vtype):
138    if vtype == "address":
139        "Does it make sense to raise an error"
140        raise errors.AnsibleFilterError("Not a network address")
141    elif vtype == "network":
142        if v.size == 2:
143            return str(netaddr.IPAddress(int(v.network)))
144        elif v.size > 1:
145            return str(netaddr.IPAddress(int(v.network) + 1))
146
147
148def _host_query(v):
149    if v.size == 1:
150        return str(v)
151    elif v.size > 1:
152        if v.ip != v.network:
153            return str(v.ip) + "/" + str(v.prefixlen)
154
155
156def _hostmask_query(v):
157    return str(v.hostmask)
158
159
160def _int_query(v, vtype):
161    if vtype == "address":
162        return int(v.ip)
163    elif vtype == "network":
164        return str(int(v.ip)) + "/" + str(int(v.prefixlen))
165
166
167def _ip_prefix_query(v):
168    if v.size == 2:
169        return str(v.ip) + "/" + str(v.prefixlen)
170    elif v.size > 1:
171        if v.ip != v.network:
172            return str(v.ip) + "/" + str(v.prefixlen)
173
174
175def _ip_netmask_query(v):
176    if v.size == 2:
177        return str(v.ip) + " " + str(v.netmask)
178    elif v.size > 1:
179        if v.ip != v.network:
180            return str(v.ip) + " " + str(v.netmask)
181
182
183"""
184def _ip_wildcard_query(v):
185    if v.size == 2:
186        return str(v.ip) + ' ' + str(v.hostmask)
187    elif v.size > 1:
188        if v.ip != v.network:
189            return str(v.ip) + ' ' + str(v.hostmask)
190"""
191
192
193def _ipv4_query(v, value):
194    if v.version == 6:
195        try:
196            return str(v.ipv4())
197        except Exception:
198            return False
199    else:
200        return value
201
202
203def _ipv6_query(v, value):
204    if v.version == 4:
205        return str(v.ipv6())
206    else:
207        return value
208
209
210def _last_usable_query(v, vtype):
211    if vtype == "address":
212        "Does it make sense to raise an error"
213        raise errors.AnsibleFilterError("Not a network address")
214    elif vtype == "network":
215        if v.size > 1:
216            first_usable, last_usable = _first_last(v)
217            return str(netaddr.IPAddress(last_usable))
218
219
220def _link_local_query(v, value):
221    v_ip = netaddr.IPAddress(str(v.ip))
222    if v.version == 4:
223        if ipaddr(str(v_ip), "169.254.0.0/24"):
224            return value
225
226    elif v.version == 6:
227        if ipaddr(str(v_ip), "fe80::/10"):
228            return value
229
230
231def _loopback_query(v, value):
232    v_ip = netaddr.IPAddress(str(v.ip))
233    if v_ip.is_loopback():
234        return value
235
236
237def _multicast_query(v, value):
238    if v.is_multicast():
239        return value
240
241
242def _net_query(v):
243    if v.size > 1:
244        if v.ip == v.network:
245            return str(v.network) + "/" + str(v.prefixlen)
246
247
248def _netmask_query(v):
249    return str(v.netmask)
250
251
252def _network_query(v):
253    """Return the network of a given IP or subnet"""
254    return str(v.network)
255
256
257def _network_id_query(v):
258    """Return the network of a given IP or subnet"""
259    return str(v.network)
260
261
262def _network_netmask_query(v):
263    return str(v.network) + " " + str(v.netmask)
264
265
266def _network_wildcard_query(v):
267    return str(v.network) + " " + str(v.hostmask)
268
269
270def _next_usable_query(v, vtype):
271    if vtype == "address":
272        "Does it make sense to raise an error"
273        raise errors.AnsibleFilterError("Not a network address")
274    elif vtype == "network":
275        if v.size > 1:
276            first_usable, last_usable = _first_last(v)
277            next_ip = int(netaddr.IPAddress(int(v.ip) + 1))
278            if next_ip >= first_usable and next_ip <= last_usable:
279                return str(netaddr.IPAddress(int(v.ip) + 1))
280
281
282def _peer_query(v, vtype):
283    if vtype == "address":
284        raise errors.AnsibleFilterError("Not a network address")
285    elif vtype == "network":
286        if v.size == 2:
287            return str(netaddr.IPAddress(int(v.ip) ^ 1))
288        if v.size == 4:
289            if int(v.ip) % 4 == 0:
290                raise errors.AnsibleFilterError(
291                    "Network address of /30 has no peer"
292                )
293            if int(v.ip) % 4 == 3:
294                raise errors.AnsibleFilterError(
295                    "Broadcast address of /30 has no peer"
296                )
297            return str(netaddr.IPAddress(int(v.ip) ^ 3))
298        raise errors.AnsibleFilterError("Not a point-to-point network")
299
300
301def _prefix_query(v):
302    return int(v.prefixlen)
303
304
305def _previous_usable_query(v, vtype):
306    if vtype == "address":
307        "Does it make sense to raise an error"
308        raise errors.AnsibleFilterError("Not a network address")
309    elif vtype == "network":
310        if v.size > 1:
311            first_usable, last_usable = _first_last(v)
312            previous_ip = int(netaddr.IPAddress(int(v.ip) - 1))
313            if previous_ip >= first_usable and previous_ip <= last_usable:
314                return str(netaddr.IPAddress(int(v.ip) - 1))
315
316
317def _private_query(v, value):
318    if v.is_private():
319        return value
320
321
322def _public_query(v, value):
323    v_ip = netaddr.IPAddress(str(v.ip))
324    if (
325        v_ip.is_unicast()
326        and not v_ip.is_private()
327        and not v_ip.is_loopback()
328        and not v_ip.is_netmask()
329        and not v_ip.is_hostmask()
330    ):
331        return value
332
333
334def _range_usable_query(v, vtype):
335    if vtype == "address":
336        "Does it make sense to raise an error"
337        raise errors.AnsibleFilterError("Not a network address")
338    elif vtype == "network":
339        if v.size > 1:
340            first_usable, last_usable = _first_last(v)
341            first_usable = str(netaddr.IPAddress(first_usable))
342            last_usable = str(netaddr.IPAddress(last_usable))
343            return "{0}-{1}".format(first_usable, last_usable)
344
345
346def _revdns_query(v):
347    v_ip = netaddr.IPAddress(str(v.ip))
348    return v_ip.reverse_dns
349
350
351def _size_query(v):
352    return v.size
353
354
355def _size_usable_query(v):
356    if v.size == 1:
357        return 0
358    elif v.size == 2:
359        return 2
360    return v.size - 2
361
362
363def _subnet_query(v):
364    return str(v.cidr)
365
366
367def _type_query(v):
368    if v.size == 1:
369        return "address"
370    if v.size > 1:
371        if v.ip != v.network:
372            return "address"
373        else:
374            return "network"
375
376
377def _unicast_query(v, value):
378    if v.is_unicast():
379        return value
380
381
382def _version_query(v):
383    return v.version
384
385
386def _wrap_query(v, vtype, value):
387    if v.version == 6:
388        if vtype == "address":
389            return "[" + str(v.ip) + "]"
390        elif vtype == "network":
391            return "[" + str(v.ip) + "]/" + str(v.prefixlen)
392    else:
393        return value
394
395
396# ---- HWaddr query helpers ----
397def _bare_query(v):
398    v.dialect = netaddr.mac_bare
399    return str(v)
400
401
402def _bool_hwaddr_query(v):
403    if v:
404        return True
405
406
407def _int_hwaddr_query(v):
408    return int(v)
409
410
411def _cisco_query(v):
412    v.dialect = netaddr.mac_cisco
413    return str(v)
414
415
416def _empty_hwaddr_query(v, value):
417    if v:
418        return value
419
420
421def _linux_query(v):
422    v.dialect = mac_linux
423    return str(v)
424
425
426def _postgresql_query(v):
427    v.dialect = netaddr.mac_pgsql
428    return str(v)
429
430
431def _unix_query(v):
432    v.dialect = netaddr.mac_unix
433    return str(v)
434
435
436def _win_query(v):
437    v.dialect = netaddr.mac_eui48
438    return str(v)
439
440
441# ---- IP address and network filters ----
442
443# Returns a minified list of subnets or a single subnet that spans all of
444# the inputs.
445def cidr_merge(value, action="merge"):
446    if not hasattr(value, "__iter__"):
447        raise errors.AnsibleFilterError(
448            "cidr_merge: expected iterable, got " + repr(value)
449        )
450
451    if action == "merge":
452        try:
453            return [str(ip) for ip in netaddr.cidr_merge(value)]
454        except Exception as e:
455            raise errors.AnsibleFilterError(
456                "cidr_merge: error in netaddr:\n%s" % e
457            )
458
459    elif action == "span":
460        # spanning_cidr needs at least two values
461        if len(value) == 0:
462            return None
463        elif len(value) == 1:
464            try:
465                return str(netaddr.IPNetwork(value[0]))
466            except Exception as e:
467                raise errors.AnsibleFilterError(
468                    "cidr_merge: error in netaddr:\n%s" % e
469                )
470        else:
471            try:
472                return str(netaddr.spanning_cidr(value))
473            except Exception as e:
474                raise errors.AnsibleFilterError(
475                    "cidr_merge: error in netaddr:\n%s" % e
476                )
477
478    else:
479        raise errors.AnsibleFilterError(
480            "cidr_merge: invalid action '%s'" % action
481        )
482
483
484def ipaddr(value, query="", version=False, alias="ipaddr"):
485    """ Check if string is an IP address or network and filter it """
486
487    query_func_extra_args = {
488        "": ("vtype",),
489        "6to4": ("vtype", "value"),
490        "cidr_lookup": ("iplist", "value"),
491        "first_usable": ("vtype",),
492        "int": ("vtype",),
493        "ipv4": ("value",),
494        "ipv6": ("value",),
495        "last_usable": ("vtype",),
496        "link-local": ("value",),
497        "loopback": ("value",),
498        "lo": ("value",),
499        "multicast": ("value",),
500        "next_usable": ("vtype",),
501        "peer": ("vtype",),
502        "previous_usable": ("vtype",),
503        "private": ("value",),
504        "public": ("value",),
505        "unicast": ("value",),
506        "range_usable": ("vtype",),
507        "wrap": ("vtype", "value"),
508    }
509
510    query_func_map = {
511        "": _empty_ipaddr_query,
512        "6to4": _6to4_query,
513        "address": _ip_query,
514        "address/prefix": _address_prefix_query,  # deprecate
515        "bool": _bool_ipaddr_query,
516        "broadcast": _broadcast_query,
517        "cidr": _cidr_query,
518        "cidr_lookup": _cidr_lookup_query,
519        "first_usable": _first_usable_query,
520        "gateway": _gateway_query,  # deprecate
521        "gw": _gateway_query,  # deprecate
522        "host": _host_query,
523        "host/prefix": _address_prefix_query,  # deprecate
524        "hostmask": _hostmask_query,
525        "hostnet": _gateway_query,  # deprecate
526        "int": _int_query,
527        "ip": _ip_query,
528        "ip/prefix": _ip_prefix_query,
529        "ip_netmask": _ip_netmask_query,
530        # 'ip_wildcard': _ip_wildcard_query, built then could not think of use case
531        "ipv4": _ipv4_query,
532        "ipv6": _ipv6_query,
533        "last_usable": _last_usable_query,
534        "link-local": _link_local_query,
535        "lo": _loopback_query,
536        "loopback": _loopback_query,
537        "multicast": _multicast_query,
538        "net": _net_query,
539        "next_usable": _next_usable_query,
540        "netmask": _netmask_query,
541        "network": _network_query,
542        "network_id": _network_id_query,
543        "network/prefix": _subnet_query,
544        "network_netmask": _network_netmask_query,
545        "network_wildcard": _network_wildcard_query,
546        "peer": _peer_query,
547        "prefix": _prefix_query,
548        "previous_usable": _previous_usable_query,
549        "private": _private_query,
550        "public": _public_query,
551        "range_usable": _range_usable_query,
552        "revdns": _revdns_query,
553        "router": _gateway_query,  # deprecate
554        "size": _size_query,
555        "size_usable": _size_usable_query,
556        "subnet": _subnet_query,
557        "type": _type_query,
558        "unicast": _unicast_query,
559        "v4": _ipv4_query,
560        "v6": _ipv6_query,
561        "version": _version_query,
562        "wildcard": _hostmask_query,
563        "wrap": _wrap_query,
564    }
565
566    vtype = None
567
568    if not value:
569        return False
570
571    elif value is True:
572        return False
573
574    # Check if value is a list and parse each element
575    elif isinstance(value, (list, tuple, types.GeneratorType)):
576
577        _ret = []
578        for element in value:
579            if ipaddr(element, str(query), version):
580                _ret.append(ipaddr(element, str(query), version))
581
582        if _ret:
583            return _ret
584        else:
585            return list()
586
587    # Check if value is a number and convert it to an IP address
588    elif str(value).isdigit():
589
590        # We don't know what IP version to assume, so let's check IPv4 first,
591        # then IPv6
592        try:
593            if (not version) or (version and version == 4):
594                v = netaddr.IPNetwork("0.0.0.0/0")
595                v.value = int(value)
596                v.prefixlen = 32
597            elif version and version == 6:
598                v = netaddr.IPNetwork("::/0")
599                v.value = int(value)
600                v.prefixlen = 128
601
602        # IPv4 didn't work the first time, so it definitely has to be IPv6
603        except Exception:
604            try:
605                v = netaddr.IPNetwork("::/0")
606                v.value = int(value)
607                v.prefixlen = 128
608
609            # The value is too big for IPv6. Are you a nanobot?
610            except Exception:
611                return False
612
613        # We got an IP address, let's mark it as such
614        value = str(v)
615        vtype = "address"
616
617    # value has not been recognized, check if it's a valid IP string
618    else:
619        try:
620            v = netaddr.IPNetwork(value)
621
622            # value is a valid IP string, check if user specified
623            # CIDR prefix or just an IP address, this will indicate default
624            # output format
625            try:
626                address, prefix = value.split("/")
627                vtype = "network"
628            except Exception:
629                vtype = "address"
630
631        # value hasn't been recognized, maybe it's a numerical CIDR?
632        except Exception:
633            try:
634                address, prefix = value.split("/")
635                address.isdigit()
636                address = int(address)
637                prefix.isdigit()
638                prefix = int(prefix)
639
640            # It's not numerical CIDR, give up
641            except Exception:
642                return False
643
644            # It is something, so let's try and build a CIDR from the parts
645            try:
646                v = netaddr.IPNetwork("0.0.0.0/0")
647                v.value = address
648                v.prefixlen = prefix
649
650            # It's not a valid IPv4 CIDR
651            except Exception:
652                try:
653                    v = netaddr.IPNetwork("::/0")
654                    v.value = address
655                    v.prefixlen = prefix
656
657                # It's not a valid IPv6 CIDR. Give up.
658                except Exception:
659                    return False
660
661            # We have a valid CIDR, so let's write it in correct format
662            value = str(v)
663            vtype = "network"
664
665    # We have a query string but it's not in the known query types. Check if
666    # that string is a valid subnet, if so, we can check later if given IP
667    # address/network is inside that specific subnet
668    try:
669        # ?? 6to4 and link-local were True here before.  Should they still?
670        if (
671            query
672            and (query not in query_func_map or query == "cidr_lookup")
673            and not str(query).isdigit()
674            and ipaddr(query, "network")
675        ):
676            iplist = netaddr.IPSet([netaddr.IPNetwork(query)])
677            query = "cidr_lookup"
678    except Exception:
679        pass
680
681    # This code checks if value maches the IP version the user wants, ie. if
682    # it's any version ("ipaddr()"), IPv4 ("ipv4()") or IPv6 ("ipv6()")
683    # If version does not match, return False
684    if version and v.version != version:
685        return False
686
687    extras = []
688    for arg in query_func_extra_args.get(query, tuple()):
689        extras.append(locals()[arg])
690    try:
691        return query_func_map[query](v, *extras)
692    except KeyError:
693        try:
694            float(query)
695            if v.size == 1:
696                if vtype == "address":
697                    return str(v.ip)
698                elif vtype == "network":
699                    return str(v)
700
701            elif v.size > 1:
702                try:
703                    return str(v[query]) + "/" + str(v.prefixlen)
704                except Exception:
705                    return False
706
707            else:
708                return value
709
710        except Exception:
711            raise errors.AnsibleFilterError(
712                alias + ": unknown filter type: %s" % query
713            )
714
715    return False
716
717
718def ipmath(value, amount):
719    try:
720        if "/" in value:
721            ip = netaddr.IPNetwork(value).ip
722        else:
723            ip = netaddr.IPAddress(value)
724    except (netaddr.AddrFormatError, ValueError):
725        msg = "You must pass a valid IP address; {0} is invalid".format(value)
726        raise errors.AnsibleFilterError(msg)
727
728    if not isinstance(amount, int):
729        msg = (
730            "You must pass an integer for arithmetic; "
731            "{0} is not a valid integer"
732        ).format(amount)
733        raise errors.AnsibleFilterError(msg)
734
735    return str(ip + amount)
736
737
738def ipwrap(value, query=""):
739    try:
740        if isinstance(value, (list, tuple, types.GeneratorType)):
741            _ret = []
742            for element in value:
743                if ipaddr(element, query, version=False, alias="ipwrap"):
744                    _ret.append(ipaddr(element, "wrap"))
745                else:
746                    _ret.append(element)
747
748            return _ret
749        else:
750            _ret = ipaddr(value, query, version=False, alias="ipwrap")
751            if _ret:
752                return ipaddr(_ret, "wrap")
753            else:
754                return value
755
756    except Exception:
757        return value
758
759
760def ipv4(value, query=""):
761    return ipaddr(value, query, version=4, alias="ipv4")
762
763
764def ipv6(value, query=""):
765    return ipaddr(value, query, version=6, alias="ipv6")
766
767
768# Split given subnet into smaller subnets or find out the biggest subnet of
769# a given IP address with given CIDR prefix
770# Usage:
771#
772#  - address or address/prefix | ipsubnet
773#      returns CIDR subnet of a given input
774#
775#  - address/prefix | ipsubnet(cidr)
776#      returns number of possible subnets for given CIDR prefix
777#
778#  - address/prefix | ipsubnet(cidr, index)
779#      returns new subnet with given CIDR prefix
780#
781#  - address | ipsubnet(cidr)
782#      returns biggest subnet with given CIDR prefix that address belongs to
783#
784#  - address | ipsubnet(cidr, index)
785#      returns next indexed subnet which contains given address
786#
787#  - address/prefix | ipsubnet(subnet/prefix)
788#      return the index of the subnet in the subnet
789def ipsubnet(value, query="", index="x"):
790    """ Manipulate IPv4/IPv6 subnets """
791
792    try:
793        vtype = ipaddr(value, "type")
794        if vtype == "address":
795            v = ipaddr(value, "cidr")
796        elif vtype == "network":
797            v = ipaddr(value, "subnet")
798
799        value = netaddr.IPNetwork(v)
800    except Exception:
801        return False
802    query_string = str(query)
803    if not query:
804        return str(value)
805
806    elif query_string.isdigit():
807        vsize = ipaddr(v, "size")
808        query = int(query)
809
810        try:
811            float(index)
812            index = int(index)
813
814            if vsize > 1:
815                try:
816                    return str(list(value.subnet(query))[index])
817                except Exception:
818                    return False
819
820            elif vsize == 1:
821                try:
822                    return str(value.supernet(query)[index])
823                except Exception:
824                    return False
825
826        except Exception:
827            if vsize > 1:
828                try:
829                    return str(len(list(value.subnet(query))))
830                except Exception:
831                    return False
832
833            elif vsize == 1:
834                try:
835                    return str(value.supernet(query)[0])
836                except Exception:
837                    return False
838
839    elif query_string:
840        vtype = ipaddr(query, "type")
841        if vtype == "address":
842            v = ipaddr(query, "cidr")
843        elif vtype == "network":
844            v = ipaddr(query, "subnet")
845        else:
846            msg = "You must pass a valid subnet or IP address; {0} is invalid".format(
847                query_string
848            )
849            raise errors.AnsibleFilterError(msg)
850        query = netaddr.IPNetwork(v)
851        for i, subnet in enumerate(query.subnet(value.prefixlen), 1):
852            if subnet == value:
853                return str(i)
854        msg = "{0} is not in the subnet {1}".format(value.cidr, query.cidr)
855        raise errors.AnsibleFilterError(msg)
856    return False
857
858
859# Returns the nth host within a network described by value.
860# Usage:
861#
862#  - address or address/prefix | nthhost(nth)
863#      returns the nth host within the given network
864def nthhost(value, query=""):
865    """ Get the nth host within a given network """
866    try:
867        vtype = ipaddr(value, "type")
868        if vtype == "address":
869            v = ipaddr(value, "cidr")
870        elif vtype == "network":
871            v = ipaddr(value, "subnet")
872
873        value = netaddr.IPNetwork(v)
874    except Exception:
875        return False
876
877    if not query:
878        return False
879
880    try:
881        nth = int(query)
882        if value.size > nth:
883            return value[nth]
884
885    except ValueError:
886        return False
887
888    return False
889
890
891# Returns the next nth usable ip within a network described by value.
892def next_nth_usable(value, offset):
893    try:
894        vtype = ipaddr(value, "type")
895        if vtype == "address":
896            v = ipaddr(value, "cidr")
897        elif vtype == "network":
898            v = ipaddr(value, "subnet")
899
900        v = netaddr.IPNetwork(v)
901    except Exception:
902        return False
903
904    if type(offset) != int:
905        raise errors.AnsibleFilterError("Must pass in an integer")
906    if v.size > 1:
907        first_usable, last_usable = _first_last(v)
908        nth_ip = int(netaddr.IPAddress(int(v.ip) + offset))
909        if nth_ip >= first_usable and nth_ip <= last_usable:
910            return str(netaddr.IPAddress(int(v.ip) + offset))
911
912
913# Returns the previous nth usable ip within a network described by value.
914def previous_nth_usable(value, offset):
915    try:
916        vtype = ipaddr(value, "type")
917        if vtype == "address":
918            v = ipaddr(value, "cidr")
919        elif vtype == "network":
920            v = ipaddr(value, "subnet")
921
922        v = netaddr.IPNetwork(v)
923    except Exception:
924        return False
925
926    if type(offset) != int:
927        raise errors.AnsibleFilterError("Must pass in an integer")
928    if v.size > 1:
929        first_usable, last_usable = _first_last(v)
930        nth_ip = int(netaddr.IPAddress(int(v.ip) - offset))
931        if nth_ip >= first_usable and nth_ip <= last_usable:
932            return str(netaddr.IPAddress(int(v.ip) - offset))
933
934
935def _range_checker(ip_check, first, last):
936    """
937    Tests whether an ip address is within the bounds of the first and last address.
938
939    :param ip_check: The ip to test if it is within first and last.
940    :param first: The first IP in the range to test against.
941    :param last: The last IP in the range to test against.
942
943    :return: bool
944    """
945    if ip_check >= first and ip_check <= last:
946        return True
947    else:
948        return False
949
950
951def _address_normalizer(value):
952    """
953    Used to validate an address or network type and return it in a consistent format.
954    This is being used for future use cases not currently available such as an address range.
955
956    :param value: The string representation of an address or network.
957
958    :return: The address or network in the normalized form.
959    """
960    try:
961        vtype = ipaddr(value, "type")
962        if vtype == "address" or vtype == "network":
963            v = ipaddr(value, "subnet")
964    except Exception:
965        return False
966
967    return v
968
969
970def network_in_usable(value, test):
971    """
972    Checks whether 'test' is a useable address or addresses in 'value'
973
974    :param: value: The string representation of an address or network to test against.
975    :param test: The string representation of an address or network to validate if it is within the range of 'value'.
976
977    :return: bool
978    """
979    # normalize value and test variables into an ipaddr
980    v = _address_normalizer(value)
981    w = _address_normalizer(test)
982
983    # get first and last addresses as integers to compare value and test; or cathes value when case is /32
984    v_first = ipaddr(ipaddr(v, "first_usable") or ipaddr(v, "address"), "int")
985    v_last = ipaddr(ipaddr(v, "last_usable") or ipaddr(v, "address"), "int")
986    w_first = ipaddr(ipaddr(w, "network") or ipaddr(w, "address"), "int")
987    w_last = ipaddr(ipaddr(w, "broadcast") or ipaddr(w, "address"), "int")
988
989    if _range_checker(w_first, v_first, v_last) and _range_checker(
990        w_last, v_first, v_last
991    ):
992        return True
993    else:
994        return False
995
996
997def network_in_network(value, test):
998    """
999    Checks whether the 'test' address or addresses are in 'value', including broadcast and network
1000
1001    :param: value: The network address or range to test against.
1002    :param test: The address or network to validate if it is within the range of 'value'.
1003
1004    :return: bool
1005    """
1006    # normalize value and test variables into an ipaddr
1007    v = _address_normalizer(value)
1008    w = _address_normalizer(test)
1009
1010    # get first and last addresses as integers to compare value and test; or cathes value when case is /32
1011    v_first = ipaddr(ipaddr(v, "network") or ipaddr(v, "address"), "int")
1012    v_last = ipaddr(ipaddr(v, "broadcast") or ipaddr(v, "address"), "int")
1013    w_first = ipaddr(ipaddr(w, "network") or ipaddr(w, "address"), "int")
1014    w_last = ipaddr(ipaddr(w, "broadcast") or ipaddr(w, "address"), "int")
1015
1016    if _range_checker(w_first, v_first, v_last) and _range_checker(
1017        w_last, v_first, v_last
1018    ):
1019        return True
1020    else:
1021        return False
1022
1023
1024def reduce_on_network(value, network):
1025    """
1026    Reduces a list of addresses to only the addresses that match a given network.
1027
1028    :param: value: The list of addresses to filter on.
1029    :param: network: The network to validate against.
1030
1031    :return: The reduced list of addresses.
1032    """
1033    # normalize network variable into an ipaddr
1034    n = _address_normalizer(network)
1035
1036    # get first and last addresses as integers to compare value and test; or cathes value when case is /32
1037    n_first = ipaddr(ipaddr(n, "network") or ipaddr(n, "address"), "int")
1038    n_last = ipaddr(ipaddr(n, "broadcast") or ipaddr(n, "address"), "int")
1039
1040    # create an empty list to fill and return
1041    r = []
1042
1043    for address in value:
1044        # normalize address variables into an ipaddr
1045        a = _address_normalizer(address)
1046
1047        # get first and last addresses as integers to compare value and test; or cathes value when case is /32
1048        a_first = ipaddr(ipaddr(a, "network") or ipaddr(a, "address"), "int")
1049        a_last = ipaddr(ipaddr(a, "broadcast") or ipaddr(a, "address"), "int")
1050
1051        if _range_checker(a_first, n_first, n_last) and _range_checker(
1052            a_last, n_first, n_last
1053        ):
1054            r.append(address)
1055
1056    return r
1057
1058
1059# Returns the SLAAC address within a network for a given HW/MAC address.
1060# Usage:
1061#
1062#  - prefix | slaac(mac)
1063def slaac(value, query=""):
1064    """ Get the SLAAC address within given network """
1065    try:
1066        vtype = ipaddr(value, "type")
1067        if vtype == "address":
1068            v = ipaddr(value, "cidr")
1069        elif vtype == "network":
1070            v = ipaddr(value, "subnet")
1071
1072        if ipaddr(value, "version") != 6:
1073            return False
1074
1075        value = netaddr.IPNetwork(v)
1076    except Exception:
1077        return False
1078
1079    if not query:
1080        return False
1081
1082    try:
1083        mac = hwaddr(query, alias="slaac")
1084
1085        eui = netaddr.EUI(mac)
1086    except Exception:
1087        return False
1088
1089    return eui.ipv6(value.network)
1090
1091
1092# ---- HWaddr / MAC address filters ----
1093def hwaddr(value, query="", alias="hwaddr"):
1094    """ Check if string is a HW/MAC address and filter it """
1095
1096    query_func_extra_args = {"": ("value",)}
1097
1098    query_func_map = {
1099        "": _empty_hwaddr_query,
1100        "bare": _bare_query,
1101        "bool": _bool_hwaddr_query,
1102        "int": _int_hwaddr_query,
1103        "cisco": _cisco_query,
1104        "eui48": _win_query,
1105        "linux": _linux_query,
1106        "pgsql": _postgresql_query,
1107        "postgresql": _postgresql_query,
1108        "psql": _postgresql_query,
1109        "unix": _unix_query,
1110        "win": _win_query,
1111    }
1112
1113    try:
1114        v = netaddr.EUI(value)
1115    except Exception:
1116        if query and query != "bool":
1117            raise errors.AnsibleFilterError(
1118                alias + ": not a hardware address: %s" % value
1119            )
1120
1121    extras = []
1122    for arg in query_func_extra_args.get(query, tuple()):
1123        extras.append(locals()[arg])
1124    try:
1125        return query_func_map[query](v, *extras)
1126    except KeyError:
1127        raise errors.AnsibleFilterError(
1128            alias + ": unknown filter type: %s" % query
1129        )
1130
1131    return False
1132
1133
1134def macaddr(value, query=""):
1135    return hwaddr(value, query, alias="macaddr")
1136
1137
1138def _need_netaddr(f_name, *args, **kwargs):
1139    raise errors.AnsibleFilterError(
1140        "The %s filter requires python's netaddr be "
1141        "installed on the ansible controller" % f_name
1142    )
1143
1144
1145def ip4_hex(arg, delimiter=""):
1146    """ Convert an IPv4 address to Hexadecimal notation """
1147    numbers = list(map(int, arg.split(".")))
1148    return "{0:02x}{sep}{1:02x}{sep}{2:02x}{sep}{3:02x}".format(
1149        *numbers, sep=delimiter
1150    )
1151
1152
1153# ---- Ansible filters ----
1154class FilterModule(object):
1155    """ IP address and network manipulation filters """
1156
1157    filter_map = {
1158        # IP addresses and networks
1159        "cidr_merge": cidr_merge,
1160        "ipaddr": ipaddr,
1161        "ipmath": ipmath,
1162        "ipwrap": ipwrap,
1163        "ip4_hex": ip4_hex,
1164        "ipv4": ipv4,
1165        "ipv6": ipv6,
1166        "ipsubnet": ipsubnet,
1167        "next_nth_usable": next_nth_usable,
1168        "network_in_network": network_in_network,
1169        "network_in_usable": network_in_usable,
1170        "reduce_on_network": reduce_on_network,
1171        "nthhost": nthhost,
1172        "previous_nth_usable": previous_nth_usable,
1173        "slaac": slaac,
1174        # MAC / HW addresses
1175        "hwaddr": hwaddr,
1176        "macaddr": macaddr,
1177    }
1178
1179    def filters(self):
1180        if netaddr:
1181            return self.filter_map
1182        else:
1183            # Need to install python's netaddr for these filters to work
1184            return dict(
1185                (f, partial(_need_netaddr, f)) for f in self.filter_map
1186            )
1187