1# -*- coding: utf-8 -*- 2# 3# Copyright: (c) 2019, F5 Networks Inc. 4# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 5 6from __future__ import (absolute_import, division, print_function) 7__metaclass__ = type 8 9import os 10import json 11import pytest 12import sys 13 14if sys.version_info < (2, 7): 15 pytestmark = pytest.mark.skip("F5 Ansible modules require Python >= 2.7") 16 17from ansible.module_utils.basic import AnsibleModule 18 19from ansible_collections.f5networks.f5_modules.plugins.modules.bigip_firewall_log_profile_network import ( 20 ApiParameters, ModuleParameters, ModuleManager, ArgumentSpec 21) 22from ansible_collections.f5networks.f5_modules.tests.unit.compat import unittest 23from ansible_collections.f5networks.f5_modules.tests.unit.compat.mock import Mock, patch 24from ansible_collections.f5networks.f5_modules.tests.unit.modules.utils import set_module_args 25 26 27fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures') 28fixture_data = {} 29 30 31def load_fixture(name): 32 path = os.path.join(fixture_path, name) 33 34 if path in fixture_data: 35 return fixture_data[path] 36 37 with open(path) as f: 38 data = f.read() 39 40 try: 41 data = json.loads(data) 42 except Exception: 43 pass 44 45 fixture_data[path] = data 46 return data 47 48 49class TestParameters(unittest.TestCase): 50 def test_module_parameters(self): 51 args = dict( 52 profile_name='foo', 53 rate_limit=150000, 54 log_publisher='/Common/foobar', 55 log_tcp_errors=dict( 56 enabled='yes', 57 rate_limit=10000, 58 ), 59 log_tcp_events=dict( 60 enabled='yes', 61 rate_limit=30000, 62 ), 63 log_ip_errors=dict( 64 enabled='yes', 65 rate_limit=60000, 66 ), 67 log_matches_accept_rule=dict( 68 enabled='yes', 69 rate_limit=80000, 70 ), 71 log_matches_drop_rule=dict( 72 enabled='no', 73 rate_limit='indefinite', 74 ), 75 log_matches_reject_rule=dict( 76 enabled='no', 77 rate_limit='indefinite', 78 ), 79 log_format_delimiter='.', 80 log_storage_format='field-list', 81 log_message_fields=['vlan', 'translated_vlan', 'src_ip'] 82 ) 83 84 p = ModuleParameters(params=args) 85 assert p.profile_name == 'foo' 86 assert p.rate_limit == 150000 87 assert p.log_publisher == '/Common/foobar' 88 assert p.log_tcp_events == 'enabled' 89 assert p.rate_tcp_events == 30000 90 assert p.log_ip_errors == 'enabled' 91 assert p.rate_ip_errors == 60000 92 assert p.log_tcp_errors == 'enabled' 93 assert p.rate_tcp_errors == 10000 94 assert p.log_acl_match_accept == 'enabled' 95 assert p.rate_acl_match_accept == 80000 96 assert p.log_acl_match_drop == 'disabled' 97 assert p.rate_acl_match_drop == 4294967295 98 assert p.log_acl_match_reject == 'disabled' 99 assert p.rate_acl_match_reject == 4294967295 100 assert p.log_format_delimiter == '.' 101 assert p.log_storage_format == 'field-list' 102 103 def test_api_parameters(self): 104 args = load_fixture('load_afm_global_network_log_network.json') 105 106 p = ApiParameters(params=args) 107 assert p.rate_limit == 4294967295 108 assert p.log_tcp_events == 'disabled' 109 assert p.rate_tcp_events == 4294967295 110 assert p.log_ip_errors == 'disabled' 111 assert p.rate_ip_errors == 4294967295 112 assert p.log_tcp_errors == 'disabled' 113 assert p.rate_tcp_errors == 4294967295 114 assert p.log_acl_match_accept == 'disabled' 115 assert p.rate_acl_match_accept == 4294967295 116 assert p.log_acl_match_drop == 'disabled' 117 assert p.rate_acl_match_drop == 4294967295 118 assert p.log_acl_match_reject == 'disabled' 119 assert p.rate_acl_match_reject == 4294967295 120 assert p.log_format_delimiter == ',' 121 assert p.log_storage_format == 'none' 122 123 124class TestManager(unittest.TestCase): 125 126 def setUp(self): 127 self.spec = ArgumentSpec() 128 self.p2 = patch('ansible_collections.f5networks.f5_modules.plugins.modules.bigip_firewall_log_profile_network.tmos_version') 129 self.p3 = patch('ansible_collections.f5networks.f5_modules.plugins.modules.bigip_firewall_log_profile_network.send_teem') 130 self.m2 = self.p2.start() 131 self.m2.return_value = '14.1.0' 132 self.m3 = self.p3.start() 133 self.m3.return_value = True 134 135 def tearDown(self): 136 self.p2.stop() 137 self.p3.stop() 138 139 def test_create(self, *args): 140 set_module_args(dict( 141 profile_name='foo', 142 rate_limit=150000, 143 log_publisher='/Common/foobar', 144 provider=dict( 145 server='localhost', 146 password='password', 147 user='admin' 148 ) 149 )) 150 151 module = AnsibleModule( 152 argument_spec=self.spec.argument_spec, 153 supports_check_mode=self.spec.supports_check_mode 154 ) 155 mm = ModuleManager(module=module) 156 157 # Override methods to force specific logic in the module to happen 158 mm.exists = Mock(side_effect=[False, True]) 159 mm.create_on_device = Mock(return_value=True) 160 161 results = mm.exec_module() 162 163 assert results['changed'] is True 164 assert results['rate_limit'] == 150000 165 assert results['log_publisher'] == '/Common/foobar' 166