1# Copyright (c) 2014 VMware, Inc.
2#
3#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4#    not use this file except in compliance with the License. You may obtain
5#    a copy of the License at
6#
7#         http://www.apache.org/licenses/LICENSE-2.0
8#
9#    Unless required by applicable law or agreed to in writing, software
10#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#    License for the specific language governing permissions and limitations
13#    under the License.
14
15from unittest import mock
16
17from oslo_utils import units
18import urllib.parse as urlparse
19
20from oslo_vmware import constants
21from oslo_vmware.objects import datastore
22from oslo_vmware.tests import base
23from oslo_vmware import vim_util
24
25
26class HostMount(object):
27
28    def __init__(self, key, mountInfo):
29        self.key = key
30        self.mountInfo = mountInfo
31
32
33class MountInfo(object):
34
35    def __init__(self, accessMode, mounted, accessible):
36        self.accessMode = accessMode
37        self.mounted = mounted
38        self.accessible = accessible
39
40
41class DatastoreTestCase(base.TestCase):
42
43    """Test the Datastore object."""
44
45    def test_ds(self):
46        ds = datastore.Datastore(
47            "fake_ref", "ds_name", 2 * units.Gi, 1 * units.Gi, 1 * units.Gi)
48        self.assertEqual('ds_name', ds.name)
49        self.assertEqual('fake_ref', ds.ref)
50        self.assertEqual(2 * units.Gi, ds.capacity)
51        self.assertEqual(1 * units.Gi, ds.freespace)
52        self.assertEqual(1 * units.Gi, ds.uncommitted)
53
54    def test_ds_invalid_space(self):
55        self.assertRaises(ValueError, datastore.Datastore,
56                          "fake_ref", "ds_name", 1 * units.Gi, 2 * units.Gi)
57        self.assertRaises(ValueError, datastore.Datastore,
58                          "fake_ref", "ds_name", None, 2 * units.Gi)
59
60    def test_ds_no_capacity_no_freespace(self):
61        ds = datastore.Datastore("fake_ref", "ds_name")
62        self.assertIsNone(ds.capacity)
63        self.assertIsNone(ds.freespace)
64
65    def test_ds_invalid(self):
66        self.assertRaises(ValueError, datastore.Datastore, None, "ds_name")
67        self.assertRaises(ValueError, datastore.Datastore, "fake_ref", None)
68
69    def test_build_path(self):
70        ds = datastore.Datastore("fake_ref", "ds_name")
71        ds_path = ds.build_path("some_dir", "foo.vmdk")
72        self.assertEqual('[ds_name] some_dir/foo.vmdk', str(ds_path))
73
74    def test_build_url(self):
75        ds = datastore.Datastore("fake_ref", "ds_name")
76        path = 'images/ubuntu.vmdk'
77        self.assertRaises(ValueError, ds.build_url, 'https', '10.0.0.2', path)
78        ds.datacenter = mock.Mock()
79        ds.datacenter.name = "dc_path"
80        ds_url = ds.build_url('https', '10.0.0.2', path)
81        self.assertEqual(ds_url.datastore_name, "ds_name")
82        self.assertEqual(ds_url.datacenter_path, "dc_path")
83        self.assertEqual(ds_url.path, path)
84
85    def test_get_summary(self):
86        ds_ref = vim_util.get_moref('ds-0', 'Datastore')
87        ds = datastore.Datastore(ds_ref, 'ds-name')
88        summary = mock.sentinel.summary
89        session = mock.Mock()
90        session.invoke_api = mock.Mock()
91        session.invoke_api.return_value = summary
92        ret = ds.get_summary(session)
93        self.assertEqual(summary, ret)
94        session.invoke_api.assert_called_once_with(vim_util,
95                                                   'get_object_property',
96                                                   session.vim,
97                                                   ds.ref, 'summary')
98
99    def _test_get_connected_hosts(self, in_maintenance_mode,
100                                  m1_accessible=True):
101        session = mock.Mock()
102        ds_ref = vim_util.get_moref('ds-0', 'Datastore')
103        ds = datastore.Datastore(ds_ref, 'ds-name')
104        ds.get_summary = mock.Mock()
105        ds.get_summary.return_value.accessible = False
106        self.assertEqual([], ds.get_connected_hosts(session))
107        ds.get_summary.return_value.accessible = True
108        m1 = HostMount("m1", MountInfo('readWrite', True, m1_accessible))
109        m2 = HostMount("m2", MountInfo('read', True, True))
110        m3 = HostMount("m3", MountInfo('readWrite', False, True))
111        m4 = HostMount("m4", MountInfo('readWrite', True, False))
112        ds.get_summary.assert_called_once_with(session)
113
114        class Prop(object):
115            DatastoreHostMount = [m1, m2, m3, m4]
116
117        class HostRuntime(object):
118            inMaintenanceMode = in_maintenance_mode
119
120        class HostProp(object):
121            name = 'runtime'
122            val = HostRuntime()
123
124        class Object(object):
125            obj = "m1"
126            propSet = [HostProp()]
127
128        class Runtime(object):
129            objects = [Object()]
130
131        session.invoke_api = mock.Mock(side_effect=[Prop(), Runtime()])
132        hosts = ds.get_connected_hosts(session)
133        calls = [mock.call(vim_util, 'get_object_property',
134                           session.vim, ds_ref, 'host')]
135        if m1_accessible:
136            calls.append(
137                mock.call(vim_util,
138                          'get_properties_for_a_collection_of_objects',
139                          session.vim, 'HostSystem', ["m1"], ['runtime']))
140        self.assertEqual(calls, session.invoke_api.mock_calls)
141        return hosts
142
143    def test_get_connected_hosts(self):
144        hosts = self._test_get_connected_hosts(False)
145        self.assertEqual(1, len(hosts))
146        self.assertEqual("m1", hosts.pop())
147
148    def test_get_connected_hosts_in_maintenance(self):
149        hosts = self._test_get_connected_hosts(True)
150        self.assertEqual(0, len(hosts))
151
152    def test_get_connected_hosts_ho_hosts(self):
153        hosts = self._test_get_connected_hosts(False, False)
154        self.assertEqual(0, len(hosts))
155
156    def test_is_datastore_mount_usable(self):
157        m = MountInfo('readWrite', True, True)
158        self.assertTrue(datastore.Datastore.is_datastore_mount_usable(m))
159        m = MountInfo('read', True, True)
160        self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
161        m = MountInfo('readWrite', False, True)
162        self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
163        m = MountInfo('readWrite', True, False)
164        self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
165        m = MountInfo('readWrite', False, False)
166        self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
167        m = MountInfo('readWrite', None, None)
168        self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
169        m = MountInfo('readWrite', None, True)
170        self.assertFalse(datastore.Datastore.is_datastore_mount_usable(m))
171
172
173class DatastoreClusterTestCase(base.TestCase):
174
175    def test_get_dsc_with_moid(self):
176        session = mock.Mock()
177        session.invoke_api = mock.Mock()
178        session.invoke_api.return_value = 'ds-cluster'
179        dsc_moid = 'group-p123'
180        dsc_ref, dsc_name = datastore.get_dsc_ref_and_name(session, dsc_moid)
181        self.assertEqual((dsc_moid, 'StoragePod'),
182                         (vim_util.get_moref_value(dsc_ref),
183                          vim_util.get_moref_type(dsc_ref)))
184        self.assertEqual('ds-cluster', dsc_name)
185        session.invoke_api.assert_called_once_with(vim_util,
186                                                   'get_object_property',
187                                                   session.vim,
188                                                   mock.ANY,
189                                                   'name')
190
191    @mock.patch('oslo_vmware.vim_util.continue_retrieval')
192    @mock.patch('oslo_vmware.vim_util.cancel_retrieval')
193    def test_get_dsc_by_name(self, cancel_retrieval, continue_retrieval):
194        pod_prop = mock.Mock()
195        pod_prop.val = 'ds-cluster'
196        pod_ref = vim_util.get_moref('group-p456', 'StoragePod')
197        pod = mock.Mock()
198        pod.propSet = [pod_prop]
199        pod.obj = pod_ref
200
201        retrieve_result = mock.Mock()
202        retrieve_result.objects = [pod]
203
204        session = mock.Mock()
205        session.invoke_api = mock.Mock()
206        session.invoke_api.return_value = retrieve_result
207        name = 'ds-cluster'
208        dsc_ref, dsc_name = datastore.get_dsc_ref_and_name(session, name)
209        self.assertEqual((vim_util.get_moref_value(pod_ref),
210                          vim_util.get_moref_type(pod_ref)),
211                         (vim_util.get_moref_value(dsc_ref),
212                          vim_util.get_moref_type(dsc_ref)))
213
214
215class DatastorePathTestCase(base.TestCase):
216
217    """Test the DatastorePath object."""
218
219    def test_ds_path(self):
220        p = datastore.DatastorePath('dsname', 'a/b/c', 'file.iso')
221        self.assertEqual('[dsname] a/b/c/file.iso', str(p))
222        self.assertEqual('a/b/c/file.iso', p.rel_path)
223        self.assertEqual('a/b/c', p.parent.rel_path)
224        self.assertEqual('[dsname] a/b/c', str(p.parent))
225        self.assertEqual('dsname', p.datastore)
226        self.assertEqual('file.iso', p.basename)
227        self.assertEqual('a/b/c', p.dirname)
228
229    def test_ds_path_no_ds_name(self):
230        bad_args = [
231            ('', ['a/b/c', 'file.iso']),
232            (None, ['a/b/c', 'file.iso'])]
233        for t in bad_args:
234            self.assertRaises(
235                ValueError, datastore.DatastorePath,
236                t[0], *t[1])
237
238    def test_ds_path_invalid_path_components(self):
239        bad_args = [
240            ('dsname', [None]),
241            ('dsname', ['', None]),
242            ('dsname', ['a', None]),
243            ('dsname', ['a', None, 'b']),
244            ('dsname', [None, '']),
245            ('dsname', [None, 'b'])]
246
247        for t in bad_args:
248            self.assertRaises(
249                ValueError, datastore.DatastorePath,
250                t[0], *t[1])
251
252    def test_ds_path_no_subdir(self):
253        args = [
254            ('dsname', ['', 'x.vmdk']),
255            ('dsname', ['x.vmdk'])]
256
257        canonical_p = datastore.DatastorePath('dsname', 'x.vmdk')
258        self.assertEqual('[dsname] x.vmdk', str(canonical_p))
259        self.assertEqual('', canonical_p.dirname)
260        self.assertEqual('x.vmdk', canonical_p.basename)
261        self.assertEqual('x.vmdk', canonical_p.rel_path)
262        for t in args:
263            p = datastore.DatastorePath(t[0], *t[1])
264            self.assertEqual(str(canonical_p), str(p))
265
266    def test_ds_path_ds_only(self):
267        args = [
268            ('dsname', []),
269            ('dsname', ['']),
270            ('dsname', ['', ''])]
271
272        canonical_p = datastore.DatastorePath('dsname')
273        self.assertEqual('[dsname]', str(canonical_p))
274        self.assertEqual('', canonical_p.rel_path)
275        self.assertEqual('', canonical_p.basename)
276        self.assertEqual('', canonical_p.dirname)
277        for t in args:
278            p = datastore.DatastorePath(t[0], *t[1])
279            self.assertEqual(str(canonical_p), str(p))
280            self.assertEqual(canonical_p.rel_path, p.rel_path)
281
282    def test_ds_path_equivalence(self):
283        args = [
284            ('dsname', ['a/b/c/', 'x.vmdk']),
285            ('dsname', ['a/', 'b/c/', 'x.vmdk']),
286            ('dsname', ['a', 'b', 'c', 'x.vmdk']),
287            ('dsname', ['a/b/c', 'x.vmdk'])]
288
289        canonical_p = datastore.DatastorePath('dsname', 'a/b/c', 'x.vmdk')
290        for t in args:
291            p = datastore.DatastorePath(t[0], *t[1])
292            self.assertEqual(str(canonical_p), str(p))
293            self.assertEqual(canonical_p.datastore, p.datastore)
294            self.assertEqual(canonical_p.rel_path, p.rel_path)
295            self.assertEqual(str(canonical_p.parent), str(p.parent))
296
297    def test_ds_path_non_equivalence(self):
298        args = [
299            # leading slash
300            ('dsname', ['/a', 'b', 'c', 'x.vmdk']),
301            ('dsname', ['/a/b/c/', 'x.vmdk']),
302            ('dsname', ['a/b/c', '/x.vmdk']),
303            # leading space
304            ('dsname', ['a/b/c/', ' x.vmdk']),
305            ('dsname', ['a/', ' b/c/', 'x.vmdk']),
306            ('dsname', [' a', 'b', 'c', 'x.vmdk']),
307            # trailing space
308            ('dsname', ['/a/b/c/', 'x.vmdk ']),
309            ('dsname', ['a/b/c/ ', 'x.vmdk'])]
310
311        canonical_p = datastore.DatastorePath('dsname', 'a/b/c', 'x.vmdk')
312        for t in args:
313            p = datastore.DatastorePath(t[0], *t[1])
314            self.assertNotEqual(str(canonical_p), str(p))
315
316    def test_equal(self):
317        a = datastore.DatastorePath('ds_name', 'a')
318        b = datastore.DatastorePath('ds_name', 'a')
319        self.assertEqual(a, b)
320
321    def test_join(self):
322        p = datastore.DatastorePath('ds_name', 'a')
323        ds_path = p.join('b')
324        self.assertEqual('[ds_name] a/b', str(ds_path))
325
326        p = datastore.DatastorePath('ds_name', 'a')
327        ds_path = p.join()
328        bad_args = [
329            [None],
330            ['', None],
331            ['a', None],
332            ['a', None, 'b']]
333        for arg in bad_args:
334            self.assertRaises(ValueError, p.join, *arg)
335
336    def test_ds_path_parse(self):
337        p = datastore.DatastorePath.parse('[dsname]')
338        self.assertEqual('dsname', p.datastore)
339        self.assertEqual('', p.rel_path)
340
341        p = datastore.DatastorePath.parse('[dsname] folder')
342        self.assertEqual('dsname', p.datastore)
343        self.assertEqual('folder', p.rel_path)
344
345        p = datastore.DatastorePath.parse('[dsname] folder/file')
346        self.assertEqual('dsname', p.datastore)
347        self.assertEqual('folder/file', p.rel_path)
348
349        for p in [None, '']:
350            self.assertRaises(ValueError, datastore.DatastorePath.parse, p)
351
352        for p in ['bad path', '/a/b/c', 'a/b/c']:
353            self.assertRaises(IndexError, datastore.DatastorePath.parse, p)
354
355
356class DatastoreURLTestCase(base.TestCase):
357
358    """Test the DatastoreURL object."""
359
360    def test_path_strip(self):
361        scheme = 'https'
362        server = '13.37.73.31'
363        path = 'images/ubuntu-14.04.vmdk'
364        dc_path = 'datacenter-1'
365        ds_name = 'datastore-1'
366        params = {'dcPath': dc_path, 'dsName': ds_name}
367        query = urlparse.urlencode(params)
368        url = datastore.DatastoreURL(scheme, server, path, dc_path, ds_name)
369        expected_url = '%s://%s/folder/%s?%s' % (
370            scheme, server, path, query)
371        self.assertEqual(expected_url, str(url))
372
373    def test_path_lstrip(self):
374        scheme = 'https'
375        server = '13.37.73.31'
376        path = '/images/ubuntu-14.04.vmdk'
377        dc_path = 'datacenter-1'
378        ds_name = 'datastore-1'
379        params = {'dcPath': dc_path, 'dsName': ds_name}
380        query = urlparse.urlencode(params)
381        url = datastore.DatastoreURL(scheme, server, path, dc_path, ds_name)
382        expected_url = '%s://%s/folder/%s?%s' % (
383            scheme, server, path.lstrip('/'), query)
384        self.assertEqual(expected_url, str(url))
385
386    def test_path_rstrip(self):
387        scheme = 'https'
388        server = '13.37.73.31'
389        path = 'images/ubuntu-14.04.vmdk/'
390        dc_path = 'datacenter-1'
391        ds_name = 'datastore-1'
392        params = {'dcPath': dc_path, 'dsName': ds_name}
393        query = urlparse.urlencode(params)
394        url = datastore.DatastoreURL(scheme, server, path, dc_path, ds_name)
395        expected_url = '%s://%s/folder/%s?%s' % (
396            scheme, server, path.rstrip('/'), query)
397        self.assertEqual(expected_url, str(url))
398
399    def test_urlparse(self):
400        dc_path = 'datacenter-1'
401        ds_name = 'datastore-1'
402        params = {'dcPath': dc_path, 'dsName': ds_name}
403        query = urlparse.urlencode(params)
404        url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
405        ds_url = datastore.DatastoreURL.urlparse(url)
406        self.assertEqual(url, str(ds_url))
407
408    def test_datastore_name(self):
409        dc_path = 'datacenter-1'
410        ds_name = 'datastore-1'
411        params = {'dcPath': dc_path, 'dsName': ds_name}
412        query = urlparse.urlencode(params)
413        url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
414        ds_url = datastore.DatastoreURL.urlparse(url)
415        self.assertEqual(ds_name, ds_url.datastore_name)
416
417    def test_datacenter_path(self):
418        dc_path = 'datacenter-1'
419        ds_name = 'datastore-1'
420        params = {'dcPath': dc_path, 'dsName': ds_name}
421        query = urlparse.urlencode(params)
422        url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
423        ds_url = datastore.DatastoreURL.urlparse(url)
424        self.assertEqual(dc_path, ds_url.datacenter_path)
425
426    def test_path(self):
427        dc_path = 'datacenter-1'
428        ds_name = 'datastore-1'
429        params = {'dcPath': dc_path, 'dsName': ds_name}
430        path = 'images/aa.vmdk'
431        query = urlparse.urlencode(params)
432        url = 'https://13.37.73.31/folder/%s?%s' % (path, query)
433        ds_url = datastore.DatastoreURL.urlparse(url)
434        self.assertEqual(path, ds_url.path)
435
436    @mock.patch('http.client.HTTPSConnection')
437    def test_connect(self, mock_conn):
438        dc_path = 'datacenter-1'
439        ds_name = 'datastore-1'
440        params = {'dcPath': dc_path, 'dsName': ds_name}
441        query = urlparse.urlencode(params)
442        url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
443        ds_url = datastore.DatastoreURL.urlparse(url)
444        cookie = mock.Mock()
445        ds_url.connect('PUT', 128, cookie)
446        mock_conn.assert_called_once_with('13.37.73.31')
447
448    def test_get_transfer_ticket(self):
449        dc_path = 'datacenter-1'
450        ds_name = 'datastore-1'
451        params = {'dcPath': dc_path, 'dsName': ds_name}
452        query = urlparse.urlencode(params)
453        url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
454        session = mock.Mock()
455        session.invoke_api = mock.Mock()
456
457        class Ticket(object):
458            id = 'fake_id'
459        session.invoke_api.return_value = Ticket()
460        ds_url = datastore.DatastoreURL.urlparse(url)
461        ticket = ds_url.get_transfer_ticket(session, 'PUT')
462        self.assertEqual('%s="%s"' % (constants.CGI_COOKIE_KEY, 'fake_id'),
463                         ticket)
464
465    def test_get_datastore_by_ref(self):
466        session = mock.Mock()
467        ds_ref = mock.Mock()
468        expected_props = {'summary.name': 'datastore1',
469                          'summary.type': 'NFS',
470                          'summary.freeSpace': 1000,
471                          'summary.capacity': 2000}
472        session.invoke_api = mock.Mock()
473        session.invoke_api.return_value = expected_props
474        ds_obj = datastore.get_datastore_by_ref(session, ds_ref)
475        self.assertEqual(expected_props['summary.name'], ds_obj.name)
476        self.assertEqual(expected_props['summary.type'], ds_obj.type)
477        self.assertEqual(expected_props['summary.freeSpace'], ds_obj.freespace)
478        self.assertEqual(expected_props['summary.capacity'], ds_obj.capacity)
479