1# -*- coding: utf-8 -*-
2#
3# Copyright: (c) 2017, F5 Networks Inc.
4# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
5
6from __future__ import (absolute_import, division, print_function)
7__metaclass__ = type
8
9import os
10import json
11import pytest
12import sys
13
14if sys.version_info < (2, 7):
15    pytestmark = pytest.mark.skip("F5 Ansible modules require Python >= 2.7")
16
17from ansible.module_utils.basic import AnsibleModule
18
19from ansible_collections.f5networks.f5_modules.plugins.modules.bigip_data_group import (
20    ModuleParameters, ModuleManager, ExternalManager, InternalManager,
21    ArgumentSpec, RecordsEncoder, RecordsDecoder
22)
23from ansible_collections.f5networks.f5_modules.plugins.module_utils.common import F5ModuleError
24from ansible_collections.f5networks.f5_modules.tests.unit.compat import unittest
25from ansible_collections.f5networks.f5_modules.tests.unit.compat.mock import Mock, patch
26from ansible_collections.f5networks.f5_modules.tests.unit.modules.utils import set_module_args
27
28
29fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
30fixture_data = {}
31
32
33def load_fixture(name):
34    path = os.path.join(fixture_path, name)
35
36    if path in fixture_data:
37        return fixture_data[path]
38
39    with open(path) as f:
40        data = f.read()
41
42    try:
43        data = json.loads(data)
44    except Exception:
45        pass
46
47    fixture_data[path] = data
48    return data
49
50
51class TestAuxClasses(unittest.TestCase):
52    def setUp(self):
53        self.ip_encode = RecordsEncoder(record_type='ip', separator=':=')
54        self.int_encode = RecordsEncoder(record_type='int', separator=':=')
55        self.str_encode = RecordsEncoder(record_type='str', separator=':=')
56        self.ip_decode = RecordsDecoder(record_type='ip', separator=':=')
57        self.int_decode = RecordsDecoder(record_type='int', separator=':=')
58        self.str_decode = RecordsDecoder(record_type='str', separator=':=')
59
60    def test_encode_ipv6_ipv4_cidr_dict(self, *args):
61        dg1 = dict(key='10.0.0.0/8', value='Network1')
62        dg2 = dict(key='2402:6940::/32', value='Network2')
63        dg3 = dict(key='192.168.1.1/32', value='Host1')
64        dg4 = dict(key='2402:9400:1000::/128', value='Host2')
65
66        net1 = self.ip_encode.encode(dg1)
67        net2 = self.ip_encode.encode(dg2)
68        host1 = self.ip_encode.encode(dg3)
69        host2 = self.ip_encode.encode(dg4)
70
71        assert net1 == 'network 10.0.0.0 prefixlen 8 := Network1'
72        assert net2 == 'network 2402:6940:: prefixlen 32 := Network2'
73        assert host1 == 'host 192.168.1.1 := Host1'
74        assert host2 == 'host 2402:9400:1000:: := Host2'
75
76    def test_encode_ipv6_ipv4_cidr_rd_dict(self, *args):
77        dg1 = dict(key='10.0.0.0%11/8', value='Network1')
78        dg2 = dict(key='2402:6940::%11/32', value='Network2')
79        dg3 = dict(key='192.168.1.1%11/32', value='Host1')
80        dg4 = dict(key='2402:9400:1000::%11/128', value='Host2')
81
82        net1 = self.ip_encode.encode(dg1)
83        net2 = self.ip_encode.encode(dg2)
84        host1 = self.ip_encode.encode(dg3)
85        host2 = self.ip_encode.encode(dg4)
86
87        assert net1 == 'network 10.0.0.0%11 prefixlen 8 := Network1'
88        assert net2 == 'network 2402:6940::%11 prefixlen 32 := Network2'
89        assert host1 == 'host 192.168.1.1%11 := Host1'
90        assert host2 == 'host 2402:9400:1000::%11 := Host2'
91
92    def test_encode_ipv6_ipv4_cidr_str(self, *args):
93        dg1 = '10.0.0.0/8'
94        dg2 = '2402:6940::/32 := Network2'
95        dg3 = '192.168.1.1/32 := Host1'
96        dg4 = '2402:9400:1000::/128'
97
98        net1 = self.ip_encode.encode(dg1)
99        net2 = self.ip_encode.encode(dg2)
100        host1 = self.ip_encode.encode(dg3)
101        host2 = self.ip_encode.encode(dg4)
102
103        assert net1 == 'network 10.0.0.0 prefixlen 8 := 10.0.0.0'
104        assert net2 == 'network 2402:6940:: prefixlen 32 := Network2'
105        assert host1 == 'host 192.168.1.1 := Host1'
106        assert host2 == 'host 2402:9400:1000:: := 2402:9400:1000::'
107
108    def test_encode_ipv6_ipv4_cidr_rd_str(self, *args):
109        dg1 = '10.0.0.0%12/8'
110        dg2 = 'network 2402:6940::%12 prefixlen 32 := Network2'
111        dg3 = 'host 192.168.1.1%12 := Host1'
112        dg4 = '2402:9400:1000::%12/128'
113
114        net1 = self.ip_encode.encode(dg1)
115        net2 = self.ip_encode.encode(dg2)
116        host1 = self.ip_encode.encode(dg3)
117        host2 = self.ip_encode.encode(dg4)
118
119        assert net1 == 'network 10.0.0.0%12 prefixlen 8 := 10.0.0.0%12'
120        assert net2 == 'network 2402:6940::%12 prefixlen 32 := Network2'
121        assert host1 == 'host 192.168.1.1%12 := Host1'
122        assert host2 == 'host 2402:9400:1000::%12 := 2402:9400:1000::%12'
123
124    def test_encode_host_net_ptrn_str(self, *args):
125        dg1 = 'network 10.0.0.0 prefixlen 8 := Network1'
126        dg2 = 'network 2402:6940:: prefixlen 32 := Network2'
127        dg3 = 'host 192.168.1.1 := Host1'
128        dg4 = 'host 2402:9400:1000:: := Host2'
129
130        net1 = self.ip_encode.encode(dg1)
131        net2 = self.ip_encode.encode(dg2)
132        host1 = self.ip_encode.encode(dg3)
133        host2 = self.ip_encode.encode(dg4)
134
135        assert net1 == 'network 10.0.0.0 prefixlen 8 := Network1'
136        assert net2 == 'network 2402:6940:: prefixlen 32 := Network2'
137        assert host1 == 'host 192.168.1.1 := Host1'
138        assert host2 == 'host 2402:9400:1000:: := Host2'
139
140    def test_encode_int_and_str(self, *args):
141        dg1 = dict(key=1, value='one')
142        dg2 = '10'
143        dg3 = dict(key='one', value='is_not_the_answer')
144        dg4 = 'fifty'
145
146        int1 = self.int_encode.encode(dg1)
147        int2 = self.int_encode.encode(dg2)
148        str1 = self.str_encode.encode(dg3)
149        str2 = self.str_encode.encode(dg4)
150
151        assert int1 == '1 := one'
152        assert int2 == '10 := ""'
153        assert str1 == 'one := is_not_the_answer'
154        assert str2 == 'fifty := ""'
155
156    def test_decode_ipv6_ipv4_cidr_rd(self, *args):
157        dg1 = 'network 192.168.0.0%11 prefixlen 16 := "Network3"'
158        dg2 = 'network 2402:9400:1000:0::%11 prefixlen 64 := "Network4"'
159        dg3 = 'host 172.16.1.1%11 := "Host3"'
160        dg4 = 'host 2001:0db8:85a3:0000:0000:8a2e:0370:7334%11 := "Host4"'
161
162        net1 = self.ip_decode.decode(dg1)
163        net2 = self.ip_decode.decode(dg2)
164        host1 = self.ip_decode.decode(dg3)
165        host2 = self.ip_decode.decode(dg4)
166
167        assert net1 == dict(name='192.168.0.0%11/16', data='Network3')
168        assert net2 == dict(name='2402:9400:1000:0::%11/64', data='Network4')
169        assert host1 == dict(name='172.16.1.1%11/32', data='Host3')
170        assert host2 == dict(name='2001:0db8:85a3:0000:0000:8a2e:0370:7334%11/128', data='Host4')
171
172    def test_decode_int_str(self, *args):
173        dg1 = '10'
174        dg2 = 'one := "is_not_the_answer"'
175
176        int1 = self.int_decode.decode(dg1)
177        str1 = self.str_decode.decode(dg2)
178
179        assert int1 == dict(name='10', data="")
180        assert str1 == dict(name='one', data='is_not_the_answer')
181
182
183class TestParameters(unittest.TestCase):
184    def test_module_parameters(self):
185        args = dict(
186            name='foo',
187            type='address',
188            delete_data_group_file=False,
189            internal=False,
190            records=[
191                dict(
192                    key='10.10.10.10/32',
193                    value='bar'
194                )
195            ],
196            separator=':=',
197            state='present',
198            partition='Common'
199        )
200
201        p = ModuleParameters(params=args)
202        assert p.name == 'foo'
203        assert p.type == 'ip'
204        assert p.delete_data_group_file is False
205        assert len(p.records) == 1
206        assert 'data' in p.records[0]
207        assert 'name' in p.records[0]
208        assert p.records[0]['data'] == 'bar'
209        assert p.records[0]['name'] == '10.10.10.10/32'
210        assert p.separator == ':='
211        assert p.state == 'present'
212        assert p.partition == 'Common'
213
214
215class TestManager(unittest.TestCase):
216
217    def setUp(self):
218        self.spec = ArgumentSpec()
219        self.p2 = patch('ansible_collections.f5networks.f5_modules.plugins.modules.bigip_data_group.tmos_version')
220        self.p3 = patch('ansible_collections.f5networks.f5_modules.plugins.modules.bigip_data_group.send_teem')
221        self.m2 = self.p2.start()
222        self.m2.return_value = '14.1.0'
223        self.m3 = self.p3.start()
224        self.m3.return_value = True
225
226    def tearDown(self):
227        self.p2.stop()
228        self.p3.stop()
229
230    def test_create_external_datagroup_type_string(self, *args):
231        set_module_args(dict(
232            name='foo',
233            delete_data_group_file=False,
234            internal=False,
235            records_src="{0}/data-group-string.txt".format(fixture_path),
236            separator=':=',
237            state='present',
238            partition='Common',
239            provider=dict(
240                server='localhost',
241                password='password',
242                user='admin'
243            )
244        ))
245
246        module = AnsibleModule(
247            argument_spec=self.spec.argument_spec,
248            supports_check_mode=self.spec.supports_check_mode,
249            mutually_exclusive=self.spec.mutually_exclusive,
250        )
251
252        # Override methods in the specific type of manager
253        mm1 = ExternalManager(module=module, params=module.params)
254        mm1.exists = Mock(side_effect=[False, True])
255        mm1.create_on_device = Mock(return_value=True)
256
257        # Override methods to force specific logic in the module to happen
258        mm0 = ModuleManager(module=module)
259        mm0.get_manager = Mock(return_value=mm1)
260
261        results = mm0.exec_module()
262
263        assert results['changed'] is True
264
265    def test_create_external_incorrect_address_data(self, *args):
266        set_module_args(dict(
267            name='foo',
268            delete_data_group_file=False,
269            internal=False,
270            type='address',
271            records_src="{0}/data-group-string.txt".format(fixture_path),
272            separator=':=',
273            state='present',
274            partition='Common',
275            provider=dict(
276                server='localhost',
277                password='password',
278                user='admin'
279            )
280        ))
281
282        module = AnsibleModule(
283            argument_spec=self.spec.argument_spec,
284            supports_check_mode=self.spec.supports_check_mode,
285            mutually_exclusive=self.spec.mutually_exclusive,
286        )
287
288        # Override methods in the specific type of manager
289        mm1 = ExternalManager(module=module, params=module.params)
290        mm1.exists = Mock(side_effect=[False, True])
291        mm1.create_on_device = Mock(return_value=True)
292
293        # Override methods to force specific logic in the module to happen
294        mm0 = ModuleManager(module=module)
295        mm0.get_manager = Mock(return_value=mm1)
296
297        with pytest.raises(F5ModuleError) as ex:
298            mm0.exec_module()
299
300        assert "When specifying an 'address' type, the value to the left of the separator must be an IP." == str(ex.value)
301
302    def test_create_external_incorrect_integer_data(self, *args):
303        set_module_args(dict(
304            name='foo',
305            delete_data_group_file=False,
306            internal=False,
307            type='integer',
308            records_src="{0}/data-group-string.txt".format(fixture_path),
309            separator=':=',
310            state='present',
311            partition='Common',
312            provider=dict(
313                server='localhost',
314                password='password',
315                user='admin'
316            )
317        ))
318
319        module = AnsibleModule(
320            argument_spec=self.spec.argument_spec,
321            supports_check_mode=self.spec.supports_check_mode,
322            mutually_exclusive=self.spec.mutually_exclusive,
323        )
324
325        # Override methods in the specific type of manager
326        mm1 = ExternalManager(module=module, params=module.params)
327        mm1.exists = Mock(side_effect=[False, True])
328        mm1.create_on_device = Mock(return_value=True)
329
330        # Override methods to force specific logic in the module to happen
331        mm0 = ModuleManager(module=module)
332        mm0.get_manager = Mock(return_value=mm1)
333
334        with pytest.raises(F5ModuleError) as ex:
335            mm0.exec_module()
336
337        assert "When specifying an 'integer' type, the value to the left of the separator must be a number." == str(ex.value)
338
339    def test_remove_data_group_keep_file(self, *args):
340        set_module_args(dict(
341            name='foo',
342            delete_data_group_file=False,
343            internal=False,
344            state='absent',
345            partition='Common',
346            provider=dict(
347                server='localhost',
348                password='password',
349                user='admin'
350            )
351        ))
352
353        module = AnsibleModule(
354            argument_spec=self.spec.argument_spec,
355            supports_check_mode=self.spec.supports_check_mode,
356            mutually_exclusive=self.spec.mutually_exclusive,
357        )
358
359        # Override methods in the specific type of manager
360        mm1 = ExternalManager(module=module, params=module.params)
361        mm1.exists = Mock(side_effect=[True, False])
362        mm1.remove_from_device = Mock(return_value=True)
363        mm1.external_file_exists = Mock(return_value=True)
364
365        # Override methods to force specific logic in the module to happen
366        mm0 = ModuleManager(module=module)
367        mm0.get_manager = Mock(return_value=mm1)
368
369        results = mm0.exec_module()
370
371        assert results['changed'] is True
372
373    def test_remove_data_group_remove_file(self, *args):
374        set_module_args(dict(
375            name='foo',
376            delete_data_group_file=True,
377            internal=False,
378            state='absent',
379            partition='Common',
380            provider=dict(
381                server='localhost',
382                password='password',
383                user='admin'
384            )
385        ))
386
387        module = AnsibleModule(
388            argument_spec=self.spec.argument_spec,
389            supports_check_mode=self.spec.supports_check_mode,
390            mutually_exclusive=self.spec.mutually_exclusive,
391        )
392
393        # Override methods in the specific type of manager
394        mm1 = ExternalManager(module=module, params=module.params)
395        mm1.exists = Mock(side_effect=[True, False])
396        mm1.remove_from_device = Mock(return_value=True)
397        mm1.external_file_exists = Mock(return_value=True)
398        mm1.remove_data_group_file_from_device = Mock(return_value=True)
399
400        # Override methods to force specific logic in the module to happen
401        mm0 = ModuleManager(module=module)
402        mm0.get_manager = Mock(return_value=mm1)
403
404        results = mm0.exec_module()
405
406        assert results['changed'] is True
407
408    def test_create_internal_datagroup_type_string(self, *args):
409        set_module_args(dict(
410            name='foo',
411            delete_data_group_file=False,
412            internal=True,
413            records_src="{0}/data-group-string.txt".format(fixture_path),
414            separator=':=',
415            state='present',
416            partition='Common',
417            provider=dict(
418                server='localhost',
419                password='password',
420                user='admin'
421            )
422        ))
423
424        module = AnsibleModule(
425            argument_spec=self.spec.argument_spec,
426            supports_check_mode=self.spec.supports_check_mode,
427            mutually_exclusive=self.spec.mutually_exclusive,
428        )
429
430        # Override methods in the specific type of manager
431        mm1 = InternalManager(module=module, params=module.params)
432        mm1.exists = Mock(side_effect=[False, True])
433        mm1.create_on_device = Mock(return_value=True)
434
435        # Override methods to force specific logic in the module to happen
436        mm0 = ModuleManager(module=module)
437        mm0.get_manager = Mock(return_value=mm1)
438
439        results = mm0.exec_module()
440
441        assert results['changed'] is True
442
443    def test_create_internal_incorrect_integer_data(self, *args):
444        set_module_args(dict(
445            name='foo',
446            delete_data_group_file=False,
447            internal=True,
448            type='integer',
449            records_src="{0}/data-group-string.txt".format(fixture_path),
450            separator=':=',
451            state='present',
452            partition='Common',
453            provider=dict(
454                server='localhost',
455                password='password',
456                user='admin'
457            )
458        ))
459
460        module = AnsibleModule(
461            argument_spec=self.spec.argument_spec,
462            supports_check_mode=self.spec.supports_check_mode,
463            mutually_exclusive=self.spec.mutually_exclusive,
464        )
465
466        # Override methods in the specific type of manager
467        mm1 = InternalManager(module=module, params=module.params)
468        mm1.exists = Mock(side_effect=[False, True])
469        mm1.create_on_device = Mock(return_value=True)
470
471        # Override methods to force specific logic in the module to happen
472        mm0 = ModuleManager(module=module)
473        mm0.get_manager = Mock(return_value=mm1)
474
475        with pytest.raises(F5ModuleError) as ex:
476            mm0.exec_module()
477
478        assert "When specifying an 'integer' type, the value to the left of the separator must be a number." == str(ex.value)
479
480    def test_create_internal_datagroup_type_integer(self, *args):
481        set_module_args(dict(
482            name='foo',
483            delete_data_group_file=False,
484            internal=True,
485            type='integer',
486            records_src="{0}/data-group-integer.txt".format(fixture_path),
487            separator=':=',
488            state='present',
489            partition='Common',
490            provider=dict(
491                server='localhost',
492                password='password',
493                user='admin'
494            )
495        ))
496
497        module = AnsibleModule(
498            argument_spec=self.spec.argument_spec,
499            supports_check_mode=self.spec.supports_check_mode,
500            mutually_exclusive=self.spec.mutually_exclusive,
501        )
502
503        # Override methods in the specific type of manager
504        mm1 = InternalManager(module=module, params=module.params)
505        mm1.exists = Mock(side_effect=[False, True])
506        mm1.create_on_device = Mock(return_value=True)
507
508        # Override methods to force specific logic in the module to happen
509        mm0 = ModuleManager(module=module)
510        mm0.get_manager = Mock(return_value=mm1)
511
512        results = mm0.exec_module()
513
514        assert results['changed'] is True
515
516    def test_create_internal_datagroup_type_address(self, *args):
517        set_module_args(dict(
518            name='foo',
519            delete_data_group_file=False,
520            internal=True,
521            type='address',
522            records_src="{0}/data-group-address.txt".format(fixture_path),
523            separator=':=',
524            state='present',
525            partition='Common',
526            provider=dict(
527                server='localhost',
528                password='password',
529                user='admin'
530            )
531        ))
532
533        module = AnsibleModule(
534            argument_spec=self.spec.argument_spec,
535            supports_check_mode=self.spec.supports_check_mode,
536            mutually_exclusive=self.spec.mutually_exclusive,
537        )
538
539        # Override methods in the specific type of manager
540        mm1 = InternalManager(module=module, params=module.params)
541        mm1.exists = Mock(side_effect=[False, True])
542        mm1.create_on_device = Mock(return_value=True)
543
544        # Override methods to force specific logic in the module to happen
545        mm0 = ModuleManager(module=module)
546        mm0.get_manager = Mock(return_value=mm1)
547
548        results = mm0.exec_module()
549
550        assert results['changed'] is True
551
552    def test_create_internal_datagroup_type_address_list(self, *args):
553        set_module_args(dict(
554            name='foo',
555            delete_data_group_file=False,
556            internal=True,
557            type='address',
558            records=[
559                dict(
560                    key='10.0.0.0/8',
561                    value='Network1'
562                ),
563                dict(
564                    key='172.16.0.0/12',
565                    value='Network2'
566                ),
567                dict(
568                    key='192.168.20.1/16',
569                    value='Network3'
570                ),
571                dict(
572                    key='192.168.20.1',
573                    value='Host1'
574                ),
575                dict(
576                    key='172.16.1.1',
577                    value='Host2'
578                )
579            ],
580            separator=':=',
581            state='present',
582            partition='Common',
583            provider=dict(
584                server='localhost',
585                password='password',
586                user='admin'
587            )
588        ))
589
590        module = AnsibleModule(
591            argument_spec=self.spec.argument_spec,
592            supports_check_mode=self.spec.supports_check_mode,
593            mutually_exclusive=self.spec.mutually_exclusive,
594        )
595
596        # Override methods in the specific type of manager
597        mm1 = InternalManager(module=module, params=module.params)
598        mm1.exists = Mock(side_effect=[False, True])
599        mm1.create_on_device = Mock(return_value=True)
600
601        # Override methods to force specific logic in the module to happen
602        mm0 = ModuleManager(module=module)
603        mm0.get_manager = Mock(return_value=mm1)
604
605        results = mm0.exec_module()
606
607        assert results['changed'] is True
608