1# -*- coding: utf-8 -*-
2#
3# Copyright: (c) 2019, F5 Networks Inc.
4# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
5
6from __future__ import (absolute_import, division, print_function)
7__metaclass__ = type
8
9import os
10import json
11import pytest
12import sys
13
14if sys.version_info < (2, 7):
15    pytestmark = pytest.mark.skip("F5 Ansible modules require Python >= 2.7")
16
17from ansible.module_utils.basic import AnsibleModule
18
19from ansible_collections.f5networks.f5_modules.plugins.modules.bigip_firewall_log_profile_network import (
20    ApiParameters, ModuleParameters, ModuleManager, ArgumentSpec
21)
22from ansible_collections.f5networks.f5_modules.tests.unit.compat import unittest
23from ansible_collections.f5networks.f5_modules.tests.unit.compat.mock import Mock, patch
24from ansible_collections.f5networks.f5_modules.tests.unit.modules.utils import set_module_args
25
26
27fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
28fixture_data = {}
29
30
31def load_fixture(name):
32    path = os.path.join(fixture_path, name)
33
34    if path in fixture_data:
35        return fixture_data[path]
36
37    with open(path) as f:
38        data = f.read()
39
40    try:
41        data = json.loads(data)
42    except Exception:
43        pass
44
45    fixture_data[path] = data
46    return data
47
48
49class TestParameters(unittest.TestCase):
50    def test_module_parameters(self):
51        args = dict(
52            profile_name='foo',
53            rate_limit=150000,
54            log_publisher='/Common/foobar',
55            log_tcp_errors=dict(
56                enabled='yes',
57                rate_limit=10000,
58            ),
59            log_tcp_events=dict(
60                enabled='yes',
61                rate_limit=30000,
62            ),
63            log_ip_errors=dict(
64                enabled='yes',
65                rate_limit=60000,
66            ),
67            log_matches_accept_rule=dict(
68                enabled='yes',
69                rate_limit=80000,
70            ),
71            log_matches_drop_rule=dict(
72                enabled='no',
73                rate_limit='indefinite',
74            ),
75            log_matches_reject_rule=dict(
76                enabled='no',
77                rate_limit='indefinite',
78            ),
79            log_format_delimiter='.',
80            log_storage_format='field-list',
81            log_message_fields=['vlan', 'translated_vlan', 'src_ip']
82        )
83
84        p = ModuleParameters(params=args)
85        assert p.profile_name == 'foo'
86        assert p.rate_limit == 150000
87        assert p.log_publisher == '/Common/foobar'
88        assert p.log_tcp_events == 'enabled'
89        assert p.rate_tcp_events == 30000
90        assert p.log_ip_errors == 'enabled'
91        assert p.rate_ip_errors == 60000
92        assert p.log_tcp_errors == 'enabled'
93        assert p.rate_tcp_errors == 10000
94        assert p.log_acl_match_accept == 'enabled'
95        assert p.rate_acl_match_accept == 80000
96        assert p.log_acl_match_drop == 'disabled'
97        assert p.rate_acl_match_drop == 4294967295
98        assert p.log_acl_match_reject == 'disabled'
99        assert p.rate_acl_match_reject == 4294967295
100        assert p.log_format_delimiter == '.'
101        assert p.log_storage_format == 'field-list'
102
103    def test_api_parameters(self):
104        args = load_fixture('load_afm_global_network_log_network.json')
105
106        p = ApiParameters(params=args)
107        assert p.rate_limit == 4294967295
108        assert p.log_tcp_events == 'disabled'
109        assert p.rate_tcp_events == 4294967295
110        assert p.log_ip_errors == 'disabled'
111        assert p.rate_ip_errors == 4294967295
112        assert p.log_tcp_errors == 'disabled'
113        assert p.rate_tcp_errors == 4294967295
114        assert p.log_acl_match_accept == 'disabled'
115        assert p.rate_acl_match_accept == 4294967295
116        assert p.log_acl_match_drop == 'disabled'
117        assert p.rate_acl_match_drop == 4294967295
118        assert p.log_acl_match_reject == 'disabled'
119        assert p.rate_acl_match_reject == 4294967295
120        assert p.log_format_delimiter == ','
121        assert p.log_storage_format == 'none'
122
123
124class TestManager(unittest.TestCase):
125
126    def setUp(self):
127        self.spec = ArgumentSpec()
128        self.p2 = patch('ansible_collections.f5networks.f5_modules.plugins.modules.bigip_firewall_log_profile_network.tmos_version')
129        self.p3 = patch('ansible_collections.f5networks.f5_modules.plugins.modules.bigip_firewall_log_profile_network.send_teem')
130        self.m2 = self.p2.start()
131        self.m2.return_value = '14.1.0'
132        self.m3 = self.p3.start()
133        self.m3.return_value = True
134
135    def tearDown(self):
136        self.p2.stop()
137        self.p3.stop()
138
139    def test_create(self, *args):
140        set_module_args(dict(
141            profile_name='foo',
142            rate_limit=150000,
143            log_publisher='/Common/foobar',
144            provider=dict(
145                server='localhost',
146                password='password',
147                user='admin'
148            )
149        ))
150
151        module = AnsibleModule(
152            argument_spec=self.spec.argument_spec,
153            supports_check_mode=self.spec.supports_check_mode
154        )
155        mm = ModuleManager(module=module)
156
157        # Override methods to force specific logic in the module to happen
158        mm.exists = Mock(side_effect=[False, True])
159        mm.create_on_device = Mock(return_value=True)
160
161        results = mm.exec_module()
162
163        assert results['changed'] is True
164        assert results['rate_limit'] == 150000
165        assert results['log_publisher'] == '/Common/foobar'
166