1#!/usr/bin/env python
2#
3# Test case for NBD's blockdev-add interface
4#
5# Copyright (C) 2016 Red Hat, Inc.
6#
7# This program is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program.  If not, see <http://www.gnu.org/licenses/>.
19#
20
21import os
22import random
23import socket
24import stat
25import time
26import iotests
27from iotests import cachemode, imgfmt, qemu_img, qemu_nbd, qemu_nbd_early_pipe
28
29NBD_PORT_START      = 32768
30NBD_PORT_END        = NBD_PORT_START + 1024
31NBD_IPV6_PORT_START = NBD_PORT_END
32NBD_IPV6_PORT_END   = NBD_IPV6_PORT_START + 1024
33
34test_img = os.path.join(iotests.test_dir, 'test.img')
35unix_socket = os.path.join(iotests.sock_dir, 'nbd.socket')
36
37
38def flatten_sock_addr(crumpled_address):
39    result = { 'type': crumpled_address['type'] }
40    result.update(crumpled_address['data'])
41    return result
42
43
44class NBDBlockdevAddBase(iotests.QMPTestCase):
45    def blockdev_add_options(self, address, export, node_name):
46        options = { 'node-name': node_name,
47                    'driver': 'raw',
48                    'file': {
49                        'driver': 'nbd',
50                        'read-only': True,
51                        'server': address
52                    } }
53        if export is not None:
54            options['file']['export'] = export
55        return options
56
57    def client_test(self, filename, address, export=None,
58                    node_name='nbd-blockdev', delete=True):
59        bao = self.blockdev_add_options(address, export, node_name)
60        result = self.vm.qmp('blockdev-add', **bao)
61        self.assert_qmp(result, 'return', {})
62
63        found = False
64        result = self.vm.qmp('query-named-block-nodes')
65        for node in result['return']:
66            if node['node-name'] == node_name:
67                found = True
68                if isinstance(filename, str):
69                    self.assert_qmp(node, 'image/filename', filename)
70                else:
71                    self.assert_json_filename_equal(node['image']['filename'],
72                                                    filename)
73                break
74        self.assertTrue(found)
75
76        if delete:
77            result = self.vm.qmp('blockdev-del', node_name=node_name)
78            self.assert_qmp(result, 'return', {})
79
80
81class QemuNBD(NBDBlockdevAddBase):
82    def setUp(self):
83        qemu_img('create', '-f', iotests.imgfmt, test_img, '64k')
84        self.vm = iotests.VM()
85        self.vm.launch()
86
87    def tearDown(self):
88        self.vm.shutdown()
89        os.remove(test_img)
90        try:
91            os.remove(unix_socket)
92        except OSError:
93            pass
94
95    def _try_server_up(self, *args):
96        status, msg = qemu_nbd_early_pipe('-f', imgfmt, test_img, *args)
97        if status == 0:
98            return True
99        if 'Address already in use' in msg:
100            return False
101        self.fail(msg)
102
103    def _server_up(self, *args):
104        self.assertTrue(self._try_server_up(*args))
105
106    def test_inet(self):
107        while True:
108            nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END)
109            if self._try_server_up('-b', 'localhost', '-p', str(nbd_port)):
110                break
111
112        address = { 'type': 'inet',
113                    'data': {
114                        'host': 'localhost',
115                        'port': str(nbd_port)
116                    } }
117        self.client_test('nbd://localhost:%i' % nbd_port,
118                         flatten_sock_addr(address))
119
120    def test_unix(self):
121        self._server_up('-k', unix_socket)
122        address = { 'type': 'unix',
123                    'data': { 'path': unix_socket } }
124        self.client_test('nbd+unix://?socket=' + unix_socket,
125                         flatten_sock_addr(address))
126
127
128class BuiltinNBD(NBDBlockdevAddBase):
129    def setUp(self):
130        qemu_img('create', '-f', iotests.imgfmt, test_img, '64k')
131        self.vm = iotests.VM()
132        self.vm.launch()
133        self.server = iotests.VM('.server')
134        self.server.add_drive_raw('if=none,id=nbd-export,' +
135                                  'file=%s,' % test_img +
136                                  'format=%s,' % imgfmt +
137                                  'cache=%s' % cachemode)
138        self.server.launch()
139
140    def tearDown(self):
141        self.vm.shutdown()
142        self.server.shutdown()
143        os.remove(test_img)
144        try:
145            os.remove(unix_socket)
146        except OSError:
147            pass
148
149    # Returns False on EADDRINUSE; fails an assertion on other errors.
150    # Returns True on success.
151    def _try_server_up(self, address, export_name=None, export_name2=None):
152        result = self.server.qmp('nbd-server-start', addr=address)
153        if 'error' in result and \
154           'Address already in use' in result['error']['desc']:
155            return False
156        self.assert_qmp(result, 'return', {})
157
158        if export_name is None:
159            result = self.server.qmp('nbd-server-add', device='nbd-export')
160        else:
161            result = self.server.qmp('nbd-server-add', device='nbd-export',
162                                     name=export_name)
163        self.assert_qmp(result, 'return', {})
164
165        if export_name2 is not None:
166            result = self.server.qmp('nbd-server-add', device='nbd-export',
167                                     name=export_name2)
168            self.assert_qmp(result, 'return', {})
169
170        return True
171
172    def _server_up(self, address, export_name=None, export_name2=None):
173        self.assertTrue(self._try_server_up(address, export_name, export_name2))
174
175    def _server_down(self):
176        result = self.server.qmp('nbd-server-stop')
177        self.assert_qmp(result, 'return', {})
178
179    def do_test_inet(self, export_name=None):
180        while True:
181            nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END)
182            address = { 'type': 'inet',
183                        'data': {
184                            'host': 'localhost',
185                            'port': str(nbd_port)
186                        } }
187            if self._try_server_up(address, export_name):
188                break
189
190        export_name = export_name or 'nbd-export'
191        self.client_test('nbd://localhost:%i/%s' % (nbd_port, export_name),
192                         flatten_sock_addr(address), export_name)
193        self._server_down()
194
195    def test_inet_default_export_name(self):
196        self.do_test_inet()
197
198    def test_inet_same_export_name(self):
199        self.do_test_inet('nbd-export')
200
201    def test_inet_different_export_name(self):
202        self.do_test_inet('shadow')
203
204    def test_inet_two_exports(self):
205        while True:
206            nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END)
207            address = { 'type': 'inet',
208                        'data': {
209                            'host': 'localhost',
210                            'port': str(nbd_port)
211                        } }
212            if self._try_server_up(address, 'exp1', 'exp2'):
213                break
214
215        self.client_test('nbd://localhost:%i/%s' % (nbd_port, 'exp1'),
216                         flatten_sock_addr(address), 'exp1', 'node1', False)
217        self.client_test('nbd://localhost:%i/%s' % (nbd_port, 'exp2'),
218                         flatten_sock_addr(address), 'exp2', 'node2', False)
219        result = self.vm.qmp('blockdev-del', node_name='node1')
220        self.assert_qmp(result, 'return', {})
221        result = self.vm.qmp('blockdev-del', node_name='node2')
222        self.assert_qmp(result, 'return', {})
223        self._server_down()
224
225    def test_inet6(self):
226        try:
227            socket.getaddrinfo("::0", "0", socket.AF_INET6,
228                               socket.SOCK_STREAM, socket.IPPROTO_TCP,
229                               socket.AI_ADDRCONFIG | socket.AI_CANONNAME)
230        except socket.gaierror:
231            # IPv6 not available, skip
232            return
233
234        while True:
235            nbd_port = random.randrange(NBD_IPV6_PORT_START, NBD_IPV6_PORT_END)
236            address = { 'type': 'inet',
237                        'data': {
238                            'host': '::1',
239                            'port': str(nbd_port),
240                            'ipv4': False,
241                            'ipv6': True
242                        } }
243            if self._try_server_up(address):
244                break
245
246        filename = { 'driver': 'raw',
247                     'file': {
248                         'driver': 'nbd',
249                         'export': 'nbd-export',
250                         'server': flatten_sock_addr(address)
251                     } }
252        self.client_test(filename, flatten_sock_addr(address), 'nbd-export')
253        self._server_down()
254
255    def test_unix(self):
256        address = { 'type': 'unix',
257                    'data': { 'path': unix_socket } }
258        self._server_up(address)
259        self.client_test('nbd+unix:///nbd-export?socket=' + unix_socket,
260                         flatten_sock_addr(address), 'nbd-export')
261        self._server_down()
262
263    def test_fd(self):
264        self._server_up({ 'type': 'unix',
265                          'data': { 'path': unix_socket } })
266
267        sockfd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268        sockfd.connect(unix_socket)
269
270        result = self.vm.send_fd_scm(fd=sockfd.fileno())
271        self.assertEqual(result, 0, 'Failed to send socket FD')
272
273        result = self.vm.qmp('getfd', fdname='nbd-fifo')
274        self.assert_qmp(result, 'return', {})
275
276        address = { 'type': 'fd',
277                    'data': { 'str': 'nbd-fifo' } }
278        filename = { 'driver': 'raw',
279                     'file': {
280                         'driver': 'nbd',
281                         'export': 'nbd-export',
282                         'server': flatten_sock_addr(address)
283                     } }
284        self.client_test(filename, flatten_sock_addr(address), 'nbd-export')
285
286        self._server_down()
287
288
289if __name__ == '__main__':
290    iotests.main(supported_fmts=['raw'],
291                 supported_protocols=['nbd'])
292