1from datetime import datetime 2from contextlib import contextmanager 3from Xlib import X, Xutil, Xatom 4import Xlib 5import ewmh 6import os 7import os.path 8import re 9import select 10import selectors 11import shlex 12import subprocess 13import sys 14import textwrap 15import time 16import types 17import herbstluftwm 18 19import pytest 20 21 22BINDIR = os.path.join(os.path.abspath(os.environ['PWD'])) 23 24# List of environment variables copied during hlwm process creation: 25# * LSAN_OPTIONS: needed to suppress warnings about known memory leaks 26COPY_ENV_WHITELIST = ['LSAN_OPTIONS'] 27 28 29def extend_env_with_whitelist(environment): 30 """Copy some whitelisted environment variables (if set) into an existing environment""" 31 environment.update({e: os.environ[e] for e in os.environ if e in COPY_ENV_WHITELIST}) 32 return environment 33 34 35class HlwmBridge(herbstluftwm.Herbstluftwm): 36 37 HC_PATH = os.path.join(BINDIR, 'herbstclient') 38 # if there is some HlwmBridge, then it is registered here: 39 INSTANCE = None 40 41 def __init__(self, display, hlwm_process): 42 herbstluftwm.Herbstluftwm.__init__(self, herbstclient=self.HC_PATH) 43 HlwmBridge.INSTANCE = self 44 self.client_procs = [] 45 self.next_client_id = 0 46 self.env = { 47 'DISPLAY': display, 48 } 49 self.env = extend_env_with_whitelist(self.env) 50 self.hlwm_process = hlwm_process 51 self.hc_idle = subprocess.Popen( 52 [self.HC_PATH, '--idle', 'rule', 'here_is_.*'], 53 bufsize=1, # line buffered 54 universal_newlines=True, 55 env=self.env, 56 stdout=subprocess.PIPE 57 ) 58 # a dictionary mapping wmclasses to window ids as reported 59 # by self.hc_idle 60 self.wmclass2winid = {} 61 62 def unchecked_call(self, cmd, log_output=True, read_hlwm_output=True): 63 """call the command but do not check exit code or stderr""" 64 args = self._parse_command(cmd) 65 66 try: 67 proc = herbstluftwm.Herbstluftwm.unchecked_call(self, cmd) 68 except subprocess.TimeoutExpired: 69 self.hlwm_process.investigate_timeout('calling ' + str(args)) 70 71 outcome = 'succeeded' if proc.returncode == 0 else 'failed' 72 allout = proc.stdout + proc.stderr 73 if allout: 74 if log_output: 75 print(f'Client command {args} {outcome} with output:\n{allout}') 76 else: 77 print(f'Client command {args} {outcome} with output', end='') 78 print(' (output suppressed).') 79 else: 80 print(f'Client command {args} {outcome} (no output)') 81 82 # Take this opportunity read and echo any hlwm output captured in the 83 # meantime: 84 if read_hlwm_output: 85 self.hlwm_process.read_and_echo_output() 86 87 return proc 88 89 def call_xfail(self, cmd): 90 """ Call the command, expect it to terminate with a non-zero exit code, 91 emit no output on stdout but some output on stderr. The returned 92 process handle offers a match() method that checks the stderr against a 93 given regex. """ 94 proc = self.unchecked_call(cmd) 95 assert proc.returncode != 0 96 assert proc.stdout == "" 97 assert proc.stderr != "" 98 99 def f(self2, reg): 100 assert re.search(reg, self2.stderr) 101 102 proc.expect_stderr = types.MethodType(f, proc) 103 return proc 104 105 def call_xfail_no_output(self, cmd): 106 """ Call the command, expect it to terminate with a non-zero exit code 107 and emit no output on either stdout or stderr. """ 108 proc = self.unchecked_call(cmd) 109 assert proc.returncode != 0 110 assert proc.stdout == "" 111 assert proc.stderr == "" 112 return proc 113 114 def get_attr(self, *attribute_path, check=True): 115 """get an attribute where the given attribute_path arguments 116 are joined with '.', so the following are equivalent: 117 118 get_attr('clients', 'focus', 'title') 119 get_attr('clients.focus', 'title') 120 get_attr('clients.focus.title') 121 """ 122 attribute_path = '.'.join([str(x) for x in attribute_path]) 123 return self.call(['get_attr', attribute_path]).stdout 124 125 def create_client(self, term_command='sleep infinity', position=None, title=None, keep_running=False): 126 """ 127 Launch a client that will be terminated on shutdown. 128 """ 129 self.next_client_id += 1 130 wmclass = 'client_{}'.format(self.next_client_id) 131 title = ['-title', str(title)] if title else [] 132 geometry = ['-geometry', '50x20+0+0'] 133 if position is not None: 134 x, y = position 135 geometry[1] = '50x2%+d%+d' % (x, y) 136 command = ['xterm'] + title + geometry 137 command += ['-class', wmclass, '-e', 'bash', '-c', term_command] 138 139 # enforce a hook when the window appears 140 self.call(['rule', 'once', 'class=' + wmclass, 'hook=here_is_' + wmclass]) 141 142 proc = subprocess.Popen(command, env=self.env) 143 # once the window appears, the hook is fired: 144 winid = self.wait_for_window_of(wmclass) 145 146 if not keep_running: 147 # Add to list of processes to be killed on shutdown: 148 self.client_procs.append(proc) 149 150 return winid, proc 151 152 def complete(self, cmd, partial=False, position=None, evaluate_escapes=False): 153 """ 154 Return a sorted list of all completions for the next argument for the 155 given command, if position=None. If position is given, then the 156 argument of the given position is completed. 157 158 Set 'partial' if some of the completions for the given command are 159 partial. If not in 'partial' mode, trailing spaces are stripped. 160 161 Set 'evaluate_escapes' if the escape sequences of completion items 162 should be evaluated. If this is set, one cannot distinguish between 163 partial and full completion results anymore. 164 """ 165 args = self._parse_command(cmd) 166 if position is None: 167 position = len(args) 168 proc = self.call(['complete_shell', position] + args) 169 items = [] 170 for i in proc.stdout.splitlines(False): 171 if partial: 172 items.append(i) 173 else: 174 if not i.endswith(' '): 175 raise Exception(("completion for \"{}\" returned the partial " 176 + "completion item \"{}\"").format(cmd, i) 177 ) from None 178 else: 179 items.append(i[0:-1]) 180 # evaluate escape sequences: 181 if evaluate_escapes: 182 old_items = items 183 items = [] 184 for s in old_items: 185 unescaped = shlex.split(s) 186 items.append(unescaped[0] if len(unescaped) else '') 187 return sorted(items) 188 189 def list_children_via_attr(self, object_path): 190 """ 191 List the names of children of the 192 given object, using the attr command internally. 193 """ 194 # regexes for list_children: 195 196 children_re = \ 197 re.compile(r'[0-9]* (child|children)[\\.:]((\n [^\n]*)*)') 198 line_re = re.compile('^ (.*)\\.$') 199 output = self.call(['attr', object_path]).stdout 200 section_match = children_re.search(output) 201 assert section_match 202 children = [] 203 for i in section_match.group(2).split('\n')[1:]: 204 child_match = line_re.match(i) 205 assert child_match 206 children.append(child_match.group(1)) 207 return sorted(children) 208 209 def list_children(self, object_path): 210 """ 211 List the names of children of the 212 given object, using the complete_shell command. 213 """ 214 if not object_path.endswith('.') and object_path != '': 215 object_path += '.' 216 items = self.complete(['object_tree', object_path], 217 partial=True, position=1) 218 children = [] 219 for i in items: 220 children.append(i.split('.')[-2]) 221 return sorted(children) 222 223 def create_clients(self, num): 224 return [self.create_client()[0] for i in range(num)] 225 226 def wait_for_window_of(self, wmclass): 227 """Wait for a rule hook of the form "here_is_" + wmclass """ 228 # We don't need to read the second argument of the hook and don't need 229 # to check that is indeed equals "here_is_" + wmclass. But we need to 230 # check this once we create clients simultaneously. 231 line = self.hc_idle.stdout.readline().rstrip('\n').split('\t') 232 try: 233 self.hc_idle.wait(0) 234 except subprocess.TimeoutExpired: 235 pass 236 if self.hc_idle.returncode is not None: 237 self.hlwm_process.investigate_timeout( 238 'waiting for hook triggered by client \"{}\"'.format(wmclass)) 239 return line[-1] 240 241 def shutdown(self): 242 for client_proc in self.client_procs: 243 client_proc.terminate() 244 client_proc.wait(2) 245 246 self.hc_idle.terminate() 247 self.hc_idle.wait(2) 248 249 def bool(self, python_bool_var): 250 """convert a boolean variable into hlwm's string representation""" 251 return "true" if python_bool_var else "false" 252 253 254@pytest.fixture() 255def hlwm(hlwm_process, xvfb): 256 # display = ':13' 257 hlwm_bridge = HlwmBridge(xvfb.display, hlwm_process) 258 yield hlwm_bridge 259 260 # Make sure that hlwm survived: 261 hlwm_bridge.call('version') 262 263 hlwm_bridge.shutdown() 264 265 266class HlwmProcess: 267 def __init__(self, autostart_stdout_message, env, args): 268 """create a new hlwm process and wait for booting up. 269 - autostart_stdout_message is the message printed to stdout 270 that indicates that the autostart has been executed entirely 271 - env is the environment dictionary 272 - args is a list of additional command line arguments 273 """ 274 self.bin_path = os.path.join(BINDIR, 'herbstluftwm') 275 self.proc = subprocess.Popen( 276 [self.bin_path, '--exit-on-xerror', '--verbose'] + args, env=env, 277 bufsize=0, # essential for reading output with selectors! 278 stdout=subprocess.PIPE, 279 stderr=subprocess.PIPE 280 ) 281 282 sel = selectors.DefaultSelector() 283 sel.register(self.proc.stdout, selectors.EVENT_READ, data=sys.stdout) 284 sel.register(self.proc.stderr, selectors.EVENT_READ, data=sys.stderr) 285 self.output_selector = sel 286 287 # Wait for marker output from wrapper script: 288 self.read_and_echo_output(until_stdout=autostart_stdout_message) 289 290 def read_and_echo_output(self, until_stdout=None, until_stderr=None, until_eof=False): 291 expect_sth = ((until_stdout or until_stderr) is not None) 292 max_wait = 15 293 294 # Track which file objects have EOFed: 295 eof_fileobjs = set() 296 fileobjs = set(k.fileobj for k in self.output_selector.get_map().values()) 297 298 stderr = '' 299 stderr_bytes = bytes() 300 stdout = '' 301 stdout_bytes = bytes() 302 303 def match_found(): 304 if until_stdout and (until_stdout in stdout): 305 return True 306 if until_stderr and (until_stderr in stderr): 307 return True 308 return False 309 310 started = datetime.now() 311 while (datetime.now() - started).total_seconds() < max_wait: 312 select_timeout = 1 313 # If we're not polling for a matching string (anymore), there is no 314 # need for a dampening timeout: 315 if not expect_sth or match_found(): 316 select_timeout = 0 317 selected = self.output_selector.select(timeout=select_timeout) 318 for key, events in selected: 319 # Read only single byte, otherwise we might block: 320 ch = key.fileobj.read(1) 321 322 if ch == b'': 323 eof_fileobjs.add(key.fileobj) 324 325 # Store in temporary buffer for string matching: 326 if key.fileobj == self.proc.stderr: 327 stderr_bytes += ch 328 if ch == b'\n': 329 stderr += stderr_bytes.decode() 330 # Pass it through to the real stderr: 331 key.data.write(stderr_bytes.decode()) 332 key.data.flush() 333 stderr_bytes = b'' 334 335 if key.fileobj == self.proc.stdout: 336 stdout_bytes += ch 337 if ch == b'\n': 338 stdout += stdout_bytes.decode() 339 # Pass it through to the real stdout: 340 key.data.write(stdout_bytes.decode()) 341 key.data.flush() 342 stdout_bytes = b'' 343 344 if until_eof: 345 # We are going to the very end, so carry on until all file 346 # objects have returned EOF: 347 if eof_fileobjs == fileobjs: 348 break 349 else: 350 continue 351 352 if selected != []: 353 # There is still data available, so keep reading (no matter 354 # what): 355 continue 356 357 # But stop reading if there is nothing to look for or we have 358 # already found it: 359 if not expect_sth or match_found(): 360 break 361 362 # decode remaining bytes for the final match_found() check 363 if stderr_bytes != b'': 364 stderr += stderr_bytes.decode() 365 sys.stderr.write(stderr_bytes.decode()) 366 sys.stderr.flush() 367 if stdout_bytes != b'': 368 stdout += stdout_bytes.decode() 369 sys.stdout.write(stdout_bytes.decode()) 370 sys.stdout.flush() 371 duration = (datetime.now() - started).total_seconds() 372 if expect_sth and not match_found(): 373 assert False, f'Expected string not encountered within {duration:.1f} seconds' 374 375 @contextmanager 376 def wait_stdout_match(self, match): 377 """ 378 Context manager for wrapping commands that are expected to result in 379 certain output on hlwm's stdout (e.g., input events). 380 381 Warning: do not run call(...) within such a context, but only 382 unchecked_call(..., read_hlwm_output=False) instead 383 """ 384 self.read_and_echo_output() 385 yield 386 self.read_and_echo_output(until_stdout=match) 387 388 @contextmanager 389 def wait_stderr_match(self, match): 390 """ 391 Context manager for wrapping commands that are expected to result in 392 certain output on hlwm's stderr (e.g., input events). 393 394 Warning: do not run call(...) within such a context, but only 395 unchecked_call(..., read_hlwm_output=False) instead 396 """ 397 self.read_and_echo_output() 398 yield 399 self.read_and_echo_output(until_stderr=match) 400 401 def investigate_timeout(self, reason): 402 """if some kind of client request observes a timeout, investigate the 403 herbstluftwm server process. 'reason' is best phrased using present 404 participle""" 405 self.read_and_echo_output() 406 try: 407 self.proc.wait(0) 408 except subprocess.TimeoutExpired: 409 pass 410 self.read_and_echo_output() 411 if self.proc.returncode is None: 412 raise Exception(str(reason) + " took too long" 413 + " but hlwm still running") from None 414 else: 415 raise Exception("{} made herbstluftwm quit with exit code {}" 416 .format(str(reason), self.proc.returncode)) from None 417 418 def shutdown(self): 419 self.proc.terminate() 420 421 # Make sure to read and echo all remaining output (esp. ASAN messages): 422 self.read_and_echo_output(until_eof=True) 423 424 if self.proc.returncode is None: 425 # only wait the process if it hasn't been cleaned up 426 # this also avoids the second exception if hlwm crashed 427 try: 428 assert self.proc.wait(2) == 0 429 except subprocess.TimeoutExpired: 430 self.proc.kill() 431 self.proc.wait(2) 432 raise Exception("herbstluftwm did not quit on sigterm" 433 + " and had to be killed") from None 434 435 436class HcIdle: 437 def __init__(self, hlwm): 438 self.hlwm = hlwm 439 self.proc = subprocess.Popen([hlwm.HC_PATH, '--idle'], 440 stdout=subprocess.PIPE, 441 bufsize=0) 442 # we don't know how fast self.proc connects to hlwm. 443 # So we keep sending messages via hlwm util we receive some 444 number_sent = 0 445 while [] == select.select([self.proc.stdout], [], [], 0.1)[0]: 446 # while there hasn't been a message received, send something 447 number_sent += 1 448 self.hlwm.call(['emit_hook', 'hc_idle_bootup', number_sent]) 449 # now that we know that self.proc is connected, we need to consume 450 # its output up to the last message we have sent 451 assert number_sent > 0 452 number_received = 0 453 while number_received < number_sent: 454 line = self.proc.stdout.readline().decode().rstrip('\n').split('\t') 455 assert line[0] == 'hc_idle_bootup' 456 number_received = int(line[1]) 457 458 def hooks(self): 459 """return all hooks since the last call of this function""" 460 # collect all hooks so far, so collect all up to the following hook: 461 self.hlwm.call(['emit_hook', 'hc_idle_logging_marker']) 462 hooks = [] 463 while True: 464 line = self.proc.stdout.readline().decode().rstrip('\n').split('\t') 465 if line[0] == 'hc_idle_logging_marker': 466 break 467 else: 468 hooks.append(line) 469 return hooks 470 471 def shutdown(self): 472 self.proc.terminate() 473 try: 474 self.proc.wait(2) 475 except subprocess.TimeoutExpired: 476 self.proc.kill() 477 self.proc.wait(2) 478 479 480@pytest.fixture() 481def hc_idle(hlwm): 482 hc = HcIdle(hlwm) 483 484 yield hc 485 486 hc.shutdown() 487 488 489@pytest.fixture() 490def hlwm_spawner(tmpdir): 491 """yield a function to spawn hlwm""" 492 assert xvfb is not None, 'Refusing to run tests in a non-Xvfb environment (possibly your actual X server?)' 493 494 def spawn(args=[], display=None): 495 if display is None: 496 display = os.environ['DISPLAY'] 497 env = { 498 'DISPLAY': display, 499 'XDG_CONFIG_HOME': str(tmpdir), 500 } 501 env = extend_env_with_whitelist(env) 502 autostart = tmpdir / 'herbstluftwm' / 'autostart' 503 autostart.ensure() 504 autostart.write(textwrap.dedent(""" 505 #!/usr/bin/env bash 506 echo "hlwm started" 507 """.lstrip('\n'))) 508 autostart.chmod(0o755) 509 return HlwmProcess('hlwm started', env, args) 510 return spawn 511 512 513@pytest.fixture() 514def xvfb(request): 515 # start an Xvfb server (don't start Xephyr because 516 # Xephyr requires another runnig xserver already). 517 # also we add '-noreset' such that the server is not reset 518 # when the last client connection is closed. 519 # 520 # the optional parameter is the resolution. If you want another resolution, 521 # then annotate the test case with: 522 # 523 # @pytest.mark.parametrize("xvfb", [(1280, 1024)], indirect=True) 524 # 525 screens = [(800, 600)] 526 if hasattr(request, 'param'): 527 screens = [request.param] 528 with MultiscreenDisplay(server='Xvfb', screens=screens, extra_args=['-noreset']) as xserver: 529 os.environ['DISPLAY'] = xserver.display 530 yield xserver 531 532 533@pytest.fixture() 534def hlwm_process(hlwm_spawner, xvfb): 535 """Set up hlwm and also check that it shuts down gently afterwards""" 536 hlwm_proc = hlwm_spawner(['--no-tag-import'], display=xvfb.display) 537 538 yield hlwm_proc 539 540 hlwm_proc.shutdown() 541 542 543@pytest.fixture(params=[0]) 544def running_clients(hlwm, running_clients_num): 545 """ 546 Fixture that provides a number of already running clients, as defined by a 547 "running_clients_num" test parameter. 548 """ 549 return hlwm.create_clients(running_clients_num) 550 551 552@pytest.fixture() 553def x11_connection(xvfb): 554 """Connect to the given xvfb display and return a Xlib.display handle""" 555 display = None 556 attempts_left = 10 557 while display is None and attempts_left > 0: 558 try: 559 display = Xlib.display.Display(xvfb.display) 560 # the above call may result in an exception: 561 # ConnectionResetError: [Errno 104] Connection reset by peer 562 # However, the handling of this error in the above function results in a 563 # type error, see 564 # https://github.com/python-xlib/python-xlib/pull/160 565 except TypeError as msg: 566 # hence, just print the type error 567 print("!!! TypeError: %s" % msg, file=sys.stderr) 568 # wait for a moment, and then try again.. 569 time.sleep(2) 570 attempts_left -= 1 571 except Xlib.error.ConnectionClosedError as msg: 572 print("!!! Xlib.error.ConnectionClosedError: %s" % msg, file=sys.stderr) 573 # maybe the display was still shutting down 574 # wait for a moment, and then try again.. 575 time.sleep(2) 576 attempts_left -= 1 577 yield display 578 display.close() 579 580 581class RawImage: 582 def __init__(self, data, width, height): 583 """ 584 A raw image is an array of size width*height containing 585 (r, g, b) triples, line by line 586 """ 587 self.data = data 588 self.width = width 589 self.height = height 590 assert len(data) == width * height 591 592 def pixel(self, x, y): 593 """return (r, g, b) triple for a pixel""" 594 return self.data[x + self.width * y] 595 596 def color_count(self, rgb_triple): 597 count = 0 598 for pix in self.data: 599 if pix == rgb_triple: 600 count += 1 601 return count 602 603 @staticmethod 604 def rgb2string(rgb_triple): 605 return '#%02x%02x%02x' % rgb_triple 606 607 608@pytest.fixture() 609def x11(x11_connection): 610 """ Short-lived fixture for interacting with the X11 display and creating 611 clients that are automatically destroyed at the end of each test. """ 612 class X11: 613 def __init__(self, x11_connection): 614 self.display = x11_connection 615 self.windows = set() 616 self.screen = self.display.screen() 617 self.root = self.screen.root 618 self.ewmh = ewmh.EWMH(self.display, self.root) 619 self.hlwm = hlwm 620 621 def window(self, winid_string): 622 """return python-xlib window wrapper for a string window id""" 623 winid_int = int(winid_string, 0) 624 return self.display.create_resource_object('window', winid_int) 625 626 def winid_str(self, window_handle): 627 return hex(window_handle.id) 628 629 def make_window_urgent(self, window): 630 """make window urgent""" 631 window.set_wm_hints(flags=Xutil.UrgencyHint) 632 self.display.sync() 633 634 def is_window_urgent(self, window): 635 """check urgency of a given window handle""" 636 hints = window.get_wm_hints() 637 if hints is None: 638 return False 639 return bool(hints.flags & Xutil.UrgencyHint) 640 641 def set_property_textlist(self, property_name, value, utf8=True, window=None): 642 """set a ascii textlist property by its string name on the root window, or any other window""" 643 if window is None: 644 window = self.root 645 prop = self.display.intern_atom(property_name) 646 bvalue = bytearray() 647 isfirst = True 648 for entry in value: 649 if isfirst: 650 isfirst = False 651 else: 652 bvalue.append(0) 653 bvalue += entry.encode() 654 proptype = Xatom.STRING 655 if utf8: 656 proptype = self.display.get_atom('UTF8_STRING') 657 window.change_property(prop, proptype, 8, bytes(bvalue)) 658 659 def set_property_cardinal(self, property_name, value, window=None): 660 if window is None: 661 window = self.root 662 prop = self.display.intern_atom(property_name) 663 window.change_property(prop, Xatom.CARDINAL, 32, value) 664 665 def get_property(self, property_name, window=None): 666 """get a property by its string name from the root window, or any other window""" 667 if window is None: 668 window = self.root 669 prop = self.display.intern_atom(property_name) 670 resp = window.get_full_property(prop, X.AnyPropertyType) 671 return resp.value if resp is not None else None 672 673 def create_client(self, urgent=False, pid=None, 674 geometry=(50, 50, 300, 200), # x, y, width, height 675 force_unmanage=False, 676 sync_hlwm=True, 677 wm_class=None, 678 window_type=None, 679 transient_for=None, 680 pre_map=lambda wh: None, # called before the window is mapped 681 ): 682 w = self.root.create_window( 683 geometry[0], 684 geometry[1], 685 geometry[2], 686 geometry[3], 687 2, 688 self.screen.root_depth, 689 X.InputOutput, 690 X.CopyFromParent, 691 background_pixel=self.screen.white_pixel, 692 override_redirect=force_unmanage, 693 ) 694 if wm_class is not None: 695 w.set_wm_class(wm_class[0], wm_class[1]) 696 697 if transient_for is not None: 698 w.set_wm_transient_for(transient_for) 699 700 # Keep track of window for later removal: 701 self.windows.add(w) 702 703 w.set_wm_name('Some Window') 704 if urgent: 705 w.set_wm_hints(flags=Xutil.UrgencyHint) 706 707 if window_type is not None: 708 w.change_property(self.display.intern_atom('_NET_WM_WINDOW_TYPE'), 709 Xatom.ATOM, 710 32, 711 [self.display.intern_atom(window_type)]) 712 713 if pid is not None: 714 w.change_property(self.display.intern_atom('_NET_WM_PID'), 715 Xatom.CARDINAL, 716 32, 717 [pid]) 718 719 pre_map(w) 720 w.map() 721 self.display.sync() 722 if sync_hlwm: 723 # wait for hlwm to fully recognize it as a client 724 self.sync_with_hlwm() 725 return w, self.winid_str(w) 726 727 def screenshot(self, win_handle): 728 geom = win_handle.get_geometry() 729 attr = win_handle.get_attributes() 730 # Xlib defines AllPlanes as: ((unsigned long)~0L) 731 all_planes = 0xffffffff 732 # Maybe, the following get_image() works differently 733 # than XGetImage(), because we still need to interpret 734 # the pixel values using the colormap, whereas the man 735 # page for XGetImage() does not mention 'colormap' at all 736 raw = win_handle.get_image(0, 0, geom.width, geom.height, 737 X.ZPixmap, all_planes) 738 # raw.data is a array of pixel-values, which need to 739 # be interpreted using the colormap: 740 colorDict = {} 741 for pixel in raw.data: 742 colorDict[pixel] = None 743 colorPixelList = list(colorDict) 744 colorRGBList = attr.colormap.query_colors(colorPixelList) 745 for pixelval, rgbval in zip(colorPixelList, colorRGBList): 746 # Useful debug output if something blows up again (which is likely) 747 # print(f'{pixelval} -> {rgbval.red} {rgbval.green} {rgbval.blue}') 748 colorDict[pixelval] = (rgbval.red % 256, rgbval.green % 256, rgbval.blue % 256) 749 # the image size is enlarged such that the width 750 # is a multiple of 4. Hence we remove these extra 751 # columns in the end when mapping colorDict[] over the data array 752 width_padded = geom.width 753 while width_padded % 4 != 0: 754 width_padded += 1 755 rgbvals = [colorDict[p] for idx, p in enumerate(raw.data) if idx % width_padded < geom.width] 756 return RawImage(rgbvals, geom.width, geom.height) 757 758 def decoration_screenshot(self, win_handle): 759 decoration = self.get_decoration_window(win_handle) 760 return self.screenshot(decoration) 761 762 def sync_with_hlwm(self): 763 self.display.sync() 764 # wait for hlwm to flush all events: 765 hlwm_bridge = HlwmBridge.INSTANCE 766 assert hlwm_bridge is not None, "hlwm must be running" 767 hlwm_bridge.call('true') 768 769 def get_decoration_window(self, window): 770 tree = window.query_tree() 771 if tree.root == tree.parent: 772 return None 773 else: 774 return tree.parent 775 776 def get_absolute_top_left(self, window): 777 """return the absolute (x,y) coordinate of the given window, 778 i.e. relative to the root window""" 779 x = 0 780 y = 0 781 while True: 782 # the following coordinates are only relative 783 # to the parent of window 784 geom = window.get_geometry() 785 print('Relative geometry of {} is: x={} y={} w={} h={}'.format( 786 self.winid_str(window), geom.x, geom.y, geom.width, geom.height)) 787 x += geom.x 788 y += geom.y 789 # check if the window's parent is already the root window 790 tree = window.query_tree() 791 if tree.root == tree.parent: 792 break 793 # if it's not, continue at its parent 794 window = tree.parent 795 return (x, y) 796 797 def get_absolute_geometry(self, window): 798 """return the geometry of the window, where the top left 799 coordinate comes from get_absolute_top_left() 800 """ 801 x, y = self.get_absolute_top_left(window) 802 geom = window.get_geometry() 803 geom.x = x 804 geom.y = y 805 return geom 806 807 def get_hlwm_frames(self): 808 """get list of window handles of herbstluftwm 809 frame decoration windows""" 810 cmd = ['xdotool', 'search', '--class', '_HERBST_FRAME'] 811 frame_wins = subprocess.run(cmd, 812 stdout=subprocess.PIPE, 813 universal_newlines=True, 814 check=True) 815 res = [] 816 for winid_decimal_str in frame_wins.stdout.splitlines(): 817 res.append(self.window(winid_decimal_str)) 818 return res 819 820 def shutdown(self): 821 # Destroy all created windows: 822 for window in self.windows: 823 window.unmap() 824 window.destroy() 825 self.display.sync() 826 827 x11_ = X11(x11_connection) 828 yield x11_ 829 x11_.shutdown() 830 831 832class MultiscreenDisplay: 833 """context manager for creating a server display with multiple 834 screens. We support two xserver programs: Xephyr and Xvfb: 835 836 - Xephyr places screens side by side. Since Xephyr opens 837 one window per screen itself, this has to run inside an existing Xvfb. 838 - Xvfb makes all screens have the coordinate (0, 0) 839 """ 840 def __init__(self, server='Xephyr', screens=[(800, 600)], extra_args=[]): 841 """ Creates a new x server (where 'server' is 'Xephyr' or 'Xvfb') 842 that has all the specified screens. 843 """ 844 # we build up the full command tu run step by step: 845 self.full_cmd = [server, '-nolisten', 'tcp'] 846 847 # pass the -screen parameters 848 # --------------------------- 849 self.screen_rects = [] 850 current_x_offset = 0 851 for idx, (width, height) in enumerate(screens): 852 self.screen_rects.append([current_x_offset, 0, width, height]) 853 geo = '{}x{}x8'.format(width, height) 854 if server == 'Xvfb': 855 self.full_cmd += ['-screen', str(idx), geo] 856 else: 857 self.full_cmd += ['-screen', geo] 858 # xephyr places them side by side: 859 current_x_offset += width 860 self.full_cmd += extra_args 861 862 # let the xserver find a display and write its name to a pipe 863 # ----------------------------------------------------------- 864 pipe_read, pipe_write = os.pipe() 865 self.full_cmd += ['-displayfd', str(pipe_write)] 866 867 print("Running: {}".format(' '.join(self.full_cmd))) 868 self.proc = subprocess.Popen(self.full_cmd, pass_fds=[pipe_write]) 869 870 # read the display number from the pipe 871 # ------------------------------------- 872 display_bytes = bytes() 873 # read bytes until the newline 874 while True: 875 chunk = os.read(pipe_read, 1) 876 display_bytes += chunk 877 if len(chunk) < 1 or chunk == b'\n': 878 break 879 os.close(pipe_read) 880 os.close(pipe_write) 881 self.display = ':' + display_bytes.decode().rstrip() 882 print(server + " is using the display \"{}\"".format(self.display)) 883 884 def screens(self): 885 """return a list of screen geometries, where a geometry 886 is a list [x, y, width, height] 887 """ 888 return self.screen_rects 889 890 def __enter__(self): 891 return self 892 893 def __exit__(self, type_param, value, traceback): 894 self.proc.terminate() 895 self.proc.wait(5) 896 897 898@pytest.fixture() 899def keyboard(): 900 class KeyBoard: 901 def press(self, key_spec): 902 subprocess.check_call(['xdotool', 'key', key_spec]) 903 904 def down(self, key_spec): 905 subprocess.check_call(['xdotool', 'keydown', key_spec]) 906 907 def up(self, key_spec): 908 subprocess.check_call(['xdotool', 'keyup', key_spec]) 909 910 return KeyBoard() 911 912 913@pytest.fixture() 914def mouse(hlwm_process, hlwm): 915 class Mouse: 916 def __init__(self): 917 self.move_to(0, 0, wait=True) 918 919 def move_into(self, win_id, x=1, y=1, wait=True): 920 if wait: 921 with hlwm_process.wait_stderr_match('EnterNotify'): 922 # no --sync here, because we're waiting for the EnterNotify anyways 923 self.call_cmd(f'xdotool mousemove --window {win_id} {x} {y}', shell=True) 924 # reaching this line only means that hlwm started processing 925 # the EnterNotify. So we need to wait until the event is fully processed: 926 hlwm.call('true') 927 else: 928 self.call_cmd(f'xdotool mousemove --sync --window {win_id} {x} {y}', shell=True) 929 930 def click(self, button, into_win_id=None, wait=True): 931 if into_win_id: 932 # no need to wait for herbstluftwm to process, we're just positioning the mouse to click 933 self.move_into(into_win_id, wait=False) 934 if wait: 935 with hlwm_process.wait_stderr_match('ButtonPress'): 936 subprocess.check_call(['xdotool', 'click', button]) 937 # reaching this line only means that hlwm started processing 938 # the ButtonPress. So we need to wait until the event is fully processed: 939 hlwm.call('true') 940 else: 941 subprocess.check_call(['xdotool', 'click', button]) 942 943 def move_to(self, abs_x, abs_y, wait=True): 944 abs_x = str(int(abs_x)) 945 abs_y = str(int(abs_y)) 946 self.call_cmd(f'xdotool mousemove --sync {abs_x} {abs_y}', shell=True) 947 if wait: 948 # wait until all the mouse move events that are now in the queue 949 # are fully processed: 950 hlwm.call('true') 951 952 def move_relative(self, delta_x, delta_y, wait=True): 953 self.call_cmd(f'xdotool mousemove_relative --sync {delta_x} {delta_y}', shell=True) 954 if wait: 955 # wait until all the mouse move events that were put in the 956 # queue by the above xdotool invokation are fully processed. 957 # (other than for the button events, the motion notify events 958 # are not printed to stderr, because this would lead to 959 # to much debug output on motions of the physical mouse) 960 hlwm.call('true') 961 962 def mouse_press(self, button, wait=True): 963 cmd = ['xdotool', 'mousedown', button] 964 if wait: 965 with hlwm_process.wait_stderr_match('ButtonPress'): 966 subprocess.check_call(cmd) 967 # wait for the ButtonPress to be fully processed: 968 hlwm.call('true') 969 else: 970 subprocess.check_call(cmd) 971 972 def mouse_release(self, button, wait=True): 973 cmd = ['xdotool', 'mouseup', button] 974 if wait: 975 with hlwm_process.wait_stderr_match('ButtonRelease'): 976 subprocess.check_call(cmd) 977 # wait for the ButtonRelease to be processed 978 hlwm.call('true') 979 else: 980 subprocess.check_call(cmd) 981 982 def call_cmd(self, cmd, shell=False): 983 print('calling: {}'.format(cmd), file=sys.stderr) 984 subprocess.check_call(cmd, shell=shell) 985 986 return Mouse() 987