1# (c) 2018, NetApp, Inc
2# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
3
4''' unit test for ONTAP Kerberos Realm module '''
5
6from __future__ import (absolute_import, division, print_function)
7__metaclass__ = type
8
9import json
10import pytest
11import ansible.module_utils.netapp as netapp_utils
12from ansible.module_utils import basic
13from ansible.module_utils._text import to_bytes
14from ansible.modules.storage.netapp.na_ontap_kerberos_realm \
15    import NetAppOntapKerberosRealm as my_module  # module under test
16from units.compat import unittest
17from units.compat.mock import patch
18from units.compat.mock import Mock
19
20
21if not netapp_utils.has_netapp_lib():
22    pytestmark = pytest.mark.skip('skipping as missing required netapp_lib')
23
24
25def set_module_args(args):
26    """prepare arguments so that they will be picked up during module creation"""
27    args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
28    basic._ANSIBLE_ARGS = to_bytes(args)  # pylint: disable=protected-access
29
30
31class AnsibleExitJson(Exception):
32    """Exception class to be raised by module.exit_json and caught by the test case"""
33    print(Exception)
34    # pass
35
36
37class AnsibleFailJson(Exception):
38    """Exception class to be raised by module.fail_json and caught by the test case"""
39    print(Exception)
40    # pass
41
42
43def exit_json(*args, **kwargs):  # pylint: disable=unused-argument
44    """function to patch over exit_json; package return data into an exception"""
45    if 'changed' not in kwargs:
46        kwargs['changed'] = False
47    raise AnsibleExitJson(kwargs)
48
49
50def fail_json(*args, **kwargs):  # pylint: disable=unused-argument
51    """function to patch over fail_json; package return data into an exception"""
52    kwargs['failed'] = True
53    raise AnsibleFailJson(kwargs)
54
55
56class MockONTAPConnection(object):
57    ''' mock server connection to ONTAP host '''
58
59    def __init__(self, kind=None):
60        ''' save arguments '''
61        self.type = kind
62        self.xml_in = None
63        self.xml_out = None
64
65    def invoke_successfully(self, xml, enable_tunneling):  # pylint: disable=unused-argument
66        ''' mock invoke_successfully returning xml data '''
67        self.xml_in = xml
68
69        if self.type == 'present_realm':
70            xml = self.build_realm()
71        else:
72            xml = self.build_empty_realm()
73
74        self.xml_out = xml
75        return xml
76
77    @staticmethod
78    def build_realm():
79        ''' build xml data for kerberos realm '''
80        xml = netapp_utils.zapi.NaElement('xml')
81        attributes = {
82            'num-records': "1",
83            'attributes-list': {
84                'kerberos-realm': {
85                    'admin-server-ip': "192.168.0.1",
86                    'admin-server-port': "749",
87                    'clock-skew': "5",
88                    'kdc-ip': "192.168.0.1",
89                    'kdc-port': "88",
90                    'kdc-vendor': "other",
91                    'password-server-ip': "192.168.0.1",
92                    'password-server-port': "464",
93                    "permitted-enc-types": {
94                        "string": ["des", "des3", "aes_128", "aes_256"]
95                    },
96                    'realm': "EXAMPLE.COM",
97                    'vserver-name': "vserver0"
98                }
99            }
100        }
101        xml.translate_struct(attributes)
102        return xml
103
104    @staticmethod
105    def build_empty_realm():
106        ''' build xml data for kerberos realm '''
107        xml = netapp_utils.zapi.NaElement('xml')
108        attributes = {
109            'num-records': "0",
110        }
111        xml.translate_struct(attributes)
112        return xml
113
114
115class TestMyModule(unittest.TestCase):
116    ''' a group of related Unit Tests '''
117
118    def setUp(self):
119        self.mock_module_helper = patch.multiple(basic.AnsibleModule,
120                                                 exit_json=exit_json,
121                                                 fail_json=fail_json)
122        self.mock_module_helper.start()
123        self.addCleanup(self.mock_module_helper.stop)
124        self.server = MockONTAPConnection(kind='present_realm')
125
126    @staticmethod
127    def get_kerberos_realm_mock_object(kind=None):
128        """
129        Helper method to return an na_ontap_volume object
130        :param kind: passes this param to MockONTAPConnection()
131        :return: na_ontap_volume object
132        """
133        krbrealm_obj = my_module()
134        krbrealm_obj.asup_log_for_cserver = Mock(return_value=None)
135        krbrealm_obj.cluster = Mock()
136        krbrealm_obj.cluster.invoke_successfully = Mock()
137        if kind is None:
138            krbrealm_obj.server = MockONTAPConnection()
139        else:
140            krbrealm_obj.server = MockONTAPConnection(kind=kind)
141        return krbrealm_obj
142
143    @staticmethod
144    def mock_args():
145        '''Set default arguments'''
146        return dict({
147            'hostname': 'test',
148            'username': 'test_user',
149            'password': 'test_pass!',
150            'https': True,
151            'validate_certs': False
152        })
153
154    @staticmethod
155    def test_module_fail_when_required_args_missing():
156        ''' required arguments are reported as errors '''
157        with pytest.raises(AnsibleFailJson) as exc:
158            set_module_args({})
159            my_module()
160        print('Info: %s' % exc.value.args[0]['msg'])
161
162    def test_module_fail_when_state_present_required_args_missing(self):
163        ''' required arguments are reported as errors '''
164        data = self.mock_args()
165        data['state'] = 'present'
166        data['vserver'] = 'vserver1'
167        data['realm'] = 'NETAPP.COM'
168        with pytest.raises(AnsibleFailJson) as exc:
169            set_module_args(data)
170            my_module()
171        print('Info: %s' % exc.value.args[0]['msg'])
172
173    def test_get_nonexistent_realm(self):
174        ''' Test if get_krbrealm returns None for non-existent kerberos realm '''
175        data = self.mock_args()
176        data['vserver'] = 'none'
177        data['realm'] = 'none'
178        data['state'] = 'present'
179        data['kdc_ip'] = 'none'
180        data['kdc_vendor'] = 'Other'
181        set_module_args(data)
182        result = self.get_kerberos_realm_mock_object().get_krbrealm()
183        assert result is None
184
185    def test_get_existing_realm(self):
186        ''' Test if get_krbrealm returns details for existing kerberos realm '''
187        data = self.mock_args()
188        data['vserver'] = 'vserver0'
189        data['realm'] = 'EXAMPLE.COM'
190        data['state'] = 'present'
191        data['kdc_ip'] = '10.0.0.1'
192        data['kdc_vendor'] = 'Other'
193        set_module_args(data)
194        result = self.get_kerberos_realm_mock_object('present_realm').get_krbrealm()
195        assert result['realm']
196
197    def test_successfully_modify_realm(self):
198        ''' Test modify realm successful for modifying kdc_ip. '''
199        data = self.mock_args()
200        data['vserver'] = 'vserver0'
201        data['realm'] = 'EXAMPLE.COM'
202        data['kdc_ip'] = '192.168.10.10'
203        data['state'] = 'present'
204        data['kdc_ip'] = '10.0.0.1'
205        data['kdc_vendor'] = 'Other'
206        set_module_args(data)
207        with pytest.raises(AnsibleExitJson) as exc:
208            self.get_kerberos_realm_mock_object('present_realm').apply()
209        assert exc.value.args[0]['changed']
210
211    @patch('ansible.modules.storage.netapp.na_ontap_kerberos_realm.NetAppOntapKerberosRealm.delete_krbrealm')
212    def test_successfully_delete_realm(self, delete_krbrealm):
213        ''' Test successfully delete realm '''
214        data = self.mock_args()
215        data['state'] = 'absent'
216        data['vserver'] = 'vserver0'
217        data['realm'] = 'EXAMPLE.COM'
218        set_module_args(data)
219        obj = self.get_kerberos_realm_mock_object('present_realm')
220        with pytest.raises(AnsibleExitJson) as exc:
221            obj.apply()
222        assert exc.value.args[0]['changed']
223        delete_krbrealm.assert_called_with()
224
225    @patch('ansible.modules.storage.netapp.na_ontap_kerberos_realm.NetAppOntapKerberosRealm.create_krbrealm')
226    def test_successfully_create_realm(self, create_krbrealm):
227        ''' Test successfully create realm '''
228        data = self.mock_args()
229        data['state'] = 'present'
230        data['vserver'] = 'vserver1'
231        data['realm'] = 'NETAPP.COM'
232        data['kdc_ip'] = '10.0.0.1'
233        data['kdc_vendor'] = 'Other'
234        set_module_args(data)
235        obj = self.get_kerberos_realm_mock_object()
236        with pytest.raises(AnsibleExitJson) as exc:
237            obj.apply()
238        assert exc.value.args[0]['changed']
239        create_krbrealm.assert_called_with()
240