xref: /qemu/tests/qemu-iotests/testenv.py (revision c64430d2)
1# TestEnv class to manage test environment variables.
2#
3# Copyright (c) 2020-2021 Virtuozzo International GmbH
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import os
20import sys
21import tempfile
22from pathlib import Path
23import shutil
24import collections
25import random
26import subprocess
27import glob
28from typing import List, Dict, Any, Optional, ContextManager
29
30
31def isxfile(path: str) -> bool:
32    return os.path.isfile(path) and os.access(path, os.X_OK)
33
34
35def get_default_machine(qemu_prog: str) -> str:
36    outp = subprocess.run([qemu_prog, '-machine', 'help'], check=True,
37                          universal_newlines=True,
38                          stdout=subprocess.PIPE).stdout
39
40    machines = outp.split('\n')
41    try:
42        default_machine = next(m for m in machines if m.endswith(' (default)'))
43    except StopIteration:
44        return ''
45    default_machine = default_machine.split(' ', 1)[0]
46
47    alias_suf = ' (alias of {})'.format(default_machine)
48    alias = next((m for m in machines if m.endswith(alias_suf)), None)
49    if alias is not None:
50        default_machine = alias.split(' ', 1)[0]
51
52    return default_machine
53
54
55class TestEnv(ContextManager['TestEnv']):
56    """
57    Manage system environment for running tests
58
59    The following variables are supported/provided. They are represented by
60    lower-cased TestEnv attributes.
61    """
62
63    # We store environment variables as instance attributes, and there are a
64    # lot of them. Silence pylint:
65    # pylint: disable=too-many-instance-attributes
66
67    env_variables = ['PYTHONPATH', 'TEST_DIR', 'SOCK_DIR', 'SAMPLE_IMG_DIR',
68                     'OUTPUT_DIR', 'PYTHON', 'QEMU_PROG', 'QEMU_IMG_PROG',
69                     'QEMU_IO_PROG', 'QEMU_NBD_PROG', 'QSD_PROG',
70                     'SOCKET_SCM_HELPER', 'QEMU_OPTIONS', 'QEMU_IMG_OPTIONS',
71                     'QEMU_IO_OPTIONS', 'QEMU_IO_OPTIONS_NO_FMT',
72                     'QEMU_NBD_OPTIONS', 'IMGOPTS', 'IMGFMT', 'IMGPROTO',
73                     'AIOMODE', 'CACHEMODE', 'VALGRIND_QEMU',
74                     'CACHEMODE_IS_DEFAULT', 'IMGFMT_GENERIC', 'IMGOPTSSYNTAX',
75                     'IMGKEYSECRET', 'QEMU_DEFAULT_MACHINE', 'MALLOC_PERTURB_']
76
77    def prepare_subprocess(self, args: List[str]) -> Dict[str, str]:
78        if self.debug:
79            args.append('-d')
80
81        with open(args[0], encoding="utf-8") as f:
82            try:
83                if f.readline().rstrip() == '#!/usr/bin/env python3':
84                    args.insert(0, self.python)
85            except UnicodeDecodeError:  # binary test? for future.
86                pass
87
88        os_env = os.environ.copy()
89        os_env.update(self.get_env())
90        return os_env
91
92    def get_env(self) -> Dict[str, str]:
93        env = {}
94        for v in self.env_variables:
95            val = getattr(self, v.lower(), None)
96            if val is not None:
97                env[v] = val
98
99        return env
100
101    def init_directories(self) -> None:
102        """Init directory variables:
103             PYTHONPATH
104             TEST_DIR
105             SOCK_DIR
106             SAMPLE_IMG_DIR
107             OUTPUT_DIR
108        """
109        self.pythonpath = os.getenv('PYTHONPATH')
110        if self.pythonpath:
111            self.pythonpath = self.source_iotests + os.pathsep + \
112                self.pythonpath
113        else:
114            self.pythonpath = self.source_iotests
115
116        self.test_dir = os.getenv('TEST_DIR',
117                                  os.path.join(os.getcwd(), 'scratch'))
118        Path(self.test_dir).mkdir(parents=True, exist_ok=True)
119
120        try:
121            self.sock_dir = os.environ['SOCK_DIR']
122            self.tmp_sock_dir = False
123            Path(self.test_dir).mkdir(parents=True, exist_ok=True)
124        except KeyError:
125            self.sock_dir = tempfile.mkdtemp()
126            self.tmp_sock_dir = True
127
128        self.sample_img_dir = os.getenv('SAMPLE_IMG_DIR',
129                                        os.path.join(self.source_iotests,
130                                                     'sample_images'))
131
132        self.output_dir = os.getcwd()  # OUTPUT_DIR
133
134    def init_binaries(self) -> None:
135        """Init binary path variables:
136             PYTHON (for bash tests)
137             QEMU_PROG, QEMU_IMG_PROG, QEMU_IO_PROG, QEMU_NBD_PROG, QSD_PROG
138             SOCKET_SCM_HELPER
139        """
140        self.python = sys.executable
141
142        def root(*names: str) -> str:
143            return os.path.join(self.build_root, *names)
144
145        arch = os.uname().machine
146        if 'ppc64' in arch:
147            arch = 'ppc64'
148
149        self.qemu_prog = os.getenv('QEMU_PROG', root(f'qemu-system-{arch}'))
150        if not os.path.exists(self.qemu_prog):
151            pattern = root('qemu-system-*')
152            try:
153                progs = sorted(glob.iglob(pattern))
154                self.qemu_prog = next(p for p in progs if isxfile(p))
155            except StopIteration:
156                sys.exit("Not found any Qemu executable binary by pattern "
157                         f"'{pattern}'")
158
159        self.qemu_img_prog = os.getenv('QEMU_IMG_PROG', root('qemu-img'))
160        self.qemu_io_prog = os.getenv('QEMU_IO_PROG', root('qemu-io'))
161        self.qemu_nbd_prog = os.getenv('QEMU_NBD_PROG', root('qemu-nbd'))
162        self.qsd_prog = os.getenv('QSD_PROG', root('storage-daemon',
163                                                   'qemu-storage-daemon'))
164
165        for b in [self.qemu_img_prog, self.qemu_io_prog, self.qemu_nbd_prog,
166                  self.qemu_prog, self.qsd_prog]:
167            if not os.path.exists(b):
168                sys.exit('No such file: ' + b)
169            if not isxfile(b):
170                sys.exit('Not executable: ' + b)
171
172        helper_path = os.path.join(self.build_iotests, 'socket_scm_helper')
173        if isxfile(helper_path):
174            self.socket_scm_helper = helper_path  # SOCKET_SCM_HELPER
175
176    def __init__(self, imgfmt: str, imgproto: str, aiomode: str,
177                 cachemode: Optional[str] = None,
178                 imgopts: Optional[str] = None,
179                 misalign: bool = False,
180                 debug: bool = False,
181                 valgrind: bool = False) -> None:
182        self.imgfmt = imgfmt
183        self.imgproto = imgproto
184        self.aiomode = aiomode
185        self.imgopts = imgopts
186        self.misalign = misalign
187        self.debug = debug
188
189        if valgrind:
190            self.valgrind_qemu = 'y'
191
192        if cachemode is None:
193            self.cachemode_is_default = 'true'
194            self.cachemode = 'writeback'
195        else:
196            self.cachemode_is_default = 'false'
197            self.cachemode = cachemode
198
199        # Initialize generic paths: build_root, build_iotests, source_iotests,
200        # which are needed to initialize some environment variables. They are
201        # used by init_*() functions as well.
202
203        if os.path.islink(sys.argv[0]):
204            # called from the build tree
205            self.source_iotests = os.path.dirname(os.readlink(sys.argv[0]))
206            self.build_iotests = os.path.dirname(os.path.abspath(sys.argv[0]))
207        else:
208            # called from the source tree
209            self.source_iotests = os.getcwd()
210            self.build_iotests = self.source_iotests
211
212        self.build_root = os.path.join(self.build_iotests, '..', '..')
213
214        self.init_directories()
215        self.init_binaries()
216
217        self.malloc_perturb_ = os.getenv('MALLOC_PERTURB_',
218                                         str(random.randrange(1, 255)))
219
220        # QEMU_OPTIONS
221        self.qemu_options = '-nodefaults -display none -accel qtest'
222        machine_map = (
223            ('arm', 'virt'),
224            ('aarch64', 'virt'),
225            ('avr', 'mega2560'),
226            ('m68k', 'virt'),
227            ('rx', 'gdbsim-r5f562n8'),
228            ('tricore', 'tricore_testboard')
229        )
230        for suffix, machine in machine_map:
231            if self.qemu_prog.endswith(f'qemu-system-{suffix}'):
232                self.qemu_options += f' -machine {machine}'
233
234        # QEMU_DEFAULT_MACHINE
235        self.qemu_default_machine = get_default_machine(self.qemu_prog)
236
237        self.qemu_img_options = os.getenv('QEMU_IMG_OPTIONS')
238        self.qemu_nbd_options = os.getenv('QEMU_NBD_OPTIONS')
239
240        is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg']
241        self.imgfmt_generic = 'true' if is_generic else 'false'
242
243        self.qemu_io_options = f'--cache {self.cachemode} --aio {self.aiomode}'
244        if self.misalign:
245            self.qemu_io_options += ' --misalign'
246
247        self.qemu_io_options_no_fmt = self.qemu_io_options
248
249        if self.imgfmt == 'luks':
250            self.imgoptssyntax = 'true'
251            self.imgkeysecret = '123456'
252            if not self.imgopts:
253                self.imgopts = 'iter-time=10'
254            elif 'iter-time=' not in self.imgopts:
255                self.imgopts += ',iter-time=10'
256        else:
257            self.imgoptssyntax = 'false'
258            self.qemu_io_options += ' -f ' + self.imgfmt
259
260        if self.imgfmt == 'vmdk':
261            if not self.imgopts:
262                self.imgopts = 'zeroed_grain=on'
263            elif 'zeroed_grain=' not in self.imgopts:
264                self.imgopts += ',zeroed_grain=on'
265
266    def close(self) -> None:
267        if self.tmp_sock_dir:
268            shutil.rmtree(self.sock_dir)
269
270    def __enter__(self) -> 'TestEnv':
271        return self
272
273    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
274        self.close()
275
276    def print_env(self) -> None:
277        template = """\
278QEMU          -- "{QEMU_PROG}" {QEMU_OPTIONS}
279QEMU_IMG      -- "{QEMU_IMG_PROG}" {QEMU_IMG_OPTIONS}
280QEMU_IO       -- "{QEMU_IO_PROG}" {QEMU_IO_OPTIONS}
281QEMU_NBD      -- "{QEMU_NBD_PROG}" {QEMU_NBD_OPTIONS}
282IMGFMT        -- {IMGFMT}{imgopts}
283IMGPROTO      -- {IMGPROTO}
284PLATFORM      -- {platform}
285TEST_DIR      -- {TEST_DIR}
286SOCK_DIR      -- {SOCK_DIR}
287SOCKET_SCM_HELPER -- {SOCKET_SCM_HELPER}"""
288
289        args = collections.defaultdict(str, self.get_env())
290
291        if 'IMGOPTS' in args:
292            args['imgopts'] = f" ({args['IMGOPTS']})"
293
294        u = os.uname()
295        args['platform'] = f'{u.sysname}/{u.machine} {u.nodename} {u.release}'
296
297        print(template.format_map(args))
298