xref: /qemu/tests/qemu-iotests/testenv.py (revision 19f4ed36)
1# TestEnv class to manage test environment variables.
2#
3# Copyright (c) 2020-2021 Virtuozzo International GmbH
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import os
20import sys
21import tempfile
22from pathlib import Path
23import shutil
24import collections
25import random
26import subprocess
27import glob
28from typing import Dict, Any, Optional, ContextManager
29
30
31def isxfile(path: str) -> bool:
32    return os.path.isfile(path) and os.access(path, os.X_OK)
33
34
35def get_default_machine(qemu_prog: str) -> str:
36    outp = subprocess.run([qemu_prog, '-machine', 'help'], check=True,
37                          universal_newlines=True,
38                          stdout=subprocess.PIPE).stdout
39
40    machines = outp.split('\n')
41    try:
42        default_machine = next(m for m in machines if m.endswith(' (default)'))
43    except StopIteration:
44        return ''
45    default_machine = default_machine.split(' ', 1)[0]
46
47    alias_suf = ' (alias of {})'.format(default_machine)
48    alias = next((m for m in machines if m.endswith(alias_suf)), None)
49    if alias is not None:
50        default_machine = alias.split(' ', 1)[0]
51
52    return default_machine
53
54
55class TestEnv(ContextManager['TestEnv']):
56    """
57    Manage system environment for running tests
58
59    The following variables are supported/provided. They are represented by
60    lower-cased TestEnv attributes.
61    """
62
63    # We store environment variables as instance attributes, and there are a
64    # lot of them. Silence pylint:
65    # pylint: disable=too-many-instance-attributes
66
67    env_variables = ['PYTHONPATH', 'TEST_DIR', 'SOCK_DIR', 'SAMPLE_IMG_DIR',
68                     'OUTPUT_DIR', 'PYTHON', 'QEMU_PROG', 'QEMU_IMG_PROG',
69                     'QEMU_IO_PROG', 'QEMU_NBD_PROG', 'QSD_PROG',
70                     'SOCKET_SCM_HELPER', 'QEMU_OPTIONS', 'QEMU_IMG_OPTIONS',
71                     'QEMU_IO_OPTIONS', 'QEMU_IO_OPTIONS_NO_FMT',
72                     'QEMU_NBD_OPTIONS', 'IMGOPTS', 'IMGFMT', 'IMGPROTO',
73                     'AIOMODE', 'CACHEMODE', 'VALGRIND_QEMU',
74                     'CACHEMODE_IS_DEFAULT', 'IMGFMT_GENERIC', 'IMGOPTSSYNTAX',
75                     'IMGKEYSECRET', 'QEMU_DEFAULT_MACHINE', 'MALLOC_PERTURB_']
76
77    def get_env(self) -> Dict[str, str]:
78        env = {}
79        for v in self.env_variables:
80            val = getattr(self, v.lower(), None)
81            if val is not None:
82                env[v] = val
83
84        return env
85
86    def init_directories(self) -> None:
87        """Init directory variables:
88             PYTHONPATH
89             TEST_DIR
90             SOCK_DIR
91             SAMPLE_IMG_DIR
92             OUTPUT_DIR
93        """
94        self.pythonpath = os.getenv('PYTHONPATH')
95        if self.pythonpath:
96            self.pythonpath = self.source_iotests + os.pathsep + \
97                self.pythonpath
98        else:
99            self.pythonpath = self.source_iotests
100
101        self.test_dir = os.getenv('TEST_DIR',
102                                  os.path.join(os.getcwd(), 'scratch'))
103        Path(self.test_dir).mkdir(parents=True, exist_ok=True)
104
105        try:
106            self.sock_dir = os.environ['SOCK_DIR']
107            self.tmp_sock_dir = False
108            Path(self.test_dir).mkdir(parents=True, exist_ok=True)
109        except KeyError:
110            self.sock_dir = tempfile.mkdtemp()
111            self.tmp_sock_dir = True
112
113        self.sample_img_dir = os.getenv('SAMPLE_IMG_DIR',
114                                        os.path.join(self.source_iotests,
115                                                     'sample_images'))
116
117        self.output_dir = os.getcwd()  # OUTPUT_DIR
118
119    def init_binaries(self) -> None:
120        """Init binary path variables:
121             PYTHON (for bash tests)
122             QEMU_PROG, QEMU_IMG_PROG, QEMU_IO_PROG, QEMU_NBD_PROG, QSD_PROG
123             SOCKET_SCM_HELPER
124        """
125        self.python = sys.executable
126
127        def root(*names: str) -> str:
128            return os.path.join(self.build_root, *names)
129
130        arch = os.uname().machine
131        if 'ppc64' in arch:
132            arch = 'ppc64'
133
134        self.qemu_prog = os.getenv('QEMU_PROG', root(f'qemu-system-{arch}'))
135        if not os.path.exists(self.qemu_prog):
136            pattern = root('qemu-system-*')
137            try:
138                progs = sorted(glob.iglob(pattern))
139                self.qemu_prog = next(p for p in progs if isxfile(p))
140            except StopIteration:
141                sys.exit("Not found any Qemu executable binary by pattern "
142                         f"'{pattern}'")
143
144        self.qemu_img_prog = os.getenv('QEMU_IMG_PROG', root('qemu-img'))
145        self.qemu_io_prog = os.getenv('QEMU_IO_PROG', root('qemu-io'))
146        self.qemu_nbd_prog = os.getenv('QEMU_NBD_PROG', root('qemu-nbd'))
147        self.qsd_prog = os.getenv('QSD_PROG', root('storage-daemon',
148                                                   'qemu-storage-daemon'))
149
150        for b in [self.qemu_img_prog, self.qemu_io_prog, self.qemu_nbd_prog,
151                  self.qemu_prog, self.qsd_prog]:
152            if not os.path.exists(b):
153                sys.exit('No such file: ' + b)
154            if not isxfile(b):
155                sys.exit('Not executable: ' + b)
156
157        helper_path = os.path.join(self.build_iotests, 'socket_scm_helper')
158        if isxfile(helper_path):
159            self.socket_scm_helper = helper_path  # SOCKET_SCM_HELPER
160
161    def __init__(self, imgfmt: str, imgproto: str, aiomode: str,
162                 cachemode: Optional[str] = None,
163                 imgopts: Optional[str] = None,
164                 misalign: bool = False,
165                 debug: bool = False,
166                 valgrind: bool = False) -> None:
167        self.imgfmt = imgfmt
168        self.imgproto = imgproto
169        self.aiomode = aiomode
170        self.imgopts = imgopts
171        self.misalign = misalign
172        self.debug = debug
173
174        if valgrind:
175            self.valgrind_qemu = 'y'
176
177        if cachemode is None:
178            self.cachemode_is_default = 'true'
179            self.cachemode = 'writeback'
180        else:
181            self.cachemode_is_default = 'false'
182            self.cachemode = cachemode
183
184        # Initialize generic paths: build_root, build_iotests, source_iotests,
185        # which are needed to initialize some environment variables. They are
186        # used by init_*() functions as well.
187
188        if os.path.islink(sys.argv[0]):
189            # called from the build tree
190            self.source_iotests = os.path.dirname(os.readlink(sys.argv[0]))
191            self.build_iotests = os.path.dirname(os.path.abspath(sys.argv[0]))
192        else:
193            # called from the source tree
194            self.source_iotests = os.getcwd()
195            self.build_iotests = self.source_iotests
196
197        self.build_root = os.path.join(self.build_iotests, '..', '..')
198
199        self.init_directories()
200        self.init_binaries()
201
202        self.malloc_perturb_ = os.getenv('MALLOC_PERTURB_',
203                                         str(random.randrange(1, 255)))
204
205        # QEMU_OPTIONS
206        self.qemu_options = '-nodefaults -display none -accel qtest'
207        machine_map = (
208            ('arm', 'virt'),
209            ('aarch64', 'virt'),
210            ('avr', 'mega2560'),
211            ('m68k', 'virt'),
212            ('rx', 'gdbsim-r5f562n8'),
213            ('tricore', 'tricore_testboard')
214        )
215        for suffix, machine in machine_map:
216            if self.qemu_prog.endswith(f'qemu-system-{suffix}'):
217                self.qemu_options += f' -machine {machine}'
218
219        # QEMU_DEFAULT_MACHINE
220        self.qemu_default_machine = get_default_machine(self.qemu_prog)
221
222        self.qemu_img_options = os.getenv('QEMU_IMG_OPTIONS')
223        self.qemu_nbd_options = os.getenv('QEMU_NBD_OPTIONS')
224
225        is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg']
226        self.imgfmt_generic = 'true' if is_generic else 'false'
227
228        self.qemu_io_options = f'--cache {self.cachemode} --aio {self.aiomode}'
229        if self.misalign:
230            self.qemu_io_options += ' --misalign'
231
232        self.qemu_io_options_no_fmt = self.qemu_io_options
233
234        if self.imgfmt == 'luks':
235            self.imgoptssyntax = 'true'
236            self.imgkeysecret = '123456'
237            if not self.imgopts:
238                self.imgopts = 'iter-time=10'
239            elif 'iter-time=' not in self.imgopts:
240                self.imgopts += ',iter-time=10'
241        else:
242            self.imgoptssyntax = 'false'
243            self.qemu_io_options += ' -f ' + self.imgfmt
244
245        if self.imgfmt == 'vmdk':
246            if not self.imgopts:
247                self.imgopts = 'zeroed_grain=on'
248            elif 'zeroed_grain=' not in self.imgopts:
249                self.imgopts += ',zeroed_grain=on'
250
251    def close(self) -> None:
252        if self.tmp_sock_dir:
253            shutil.rmtree(self.sock_dir)
254
255    def __enter__(self) -> 'TestEnv':
256        return self
257
258    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
259        self.close()
260
261    def print_env(self) -> None:
262        template = """\
263QEMU          -- "{QEMU_PROG}" {QEMU_OPTIONS}
264QEMU_IMG      -- "{QEMU_IMG_PROG}" {QEMU_IMG_OPTIONS}
265QEMU_IO       -- "{QEMU_IO_PROG}" {QEMU_IO_OPTIONS}
266QEMU_NBD      -- "{QEMU_NBD_PROG}" {QEMU_NBD_OPTIONS}
267IMGFMT        -- {IMGFMT}{imgopts}
268IMGPROTO      -- {IMGPROTO}
269PLATFORM      -- {platform}
270TEST_DIR      -- {TEST_DIR}
271SOCK_DIR      -- {SOCK_DIR}
272SOCKET_SCM_HELPER -- {SOCKET_SCM_HELPER}"""
273
274        args = collections.defaultdict(str, self.get_env())
275
276        if 'IMGOPTS' in args:
277            args['imgopts'] = f" ({args['IMGOPTS']})"
278
279        u = os.uname()
280        args['platform'] = f'{u.sysname}/{u.machine} {u.nodename} {u.release}'
281
282        print(template.format_map(args))
283