1# Copyright 2016 Cloudbase Solutions Srl
2# All Rights Reserved.
3#
4#    Licensed under the Apache License, Version 2.0 (the "License"); you may
5#    not use this file except in compliance with the License. You may obtain
6#    a copy of the License at
7#
8#         http://www.apache.org/licenses/LICENSE-2.0
9#
10#    Unless required by applicable law or agreed to in writing, software
11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13#    License for the specific language governing permissions and limitations
14#    under the License.
15
16from unittest import mock
17
18import ddt
19from os_win import exceptions as os_win_exc
20
21from os_brick import exception
22from os_brick.initiator.windows import fibre_channel as fc
23from os_brick.tests.windows import test_base
24
25
26@ddt.ddt
27class WindowsFCConnectorTestCase(test_base.WindowsConnectorTestBase):
28    def setUp(self):
29        super(WindowsFCConnectorTestCase, self).setUp()
30        self._connector = fc.WindowsFCConnector(
31            device_scan_interval=mock.sentinel.rescan_interval)
32
33        self._diskutils = self._connector._diskutils
34        self._fc_utils = self._connector._fc_utils
35
36    @ddt.data(True, False)
37    @mock.patch.object(fc.utilsfactory, 'get_fc_utils')
38    def test_get_volume_connector_props(self, valid_fc_hba_ports,
39                                        mock_get_fc_utils):
40        fake_fc_hba_ports = [{'node_name': mock.sentinel.node_name,
41                              'port_name': mock.sentinel.port_name},
42                             {'node_name': mock.sentinel.second_node_name,
43                              'port_name': mock.sentinel.second_port_name}]
44        self._fc_utils = mock_get_fc_utils.return_value
45        self._fc_utils.get_fc_hba_ports.return_value = (
46            fake_fc_hba_ports if valid_fc_hba_ports else [])
47
48        props = self._connector.get_connector_properties()
49
50        self._fc_utils.refresh_hba_configuration.assert_called_once_with()
51        self._fc_utils.get_fc_hba_ports.assert_called_once_with()
52
53        if valid_fc_hba_ports:
54            expected_props = {
55                'wwpns': [mock.sentinel.port_name,
56                          mock.sentinel.second_port_name],
57                'wwnns': [mock.sentinel.node_name,
58                          mock.sentinel.second_node_name]
59            }
60        else:
61            expected_props = {}
62
63        self.assertCountEqual(expected_props, props)
64
65    @mock.patch.object(fc.WindowsFCConnector, '_get_scsi_wwn')
66    @mock.patch.object(fc.WindowsFCConnector, 'get_volume_paths')
67    def test_connect_volume(self, mock_get_vol_paths,
68                            mock_get_scsi_wwn):
69        mock_get_vol_paths.return_value = [mock.sentinel.dev_name]
70        mock_get_dev_num = self._diskutils.get_device_number_from_device_name
71        mock_get_dev_num.return_value = mock.sentinel.dev_num
72
73        expected_device_info = dict(type='block',
74                                    path=mock.sentinel.dev_name,
75                                    number=mock.sentinel.dev_num,
76                                    scsi_wwn=mock_get_scsi_wwn.return_value)
77        device_info = self._connector.connect_volume(mock.sentinel.conn_props)
78
79        self.assertEqual(expected_device_info, device_info)
80        mock_get_vol_paths.assert_called_once_with(mock.sentinel.conn_props)
81        mock_get_dev_num.assert_called_once_with(mock.sentinel.dev_name)
82        mock_get_scsi_wwn.assert_called_once_with(mock.sentinel.dev_num)
83
84    @mock.patch.object(fc.WindowsFCConnector, 'get_volume_paths')
85    def test_connect_volume_not_found(self, mock_get_vol_paths):
86        mock_get_vol_paths.return_value = []
87        self.assertRaises(exception.NoFibreChannelVolumeDeviceFound,
88                          self._connector.connect_volume,
89                          mock.sentinel.conn_props)
90
91    @ddt.data({'volume_mappings': [], 'expected_paths': []},
92              {'volume_mappings': [dict(device_name='',
93                                        fcp_lun=mock.sentinel.fcp_lun)] * 3,
94               'scsi_id_side_eff': os_win_exc.OSWinException,
95               'expected_paths': []},
96              {'volume_mappings': [dict(device_name='',
97                                        fcp_lun=mock.sentinel.fcp_lun),
98                                   dict(device_name=mock.sentinel.disk_path)],
99               'expected_paths': [mock.sentinel.disk_path]},
100              {'volume_mappings': [dict(device_name='',
101                                        fcp_lun=mock.sentinel.fcp_lun)],
102               'scsi_id_side_eff': [[mock.sentinel.disk_path]],
103               'expected_paths': [mock.sentinel.disk_path]},
104              {'volume_mappings': [dict(device_name=mock.sentinel.disk_path)],
105               'use_multipath': True,
106               'is_mpio_disk': True,
107               'expected_paths': [mock.sentinel.disk_path]},
108              {'volume_mappings': [dict(device_name=mock.sentinel.disk_path)],
109               'use_multipath': True,
110               'is_mpio_disk': False,
111               'expected_paths': []})
112    @ddt.unpack
113    @mock.patch('time.sleep')
114    @mock.patch.object(fc.WindowsFCConnector, '_get_fc_volume_mappings')
115    @mock.patch.object(fc.WindowsFCConnector, '_get_disk_paths_by_scsi_id')
116    def test_get_volume_paths(self, mock_get_disk_paths_by_scsi_id,
117                              mock_get_fc_mappings,
118                              mock_sleep,
119                              volume_mappings, expected_paths,
120                              scsi_id_side_eff=None,
121                              use_multipath=False,
122                              is_mpio_disk=False):
123        mock_get_dev_num = self._diskutils.get_device_number_from_device_name
124        mock_get_fc_mappings.return_value = volume_mappings
125        mock_get_disk_paths_by_scsi_id.side_effect = scsi_id_side_eff
126        self._diskutils.is_mpio_disk.return_value = is_mpio_disk
127
128        self._connector.use_multipath = use_multipath
129
130        vol_paths = self._connector.get_volume_paths(mock.sentinel.conn_props)
131        self.assertEqual(expected_paths, vol_paths)
132
133        # In this test case, either the volume is found after the first
134        # attempt, either it's not found at all, in which case we'd expect
135        # the number of retries to be the requested maximum number of rescans.
136        expected_try_count = (1 if expected_paths
137                              else self._connector.device_scan_attempts)
138        self._diskutils.rescan_disks.assert_has_calls(
139            [mock.call()] * expected_try_count)
140        mock_get_fc_mappings.assert_has_calls(
141            [mock.call(mock.sentinel.conn_props)] * expected_try_count)
142        mock_sleep.assert_has_calls(
143            [mock.call(mock.sentinel.rescan_interval)] *
144            (expected_try_count - 1))
145
146        dev_names = [mapping['device_name']
147                     for mapping in volume_mappings if mapping['device_name']]
148        if volume_mappings and not dev_names:
149            mock_get_disk_paths_by_scsi_id.assert_any_call(
150                mock.sentinel.conn_props,
151                volume_mappings[0]['fcp_lun'])
152
153        if expected_paths and use_multipath:
154            mock_get_dev_num.assert_called_once_with(expected_paths[0])
155
156            self._diskutils.is_mpio_disk.assert_any_call(
157                mock_get_dev_num.return_value)
158
159    @mock.patch.object(fc.WindowsFCConnector, '_get_fc_hba_mappings')
160    def test_get_fc_volume_mappings(self, mock_get_fc_hba_mappings):
161        fake_target_wwpn = 'FAKE_TARGET_WWPN'
162        fake_conn_props = dict(target_lun=mock.sentinel.target_lun,
163                               target_wwn=[fake_target_wwpn])
164
165        mock_hba_mappings = {mock.sentinel.node_name: mock.sentinel.hba_ports}
166        mock_get_fc_hba_mappings.return_value = mock_hba_mappings
167
168        all_target_mappings = [{'device_name': mock.sentinel.dev_name,
169                                'port_name': fake_target_wwpn,
170                                'lun': mock.sentinel.target_lun},
171                               {'device_name': mock.sentinel.dev_name_1,
172                                'port_name': mock.sentinel.target_port_name_1,
173                                'lun': mock.sentinel.target_lun},
174                               {'device_name': mock.sentinel.dev_name,
175                                'port_name': mock.sentinel.target_port_name,
176                                'lun': mock.sentinel.target_lun_1}]
177        expected_mappings = [all_target_mappings[0]]
178
179        self._fc_utils.get_fc_target_mappings.return_value = (
180            all_target_mappings)
181
182        volume_mappings = self._connector._get_fc_volume_mappings(
183            fake_conn_props)
184        self.assertEqual(expected_mappings, volume_mappings)
185
186    def test_get_fc_hba_mappings(self):
187        fake_fc_hba_ports = [{'node_name': mock.sentinel.node_name,
188                              'port_name': mock.sentinel.port_name}]
189
190        self._fc_utils.get_fc_hba_ports.return_value = fake_fc_hba_ports
191
192        resulted_mappings = self._connector._get_fc_hba_mappings()
193
194        expected_mappings = {
195            mock.sentinel.node_name: [mock.sentinel.port_name]}
196        self.assertEqual(expected_mappings, resulted_mappings)
197
198    @mock.patch.object(fc.WindowsFCConnector, '_get_dev_nums_by_scsi_id')
199    def test_get_disk_paths_by_scsi_id(self, mock_get_dev_nums):
200        remote_wwpns = [mock.sentinel.remote_wwpn_0,
201                        mock.sentinel.remote_wwpn_1]
202        fake_init_target_map = {mock.sentinel.local_wwpn: remote_wwpns}
203        conn_props = dict(initiator_target_map=fake_init_target_map)
204
205        mock_get_dev_nums.side_effect = [os_win_exc.FCException,
206                                         [mock.sentinel.dev_num]]
207        mock_get_dev_name = self._diskutils.get_device_name_by_device_number
208        mock_get_dev_name.return_value = mock.sentinel.dev_name
209
210        disk_paths = self._connector._get_disk_paths_by_scsi_id(
211            conn_props, mock.sentinel.fcp_lun)
212        self.assertEqual([mock.sentinel.dev_name], disk_paths)
213
214        mock_get_dev_nums.assert_has_calls([
215            mock.call(mock.sentinel.local_wwpn,
216                      remote_wwpn,
217                      mock.sentinel.fcp_lun)
218            for remote_wwpn in remote_wwpns])
219        mock_get_dev_name.assert_called_once_with(mock.sentinel.dev_num)
220
221    @mock.patch.object(fc.WindowsFCConnector, '_get_fc_hba_wwn_for_port')
222    def test_get_dev_nums_by_scsi_id(self, mock_get_fc_hba_wwn):
223        fake_identifier = dict(id=mock.sentinel.id,
224                               type=mock.sentinel.type)
225
226        mock_get_fc_hba_wwn.return_value = mock.sentinel.local_wwnn
227        self._fc_utils.get_scsi_device_identifiers.return_value = [
228            fake_identifier]
229        self._diskutils.get_disk_numbers_by_unique_id.return_value = (
230            mock.sentinel.dev_nums)
231
232        dev_nums = self._connector._get_dev_nums_by_scsi_id(
233            mock.sentinel.local_wwpn,
234            mock.sentinel.remote_wwpn,
235            mock.sentinel.fcp_lun)
236        self.assertEqual(mock.sentinel.dev_nums, dev_nums)
237
238        mock_get_fc_hba_wwn.assert_called_once_with(mock.sentinel.local_wwpn)
239        self._fc_utils.get_scsi_device_identifiers.assert_called_once_with(
240            mock.sentinel.local_wwnn, mock.sentinel.local_wwpn,
241            mock.sentinel.remote_wwpn, mock.sentinel.fcp_lun)
242        self._diskutils.get_disk_numbers_by_unique_id.assert_called_once_with(
243            unique_id=mock.sentinel.id,
244            unique_id_format=mock.sentinel.type)
245