1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# This file is part of Xpra. 4# Copyright (C) 2013-2021 Antoine Martin <antoine@xpra.org> 5# Xpra is released under the terms of the GNU GPL v2, or, at your option, any 6# later version. See the file COPYING for details. 7 8import re 9import os 10import sys 11import signal 12import uuid 13import time 14import struct 15import binascii 16import threading 17 18SIGNAMES = {} 19for signame in (sig for sig in dir(signal) if sig.startswith("SIG") and not sig.startswith("SIG_")): 20 SIGNAMES[getattr(signal, signame)] = signame 21 22 23WIN32 = sys.platform.startswith("win") 24OSX = sys.platform.startswith("darwin") 25LINUX = sys.platform.startswith("linux") 26NETBSD = sys.platform.startswith("netbsd") 27OPENBSD = sys.platform.startswith("openbsd") 28FREEBSD = sys.platform.startswith("freebsd") 29DRAGONFLY = sys.platform.startswith("dragonfly") 30 31POSIX = os.name=="posix" 32 33BITS = struct.calcsize(b"P")*8 34 35 36main_thread = threading.current_thread() 37def is_main_thread(): 38 return threading.current_thread()==main_thread 39 40 41def get_frame_info(ignore_threads=()): 42 info = { 43 "count" : threading.active_count() - len(ignore_threads), 44 } 45 try: 46 import traceback 47 def nn(x): 48 if x is None: 49 return "" 50 return str(x) 51 thread_ident = {} 52 for t in threading.enumerate(): 53 if t not in ignore_threads: 54 thread_ident[t.ident] = t.getName() 55 else: 56 thread_ident[t.ident] = None 57 thread_ident.update({ 58 threading.current_thread().ident : "info", 59 main_thread.ident : "main", 60 }) 61 frames = sys._current_frames() #pylint: disable=protected-access 62 stack = None 63 for i,frame_pair in enumerate(frames.items()): 64 stack = traceback.extract_stack(frame_pair[1]) 65 tident = thread_ident.get(frame_pair[0], "unknown") 66 if tident is None: 67 continue 68 #sanitize stack to prevent None values (which cause encoding errors with the bencoder) 69 sanestack = [] 70 for e in stack: 71 sanestack.append(tuple([nn(x) for x in e])) 72 info[i] = { 73 "" : tident, 74 "stack" : sanestack, 75 } 76 del frames, stack 77 except Exception as e: 78 get_util_logger().error("failed to get frame info: %s", e) 79 return info 80 81def get_info_env(): 82 filtered_env = os.environ.copy() 83 if filtered_env.get('XPRA_PASSWORD'): 84 filtered_env['XPRA_PASSWORD'] = "*****" 85 if filtered_env.get('XPRA_ENCRYPTION_KEY'): 86 filtered_env['XPRA_ENCRYPTION_KEY'] = "*****" 87 return filtered_env 88 89def get_sysconfig_info(): 90 import sysconfig 91 sysinfo = {} 92 log = get_util_logger() 93 for attr in ( 94 "platform", 95 "python-version", 96 "config-vars", 97 "paths", 98 ): 99 fn = "get_%s" % attr.replace("-", "_") 100 getter = getattr(sysconfig, fn, None) 101 if getter: 102 try: 103 sysinfo[attr] = getter() #pylint: disable=not-callable 104 except ModuleNotFoundError: 105 log("sysconfig.%s", fn, exc_info=True) 106 if attr=="config-vars" and WIN32: 107 continue 108 log.warn("Warning: failed to collect %s sysconfig information", attr) 109 except Exception: 110 log.error("Error calling sysconfig.%s", fn, exc_info=True) 111 return sysinfo 112 113def strtobytes(x) -> bytes: 114 if isinstance(x, bytes): 115 return x 116 return str(x).encode("latin1") 117def bytestostr(x) -> str: 118 if isinstance(x, (bytes, bytearray)): 119 return x.decode("latin1") 120 return str(x) 121def hexstr(v) -> str: 122 return bytestostr(binascii.hexlify(strtobytes(v))) 123 124 125util_logger = None 126def get_util_logger(): 127 global util_logger 128 if not util_logger: 129 from xpra.log import Logger 130 util_logger = Logger("util") 131 return util_logger 132 133def memoryview_to_bytes(v) -> bytes: 134 if isinstance(v, bytes): 135 return v 136 if isinstance(v, memoryview): 137 return v.tobytes() 138 if isinstance(v, bytearray): 139 return bytes(v) 140 return strtobytes(v) 141 142 143def set_proc_title(title): 144 try: 145 import setproctitle #pylint: disable=import-outside-toplevel 146 setproctitle.setproctitle(title) #@UndefinedVariable pylint: disable=c-extension-no-member 147 except ImportError as e: 148 get_util_logger().debug("setproctitle is not installed: %s", e) 149 150 151def getuid() -> int: 152 if POSIX: 153 return os.getuid() 154 return 0 155 156def getgid() -> int: 157 if POSIX: 158 return os.getgid() 159 return 0 160 161def get_shell_for_uid(uid) -> str: 162 if POSIX: 163 from pwd import getpwuid 164 try: 165 return getpwuid(uid).pw_shell 166 except KeyError: 167 pass 168 return "" 169 170def get_username_for_uid(uid) -> str: 171 if POSIX: 172 from pwd import getpwuid 173 try: 174 return getpwuid(uid).pw_name 175 except KeyError: 176 pass 177 return "" 178 179def get_home_for_uid(uid) -> str: 180 if POSIX: 181 from pwd import getpwuid 182 try: 183 return getpwuid(uid).pw_dir 184 except KeyError: 185 pass 186 return "" 187 188def get_groups(username): 189 if POSIX: 190 import grp #@UnresolvedImport 191 return [gr.gr_name for gr in grp.getgrall() if username in gr.gr_mem] 192 return [] 193 194def get_group_id(group) -> int: 195 try: 196 import grp #@UnresolvedImport 197 gr = grp.getgrnam(group) 198 return gr.gr_gid 199 except (ImportError, KeyError): 200 return -1 201 202 203def platform_release(release): 204 if OSX: 205 SYSTEMVERSION_PLIST = "/System/Library/CoreServices/SystemVersion.plist" 206 try: 207 import plistlib 208 with open(SYSTEMVERSION_PLIST, "rb") as f: 209 pl = plistlib.load(f) #@UndefinedVariable 210 return pl['ProductUserVisibleVersion'] 211 except Exception as e: 212 get_util_logger().debug("platform_release(%s)", release, exc_info=True) 213 get_util_logger().warn("Warning: failed to get release information") 214 get_util_logger().warn(" from '%s':", SYSTEMVERSION_PLIST) 215 get_util_logger().warn(" %s", e) 216 return release 217 218 219def platform_name(sys_platform=sys.platform, release=None) -> str: 220 if not sys_platform: 221 return "unknown" 222 PLATFORMS = {"win32" : "Microsoft Windows", 223 "cygwin" : "Windows/Cygwin", 224 "linux.*" : "Linux", 225 "darwin" : "Mac OS X", 226 "freebsd.*": "FreeBSD", 227 "os2" : "OS/2", 228 } 229 def rel(v): 230 values = [v] 231 if isinstance(release, (tuple, list)): 232 values += list(release) 233 else: 234 values.append(release) 235 return " ".join([str(x) for x in values if x]) 236 for k,v in PLATFORMS.items(): 237 regexp = re.compile(k) 238 if regexp.match(sys_platform): 239 return rel(v) 240 return rel(sys_platform) 241 242 243def get_rand_chars(l=16, chars=b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") -> bytes: 244 import random 245 return b"".join(chars[random.randint(0, len(chars)-1):][:1] for _ in range(l)) 246 247def get_hex_uuid() -> str: 248 return uuid.uuid4().hex 249 250def get_int_uuid() -> int: 251 return uuid.uuid4().int 252 253def get_machine_id() -> str: 254 """ 255 Try to get uuid string which uniquely identifies this machine. 256 Warning: only works on posix! 257 (which is ok since we only used it on posix at present) 258 """ 259 v = "" 260 if POSIX: 261 for filename in ("/etc/machine-id", "/var/lib/dbus/machine-id"): 262 v = load_binary_file(filename) 263 if v is not None: 264 break 265 elif WIN32: 266 v = uuid.getnode() 267 return bytestostr(v).strip("\n\r") 268 269def get_user_uuid() -> str: 270 """ 271 Try to generate a uuid string which is unique to this user. 272 (relies on get_machine_id to uniquely identify a machine) 273 """ 274 user_uuid = os.environ.get("XPRA_USER_UUID") 275 if user_uuid: 276 return user_uuid 277 import hashlib 278 u = hashlib.sha256() 279 def uupdate(ustr): 280 u.update(ustr.encode("utf-8")) 281 uupdate(get_machine_id()) 282 if POSIX: 283 uupdate(u"/") 284 uupdate(str(os.getuid())) 285 uupdate(u"/") 286 uupdate(str(os.getgid())) 287 uupdate(os.path.expanduser("~/")) 288 return u.hexdigest() 289 290 291def is_X11() -> bool: 292 if OSX or WIN32: 293 return False 294 try: 295 from xpra.x11.gtk3.gdk_bindings import is_X11_Display #@UnresolvedImport 296 return is_X11_Display() 297 except ImportError: 298 get_util_logger().debug("failed to load x11 bindings", exc_info=True) 299 return True 300 301def restore_script_env(env): 302 # On OSX PythonExecWrapper sets various env vars to point into the bundle 303 # and records the original variable contents. Here we revert them back 304 # to their original state in case any of those changes cause problems 305 # when running a command. 306 if "_PYTHON_WRAPPER_VARS" in env: 307 for v in env["_PYTHON_WRAPPER_VARS"].split(): 308 origv = "_" + v 309 if origv in env: 310 env[v] = env[origv] 311 elif v in env: 312 del[v] 313 del env[origv] 314 del env["_PYTHON_WRAPPER_VARS"] 315 return env 316 317 318_saved_env = os.environ.copy() 319def get_saved_env(): 320 return _saved_env.copy() 321 322def get_saved_env_var(var, default=None): 323 return _saved_env.get(var, default) 324 325def is_Wayland() -> bool: 326 return _is_Wayland(_saved_env) 327 328def _is_Wayland(env : dict) -> bool: 329 backend = env.get("GDK_BACKEND", "") 330 if backend=="wayland": 331 return True 332 return backend!="x11" and ( 333 bool(env.get("WAYLAND_DISPLAY")) or env.get("XDG_SESSION_TYPE")=="wayland" 334 ) 335 336 337def is_distribution_variant(variant=b"Debian") -> bool: 338 if not POSIX: 339 return False 340 try: 341 v = load_os_release_file() 342 return any(l.find(variant)>=0 for l in v.splitlines() if l.startswith(b"NAME=")) 343 except Exception: 344 pass 345 try: 346 d = get_linux_distribution()[0] 347 if d==variant: 348 return True 349 if variant==b"RedHat" and d.startswith(variant): 350 return True 351 except Exception: 352 pass 353 return False 354 355def get_distribution_version_id() -> str: 356 if not POSIX: 357 return "" 358 try: 359 v = load_os_release_file() 360 for line in v.splitlines(): 361 l = line.decode() 362 if l.startswith("VERSION_ID="): 363 return l.split("=", 1)[1].strip('"') 364 except Exception: 365 pass 366 return "" 367 368os_release_file_data = False 369def load_os_release_file() -> bytes: 370 global os_release_file_data 371 if os_release_file_data is False: 372 try: 373 os_release_file_data = load_binary_file("/etc/os-release") 374 except OSError: # pragma: no cover 375 os_release_file_data = None 376 return os_release_file_data 377 378def is_Ubuntu() -> bool: 379 return is_distribution_variant(b"Ubuntu") 380 381def is_Debian() -> bool: 382 return is_distribution_variant(b"Debian") 383 384def is_Raspbian() -> bool: 385 return is_distribution_variant(b"Raspbian") 386 387def is_Fedora() -> bool: 388 return is_distribution_variant(b"Fedora") 389 390def is_Arch() -> bool: 391 return is_distribution_variant(b"Arch") 392 393def is_CentOS() -> bool: 394 return is_distribution_variant(b"CentOS") 395 396def is_RedHat() -> bool: 397 return is_distribution_variant(b"RedHat") 398 399 400def is_arm() -> bool: 401 import platform 402 return platform.uname()[4].startswith("arm") 403 404 405_linux_distribution = None 406def get_linux_distribution(): 407 global _linux_distribution 408 if LINUX and not _linux_distribution: 409 #linux_distribution is deprecated in Python 3.5 and it causes warnings, 410 #so use our own code first: 411 import subprocess 412 cmd = ["lsb_release", "-a"] 413 try: 414 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 415 out = p.communicate()[0] 416 assert p.returncode==0 and out 417 except Exception: 418 try: 419 import platform 420 _linux_distribution = platform.linux_distribution() #@UndefinedVariable, pylint: disable=deprecated-method, no-member 421 except Exception: 422 _linux_distribution = ("unknown", "unknown", "unknown") 423 else: 424 d = {} 425 for line in strtobytes(out).splitlines(): 426 line = bytestostr(line) 427 parts = line.rstrip("\n\r").split(":", 1) 428 if len(parts)==2: 429 d[parts[0].lower().replace(" ", "_")] = parts[1].strip() 430 v = [d.get(x) for x in ("distributor_id", "release", "codename")] 431 if None not in v: 432 return tuple([bytestostr(x) for x in v]) 433 return _linux_distribution 434 435def is_unity() -> bool: 436 d = os.environ.get("XDG_CURRENT_DESKTOP", "").lower() 437 return d.find("unity")>=0 or d.find("ubuntu")>=0 438 439def is_gnome() -> bool: 440 if os.environ.get("XDG_SESSION_DESKTOP", "").split("-", 1)[0] in ("i3", "ubuntu", ): 441 #"i3-gnome" is not really gnome... ie: the systray does work! 442 return False 443 return os.environ.get("XDG_CURRENT_DESKTOP", "").lower().find("gnome")>=0 444 445def is_kde() -> bool: 446 return os.environ.get("XDG_CURRENT_DESKTOP", "").lower().find("kde")>=0 447 448 449def get_loaded_kernel_modules(*modlist): 450 loaded = [] 451 if LINUX and os.path.exists("/sys/module"): 452 for mod in modlist: 453 if os.path.exists("/sys/module/%s" % mod): # pragma: no cover 454 loaded.append(mod) 455 return loaded 456 457 458def is_WSL() -> bool: 459 if not POSIX: 460 return False 461 r = None 462 for f in ("/proc/sys/kernel/osrelease", "/proc/version"): 463 r = load_binary_file(f) 464 if r: 465 break 466 return r is not None and r.find(b"Microsoft")>=0 467 468 469def get_generic_os_name() -> str: 470 return do_get_generic_os_name().lower() 471 472def do_get_generic_os_name() -> str: 473 for k,v in { 474 "linux" : "Linux", 475 "darwin" : "MacOS", 476 "win" : "Win32", 477 "freebsd" : "FreeBSD", 478 }.items(): 479 if sys.platform.startswith(k): 480 return v 481 return sys.platform # pragma: no cover 482 483 484def filedata_nocrlf(filename) -> str: 485 v = load_binary_file(filename) 486 if v is None: 487 get_util_logger().error("failed to load '%s'", filename) 488 return None 489 return v.strip(b"\n\r") 490 491def load_binary_file(filename) -> bytes: 492 if not os.path.exists(filename): 493 return None 494 try: 495 with open(filename, "rb") as f: 496 return f.read() 497 except Exception as e: # pragma: no cover 498 get_util_logger().debug("load_binary_file(%s)", filename, exc_info=True) 499 get_util_logger().warn("Warning: failed to load '%s':", filename) 500 get_util_logger().warn(" %s", e) 501 return None 502 503def parse_encoded_bin_data(data): 504 if not data: 505 return None 506 header = bytestostr(data).lower()[:10] 507 if header.startswith("0x"): 508 return binascii.unhexlify(data[2:]) 509 import base64 510 if header.startswith("b64:"): 511 return base64.b64decode(data[4:]) 512 if header.startswith("base64:"): 513 return base64.b64decode(data[7:]) 514 try: 515 return binascii.unhexlify(data) 516 except (TypeError, binascii.Error): 517 try: 518 return base64.b64decode(data) 519 except Exception: 520 pass 521 return None 522 523 524#here so we can override it when needed 525def force_quit(status=1): 526 os._exit(status) #pylint: disable=protected-access 527 528 529def no_idle(fn, *args, **kwargs): 530 fn(*args, **kwargs) 531def register_SIGUSR_signals(idle_add=no_idle): 532 if not os.name=="posix": 533 return 534 from xpra.util import dump_all_frames, dump_gc_frames 535 def sigusr1(*_args): 536 log = get_util_logger().info 537 log("SIGUSR1") 538 idle_add(dump_all_frames, log) 539 def sigusr2(*_args): 540 log = get_util_logger().info 541 log("SIGUSR2") 542 idle_add(dump_gc_frames, log) 543 signal.signal(signal.SIGUSR1, sigusr1) 544 signal.signal(signal.SIGUSR2, sigusr2) 545 546 547def livefds(): 548 live = set() 549 try: 550 MAXFD = os.sysconf("SC_OPEN_MAX") 551 except (ValueError, AttributeError): 552 MAXFD = 256 553 for fd in range(0, MAXFD): 554 try: 555 s = os.fstat(fd) 556 except Exception: 557 continue 558 else: 559 if s: 560 live.add(fd) 561 return live 562 563def get_all_fds(): 564 fd_dirs = ["/dev/fd", "/proc/self/fd"] 565 fds = [] 566 for fd_dir in fd_dirs: 567 if os.path.exists(fd_dir): 568 for fd_str in os.listdir(fd_dir): 569 try: 570 fd = int(fd_str) 571 fds.append(fd) 572 except OSError: 573 # This exception happens inevitably, because the fd used 574 # by listdir() is already closed. 575 pass 576 return fds 577 sys.stderr.write("Uh-oh, can't close fds, please port me to your system...\n") 578 return fds 579 580def close_all_fds(exceptions=()): 581 for fd in get_all_fds(): 582 try: 583 if fd not in exceptions: 584 os.close(fd) 585 except OSError: 586 # This exception happens inevitably, because the fd used 587 # by listdir() is already closed. 588 pass 589 590def use_tty(): 591 from xpra.util import envbool 592 if envbool("XPRA_NOTTY", False): 593 return False 594 from xpra.platform.gui import use_stdin 595 return use_stdin() 596 597def use_gui_prompt(): 598 return WIN32 or OSX or not use_tty() 599 600 601def shellsub(s, subs=None): 602 """ shell style string substitution using the dictionary given """ 603 if subs: 604 for var,value in subs.items(): 605 try: 606 if isinstance(s, bytes): 607 s = s.replace(("$%s" % var).encode(), str(value).encode()) 608 s = s.replace(("${%s}" % var).encode(), str(value).encode()) 609 else: 610 s = s.replace("$%s" % var, str(value)) 611 s = s.replace("${%s}" % var, str(value)) 612 except (TypeError, ValueError): 613 raise Exception("failed to substitute '%s' with value '%s' (%s) in '%s'" % ( 614 var, value, type(value), s)) from None 615 return s 616 617 618def osexpand(s, actual_username="", uid=0, gid=0, subs=None): 619 if not s: 620 return s 621 def expanduser(s): 622 if actual_username and s.startswith("~/"): 623 #replace "~/" with "~$actual_username/" 624 return os.path.expanduser("~%s/%s" % (actual_username, s[2:])) 625 return os.path.expanduser(s) 626 d = dict(subs or {}) 627 d.update({ 628 "PID" : os.getpid(), 629 "HOME" : expanduser("~/"), 630 }) 631 if os.name=="posix": 632 d.update({ 633 "UID" : uid or os.geteuid(), 634 "GID" : gid or os.getegid(), 635 }) 636 if not OSX: 637 from xpra.platform.xposix.paths import get_runtime_dir 638 rd = get_runtime_dir() 639 if rd and "XDG_RUNTIME_DIR" not in os.environ: 640 d["XDG_RUNTIME_DIR"] = rd 641 if actual_username: 642 d["USERNAME"] = actual_username 643 d["USER"] = actual_username 644 #first, expand the substitutions themselves, 645 #as they may contain references to other variables: 646 ssub = {} 647 for k,v in d.items(): 648 ssub[k] = expanduser(shellsub(str(v), d)) 649 return os.path.expandvars(expanduser(shellsub(expanduser(s), ssub))) 650 651 652def path_permission_info(filename, ftype=None): 653 if not POSIX: 654 return [] 655 info = [] 656 try: 657 import stat 658 stat_info = os.stat(filename) 659 if not ftype: 660 ftype = "file" 661 if os.path.isdir(filename): 662 ftype = "directory" 663 info.append("permissions on %s %s: %s" % (ftype, filename, oct(stat.S_IMODE(stat_info.st_mode)))) 664 import pwd 665 import grp #@UnresolvedImport 666 user = pwd.getpwuid(stat_info.st_uid)[0] 667 group = grp.getgrgid(stat_info.st_gid)[0] 668 info.append("ownership %s:%s" % (user, group)) 669 except Exception as e: 670 info.append("failed to query path information for '%s': %s" % (filename, e)) 671 return info 672 673 674#code to temporarily redirect stderr and restore it afterwards, adapted from: 675#http://stackoverflow.com/questions/5081657/how-do-i-prevent-a-c-shared-library-to-print-on-stdout-in-python 676#used by the sound code to get rid of the stupid gst warning below: 677#"** Message: pygobject_register_sinkfunc is deprecated (GstObject)" 678#ideally we would redirect to a buffer so we could still capture and show these messages in debug out 679class HideStdErr: 680 __slots__ = ("savedstderr") 681 def __init__(self, *_args): 682 self.savedstderr = None 683 684 def __enter__(self): 685 if POSIX and os.getppid()==1: 686 #this interferes with server daemonizing? 687 return 688 sys.stderr.flush() # <--- important when redirecting to files 689 self.savedstderr = os.dup(2) 690 devnull = os.open(os.devnull, os.O_WRONLY) 691 os.dup2(devnull, 2) 692 os.close(devnull) 693 sys.stderr = os.fdopen(self.savedstderr, 'w') 694 695 def __exit__(self, *_args): 696 if self.savedstderr is not None: 697 os.dup2(self.savedstderr, 2) 698 699class HideSysArgv: 700 __slots__ = ("savedsysargv") 701 def __init__(self, *_args): 702 self.savedsysargv = None 703 704 def __enter__(self): 705 self.savedsysargv = sys.argv 706 sys.argv = sys.argv[:1] 707 708 def __exit__(self, *_args): 709 if self.savedsysargv is not None: 710 sys.argv = self.savedsysargv 711 712 713class OSEnvContext: 714 __slots__ = ("env") 715 def __init__(self): 716 self.env = {} 717 def __enter__(self): 718 self.env = os.environ.copy() 719 def __exit__(self, *_args): 720 os.environ.clear() 721 os.environ.update(self.env) 722 def __repr__(self): 723 return "OSEnvContext" 724 725 726class FDChangeCaptureContext: 727 __slots__ = ("enter_fds", "exit_fds") 728 def __init__(self): 729 self.enter_fds = [] 730 self.exit_fds = [] 731 def __enter__(self): 732 self.enter_fds = get_all_fds() 733 def __exit__(self, *_args): 734 self.exit_fds = get_all_fds() 735 def __repr__(self): 736 return "FDChangeCaptureContext" 737 def get_new_fds(self): 738 return sorted(tuple(set(self.exit_fds)-set(self.enter_fds))) 739 def get_lost_fds(self): 740 return sorted(tuple(set(self.enter_fds)-set(self.exit_fds))) 741 742class DummyContextManager: 743 __slots__ = () 744 def __enter__(self): 745 """ do nothing """ 746 def __exit__(self, *_args): 747 """ do nothing """ 748 def __repr__(self): 749 return "DummyContextManager" 750 751 752#workaround incompatibility between paramiko and gssapi: 753class nomodule_context: 754 __slots__ = ("module_name", "saved_module") 755 def __init__(self, module_name): 756 self.module_name = module_name 757 def __enter__(self): 758 self.saved_module = sys.modules.get(self.module_name) 759 sys.modules[self.module_name] = None 760 def __exit__(self, *_args): 761 if sys.modules.get(self.module_name) is None: 762 if self.saved_module is None: 763 sys.modules.pop(self.module_name, None) 764 else: 765 sys.modules[self.module_name] = self.saved_module 766 def __repr__(self): 767 return "nomodule_context(%s)" % self.module_name 768 769class umask_context: 770 __slots__ = ("umask", "orig_umask") 771 def __init__(self, umask): 772 self.umask = umask 773 def __enter__(self): 774 self.orig_umask = os.umask(self.umask) 775 def __exit__(self, *_args): 776 os.umask(self.orig_umask) 777 def __repr__(self): 778 return "umask_context(%s)" % self.umask 779 780 781def disable_stdout_buffering(): 782 import gc 783 # Appending to gc.garbage is a way to stop an object from being 784 # destroyed. If the old sys.stdout is ever collected, it will 785 # close() stdout, which is not good. 786 gc.garbage.append(sys.stdout) 787 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) 788 789def setbinarymode(fd): 790 if WIN32: 791 #turn on binary mode: 792 try: 793 import msvcrt 794 msvcrt.setmode(fd, os.O_BINARY) #@UndefinedVariable pylint: disable=no-member 795 except OSError: 796 get_util_logger().error("setting stdin to binary mode failed", exc_info=True) 797 798def find_lib_ldconfig(libname): 799 libname = re.escape(libname) 800 801 arch_map = {"x86_64": "libc6,x86-64"} 802 arch = arch_map.get(os.uname()[4], "libc6") 803 804 pattern = r'^\s+lib%s\.[^\s]+ \(%s(?:,.*?)?\) => (.*lib%s[^\s]+)' % (libname, arch, libname) 805 806 #try to find ldconfig first, which may not be on the $PATH 807 #(it isn't on Debian..) 808 ldconfig = "ldconfig" 809 for d in ("/sbin", "/usr/sbin"): 810 t = os.path.join(d, "ldconfig") 811 if os.path.exists(t): 812 ldconfig = t 813 break 814 import subprocess 815 p = subprocess.Popen("%s -p" % ldconfig, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) 816 data = bytestostr(p.communicate()[0]) 817 818 libpath = re.search(pattern, data, re.MULTILINE) #@UndefinedVariable 819 if libpath: 820 libpath = libpath.group(1) 821 return libpath 822 823def find_lib(libname): 824 #it would be better to rely on dlopen to find the paths 825 #but I cannot find a way of getting ctypes to tell us the path 826 #it found the library in 827 assert POSIX 828 libpaths = os.environ.get("LD_LIBRARY_PATH", "").split(":") 829 if BITS==64 and os.path.exists("/usr/lib64"): 830 libpaths.append("/usr/lib64") 831 else: 832 libpaths.append("/usr/lib") 833 for libpath in libpaths: 834 if not libpath or not os.path.exists(libpath): 835 continue 836 libname_so = os.path.join(libpath, libname) 837 if os.path.exists(libname_so): 838 return libname_so 839 return None 840 841 842def pollwait(process, timeout=5): 843 start = time.monotonic() 844 v = None 845 while time.monotonic()-start<timeout: 846 v = process.poll() 847 if v is not None: 848 break 849 time.sleep(0.1) 850 return v 851 852def which(command): 853 try: 854 from distutils.spawn import find_executable 855 except ImportError: 856 path = os.environ.get("PATH", None) 857 if not path: 858 return None 859 paths = path.split(os.pathsep) 860 for p in paths: 861 f = os.path.join(p, command) 862 if os.path.isfile(f): 863 return f 864 return None 865 else: 866 try: 867 return find_executable(command) 868 except Exception: 869 get_util_logger().debug("find_executable(%s)", command, exc_info=True) 870 return None 871 872def get_status_output(*args, **kwargs): 873 from subprocess import PIPE, Popen 874 kwargs.update({ 875 "stdout" : PIPE, 876 "stderr" : PIPE, 877 "universal_newlines" : True, 878 }) 879 try: 880 p = Popen(*args, **kwargs) 881 except Exception as e: 882 print("error running %s,%s: %s" % (args, kwargs, e)) 883 return -1, "", "" 884 stdout, stderr = p.communicate() 885 return p.returncode, stdout, stderr 886 887 888def is_systemd_pid1() -> bool: 889 if not POSIX: 890 return False 891 d = load_binary_file("/proc/1/cmdline") 892 return d and d.find(b"/systemd")>=0 893 894 895def get_ssh_port() -> int: 896 #on Linux we can run "ssh -T | grep port" 897 #but this usually requires root permissions to access /etc/ssh/sshd_config 898 if WIN32: 899 return 0 900 return 22 901 902 903def setuidgid(uid, gid): 904 if not POSIX: 905 return 906 log = get_util_logger() 907 if os.getuid()!=uid or os.getgid()!=gid: 908 #find the username for the given uid: 909 from pwd import getpwuid 910 try: 911 username = getpwuid(uid).pw_name 912 except KeyError: 913 raise Exception("uid %i not found" % uid) from None 914 #set the groups: 915 if hasattr(os, "initgroups"): # python >= 2.7 916 os.initgroups(username, gid) 917 else: 918 import grp #@UnresolvedImport 919 groups = [gr.gr_gid for gr in grp.getgrall() if username in gr.gr_mem] 920 os.setgroups(groups) 921 #change uid and gid: 922 try: 923 if os.getgid()!=gid: 924 os.setgid(gid) 925 except OSError as e: 926 log.error("Error: cannot change gid to %i:", gid) 927 if os.getgid()==0: 928 #don't run as root! 929 raise 930 log.error(" %s", e) 931 log.error(" continuing with gid=%i", os.getgid()) 932 try: 933 if os.getuid()!=uid: 934 os.setuid(uid) 935 except OSError as e: 936 log.error("Error: cannot change uid to %i:", uid) 937 if os.getuid()==0: 938 #don't run as root! 939 raise 940 log.error(" %s", e) 941 log.error(" continuing with uid=%i", os.getuid()) 942 log("new uid=%s, gid=%s", os.getuid(), os.getgid()) 943 944def get_peercred(sock): 945 if LINUX: 946 SO_PEERCRED = 17 947 log = get_util_logger() 948 try: 949 import socket 950 creds = sock.getsockopt(socket.SOL_SOCKET, SO_PEERCRED, struct.calcsize(b'3i')) 951 pid, uid, gid = struct.unpack(b'3i',creds) 952 log("peer: %s", (pid, uid, gid)) 953 return pid, uid, gid 954 except IOError as e: 955 log("getsockopt", exc_info=True) 956 log.error("Error getting peer credentials: %s", e) 957 return None 958 elif FREEBSD: 959 log.warn("Warning: peercred is not yet implemented for FreeBSD") 960 #use getpeereid 961 #then pwd to get the gid? 962 return None 963