1# (c) 2018, NetApp, Inc
2# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
3
4''' unit tests ONTAP Ansible module: na_ontap_user '''
5
6from __future__ import (absolute_import, division, print_function)
7__metaclass__ = type
8import json
9import pytest
10
11from ansible.module_utils import basic
12from ansible.module_utils._text import to_bytes
13from ansible_collections.netapp.ontap.tests.unit.compat import unittest
14from ansible_collections.netapp.ontap.tests.unit.compat.mock import patch
15import ansible_collections.netapp.ontap.plugins.module_utils.netapp as netapp_utils
16
17from ansible_collections.netapp.ontap.plugins.modules.na_ontap_user \
18    import NetAppOntapUser as my_module  # module under test
19
20if not netapp_utils.has_netapp_lib():
21    pytestmark = pytest.mark.skip('skipping as missing required netapp_lib')
22
23# REST API canned responses when mocking send_request
24SRR = {
25    # common responses
26    'is_rest': (200, {}, None),
27    'is_zapi': (400, {}, "Unreachable"),
28    'empty_good': (200, {}, None),
29    'zero_records': (200, {'num_records': 0}, None),
30    'end_of_sequence': (500, None, "Ooops, the UT needs one more SRR response"),
31    'generic_error': (400, None, "Expected error"),
32    'get_uuid': (200, {'owner': {'uuid': 'ansible'}}, None),
33    'get_user_rest': (200,
34                      {'num_records': 1,
35                       'records': [{'owner': {'uuid': 'ansible_vserver'},
36                                    'name': 'abcd'}]}, None),
37    'get_user_details_rest': (200,
38                              {'role': {'name': 'vsadmin'},
39                               'applications': [{'application': 'http'}],
40                               'locked': False}, None)
41}
42
43
44def set_module_args(args):
45    """prepare arguments so that they will be picked up during module creation"""
46    args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
47    basic._ANSIBLE_ARGS = to_bytes(args)  # pylint: disable=protected-access
48
49
50class AnsibleExitJson(Exception):
51    """Exception class to be raised by module.exit_json and caught by the test case"""
52
53
54class AnsibleFailJson(Exception):
55    """Exception class to be raised by module.fail_json and caught by the test case"""
56
57
58def exit_json(*args, **kwargs):  # pylint: disable=unused-argument
59    """function to patch over exit_json; package return data into an exception"""
60    if 'changed' not in kwargs:
61        kwargs['changed'] = False
62    raise AnsibleExitJson(kwargs)
63
64
65def fail_json(*args, **kwargs):  # pylint: disable=unused-argument
66    """function to patch over fail_json; package return data into an exception"""
67    kwargs['failed'] = True
68    raise AnsibleFailJson(kwargs)
69
70
71def set_default_args_rest():
72    return dict({
73        'hostname': 'hostname',
74        'username': 'username',
75        'password': 'password',
76        'name': 'user_name',
77        'vserver': 'vserver',
78        'applications': 'http',
79        'authentication_method': 'password',
80        'role_name': 'vsadmin',
81        'lock_user': 'True',
82    })
83
84
85class MockONTAPConnection(object):
86    ''' mock server connection to ONTAP host '''
87
88    def __init__(self, kind=None, parm1=None, parm2=None):
89        ''' save arguments '''
90        self.type = kind
91        self.parm1 = parm1
92        self.parm2 = parm2
93        self.xml_in = None
94        self.xml_out = None
95
96    def invoke_successfully(self, xml, enable_tunneling):  # pylint: disable=unused-argument
97        ''' mock invoke_successfully returning xml data '''
98        self.xml_in = xml
99        if self.type == 'user':
100            xml = self.build_user_info(self.parm1, self.parm2)
101        elif self.type == 'user_fail':
102            raise netapp_utils.zapi.NaApiError(code='TEST', message="This exception is from the unit test")
103        self.xml_out = xml
104        return xml
105
106    @staticmethod
107    def set_vserver(vserver):
108        '''mock set vserver'''
109
110    @staticmethod
111    def build_user_info(locked, role_name):
112        ''' build xml data for user-info '''
113        xml = netapp_utils.zapi.NaElement('xml')
114        data = {'num-records': 1,
115                'attributes-list': {
116                    'security-login-account-info': {
117                        'is-locked': locked, 'role-name': role_name, 'application': 'console', 'authentication-method': 'password'}}}
118
119        xml.translate_struct(data)
120        print(xml.to_string())
121        return xml
122
123
124class TestMyModule(unittest.TestCase):
125    ''' a group of related Unit Tests '''
126
127    def setUp(self):
128        self.mock_module_helper = patch.multiple(basic.AnsibleModule,
129                                                 exit_json=exit_json,
130                                                 fail_json=fail_json)
131        self.mock_module_helper.start()
132        self.addCleanup(self.mock_module_helper.stop)
133        self.server = MockONTAPConnection()
134        self.onbox = False
135
136    def set_default_args(self, rest=False):
137        if self.onbox:
138            hostname = '10.10.10.10'
139            username = 'username'
140            password = 'password'
141            user_name = 'test'
142            vserver = 'ansible_test'
143            application = 'console'
144            authentication_method = 'password'
145        else:
146            hostname = 'hostname'
147            username = 'username'
148            password = 'password'
149            user_name = 'name'
150            vserver = 'vserver'
151            application = 'console'
152            authentication_method = 'password'
153        if rest:
154            use_rest = 'auto'
155        else:
156            use_rest = 'never'
157
158        return dict({
159            'hostname': hostname,
160            'username': username,
161            'password': password,
162            'use_rest': use_rest,
163            'name': user_name,
164            'vserver': vserver,
165            'applications': application,
166            'authentication_method': authentication_method
167        })
168
169    def test_module_fail_when_required_args_missing(self):
170        ''' required arguments are reported as errors '''
171        with pytest.raises(AnsibleFailJson) as exc:
172            set_module_args({})
173            my_module()
174        print('Info: %s' % exc.value.args[0]['msg'])
175
176    def test_ensure_user_get_called(self):
177        ''' a more interesting test '''
178        module_args = {}
179        module_args.update(self.set_default_args())
180        module_args.update({'role_name': 'test'})
181        set_module_args(module_args)
182        my_obj = my_module()
183        my_obj.server = self.server
184        # app = dict(application='testapp', authentication_methods=['testam'])
185        user_info = my_obj.get_user()
186        print('Info: test_user_get: %s' % repr(user_info))
187        assert user_info is None
188
189    def test_ensure_user_apply_called(self):
190        ''' creating user and checking idempotency '''
191        module_args = {}
192        module_args.update(self.set_default_args())
193        module_args.update({'name': 'create'})
194        module_args.update({'role_name': 'test'})
195        set_module_args(module_args)
196        my_obj = my_module()
197        if not self.onbox:
198            my_obj.server = self.server
199        with pytest.raises(AnsibleExitJson) as exc:
200            my_obj.apply()
201        print('Info: test_user_apply: %s' % repr(exc.value))
202        assert exc.value.args[0]['changed']
203        if not self.onbox:
204            my_obj.server = MockONTAPConnection('user', 'false')
205        with pytest.raises(AnsibleExitJson) as exc:
206            my_obj.apply()
207        print('Info: test_user_apply: %s' % repr(exc.value))
208        assert exc.value.args[0]['changed']
209
210    def test_ensure_user_sp_apply_called(self):
211        ''' creating user with service_processor application and idempotency '''
212        module_args = {}
213        module_args.update(self.set_default_args())
214        module_args.update({'name': 'create'})
215        module_args.update({'role_name': 'test'})
216        module_args.update({'application': 'service-processor'})
217        set_module_args(module_args)
218        my_obj = my_module()
219        if not self.onbox:
220            my_obj.server = self.server
221        with pytest.raises(AnsibleExitJson) as exc:
222            my_obj.apply()
223        print('Info: test_user_sp: %s' % repr(exc.value))
224        assert exc.value.args[0]['changed']
225        if not self.onbox:
226            my_obj.server = MockONTAPConnection('user', 'false')
227        with pytest.raises(AnsibleExitJson) as exc:
228            my_obj.apply()
229        print('Info: test_user_sp: %s' % repr(exc.value))
230        assert exc.value.args[0]['changed']
231        # creating user with service_processor application and idempotency
232        module_args.update({'application': 'service_processor'})
233        set_module_args(module_args)
234        my_obj = my_module()
235        if not self.onbox:
236            my_obj.server = self.server
237        with pytest.raises(AnsibleExitJson) as exc:
238            my_obj.apply()
239        print('Info: test_user_sp: %s' % repr(exc.value))
240        assert exc.value.args[0]['changed']
241        if not self.onbox:
242            my_obj.server = MockONTAPConnection('user', 'false')
243        with pytest.raises(AnsibleExitJson) as exc:
244            my_obj.apply()
245        print('Info: test_user_sp: %s' % repr(exc.value))
246        assert exc.value.args[0]['changed']
247
248    def test_ensure_user_apply_for_delete_called(self):
249        ''' deleting user and checking idempotency '''
250        module_args = {}
251        module_args.update(self.set_default_args())
252        module_args.update({'name': 'create'})
253        module_args.update({'role_name': 'test'})
254        set_module_args(module_args)
255        my_obj = my_module()
256        if not self.onbox:
257            my_obj.server = MockONTAPConnection('user', 'false', 'test')
258        with pytest.raises(AnsibleExitJson) as exc:
259            my_obj.apply()
260        print('Info: test_user_apply: %s' % repr(exc.value))
261        assert not exc.value.args[0]['changed']
262        module_args.update({'state': 'absent'})
263        set_module_args(module_args)
264        my_obj = my_module()
265        if not self.onbox:
266            my_obj.server = MockONTAPConnection('user', 'false', 'test')
267        with pytest.raises(AnsibleExitJson) as exc:
268            my_obj.apply()
269        print('Info: test_user_delete: %s' % repr(exc.value))
270        assert exc.value.args[0]['changed']
271
272    def test_ensure_user_lock_called(self):
273        ''' changing user_lock to True and checking idempotency'''
274        module_args = {}
275        module_args.update(self.set_default_args())
276        module_args.update({'name': 'create'})
277        module_args.update({'role_name': 'test'})
278        module_args.update({'lock_user': 'false'})
279        set_module_args(module_args)
280        my_obj = my_module()
281        if not self.onbox:
282            my_obj.server = MockONTAPConnection('user', 'false', 'test')
283        with pytest.raises(AnsibleExitJson) as exc:
284            my_obj.apply()
285        print('Info: test_user_apply: %s' % repr(exc.value))
286        assert not exc.value.args[0]['changed']
287        module_args.update({'lock_user': 'true'})
288        set_module_args(module_args)
289        my_obj = my_module()
290        if not self.onbox:
291            my_obj.server = MockONTAPConnection('user', 'false')
292        with pytest.raises(AnsibleExitJson) as exc:
293            my_obj.apply()
294        print('Info: test_user_lock: %s' % repr(exc.value))
295        assert exc.value.args[0]['changed']
296
297    def test_ensure_user_unlock_called(self):
298        ''' changing user_lock to False and checking idempotency'''
299        module_args = {}
300        module_args.update(self.set_default_args())
301        module_args.update({'name': 'create'})
302        module_args.update({'role_name': 'test'})
303        module_args.update({'lock_user': 'false'})
304        set_module_args(module_args)
305        my_obj = my_module()
306        if not self.onbox:
307            my_obj.server = MockONTAPConnection('user', 'false', 'test')
308        with pytest.raises(AnsibleExitJson) as exc:
309            my_obj.apply()
310        print('Info: test_user_apply: %s' % repr(exc.value))
311        assert not exc.value.args[0]['changed']
312        module_args.update({'lock_user': 'false'})
313        set_module_args(module_args)
314        my_obj = my_module()
315        if not self.onbox:
316            my_obj.server = MockONTAPConnection('user', 'true', 'test')
317        with pytest.raises(AnsibleExitJson) as exc:
318            my_obj.apply()
319        print('Info: test_user_unlock: %s' % repr(exc.value))
320        assert exc.value.args[0]['changed']
321
322    def test_ensure_user_set_password_called(self):
323        ''' set password '''
324        module_args = {}
325        module_args.update(self.set_default_args())
326        module_args.update({'name': 'create'})
327        module_args.update({'role_name': 'test'})
328        module_args.update({'set_password': '123456'})
329        set_module_args(module_args)
330        my_obj = my_module()
331        if not self.onbox:
332            my_obj.server = MockONTAPConnection('user', 'true')
333        with pytest.raises(AnsibleExitJson) as exc:
334            my_obj.apply()
335        print('Info: test_user_apply: %s' % repr(exc.value))
336        assert exc.value.args[0]['changed']
337
338    def test_ensure_user_role_update_called(self):
339        ''' set password '''
340        module_args = {}
341        module_args.update(self.set_default_args())
342        module_args.update({'name': 'create'})
343        module_args.update({'role_name': 'test123'})
344        module_args.update({'set_password': '123456'})
345        set_module_args(module_args)
346        my_obj = my_module()
347        if not self.onbox:
348            my_obj.server = MockONTAPConnection('user', 'true')
349        with pytest.raises(AnsibleExitJson) as exc:
350            my_obj.apply()
351        print('Info: test_user_apply: %s' % repr(exc.value))
352        assert exc.value.args[0]['changed']
353
354    def test_ensure_user_role_update_additional_application_called(self):
355        ''' set password '''
356        module_args = {}
357        module_args.update(self.set_default_args())
358        module_args.update({'name': 'create'})
359        module_args.update({'role_name': 'test123'})
360        module_args.update({'application': 'http'})
361        module_args.update({'set_password': '123456'})
362        set_module_args(module_args)
363        my_obj = my_module()
364        if not self.onbox:
365            my_obj.server = MockONTAPConnection('user', 'true')
366        with pytest.raises(AnsibleExitJson) as exc:
367            my_obj.apply()
368        print('Info: test_user_apply: %s' % repr(exc.value))
369        assert exc.value.args[0]['changed']
370
371    def test_if_all_methods_catch_exception(self):
372        data = self.set_default_args()
373        data.update({'role_name': 'test'})
374        set_module_args(data)
375        my_obj = my_module()
376        app = dict(application='console', authentication_methods=['password'])
377        if not self.onbox:
378            my_obj.server = MockONTAPConnection('user_fail')
379        with pytest.raises(AnsibleFailJson) as exc:
380            my_obj.get_user()
381        assert 'Error getting user ' in exc.value.args[0]['msg']
382        with pytest.raises(AnsibleFailJson) as exc:
383            my_obj.create_user(app)
384        assert 'Error creating user ' in exc.value.args[0]['msg']
385        with pytest.raises(AnsibleFailJson) as exc:
386            my_obj.lock_given_user()
387        assert 'Error locking user ' in exc.value.args[0]['msg']
388        with pytest.raises(AnsibleFailJson) as exc:
389            my_obj.unlock_given_user()
390        assert 'Error unlocking user ' in exc.value.args[0]['msg']
391        with pytest.raises(AnsibleFailJson) as exc:
392            my_obj.delete_user(app)
393        assert 'Error removing user ' in exc.value.args[0]['msg']
394        with pytest.raises(AnsibleFailJson) as exc:
395            my_obj.change_password()
396        assert 'Error setting password for user ' in exc.value.args[0]['msg']
397        with pytest.raises(AnsibleFailJson) as exc:
398            my_obj.modify_user(app, ['password'])
399        assert 'Error modifying user ' in exc.value.args[0]['msg']
400
401    @patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
402    def test_rest_error_applications_snmp(self, mock_request):
403        data = self.set_default_args(rest=True)
404        data.update({'applications': 'snmp'})
405        data.update({'name': 'create'})
406        data.update({'role_name': 'test123'})
407        data.update({'set_password': '123456'})
408        set_module_args(data)
409        mock_request.side_effect = [
410            SRR['is_rest'],
411            SRR['end_of_sequence']
412        ]
413        with pytest.raises(AnsibleFailJson) as exc:
414            my_module()
415        assert exc.value.args[0]['msg'] == "snmp as application is not supported in REST."
416
417
418@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
419@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
420def test_ensure_user_get_rest_called(mock_request, mock_fail):
421    mock_fail.side_effect = fail_json
422    mock_request.side_effect = [
423        SRR['is_rest'],
424        SRR['get_user_rest'],
425        SRR['end_of_sequence']
426    ]
427    set_module_args(set_default_args_rest())
428    my_obj = my_module()
429    assert my_obj.get_user_rest() is not None
430
431
432@patch('ansible.module_utils.basic.AnsibleModule.exit_json')
433@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
434@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
435def test_ensure_create_user_rest_called(mock_request, mock_fail, mock_exit):
436    mock_fail.side_effect = fail_json
437    mock_exit.side_effect = exit_json
438    mock_request.side_effect = [
439        SRR['is_rest'],
440        SRR['zero_records'],            # get
441        SRR['empty_good'],              # create
442        SRR['end_of_sequence']
443    ]
444    set_module_args(set_default_args_rest())
445    my_obj = my_module()
446    with pytest.raises(AnsibleExitJson) as exc:
447        my_obj.apply()
448    assert exc.value.args[0]['changed']
449
450
451@patch('ansible.module_utils.basic.AnsibleModule.exit_json')
452@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
453@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
454def test_ensure_delete_user_rest_called(mock_request, mock_fail, mock_exit):
455    mock_fail.side_effect = fail_json
456    mock_exit.side_effect = exit_json
457    mock_request.side_effect = [
458        SRR['is_rest'],
459        SRR['get_user_rest'],
460        SRR['get_user_details_rest'],
461        SRR['get_user_rest'],
462        SRR['empty_good'],
463        SRR['end_of_sequence']
464    ]
465    data = {
466        'state': 'absent',
467    }
468    data.update(set_default_args_rest())
469    set_module_args(data)
470    my_obj = my_module()
471    with pytest.raises(AnsibleExitJson) as exc:
472        my_obj.apply()
473    assert exc.value.args[0]['changed']
474
475
476@patch('ansible.module_utils.basic.AnsibleModule.exit_json')
477@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
478@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
479def test_ensure_modify_user_rest_called(mock_request, mock_fail, mock_exit):
480    mock_fail.side_effect = fail_json
481    mock_exit.side_effect = exit_json
482    mock_request.side_effect = [
483        SRR['is_rest'],
484        SRR['get_user_rest'],
485        SRR['get_user_details_rest'],
486        SRR['get_user_rest'],
487        SRR['empty_good'],
488        SRR['end_of_sequence']
489    ]
490    data = {
491        'application': 'ssh',
492    }
493    data.update(set_default_args_rest())
494    set_module_args(data)
495    my_obj = my_module()
496    with pytest.raises(AnsibleExitJson) as exc:
497        my_obj.apply()
498    assert exc.value.args[0]['changed']
499
500
501@patch('ansible.module_utils.basic.AnsibleModule.exit_json')
502@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
503@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
504def test_ensure_lock_unlock_user_rest_called(mock_request, mock_fail, mock_exit):
505    mock_fail.side_effect = fail_json
506    mock_exit.side_effect = exit_json
507    mock_request.side_effect = [
508        SRR['is_rest'],
509        SRR['get_user_rest'],
510        SRR['get_user_details_rest'],
511        SRR['get_user_rest'],
512        SRR['empty_good'],
513        SRR['end_of_sequence']
514    ]
515    data = {
516        'lock_user': 'newvalue',
517    }
518    data.update(set_default_args_rest())
519    set_module_args(data)
520    my_obj = my_module()
521    with pytest.raises(AnsibleExitJson) as exc:
522        my_obj.apply()
523    assert exc.value.args[0]['changed']
524
525
526@patch('ansible.module_utils.basic.AnsibleModule.exit_json')
527@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
528@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
529def test_ensure_change_password_user_rest_called(mock_request, mock_fail, mock_exit):
530    mock_fail.side_effect = fail_json
531    mock_exit.side_effect = exit_json
532    mock_request.side_effect = [
533        SRR['is_rest'],
534        SRR['get_user_rest'],
535        SRR['get_user_details_rest'],
536        SRR['get_user_rest'],
537        SRR['empty_good'],
538        SRR['end_of_sequence']
539    ]
540    data = {
541        'password': 'newvalue',
542    }
543    data.update(set_default_args_rest())
544    set_module_args(data)
545    my_obj = my_module()
546    with pytest.raises(AnsibleExitJson) as exc:
547        my_obj.apply()
548    assert exc.value.args[0]['changed']
549