1from datetime import datetime
2from contextlib import contextmanager
3from Xlib import X, Xutil, Xatom
4import Xlib
5import ewmh
6import os
7import os.path
8import re
9import select
10import selectors
11import shlex
12import subprocess
13import sys
14import textwrap
15import time
16import types
17import herbstluftwm
18
19import pytest
20
21
22BINDIR = os.path.join(os.path.abspath(os.environ['PWD']))
23
24# List of environment variables copied during hlwm process creation:
25# * LSAN_OPTIONS: needed to suppress warnings about known memory leaks
26COPY_ENV_WHITELIST = ['LSAN_OPTIONS']
27
28
29def extend_env_with_whitelist(environment):
30    """Copy some whitelisted environment variables (if set) into an existing environment"""
31    environment.update({e: os.environ[e] for e in os.environ if e in COPY_ENV_WHITELIST})
32    return environment
33
34
35class HlwmBridge(herbstluftwm.Herbstluftwm):
36
37    HC_PATH = os.path.join(BINDIR, 'herbstclient')
38    # if there is some HlwmBridge, then it is registered here:
39    INSTANCE = None
40
41    def __init__(self, display, hlwm_process):
42        herbstluftwm.Herbstluftwm.__init__(self, herbstclient=self.HC_PATH)
43        HlwmBridge.INSTANCE = self
44        self.client_procs = []
45        self.next_client_id = 0
46        self.env = {
47            'DISPLAY': display,
48        }
49        self.env = extend_env_with_whitelist(self.env)
50        self.hlwm_process = hlwm_process
51        self.hc_idle = subprocess.Popen(
52            [self.HC_PATH, '--idle', 'rule', 'here_is_.*'],
53            bufsize=1,  # line buffered
54            universal_newlines=True,
55            env=self.env,
56            stdout=subprocess.PIPE
57        )
58        # a dictionary mapping wmclasses to window ids as reported
59        # by self.hc_idle
60        self.wmclass2winid = {}
61
62    def unchecked_call(self, cmd, log_output=True, read_hlwm_output=True):
63        """call the command but do not check exit code or stderr"""
64        args = self._parse_command(cmd)
65
66        try:
67            proc = herbstluftwm.Herbstluftwm.unchecked_call(self, cmd)
68        except subprocess.TimeoutExpired:
69            self.hlwm_process.investigate_timeout('calling ' + str(args))
70
71        outcome = 'succeeded' if proc.returncode == 0 else 'failed'
72        allout = proc.stdout + proc.stderr
73        if allout:
74            if log_output:
75                print(f'Client command {args} {outcome} with output:\n{allout}')
76            else:
77                print(f'Client command {args} {outcome} with output', end='')
78                print(' (output suppressed).')
79        else:
80            print(f'Client command {args} {outcome} (no output)')
81
82        # Take this opportunity read and echo any hlwm output captured in the
83        # meantime:
84        if read_hlwm_output:
85            self.hlwm_process.read_and_echo_output()
86
87        return proc
88
89    def call_xfail(self, cmd):
90        """ Call the command, expect it to terminate with a non-zero exit code,
91        emit no output on stdout but some output on stderr. The returned
92        process handle offers a match() method that checks the stderr against a
93        given regex. """
94        proc = self.unchecked_call(cmd)
95        assert proc.returncode != 0
96        assert proc.stdout == ""
97        assert proc.stderr != ""
98
99        def f(self2, reg):
100            assert re.search(reg, self2.stderr)
101
102        proc.expect_stderr = types.MethodType(f, proc)
103        return proc
104
105    def call_xfail_no_output(self, cmd):
106        """ Call the command, expect it to terminate with a non-zero exit code
107        and emit no output on either stdout or stderr. """
108        proc = self.unchecked_call(cmd)
109        assert proc.returncode != 0
110        assert proc.stdout == ""
111        assert proc.stderr == ""
112        return proc
113
114    def get_attr(self, *attribute_path, check=True):
115        """get an attribute where the given attribute_path arguments
116        are joined with '.', so the following are equivalent:
117
118            get_attr('clients', 'focus', 'title')
119            get_attr('clients.focus', 'title')
120            get_attr('clients.focus.title')
121        """
122        attribute_path = '.'.join([str(x) for x in attribute_path])
123        return self.call(['get_attr', attribute_path]).stdout
124
125    def create_client(self, term_command='sleep infinity', position=None, title=None, keep_running=False):
126        """
127        Launch a client that will be terminated on shutdown.
128        """
129        self.next_client_id += 1
130        wmclass = 'client_{}'.format(self.next_client_id)
131        title = ['-title', str(title)] if title else []
132        geometry = ['-geometry', '50x20+0+0']
133        if position is not None:
134            x, y = position
135            geometry[1] = '50x2%+d%+d' % (x, y)
136        command = ['xterm'] + title + geometry
137        command += ['-class', wmclass, '-e', 'bash', '-c', term_command]
138
139        # enforce a hook when the window appears
140        self.call(['rule', 'once', 'class=' + wmclass, 'hook=here_is_' + wmclass])
141
142        proc = subprocess.Popen(command, env=self.env)
143        # once the window appears, the hook is fired:
144        winid = self.wait_for_window_of(wmclass)
145
146        if not keep_running:
147            # Add to list of processes to be killed on shutdown:
148            self.client_procs.append(proc)
149
150        return winid, proc
151
152    def complete(self, cmd, partial=False, position=None, evaluate_escapes=False):
153        """
154        Return a sorted list of all completions for the next argument for the
155        given command, if position=None. If position is given, then the
156        argument of the given position is completed.
157
158        Set 'partial' if some of the completions for the given command are
159        partial. If not in 'partial' mode, trailing spaces are stripped.
160
161        Set 'evaluate_escapes' if the escape sequences of completion items
162        should be evaluated. If this is set, one cannot distinguish between
163        partial and full completion results anymore.
164        """
165        args = self._parse_command(cmd)
166        if position is None:
167            position = len(args)
168        proc = self.call(['complete_shell', position] + args)
169        items = []
170        for i in proc.stdout.splitlines(False):
171            if partial:
172                items.append(i)
173            else:
174                if not i.endswith(' '):
175                    raise Exception(("completion for \"{}\" returned the partial "
176                                    + "completion item \"{}\"").format(cmd, i)
177                                    ) from None
178                else:
179                    items.append(i[0:-1])
180        # evaluate escape sequences:
181        if evaluate_escapes:
182            old_items = items
183            items = []
184            for s in old_items:
185                unescaped = shlex.split(s)
186                items.append(unescaped[0] if len(unescaped) else '')
187        return sorted(items)
188
189    def list_children_via_attr(self, object_path):
190        """
191        List the names of children of the
192        given object, using the attr command internally.
193        """
194        # regexes for list_children:
195
196        children_re = \
197            re.compile(r'[0-9]* (child|children)[\\.:]((\n  [^\n]*)*)')
198        line_re = re.compile('^  (.*)\\.$')
199        output = self.call(['attr', object_path]).stdout
200        section_match = children_re.search(output)
201        assert section_match
202        children = []
203        for i in section_match.group(2).split('\n')[1:]:
204            child_match = line_re.match(i)
205            assert child_match
206            children.append(child_match.group(1))
207        return sorted(children)
208
209    def list_children(self, object_path):
210        """
211        List the names of children of the
212        given object, using the complete_shell command.
213        """
214        if not object_path.endswith('.') and object_path != '':
215            object_path += '.'
216        items = self.complete(['object_tree', object_path],
217                              partial=True, position=1)
218        children = []
219        for i in items:
220            children.append(i.split('.')[-2])
221        return sorted(children)
222
223    def create_clients(self, num):
224        return [self.create_client()[0] for i in range(num)]
225
226    def wait_for_window_of(self, wmclass):
227        """Wait for a rule hook of the form "here_is_" + wmclass """
228        # We don't need to read the second argument of the hook and don't need
229        # to check that is indeed equals "here_is_" + wmclass. But we need to
230        # check this once we create clients simultaneously.
231        line = self.hc_idle.stdout.readline().rstrip('\n').split('\t')
232        try:
233            self.hc_idle.wait(0)
234        except subprocess.TimeoutExpired:
235            pass
236        if self.hc_idle.returncode is not None:
237            self.hlwm_process.investigate_timeout(
238                'waiting for hook triggered by client \"{}\"'.format(wmclass))
239        return line[-1]
240
241    def shutdown(self):
242        for client_proc in self.client_procs:
243            client_proc.terminate()
244            client_proc.wait(2)
245
246        self.hc_idle.terminate()
247        self.hc_idle.wait(2)
248
249    def bool(self, python_bool_var):
250        """convert a boolean variable into hlwm's string representation"""
251        return "true" if python_bool_var else "false"
252
253
254@pytest.fixture()
255def hlwm(hlwm_process, xvfb):
256    # display = ':13'
257    hlwm_bridge = HlwmBridge(xvfb.display, hlwm_process)
258    yield hlwm_bridge
259
260    # Make sure that hlwm survived:
261    hlwm_bridge.call('version')
262
263    hlwm_bridge.shutdown()
264
265
266class HlwmProcess:
267    def __init__(self, autostart_stdout_message, env, args):
268        """create a new hlwm process and wait for booting up.
269        - autostart_stdout_message is the message printed to stdout
270          that indicates that the autostart has been executed entirely
271        - env is the environment dictionary
272        - args is a list of additional command line arguments
273        """
274        self.bin_path = os.path.join(BINDIR, 'herbstluftwm')
275        self.proc = subprocess.Popen(
276            [self.bin_path, '--exit-on-xerror', '--verbose'] + args, env=env,
277            bufsize=0,  # essential for reading output with selectors!
278            stdout=subprocess.PIPE,
279            stderr=subprocess.PIPE
280        )
281
282        sel = selectors.DefaultSelector()
283        sel.register(self.proc.stdout, selectors.EVENT_READ, data=sys.stdout)
284        sel.register(self.proc.stderr, selectors.EVENT_READ, data=sys.stderr)
285        self.output_selector = sel
286
287        # Wait for marker output from wrapper script:
288        self.read_and_echo_output(until_stdout=autostart_stdout_message)
289
290    def read_and_echo_output(self, until_stdout=None, until_stderr=None, until_eof=False):
291        expect_sth = ((until_stdout or until_stderr) is not None)
292        max_wait = 15
293
294        # Track which file objects have EOFed:
295        eof_fileobjs = set()
296        fileobjs = set(k.fileobj for k in self.output_selector.get_map().values())
297
298        stderr = ''
299        stderr_bytes = bytes()
300        stdout = ''
301        stdout_bytes = bytes()
302
303        def match_found():
304            if until_stdout and (until_stdout in stdout):
305                return True
306            if until_stderr and (until_stderr in stderr):
307                return True
308            return False
309
310        started = datetime.now()
311        while (datetime.now() - started).total_seconds() < max_wait:
312            select_timeout = 1
313            # If we're not polling for a matching string (anymore), there is no
314            # need for a dampening timeout:
315            if not expect_sth or match_found():
316                select_timeout = 0
317            selected = self.output_selector.select(timeout=select_timeout)
318            for key, events in selected:
319                # Read only single byte, otherwise we might block:
320                ch = key.fileobj.read(1)
321
322                if ch == b'':
323                    eof_fileobjs.add(key.fileobj)
324
325                # Store in temporary buffer for string matching:
326                if key.fileobj == self.proc.stderr:
327                    stderr_bytes += ch
328                    if ch == b'\n':
329                        stderr += stderr_bytes.decode()
330                        # Pass it through to the real stderr:
331                        key.data.write(stderr_bytes.decode())
332                        key.data.flush()
333                        stderr_bytes = b''
334
335                if key.fileobj == self.proc.stdout:
336                    stdout_bytes += ch
337                    if ch == b'\n':
338                        stdout += stdout_bytes.decode()
339                        # Pass it through to the real stdout:
340                        key.data.write(stdout_bytes.decode())
341                        key.data.flush()
342                        stdout_bytes = b''
343
344            if until_eof:
345                # We are going to the very end, so carry on until all file
346                # objects have returned EOF:
347                if eof_fileobjs == fileobjs:
348                    break
349                else:
350                    continue
351
352            if selected != []:
353                # There is still data available, so keep reading (no matter
354                # what):
355                continue
356
357            # But stop reading if there is nothing to look for or we have
358            # already found it:
359            if not expect_sth or match_found():
360                break
361
362        # decode remaining bytes for the final match_found() check
363        if stderr_bytes != b'':
364            stderr += stderr_bytes.decode()
365            sys.stderr.write(stderr_bytes.decode())
366            sys.stderr.flush()
367        if stdout_bytes != b'':
368            stdout += stdout_bytes.decode()
369            sys.stdout.write(stdout_bytes.decode())
370            sys.stdout.flush()
371        duration = (datetime.now() - started).total_seconds()
372        if expect_sth and not match_found():
373            assert False, f'Expected string not encountered within {duration:.1f} seconds'
374
375    @contextmanager
376    def wait_stdout_match(self, match):
377        """
378        Context manager for wrapping commands that are expected to result in
379        certain output on hlwm's stdout (e.g., input events).
380
381        Warning: do not run call(...) within such a context, but only
382        unchecked_call(..., read_hlwm_output=False) instead
383        """
384        self.read_and_echo_output()
385        yield
386        self.read_and_echo_output(until_stdout=match)
387
388    @contextmanager
389    def wait_stderr_match(self, match):
390        """
391        Context manager for wrapping commands that are expected to result in
392        certain output on hlwm's stderr (e.g., input events).
393
394        Warning: do not run call(...) within such a context, but only
395        unchecked_call(..., read_hlwm_output=False) instead
396        """
397        self.read_and_echo_output()
398        yield
399        self.read_and_echo_output(until_stderr=match)
400
401    def investigate_timeout(self, reason):
402        """if some kind of client request observes a timeout, investigate the
403        herbstluftwm server process. 'reason' is best phrased using present
404        participle"""
405        self.read_and_echo_output()
406        try:
407            self.proc.wait(0)
408        except subprocess.TimeoutExpired:
409            pass
410        self.read_and_echo_output()
411        if self.proc.returncode is None:
412            raise Exception(str(reason) + " took too long"
413                            + " but hlwm still running") from None
414        else:
415            raise Exception("{} made herbstluftwm quit with exit code {}"
416                            .format(str(reason), self.proc.returncode)) from None
417
418    def shutdown(self):
419        self.proc.terminate()
420
421        # Make sure to read and echo all remaining output (esp. ASAN messages):
422        self.read_and_echo_output(until_eof=True)
423
424        if self.proc.returncode is None:
425            # only wait the process if it hasn't been cleaned up
426            # this also avoids the second exception if hlwm crashed
427            try:
428                assert self.proc.wait(2) == 0
429            except subprocess.TimeoutExpired:
430                self.proc.kill()
431                self.proc.wait(2)
432                raise Exception("herbstluftwm did not quit on sigterm"
433                                + " and had to be killed") from None
434
435
436class HcIdle:
437    def __init__(self, hlwm):
438        self.hlwm = hlwm
439        self.proc = subprocess.Popen([hlwm.HC_PATH, '--idle'],
440                                     stdout=subprocess.PIPE,
441                                     bufsize=0)
442        # we don't know how fast self.proc connects to hlwm.
443        # So we keep sending messages via hlwm util we receive some
444        number_sent = 0
445        while [] == select.select([self.proc.stdout], [], [], 0.1)[0]:
446            # while there hasn't been a message received, send something
447            number_sent += 1
448            self.hlwm.call(['emit_hook', 'hc_idle_bootup', number_sent])
449        # now that we know that self.proc is connected, we need to consume
450        # its output up to the last message we have sent
451        assert number_sent > 0
452        number_received = 0
453        while number_received < number_sent:
454            line = self.proc.stdout.readline().decode().rstrip('\n').split('\t')
455            assert line[0] == 'hc_idle_bootup'
456            number_received = int(line[1])
457
458    def hooks(self):
459        """return all hooks since the last call of this function"""
460        # collect all hooks so far, so collect all up to the following hook:
461        self.hlwm.call(['emit_hook', 'hc_idle_logging_marker'])
462        hooks = []
463        while True:
464            line = self.proc.stdout.readline().decode().rstrip('\n').split('\t')
465            if line[0] == 'hc_idle_logging_marker':
466                break
467            else:
468                hooks.append(line)
469        return hooks
470
471    def shutdown(self):
472        self.proc.terminate()
473        try:
474            self.proc.wait(2)
475        except subprocess.TimeoutExpired:
476            self.proc.kill()
477            self.proc.wait(2)
478
479
480@pytest.fixture()
481def hc_idle(hlwm):
482    hc = HcIdle(hlwm)
483
484    yield hc
485
486    hc.shutdown()
487
488
489@pytest.fixture()
490def hlwm_spawner(tmpdir):
491    """yield a function to spawn hlwm"""
492    assert xvfb is not None, 'Refusing to run tests in a non-Xvfb environment (possibly your actual X server?)'
493
494    def spawn(args=[], display=None):
495        if display is None:
496            display = os.environ['DISPLAY']
497        env = {
498            'DISPLAY': display,
499            'XDG_CONFIG_HOME': str(tmpdir),
500        }
501        env = extend_env_with_whitelist(env)
502        autostart = tmpdir / 'herbstluftwm' / 'autostart'
503        autostart.ensure()
504        autostart.write(textwrap.dedent("""
505            #!/usr/bin/env bash
506            echo "hlwm started"
507        """.lstrip('\n')))
508        autostart.chmod(0o755)
509        return HlwmProcess('hlwm started', env, args)
510    return spawn
511
512
513@pytest.fixture()
514def xvfb(request):
515    # start an Xvfb server (don't start Xephyr because
516    # Xephyr requires another runnig xserver already).
517    # also we add '-noreset' such that the server is not reset
518    # when the last client connection is closed.
519    #
520    # the optional parameter is the resolution. If you want another resolution,
521    # then annotate the test case with:
522    #
523    #    @pytest.mark.parametrize("xvfb", [(1280, 1024)], indirect=True)
524    #
525    screens = [(800, 600)]
526    if hasattr(request, 'param'):
527        screens = [request.param]
528    with MultiscreenDisplay(server='Xvfb', screens=screens, extra_args=['-noreset']) as xserver:
529        os.environ['DISPLAY'] = xserver.display
530        yield xserver
531
532
533@pytest.fixture()
534def hlwm_process(hlwm_spawner, xvfb):
535    """Set up hlwm and also check that it shuts down gently afterwards"""
536    hlwm_proc = hlwm_spawner(['--no-tag-import'], display=xvfb.display)
537
538    yield hlwm_proc
539
540    hlwm_proc.shutdown()
541
542
543@pytest.fixture(params=[0])
544def running_clients(hlwm, running_clients_num):
545    """
546    Fixture that provides a number of already running clients, as defined by a
547    "running_clients_num" test parameter.
548    """
549    return hlwm.create_clients(running_clients_num)
550
551
552@pytest.fixture()
553def x11_connection(xvfb):
554    """Connect to the given xvfb display and return a Xlib.display handle"""
555    display = None
556    attempts_left = 10
557    while display is None and attempts_left > 0:
558        try:
559            display = Xlib.display.Display(xvfb.display)
560            # the above call may result in an exception:
561            # ConnectionResetError: [Errno 104] Connection reset by peer
562            # However, the handling of this error in the above function results in a
563            # type error, see
564            # https://github.com/python-xlib/python-xlib/pull/160
565        except TypeError as msg:
566            # hence, just print the type error
567            print("!!! TypeError: %s" % msg, file=sys.stderr)
568            # wait for a moment, and then try again..
569            time.sleep(2)
570            attempts_left -= 1
571        except Xlib.error.ConnectionClosedError as msg:
572            print("!!! Xlib.error.ConnectionClosedError: %s" % msg, file=sys.stderr)
573            # maybe the display was still shutting down
574            # wait for a moment, and then try again..
575            time.sleep(2)
576            attempts_left -= 1
577    yield display
578    display.close()
579
580
581class RawImage:
582    def __init__(self, data, width, height):
583        """
584        A raw image is an array of size width*height containing
585        (r, g, b) triples, line by line
586        """
587        self.data = data
588        self.width = width
589        self.height = height
590        assert len(data) == width * height
591
592    def pixel(self, x, y):
593        """return (r, g, b) triple for a pixel"""
594        return self.data[x + self.width * y]
595
596    def color_count(self, rgb_triple):
597        count = 0
598        for pix in self.data:
599            if pix == rgb_triple:
600                count += 1
601        return count
602
603    @staticmethod
604    def rgb2string(rgb_triple):
605        return '#%02x%02x%02x' % rgb_triple
606
607
608@pytest.fixture()
609def x11(x11_connection):
610    """ Short-lived fixture for interacting with the X11 display and creating
611    clients that are automatically destroyed at the end of each test. """
612    class X11:
613        def __init__(self, x11_connection):
614            self.display = x11_connection
615            self.windows = set()
616            self.screen = self.display.screen()
617            self.root = self.screen.root
618            self.ewmh = ewmh.EWMH(self.display, self.root)
619            self.hlwm = hlwm
620
621        def window(self, winid_string):
622            """return python-xlib window wrapper for a string window id"""
623            winid_int = int(winid_string, 0)
624            return self.display.create_resource_object('window', winid_int)
625
626        def winid_str(self, window_handle):
627            return hex(window_handle.id)
628
629        def make_window_urgent(self, window):
630            """make window urgent"""
631            window.set_wm_hints(flags=Xutil.UrgencyHint)
632            self.display.sync()
633
634        def is_window_urgent(self, window):
635            """check urgency of a given window handle"""
636            hints = window.get_wm_hints()
637            if hints is None:
638                return False
639            return bool(hints.flags & Xutil.UrgencyHint)
640
641        def set_property_textlist(self, property_name, value, utf8=True, window=None):
642            """set a ascii textlist property by its string name on the root window, or any other window"""
643            if window is None:
644                window = self.root
645            prop = self.display.intern_atom(property_name)
646            bvalue = bytearray()
647            isfirst = True
648            for entry in value:
649                if isfirst:
650                    isfirst = False
651                else:
652                    bvalue.append(0)
653                bvalue += entry.encode()
654            proptype = Xatom.STRING
655            if utf8:
656                proptype = self.display.get_atom('UTF8_STRING')
657            window.change_property(prop, proptype, 8, bytes(bvalue))
658
659        def set_property_cardinal(self, property_name, value, window=None):
660            if window is None:
661                window = self.root
662            prop = self.display.intern_atom(property_name)
663            window.change_property(prop, Xatom.CARDINAL, 32, value)
664
665        def get_property(self, property_name, window=None):
666            """get a property by its string name from the root window, or any other window"""
667            if window is None:
668                window = self.root
669            prop = self.display.intern_atom(property_name)
670            resp = window.get_full_property(prop, X.AnyPropertyType)
671            return resp.value if resp is not None else None
672
673        def create_client(self, urgent=False, pid=None,
674                          geometry=(50, 50, 300, 200),  # x, y, width, height
675                          force_unmanage=False,
676                          sync_hlwm=True,
677                          wm_class=None,
678                          window_type=None,
679                          transient_for=None,
680                          pre_map=lambda wh: None,  # called before the window is mapped
681                          ):
682            w = self.root.create_window(
683                geometry[0],
684                geometry[1],
685                geometry[2],
686                geometry[3],
687                2,
688                self.screen.root_depth,
689                X.InputOutput,
690                X.CopyFromParent,
691                background_pixel=self.screen.white_pixel,
692                override_redirect=force_unmanage,
693            )
694            if wm_class is not None:
695                w.set_wm_class(wm_class[0], wm_class[1])
696
697            if transient_for is not None:
698                w.set_wm_transient_for(transient_for)
699
700            # Keep track of window for later removal:
701            self.windows.add(w)
702
703            w.set_wm_name('Some Window')
704            if urgent:
705                w.set_wm_hints(flags=Xutil.UrgencyHint)
706
707            if window_type is not None:
708                w.change_property(self.display.intern_atom('_NET_WM_WINDOW_TYPE'),
709                                  Xatom.ATOM,
710                                  32,
711                                  [self.display.intern_atom(window_type)])
712
713            if pid is not None:
714                w.change_property(self.display.intern_atom('_NET_WM_PID'),
715                                  Xatom.CARDINAL,
716                                  32,
717                                  [pid])
718
719            pre_map(w)
720            w.map()
721            self.display.sync()
722            if sync_hlwm:
723                # wait for hlwm to fully recognize it as a client
724                self.sync_with_hlwm()
725            return w, self.winid_str(w)
726
727        def screenshot(self, win_handle):
728            geom = win_handle.get_geometry()
729            attr = win_handle.get_attributes()
730            # Xlib defines AllPlanes as: ((unsigned long)~0L)
731            all_planes = 0xffffffff
732            # Maybe, the following get_image() works differently
733            # than XGetImage(), because we still need to interpret
734            # the pixel values using the colormap, whereas the man
735            # page for XGetImage() does not mention 'colormap' at all
736            raw = win_handle.get_image(0, 0, geom.width, geom.height,
737                                       X.ZPixmap, all_planes)
738            # raw.data is a array of pixel-values, which need to
739            # be interpreted using the colormap:
740            colorDict = {}
741            for pixel in raw.data:
742                colorDict[pixel] = None
743            colorPixelList = list(colorDict)
744            colorRGBList = attr.colormap.query_colors(colorPixelList)
745            for pixelval, rgbval in zip(colorPixelList, colorRGBList):
746                # Useful debug output if something blows up again (which is likely)
747                # print(f'{pixelval} -> {rgbval.red}  {rgbval.green} {rgbval.blue}')
748                colorDict[pixelval] = (rgbval.red % 256, rgbval.green % 256, rgbval.blue % 256)
749            # the image size is enlarged such that the width
750            # is a multiple of 4. Hence we remove these extra
751            # columns in the end when mapping colorDict[] over the data array
752            width_padded = geom.width
753            while width_padded % 4 != 0:
754                width_padded += 1
755            rgbvals = [colorDict[p] for idx, p in enumerate(raw.data) if idx % width_padded < geom.width]
756            return RawImage(rgbvals, geom.width, geom.height)
757
758        def decoration_screenshot(self, win_handle):
759            decoration = self.get_decoration_window(win_handle)
760            return self.screenshot(decoration)
761
762        def sync_with_hlwm(self):
763            self.display.sync()
764            # wait for hlwm to flush all events:
765            hlwm_bridge = HlwmBridge.INSTANCE
766            assert hlwm_bridge is not None, "hlwm must be running"
767            hlwm_bridge.call('true')
768
769        def get_decoration_window(self, window):
770            tree = window.query_tree()
771            if tree.root == tree.parent:
772                return None
773            else:
774                return tree.parent
775
776        def get_absolute_top_left(self, window):
777            """return the absolute (x,y) coordinate of the given window,
778            i.e. relative to the root window"""
779            x = 0
780            y = 0
781            while True:
782                # the following coordinates are only relative
783                # to the parent of window
784                geom = window.get_geometry()
785                print('Relative geometry of {} is: x={} y={} w={} h={}'.format(
786                      self.winid_str(window), geom.x, geom.y, geom.width, geom.height))
787                x += geom.x
788                y += geom.y
789                # check if the window's parent is already the root window
790                tree = window.query_tree()
791                if tree.root == tree.parent:
792                    break
793                # if it's not, continue at its parent
794                window = tree.parent
795            return (x, y)
796
797        def get_absolute_geometry(self, window):
798            """return the geometry of the window, where the top left
799            coordinate comes from get_absolute_top_left()
800            """
801            x, y = self.get_absolute_top_left(window)
802            geom = window.get_geometry()
803            geom.x = x
804            geom.y = y
805            return geom
806
807        def get_hlwm_frames(self):
808            """get list of window handles of herbstluftwm
809            frame decoration windows"""
810            cmd = ['xdotool', 'search', '--class', '_HERBST_FRAME']
811            frame_wins = subprocess.run(cmd,
812                                        stdout=subprocess.PIPE,
813                                        universal_newlines=True,
814                                        check=True)
815            res = []
816            for winid_decimal_str in frame_wins.stdout.splitlines():
817                res.append(self.window(winid_decimal_str))
818            return res
819
820        def shutdown(self):
821            # Destroy all created windows:
822            for window in self.windows:
823                window.unmap()
824                window.destroy()
825            self.display.sync()
826
827    x11_ = X11(x11_connection)
828    yield x11_
829    x11_.shutdown()
830
831
832class MultiscreenDisplay:
833    """context manager for creating a server display with multiple
834    screens. We support two xserver programs: Xephyr and Xvfb:
835
836      - Xephyr places screens side by side. Since Xephyr opens
837        one window per screen itself, this has to run inside an existing Xvfb.
838      - Xvfb makes all screens have the coordinate (0, 0)
839    """
840    def __init__(self, server='Xephyr', screens=[(800, 600)], extra_args=[]):
841        """ Creates a new x server (where 'server' is 'Xephyr' or 'Xvfb')
842        that has all the specified screens.
843        """
844        # we build up the full command tu run step by step:
845        self.full_cmd = [server, '-nolisten', 'tcp']
846
847        # pass the -screen parameters
848        # ---------------------------
849        self.screen_rects = []
850        current_x_offset = 0
851        for idx, (width, height) in enumerate(screens):
852            self.screen_rects.append([current_x_offset, 0, width, height])
853            geo = '{}x{}x8'.format(width, height)
854            if server == 'Xvfb':
855                self.full_cmd += ['-screen', str(idx), geo]
856            else:
857                self.full_cmd += ['-screen', geo]
858                # xephyr places them side by side:
859                current_x_offset += width
860        self.full_cmd += extra_args
861
862        # let the xserver find a display and write its name to a pipe
863        # -----------------------------------------------------------
864        pipe_read, pipe_write = os.pipe()
865        self.full_cmd += ['-displayfd', str(pipe_write)]
866
867        print("Running: {}".format(' '.join(self.full_cmd)))
868        self.proc = subprocess.Popen(self.full_cmd, pass_fds=[pipe_write])
869
870        # read the display number from the pipe
871        # -------------------------------------
872        display_bytes = bytes()
873        # read bytes until the newline
874        while True:
875            chunk = os.read(pipe_read, 1)
876            display_bytes += chunk
877            if len(chunk) < 1 or chunk == b'\n':
878                break
879        os.close(pipe_read)
880        os.close(pipe_write)
881        self.display = ':' + display_bytes.decode().rstrip()
882        print(server + " is using the display \"{}\"".format(self.display))
883
884    def screens(self):
885        """return a list of screen geometries, where a geometry
886        is a list [x, y, width, height]
887        """
888        return self.screen_rects
889
890    def __enter__(self):
891        return self
892
893    def __exit__(self, type_param, value, traceback):
894        self.proc.terminate()
895        self.proc.wait(5)
896
897
898@pytest.fixture()
899def keyboard():
900    class KeyBoard:
901        def press(self, key_spec):
902            subprocess.check_call(['xdotool', 'key', key_spec])
903
904        def down(self, key_spec):
905            subprocess.check_call(['xdotool', 'keydown', key_spec])
906
907        def up(self, key_spec):
908            subprocess.check_call(['xdotool', 'keyup', key_spec])
909
910    return KeyBoard()
911
912
913@pytest.fixture()
914def mouse(hlwm_process, hlwm):
915    class Mouse:
916        def __init__(self):
917            self.move_to(0, 0, wait=True)
918
919        def move_into(self, win_id, x=1, y=1, wait=True):
920            if wait:
921                with hlwm_process.wait_stderr_match('EnterNotify'):
922                    # no --sync here, because we're waiting for the EnterNotify anyways
923                    self.call_cmd(f'xdotool mousemove --window {win_id} {x} {y}', shell=True)
924                # reaching this line only means that hlwm started processing
925                # the EnterNotify. So we need to wait until the event is fully processed:
926                hlwm.call('true')
927            else:
928                self.call_cmd(f'xdotool mousemove --sync --window {win_id} {x} {y}', shell=True)
929
930        def click(self, button, into_win_id=None, wait=True):
931            if into_win_id:
932                # no need to wait for herbstluftwm to process, we're just positioning the mouse to click
933                self.move_into(into_win_id, wait=False)
934            if wait:
935                with hlwm_process.wait_stderr_match('ButtonPress'):
936                    subprocess.check_call(['xdotool', 'click', button])
937                # reaching this line only means that hlwm started processing
938                # the ButtonPress. So we need to wait until the event is fully processed:
939                hlwm.call('true')
940            else:
941                subprocess.check_call(['xdotool', 'click', button])
942
943        def move_to(self, abs_x, abs_y, wait=True):
944            abs_x = str(int(abs_x))
945            abs_y = str(int(abs_y))
946            self.call_cmd(f'xdotool mousemove --sync {abs_x} {abs_y}', shell=True)
947            if wait:
948                # wait until all the mouse move events that are now in the queue
949                # are fully processed:
950                hlwm.call('true')
951
952        def move_relative(self, delta_x, delta_y, wait=True):
953            self.call_cmd(f'xdotool mousemove_relative --sync {delta_x} {delta_y}', shell=True)
954            if wait:
955                # wait until all the mouse move events that were put in the
956                # queue by the above xdotool invokation are fully processed.
957                # (other than for the button events, the motion notify events
958                # are not printed to stderr, because this would lead to
959                # to much debug output on motions of the physical mouse)
960                hlwm.call('true')
961
962        def mouse_press(self, button, wait=True):
963            cmd = ['xdotool', 'mousedown', button]
964            if wait:
965                with hlwm_process.wait_stderr_match('ButtonPress'):
966                    subprocess.check_call(cmd)
967                # wait for the ButtonPress to be fully processed:
968                hlwm.call('true')
969            else:
970                subprocess.check_call(cmd)
971
972        def mouse_release(self, button, wait=True):
973            cmd = ['xdotool', 'mouseup', button]
974            if wait:
975                with hlwm_process.wait_stderr_match('ButtonRelease'):
976                    subprocess.check_call(cmd)
977                # wait for the ButtonRelease to be processed
978                hlwm.call('true')
979            else:
980                subprocess.check_call(cmd)
981
982        def call_cmd(self, cmd, shell=False):
983            print('calling: {}'.format(cmd), file=sys.stderr)
984            subprocess.check_call(cmd, shell=shell)
985
986    return Mouse()
987