1# (c) 2018, NetApp, Inc
2# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
3
4''' unit tests ONTAP Ansible module: na_ontap_user '''
5
6from __future__ import (absolute_import, division, print_function)
7__metaclass__ = type
8import json
9import pytest
10
11from ansible.module_utils import basic
12from ansible.module_utils._text import to_bytes
13from ansible_collections.netapp.ontap.tests.unit.compat import unittest
14from ansible_collections.netapp.ontap.tests.unit.compat.mock import patch
15import ansible_collections.netapp.ontap.plugins.module_utils.netapp as netapp_utils
16
17from ansible_collections.netapp.ontap.plugins.modules.na_ontap_user \
18    import NetAppOntapUser as my_module  # module under test
19
20if not netapp_utils.has_netapp_lib():
21    pytestmark = pytest.mark.skip('skipping as missing required netapp_lib')
22
23# REST API canned responses when mocking send_request
24SRR = {
25    # common responses
26    'is_rest': (200, {}, None),
27    'is_zapi': (400, {}, "Unreachable"),
28    'empty_good': (200, {}, None),
29    'zero_records': (200, {'num_records': 0}, None),
30    'end_of_sequence': (500, None, "Ooops, the UT needs one more SRR response"),
31    'generic_error': (400, None, "Expected error"),
32    'invalid_value_error': (400, None, {'message': "invalid value service_processor"}),
33    'get_uuid': (200, {'owner': {'uuid': 'ansible'}}, None),
34    'get_user_rest': (200,
35                      {'num_records': 1,
36                       'records': [{'owner': {'uuid': 'ansible_vserver'},
37                                    'name': 'abcd'}]}, None),
38    'get_user_details_rest': (200,
39                              {'role': {'name': 'vsadmin'},
40                               'applications': [{'application': 'http', 'second_authentication_method': 'none'}, ],
41                               'locked': False}, None)
42}
43
44
45def set_module_args(args):
46    """prepare arguments so that they will be picked up during module creation"""
47    args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
48    basic._ANSIBLE_ARGS = to_bytes(args)  # pylint: disable=protected-access
49
50
51class AnsibleExitJson(Exception):
52    """Exception class to be raised by module.exit_json and caught by the test case"""
53
54
55class AnsibleFailJson(Exception):
56    """Exception class to be raised by module.fail_json and caught by the test case"""
57
58
59def exit_json(*args, **kwargs):  # pylint: disable=unused-argument
60    """function to patch over exit_json; package return data into an exception"""
61    if 'changed' not in kwargs:
62        kwargs['changed'] = False
63    raise AnsibleExitJson(kwargs)
64
65
66def fail_json(*args, **kwargs):  # pylint: disable=unused-argument
67    """function to patch over fail_json; package return data into an exception"""
68    kwargs['failed'] = True
69    raise AnsibleFailJson(kwargs)
70
71
72def set_default_args_rest():
73    return dict({
74        'hostname': 'hostname',
75        'username': 'username',
76        'password': 'password',
77        'name': 'user_name',
78        'vserver': 'vserver',
79        'application_dicts':
80            [dict(application='http', authentication_methods=['password']),
81             dict(application='ontapi', authentication_methods=['password'])],
82        'role_name': 'vsadmin',
83        'lock_user': 'True',
84    })
85
86
87class MockONTAPConnection(object):
88    ''' mock server connection to ONTAP host '''
89
90    def __init__(self, kind=None, parm1=None, parm2=None):
91        ''' save arguments '''
92        self.type = kind
93        self.parm1 = parm1
94        self.parm2 = parm2
95        self.xml_in = None
96        self.xml_out = None
97
98    def invoke_successfully(self, xml, enable_tunneling):  # pylint: disable=unused-argument
99        ''' mock invoke_successfully returning xml data '''
100        self.xml_in = xml
101        if self.type == 'user':
102            xml = self.build_user_info(self.parm1, self.parm2)
103        elif self.type == 'user_fail':
104            raise netapp_utils.zapi.NaApiError(code='TEST', message="This exception is from the unit test")
105        self.xml_out = xml
106        return xml
107
108    @staticmethod
109    def set_vserver(vserver):
110        '''mock set vserver'''
111
112    @staticmethod
113    def build_user_info(locked, role_name):
114        ''' build xml data for user-info '''
115        xml = netapp_utils.zapi.NaElement('xml')
116        data = {'num-records': 1,
117                'attributes-list': {
118                    'security-login-account-info': {
119                        'is-locked': locked, 'role-name': role_name, 'application': 'console', 'authentication-method': 'password'}}}
120
121        xml.translate_struct(data)
122        print(xml.to_string())
123        return xml
124
125
126class TestMyModule(unittest.TestCase):
127    ''' a group of related Unit Tests '''
128
129    def setUp(self):
130        self.mock_module_helper = patch.multiple(basic.AnsibleModule,
131                                                 exit_json=exit_json,
132                                                 fail_json=fail_json)
133        self.mock_module_helper.start()
134        self.addCleanup(self.mock_module_helper.stop)
135        self.server = MockONTAPConnection()
136        self.onbox = False
137
138    def set_default_args(self, rest=False):
139        if self.onbox:
140            hostname = '10.10.10.10'
141            username = 'username'
142            password = 'password'
143            user_name = 'test'
144            vserver = 'ansible_test'
145            application = 'console'
146            authentication_method = 'password'
147        else:
148            hostname = 'hostname'
149            username = 'username'
150            password = 'password'
151            user_name = 'name'
152            vserver = 'vserver'
153            application = 'console'
154            authentication_method = 'password'
155        if rest:
156            use_rest = 'auto'
157        else:
158            use_rest = 'never'
159
160        return dict({
161            'hostname': hostname,
162            'username': username,
163            'password': password,
164            'use_rest': use_rest,
165            'name': user_name,
166            'vserver': vserver,
167            'applications': application,
168            'authentication_method': authentication_method
169        })
170
171    def test_module_fail_when_required_args_missing(self):
172        ''' required arguments are reported as errors '''
173        with pytest.raises(AnsibleFailJson) as exc:
174            set_module_args({})
175            my_module()
176        print('Info: %s' % exc.value.args[0]['msg'])
177
178    def test_ensure_user_get_called(self):
179        ''' a more interesting test '''
180        module_args = {}
181        module_args.update(self.set_default_args())
182        module_args.update({'role_name': 'test'})
183        set_module_args(module_args)
184        my_obj = my_module()
185        my_obj.server = self.server
186        user_info = my_obj.get_user()
187        print('Info: test_user_get: %s' % repr(user_info))
188        assert user_info is None
189
190    def test_ensure_user_apply_called(self):
191        ''' creating user and checking idempotency '''
192        module_args = {}
193        module_args.update(self.set_default_args())
194        module_args.update({'name': 'create'})
195        module_args.update({'role_name': 'test'})
196        set_module_args(module_args)
197        my_obj = my_module()
198        if not self.onbox:
199            my_obj.server = self.server
200        with pytest.raises(AnsibleExitJson) as exc:
201            my_obj.apply()
202        print('Info: test_user_apply: %s' % repr(exc.value))
203        assert exc.value.args[0]['changed']
204        if not self.onbox:
205            my_obj.server = MockONTAPConnection('user', 'false')
206        with pytest.raises(AnsibleExitJson) as exc:
207            my_obj.apply()
208        print('Info: test_user_apply: %s' % repr(exc.value))
209        assert exc.value.args[0]['changed']
210
211    def test_ensure_user_sp_apply_called(self):
212        ''' creating user with service_processor application and idempotency '''
213        module_args = {}
214        module_args.update(self.set_default_args())
215        module_args.update({'name': 'create'})
216        module_args.update({'role_name': 'test'})
217        module_args.update({'application': 'service-processor'})
218        set_module_args(module_args)
219        my_obj = my_module()
220        if not self.onbox:
221            my_obj.server = self.server
222        with pytest.raises(AnsibleExitJson) as exc:
223            my_obj.apply()
224        print('Info: test_user_sp: %s' % repr(exc.value))
225        assert exc.value.args[0]['changed']
226        if not self.onbox:
227            my_obj.server = MockONTAPConnection('user', 'false')
228        with pytest.raises(AnsibleExitJson) as exc:
229            my_obj.apply()
230        print('Info: test_user_sp: %s' % repr(exc.value))
231        assert exc.value.args[0]['changed']
232        # creating user with service_processor application and idempotency
233        module_args.update({'application': 'service_processor'})
234        set_module_args(module_args)
235        my_obj = my_module()
236        if not self.onbox:
237            my_obj.server = self.server
238        with pytest.raises(AnsibleExitJson) as exc:
239            my_obj.apply()
240        print('Info: test_user_sp: %s' % repr(exc.value))
241        assert exc.value.args[0]['changed']
242        if not self.onbox:
243            my_obj.server = MockONTAPConnection('user', 'false')
244        with pytest.raises(AnsibleExitJson) as exc:
245            my_obj.apply()
246        print('Info: test_user_sp: %s' % repr(exc.value))
247        assert exc.value.args[0]['changed']
248
249    def test_ensure_user_apply_for_delete_called(self):
250        ''' deleting user and checking idempotency '''
251        module_args = {}
252        module_args.update(self.set_default_args())
253        module_args.update({'name': 'create'})
254        module_args.update({'role_name': 'test'})
255        set_module_args(module_args)
256        my_obj = my_module()
257        if not self.onbox:
258            my_obj.server = MockONTAPConnection('user', 'false', 'test')
259        with pytest.raises(AnsibleExitJson) as exc:
260            my_obj.apply()
261        print('Info: test_user_apply: %s' % repr(exc.value))
262        assert not exc.value.args[0]['changed']
263        module_args.update({'state': 'absent'})
264        set_module_args(module_args)
265        my_obj = my_module()
266        if not self.onbox:
267            my_obj.server = MockONTAPConnection('user', 'false', 'test')
268        with pytest.raises(AnsibleExitJson) as exc:
269            my_obj.apply()
270        print('Info: test_user_delete: %s' % repr(exc.value))
271        assert exc.value.args[0]['changed']
272
273    def test_ensure_user_lock_called(self):
274        ''' changing user_lock to True and checking idempotency'''
275        module_args = {}
276        module_args.update(self.set_default_args())
277        module_args.update({'name': 'create'})
278        module_args.update({'role_name': 'test'})
279        module_args.update({'lock_user': 'false'})
280        set_module_args(module_args)
281        my_obj = my_module()
282        if not self.onbox:
283            my_obj.server = MockONTAPConnection('user', 'false', 'test')
284        with pytest.raises(AnsibleExitJson) as exc:
285            my_obj.apply()
286        print('Info: test_user_apply: %s' % repr(exc.value))
287        assert not exc.value.args[0]['changed']
288        module_args.update({'lock_user': 'true'})
289        set_module_args(module_args)
290        my_obj = my_module()
291        if not self.onbox:
292            my_obj.server = MockONTAPConnection('user', 'false')
293        with pytest.raises(AnsibleExitJson) as exc:
294            my_obj.apply()
295        print('Info: test_user_lock: %s' % repr(exc.value))
296        assert exc.value.args[0]['changed']
297
298    def test_ensure_user_unlock_called(self):
299        ''' changing user_lock to False and checking idempotency'''
300        module_args = {}
301        module_args.update(self.set_default_args())
302        module_args.update({'name': 'create'})
303        module_args.update({'role_name': 'test'})
304        module_args.update({'lock_user': 'false'})
305        set_module_args(module_args)
306        my_obj = my_module()
307        if not self.onbox:
308            my_obj.server = MockONTAPConnection('user', 'false', 'test')
309        with pytest.raises(AnsibleExitJson) as exc:
310            my_obj.apply()
311        print('Info: test_user_apply: %s' % repr(exc.value))
312        assert not exc.value.args[0]['changed']
313        module_args.update({'lock_user': 'false'})
314        set_module_args(module_args)
315        my_obj = my_module()
316        if not self.onbox:
317            my_obj.server = MockONTAPConnection('user', 'true', 'test')
318        with pytest.raises(AnsibleExitJson) as exc:
319            my_obj.apply()
320        print('Info: test_user_unlock: %s' % repr(exc.value))
321        assert exc.value.args[0]['changed']
322
323    def test_ensure_user_set_password_called(self):
324        ''' set password '''
325        module_args = {}
326        module_args.update(self.set_default_args())
327        module_args.update({'name': 'create'})
328        module_args.update({'role_name': 'test'})
329        module_args.update({'set_password': '123456'})
330        set_module_args(module_args)
331        my_obj = my_module()
332        if not self.onbox:
333            my_obj.server = MockONTAPConnection('user', 'true')
334        with pytest.raises(AnsibleExitJson) as exc:
335            my_obj.apply()
336        print('Info: test_user_apply: %s' % repr(exc.value))
337        assert exc.value.args[0]['changed']
338
339    def test_ensure_user_role_update_called(self):
340        ''' set password '''
341        module_args = {}
342        module_args.update(self.set_default_args())
343        module_args.update({'name': 'create'})
344        module_args.update({'role_name': 'test123'})
345        module_args.update({'set_password': '123456'})
346        set_module_args(module_args)
347        my_obj = my_module()
348        if not self.onbox:
349            my_obj.server = MockONTAPConnection('user', 'true')
350        with pytest.raises(AnsibleExitJson) as exc:
351            my_obj.apply()
352        print('Info: test_user_apply: %s' % repr(exc.value))
353        assert exc.value.args[0]['changed']
354
355    def test_ensure_user_role_update_additional_application_called(self):
356        ''' set password '''
357        module_args = {}
358        module_args.update(self.set_default_args())
359        module_args.update({'name': 'create'})
360        module_args.update({'role_name': 'test123'})
361        module_args.update({'application': 'http'})
362        module_args.update({'set_password': '123456'})
363        set_module_args(module_args)
364        my_obj = my_module()
365        if not self.onbox:
366            my_obj.server = MockONTAPConnection('user', 'true')
367        with pytest.raises(AnsibleExitJson) as exc:
368            my_obj.apply()
369        print('Info: test_user_apply: %s' % repr(exc.value))
370        assert exc.value.args[0]['changed']
371
372    def test_if_all_methods_catch_exception(self):
373        data = self.set_default_args()
374        data.update({'role_name': 'test'})
375        set_module_args(data)
376        my_obj = my_module()
377        app = dict(application='console', authentication_methods=['password'])
378        if not self.onbox:
379            my_obj.server = MockONTAPConnection('user_fail')
380        with pytest.raises(AnsibleFailJson) as exc:
381            my_obj.get_user()
382        assert 'Error getting user ' in exc.value.args[0]['msg']
383        with pytest.raises(AnsibleFailJson) as exc:
384            my_obj.create_user(app)
385        assert 'Error creating user ' in exc.value.args[0]['msg']
386        with pytest.raises(AnsibleFailJson) as exc:
387            my_obj.lock_given_user()
388        assert 'Error locking user ' in exc.value.args[0]['msg']
389        with pytest.raises(AnsibleFailJson) as exc:
390            my_obj.unlock_given_user()
391        assert 'Error unlocking user ' in exc.value.args[0]['msg']
392        with pytest.raises(AnsibleFailJson) as exc:
393            my_obj.delete_user(app)
394        assert 'Error removing user ' in exc.value.args[0]['msg']
395        with pytest.raises(AnsibleFailJson) as exc:
396            my_obj.change_password()
397        assert 'Error setting password for user ' in exc.value.args[0]['msg']
398        with pytest.raises(AnsibleFailJson) as exc:
399            my_obj.modify_user(app, ['password'])
400        assert 'Error modifying user ' in exc.value.args[0]['msg']
401
402    @patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
403    def test_rest_error_applications_snmp(self, mock_request):
404        data = self.set_default_args(rest=True)
405        data.update({'applications': 'snmp'})
406        data.update({'name': 'create'})
407        data.update({'role_name': 'test123'})
408        data.update({'set_password': '123456'})
409        set_module_args(data)
410        mock_request.side_effect = [
411            SRR['is_rest'],
412            SRR['end_of_sequence']
413        ]
414        with pytest.raises(AnsibleFailJson) as exc:
415            my_module()
416        assert exc.value.args[0]['msg'] == "snmp as application is not supported in REST."
417
418
419@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
420@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
421def test_ensure_user_get_rest_called(mock_request, mock_fail):
422    mock_fail.side_effect = fail_json
423    mock_request.side_effect = [
424        SRR['is_rest'],
425        SRR['get_user_rest'],
426        SRR['end_of_sequence']
427    ]
428    set_module_args(set_default_args_rest())
429    my_obj = my_module()
430    assert my_obj.get_user_rest() is not None
431
432
433@patch('ansible.module_utils.basic.AnsibleModule.exit_json')
434@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
435@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
436def test_ensure_create_user_rest_called(mock_request, mock_fail, mock_exit):
437    mock_fail.side_effect = fail_json
438    mock_exit.side_effect = exit_json
439    mock_request.side_effect = [
440        SRR['is_rest'],
441        SRR['zero_records'],            # get
442        SRR['empty_good'],              # create
443        SRR['end_of_sequence']
444    ]
445    set_module_args(set_default_args_rest())
446    my_obj = my_module()
447    with pytest.raises(AnsibleExitJson) as exc:
448        my_obj.apply()
449    assert exc.value.args[0]['changed']
450
451
452@patch('ansible.module_utils.basic.AnsibleModule.exit_json')
453@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
454@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
455def test_ensure_delete_user_rest_called(mock_request, mock_fail, mock_exit):
456    mock_fail.side_effect = fail_json
457    mock_exit.side_effect = exit_json
458    mock_request.side_effect = [
459        SRR['is_rest'],
460        SRR['get_user_rest'],
461        SRR['get_user_details_rest'],
462        SRR['get_user_rest'],
463        SRR['empty_good'],
464        SRR['end_of_sequence']
465    ]
466    data = {
467        'state': 'absent',
468    }
469    data.update(set_default_args_rest())
470    set_module_args(data)
471    my_obj = my_module()
472    with pytest.raises(AnsibleExitJson) as exc:
473        my_obj.apply()
474    assert exc.value.args[0]['changed']
475
476
477@patch('ansible.module_utils.basic.AnsibleModule.exit_json')
478@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
479@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
480def test_ensure_modify_user_rest_called(mock_request, mock_fail, mock_exit):
481    mock_fail.side_effect = fail_json
482    mock_exit.side_effect = exit_json
483    mock_request.side_effect = [
484        SRR['is_rest'],
485        SRR['get_user_rest'],
486        SRR['get_user_details_rest'],
487        SRR['get_user_rest'],
488        SRR['empty_good'],
489        SRR['end_of_sequence']
490    ]
491    app = dict(application='service_processor', authentication_methods=['usm'])
492    data = set_default_args_rest()
493    data.update({
494        'application_dicts': [app],
495    })
496    set_module_args(data)
497    my_obj = my_module()
498    with pytest.raises(AnsibleExitJson) as exc:
499        my_obj.apply()
500    assert exc.value.args[0]['changed']
501
502
503@patch('ansible.module_utils.basic.AnsibleModule.exit_json')
504@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
505@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
506def test_ensure_lock_unlock_user_rest_called(mock_request, mock_fail, mock_exit):
507    mock_fail.side_effect = fail_json
508    mock_exit.side_effect = exit_json
509    mock_request.side_effect = [
510        SRR['is_rest'],
511        SRR['get_user_rest'],
512        SRR['get_user_details_rest'],
513        SRR['get_user_rest'],
514        SRR['empty_good'],
515        SRR['end_of_sequence']
516    ]
517    data = {
518        'lock_user': 'newvalue',
519    }
520    data.update(set_default_args_rest())
521    set_module_args(data)
522    my_obj = my_module()
523    with pytest.raises(AnsibleExitJson) as exc:
524        my_obj.apply()
525    assert exc.value.args[0]['changed']
526
527
528@patch('ansible.module_utils.basic.AnsibleModule.exit_json')
529@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
530@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
531def test_ensure_change_password_user_rest_called(mock_request, mock_fail, mock_exit):
532    mock_fail.side_effect = fail_json
533    mock_exit.side_effect = exit_json
534    mock_request.side_effect = [
535        SRR['is_rest'],
536        SRR['get_user_rest'],
537        SRR['get_user_details_rest'],
538        SRR['get_user_rest'],
539        SRR['empty_good'],
540        SRR['end_of_sequence']
541    ]
542    data = {
543        'password': 'newvalue',
544    }
545    data.update(set_default_args_rest())
546    set_module_args(data)
547    my_obj = my_module()
548    with pytest.raises(AnsibleExitJson) as exc:
549        my_obj.apply()
550    assert exc.value.args[0]['changed']
551
552
553@patch('ansible.module_utils.basic.AnsibleModule.exit_json')
554@patch('ansible.module_utils.basic.AnsibleModule.fail_json')
555@patch('ansible_collections.netapp.ontap.plugins.module_utils.netapp.OntapRestAPI.send_request')
556def test_sp_retry(mock_request, mock_fail, mock_exit):
557    """simulate error in create_user_rest and retry"""
558    mock_fail.side_effect = fail_json
559    mock_exit.side_effect = exit_json
560    mock_request.side_effect = [
561        SRR['is_rest'],
562        SRR['zero_records'],            # get
563        SRR['invalid_value_error'],    # create
564        SRR['generic_error'],           # create
565        SRR['end_of_sequence']
566    ]
567    app = dict(application='service_processor', authentication_methods=['usm'])
568    data = dict(set_default_args_rest())
569    data.update({
570        'application_dicts': [app],
571    })
572    set_module_args(data)
573    my_obj = my_module()
574    with pytest.raises(AnsibleFailJson) as exc:
575        my_obj.apply()
576    print(mock_request.mock_calls)
577    assert 'invalid value' in exc.value.args[0]['msg']
578    assert 'service-processor' in repr(mock_request.mock_calls[-1])
579
580    mock_request.side_effect = [
581        SRR['is_rest'],
582        SRR['zero_records'],            # get
583        SRR['invalid_value_error'],     # create
584        SRR['empty_good'],              # create
585        SRR['end_of_sequence']
586    ]
587    app = dict(application='service-processor', authentication_methods=['usm'])
588    data.update({
589        'application_dicts': [app],
590    })
591    print(data)
592    set_module_args(data)
593    my_obj = my_module()
594    with pytest.raises(AnsibleExitJson) as exc:
595        my_obj.apply()
596    print(mock_request.mock_calls)
597    assert 'service_processor' in repr(mock_request.mock_calls[-1])
598    assert exc.value.args[0]['changed']
599