1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# This file is part of Xpra.
4# Copyright (C) 2013-2021 Antoine Martin <antoine@xpra.org>
5# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
6# later version. See the file COPYING for details.
8import re
9import os
10import sys
11import signal
12import uuid
13import time
14import struct
15import binascii
16import threading
19for signame in (sig for sig in dir(signal) if sig.startswith("SIG") and not sig.startswith("SIG_")):
20    SIGNAMES[getattr(signal, signame)] = signame
23WIN32 = sys.platform.startswith("win")
24OSX = sys.platform.startswith("darwin")
25LINUX = sys.platform.startswith("linux")
26NETBSD = sys.platform.startswith("netbsd")
27OPENBSD = sys.platform.startswith("openbsd")
28FREEBSD  = sys.platform.startswith("freebsd")
29DRAGONFLY  = sys.platform.startswith("dragonfly")
31POSIX = os.name=="posix"
33BITS = struct.calcsize(b"P")*8
36main_thread = threading.current_thread()
37def is_main_thread():
38    return threading.current_thread()==main_thread
41def get_frame_info(ignore_threads=()):
42    info = {
43        "count"        : threading.active_count() - len(ignore_threads),
44        }
45    try:
46        import traceback
47        def nn(x):
48            if x is None:
49                return ""
50            return str(x)
51        thread_ident = {}
52        for t in threading.enumerate():
53            if t not in ignore_threads:
54                thread_ident[t.ident] = t.getName()
55            else:
56                thread_ident[t.ident] = None
57        thread_ident.update({
58                threading.current_thread().ident    : "info",
59                main_thread.ident                   : "main",
60                })
61        frames = sys._current_frames()  #pylint: disable=protected-access
62        stack = None
63        for i,frame_pair in enumerate(frames.items()):
64            stack = traceback.extract_stack(frame_pair[1])
65            tident = thread_ident.get(frame_pair[0], "unknown")
66            if tident is None:
67                continue
68            #sanitize stack to prevent None values (which cause encoding errors with the bencoder)
69            sanestack = []
70            for e in stack:
71                sanestack.append(tuple([nn(x) for x in e]))
72            info[i] = {
73                ""          : tident,
74                "stack"     : sanestack,
75                }
76        del frames, stack
77    except Exception as e:
78        get_util_logger().error("failed to get frame info: %s", e)
79    return info
81def get_info_env():
82    filtered_env = os.environ.copy()
83    if filtered_env.get('XPRA_PASSWORD'):
84        filtered_env['XPRA_PASSWORD'] = "*****"
85    if filtered_env.get('XPRA_ENCRYPTION_KEY'):
86        filtered_env['XPRA_ENCRYPTION_KEY'] = "*****"
87    return filtered_env
89def get_sysconfig_info():
90    import sysconfig
91    sysinfo = {}
92    log = get_util_logger()
93    for attr in (
94        "platform",
95        "python-version",
96        "config-vars",
97        "paths",
98        ):
99        fn = "get_%s" % attr.replace("-", "_")
100        getter = getattr(sysconfig, fn, None)
101        if getter:
102            try:
103                sysinfo[attr] = getter()  #pylint: disable=not-callable
104            except ModuleNotFoundError:
105                log("sysconfig.%s", fn, exc_info=True)
106                if attr=="config-vars" and WIN32:
107                    continue
108                log.warn("Warning: failed to collect %s sysconfig information", attr)
109            except Exception:
110                log.error("Error calling sysconfig.%s", fn, exc_info=True)
111    return sysinfo
113def strtobytes(x) -> bytes:
114    if isinstance(x, bytes):
115        return x
116    return str(x).encode("latin1")
117def bytestostr(x) -> str:
118    if isinstance(x, (bytes, bytearray)):
119        return x.decode("latin1")
120    return str(x)
121def hexstr(v) -> str:
122    return bytestostr(binascii.hexlify(strtobytes(v)))
125util_logger = None
126def get_util_logger():
127    global util_logger
128    if not util_logger:
129        from xpra.log import Logger
130        util_logger = Logger("util")
131    return util_logger
133def memoryview_to_bytes(v) -> bytes:
134    if isinstance(v, bytes):
135        return v
136    if isinstance(v, memoryview):
137        return v.tobytes()
138    if isinstance(v, bytearray):
139        return bytes(v)
140    return strtobytes(v)
143def set_proc_title(title):
144    try:
145        import setproctitle  #pylint: disable=import-outside-toplevel
146        setproctitle.setproctitle(title)  #@UndefinedVariable pylint: disable=c-extension-no-member
147    except ImportError as e:
148        get_util_logger().debug("setproctitle is not installed: %s", e)
151def getuid() -> int:
152    if POSIX:
153        return os.getuid()
154    return 0
156def getgid() -> int:
157    if POSIX:
158        return os.getgid()
159    return 0
161def get_shell_for_uid(uid) -> str:
162    if POSIX:
163        from pwd import getpwuid
164        try:
165            return getpwuid(uid).pw_shell
166        except KeyError:
167            pass
168    return ""
170def get_username_for_uid(uid) -> str:
171    if POSIX:
172        from pwd import getpwuid
173        try:
174            return getpwuid(uid).pw_name
175        except KeyError:
176            pass
177    return ""
179def get_home_for_uid(uid) -> str:
180    if POSIX:
181        from pwd import getpwuid
182        try:
183            return getpwuid(uid).pw_dir
184        except KeyError:
185            pass
186    return ""
188def get_groups(username):
189    if POSIX:
190        import grp      #@UnresolvedImport
191        return [gr.gr_name for gr in grp.getgrall() if username in gr.gr_mem]
192    return []
194def get_group_id(group) -> int:
195    try:
196        import grp      #@UnresolvedImport
197        gr = grp.getgrnam(group)
198        return gr.gr_gid
199    except (ImportError, KeyError):
200        return -1
203def platform_release(release):
204    if OSX:
205        SYSTEMVERSION_PLIST = "/System/Library/CoreServices/SystemVersion.plist"
206        try:
207            import plistlib
208            with open(SYSTEMVERSION_PLIST, "rb") as f:
209                pl = plistlib.load(f)           #@UndefinedVariable
210            return pl['ProductUserVisibleVersion']
211        except Exception as e:
212            get_util_logger().debug("platform_release(%s)", release, exc_info=True)
213            get_util_logger().warn("Warning: failed to get release information")
214            get_util_logger().warn(" from '%s':", SYSTEMVERSION_PLIST)
215            get_util_logger().warn(" %s", e)
216    return release
219def platform_name(sys_platform=sys.platform, release=None) -> str:
220    if not sys_platform:
221        return "unknown"
222    PLATFORMS = {"win32"    : "Microsoft Windows",
223                 "cygwin"   : "Windows/Cygwin",
224                 "linux.*"  : "Linux",
225                 "darwin"   : "Mac OS X",
226                 "freebsd.*": "FreeBSD",
227                 "os2"      : "OS/2",
228                 }
229    def rel(v):
230        values = [v]
231        if isinstance(release, (tuple, list)):
232            values += list(release)
233        else:
234            values.append(release)
235        return " ".join([str(x) for x in values if x])
236    for k,v in PLATFORMS.items():
237        regexp = re.compile(k)
238        if regexp.match(sys_platform):
239            return rel(v)
240    return rel(sys_platform)
243def get_rand_chars(l=16, chars=b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") -> bytes:
244    import random
245    return b"".join(chars[random.randint(0, len(chars)-1):][:1] for _ in range(l))
247def get_hex_uuid() -> str:
248    return uuid.uuid4().hex
250def get_int_uuid() -> int:
251    return uuid.uuid4().int
253def get_machine_id() -> str:
254    """
255        Try to get uuid string which uniquely identifies this machine.
256        Warning: only works on posix!
257        (which is ok since we only used it on posix at present)
258    """
259    v = ""
260    if POSIX:
261        for filename in ("/etc/machine-id", "/var/lib/dbus/machine-id"):
262            v = load_binary_file(filename)
263            if v is not None:
264                break
265    elif WIN32:
266        v = uuid.getnode()
267    return bytestostr(v).strip("\n\r")
269def get_user_uuid() -> str:
270    """
271        Try to generate a uuid string which is unique to this user.
272        (relies on get_machine_id to uniquely identify a machine)
273    """
274    user_uuid = os.environ.get("XPRA_USER_UUID")
275    if user_uuid:
276        return user_uuid
277    import hashlib
278    u = hashlib.sha256()
279    def uupdate(ustr):
280        u.update(ustr.encode("utf-8"))
281    uupdate(get_machine_id())
282    if POSIX:
283        uupdate(u"/")
284        uupdate(str(os.getuid()))
285        uupdate(u"/")
286        uupdate(str(os.getgid()))
287    uupdate(os.path.expanduser("~/"))
288    return u.hexdigest()
291def is_X11() -> bool:
292    if OSX or WIN32:
293        return False
294    try:
295        from xpra.x11.gtk3.gdk_bindings import is_X11_Display   #@UnresolvedImport
296        return is_X11_Display()
297    except ImportError:
298        get_util_logger().debug("failed to load x11 bindings", exc_info=True)
299        return True
301def restore_script_env(env):
302    # On OSX PythonExecWrapper sets various env vars to point into the bundle
303    # and records the original variable contents. Here we revert them back
304    # to their original state in case any of those changes cause problems
305    # when running a command.
306    if "_PYTHON_WRAPPER_VARS" in env:
307        for v in env["_PYTHON_WRAPPER_VARS"].split():
308            origv = "_" + v
309            if origv in env:
310                env[v] = env[origv]
311            elif v in env:
312                del[v]
313            del env[origv]
314        del env["_PYTHON_WRAPPER_VARS"]
315    return env
318_saved_env = os.environ.copy()
319def get_saved_env():
320    return _saved_env.copy()
322def get_saved_env_var(var, default=None):
323    return _saved_env.get(var, default)
325def is_Wayland() -> bool:
326    return _is_Wayland(_saved_env)
328def _is_Wayland(env : dict) -> bool:
329    backend = env.get("GDK_BACKEND", "")
330    if backend=="wayland":
331        return True
332    return backend!="x11" and (
333        bool(env.get("WAYLAND_DISPLAY")) or env.get("XDG_SESSION_TYPE")=="wayland"
334        )
337def is_distribution_variant(variant=b"Debian") -> bool:
338    if not POSIX:
339        return False
340    try:
341        v = load_os_release_file()
342        return any(l.find(variant)>=0 for l in v.splitlines() if l.startswith(b"NAME="))
343    except Exception:
344        pass
345    try:
346        d = get_linux_distribution()[0]
347        if d==variant:
348            return True
349        if variant==b"RedHat" and d.startswith(variant):
350            return True
351    except Exception:
352        pass
353    return False
355def get_distribution_version_id() -> str:
356    if not POSIX:
357        return ""
358    try:
359        v = load_os_release_file()
360        for line in v.splitlines():
361            l = line.decode()
362            if l.startswith("VERSION_ID="):
363                return l.split("=", 1)[1].strip('"')
364    except Exception:
365        pass
366    return ""
368os_release_file_data = False
369def load_os_release_file() -> bytes:
370    global os_release_file_data
371    if os_release_file_data is False:
372        try:
373            os_release_file_data = load_binary_file("/etc/os-release")
374        except OSError: # pragma: no cover
375            os_release_file_data = None
376    return os_release_file_data
378def is_Ubuntu() -> bool:
379    return is_distribution_variant(b"Ubuntu")
381def is_Debian() -> bool:
382    return is_distribution_variant(b"Debian")
384def is_Raspbian() -> bool:
385    return is_distribution_variant(b"Raspbian")
387def is_Fedora() -> bool:
388    return is_distribution_variant(b"Fedora")
390def is_Arch() -> bool:
391    return is_distribution_variant(b"Arch")
393def is_CentOS() -> bool:
394    return is_distribution_variant(b"CentOS")
396def is_RedHat() -> bool:
397    return is_distribution_variant(b"RedHat")
400def is_arm() -> bool:
401    import platform
402    return platform.uname()[4].startswith("arm")
405_linux_distribution = None
406def get_linux_distribution():
407    global _linux_distribution
408    if LINUX and not _linux_distribution:
409        #linux_distribution is deprecated in Python 3.5 and it causes warnings,
410        #so use our own code first:
411        import subprocess
412        cmd = ["lsb_release", "-a"]
413        try:
414            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
415            out = p.communicate()[0]
416            assert p.returncode==0 and out
417        except Exception:
418            try:
419                import platform
420                _linux_distribution = platform.linux_distribution()  #@UndefinedVariable, pylint: disable=deprecated-method, no-member
421            except Exception:
422                _linux_distribution = ("unknown", "unknown", "unknown")
423        else:
424            d = {}
425            for line in strtobytes(out).splitlines():
426                line = bytestostr(line)
427                parts = line.rstrip("\n\r").split(":", 1)
428                if len(parts)==2:
429                    d[parts[0].lower().replace(" ", "_")] = parts[1].strip()
430            v = [d.get(x) for x in ("distributor_id", "release", "codename")]
431            if None not in v:
432                return tuple([bytestostr(x) for x in v])
433    return _linux_distribution
435def is_unity() -> bool:
436    d = os.environ.get("XDG_CURRENT_DESKTOP", "").lower()
437    return d.find("unity")>=0 or d.find("ubuntu")>=0
439def is_gnome() -> bool:
440    if os.environ.get("XDG_SESSION_DESKTOP", "").split("-", 1)[0] in ("i3", "ubuntu", ):
441        #"i3-gnome" is not really gnome... ie: the systray does work!
442        return False
443    return os.environ.get("XDG_CURRENT_DESKTOP", "").lower().find("gnome")>=0
445def is_kde() -> bool:
446    return os.environ.get("XDG_CURRENT_DESKTOP", "").lower().find("kde")>=0
449def get_loaded_kernel_modules(*modlist):
450    loaded = []
451    if LINUX and os.path.exists("/sys/module"):
452        for mod in modlist:
453            if os.path.exists("/sys/module/%s" % mod):  # pragma: no cover
454                loaded.append(mod)
455    return loaded
458def is_WSL() -> bool:
459    if not POSIX:
460        return False
461    r = None
462    for f in ("/proc/sys/kernel/osrelease", "/proc/version"):
463        r = load_binary_file(f)
464        if r:
465            break
466    return r is not None and r.find(b"Microsoft")>=0
469def get_generic_os_name() -> str:
470    return do_get_generic_os_name().lower()
472def do_get_generic_os_name() -> str:
473    for k,v in {
474            "linux"     : "Linux",
475            "darwin"    : "MacOS",
476            "win"       : "Win32",
477            "freebsd"   : "FreeBSD",
478            }.items():
479        if sys.platform.startswith(k):
480            return v
481    return sys.platform     # pragma: no cover
484def filedata_nocrlf(filename) -> str:
485    v = load_binary_file(filename)
486    if v is None:
487        get_util_logger().error("failed to load '%s'", filename)
488        return None
489    return v.strip(b"\n\r")
491def load_binary_file(filename) -> bytes:
492    if not os.path.exists(filename):
493        return None
494    try:
495        with open(filename, "rb") as f:
496            return f.read()
497    except Exception as e:  # pragma: no cover
498        get_util_logger().debug("load_binary_file(%s)", filename, exc_info=True)
499        get_util_logger().warn("Warning: failed to load '%s':", filename)
500        get_util_logger().warn(" %s", e)
501        return None
503def parse_encoded_bin_data(data):
504    if not data:
505        return None
506    header = bytestostr(data).lower()[:10]
507    if header.startswith("0x"):
508        return binascii.unhexlify(data[2:])
509    import base64
510    if header.startswith("b64:"):
511        return base64.b64decode(data[4:])
512    if header.startswith("base64:"):
513        return base64.b64decode(data[7:])
514    try:
515        return binascii.unhexlify(data)
516    except (TypeError, binascii.Error):
517        try:
518            return base64.b64decode(data)
519        except Exception:
520            pass
521    return None
524#here so we can override it when needed
525def force_quit(status=1):
526    os._exit(status)  #pylint: disable=protected-access
529def no_idle(fn, *args, **kwargs):
530    fn(*args, **kwargs)
531def register_SIGUSR_signals(idle_add=no_idle):
532    if not os.name=="posix":
533        return
534    from xpra.util import dump_all_frames, dump_gc_frames
535    def sigusr1(*_args):
536        log = get_util_logger().info
537        log("SIGUSR1")
538        idle_add(dump_all_frames, log)
539    def sigusr2(*_args):
540        log = get_util_logger().info
541        log("SIGUSR2")
542        idle_add(dump_gc_frames, log)
543    signal.signal(signal.SIGUSR1, sigusr1)
544    signal.signal(signal.SIGUSR2, sigusr2)
547def livefds():
548    live = set()
549    try:
550        MAXFD = os.sysconf("SC_OPEN_MAX")
551    except (ValueError, AttributeError):
552        MAXFD = 256
553    for fd in range(0, MAXFD):
554        try:
555            s = os.fstat(fd)
556        except Exception:
557            continue
558        else:
559            if s:
560                live.add(fd)
561    return live
563def get_all_fds():
564    fd_dirs = ["/dev/fd", "/proc/self/fd"]
565    fds = []
566    for fd_dir in fd_dirs:
567        if os.path.exists(fd_dir):
568            for fd_str in os.listdir(fd_dir):
569                try:
570                    fd = int(fd_str)
571                    fds.append(fd)
572                except OSError:
573                    # This exception happens inevitably, because the fd used
574                    # by listdir() is already closed.
575                    pass
576            return fds
577    sys.stderr.write("Uh-oh, can't close fds, please port me to your system...\n")
578    return fds
580def close_all_fds(exceptions=()):
581    for fd in get_all_fds():
582        try:
583            if fd not in exceptions:
584                os.close(fd)
585        except OSError:
586            # This exception happens inevitably, because the fd used
587            # by listdir() is already closed.
588            pass
590def use_tty():
591    from xpra.util import envbool
592    if envbool("XPRA_NOTTY", False):
593        return False
594    from xpra.platform.gui import use_stdin
595    return use_stdin()
597def use_gui_prompt():
598    return WIN32 or OSX or not use_tty()
601def shellsub(s, subs=None):
602    """ shell style string substitution using the dictionary given """
603    if subs:
604        for var,value in subs.items():
605            try:
606                if isinstance(s, bytes):
607                    s = s.replace(("$%s" % var).encode(), str(value).encode())
608                    s = s.replace(("${%s}" % var).encode(), str(value).encode())
609                else:
610                    s = s.replace("$%s" % var, str(value))
611                    s = s.replace("${%s}" % var, str(value))
612            except (TypeError, ValueError):
613                raise Exception("failed to substitute '%s' with value '%s' (%s) in '%s'" % (
614                    var, value, type(value), s)) from None
615    return s
618def osexpand(s, actual_username="", uid=0, gid=0, subs=None):
619    if not s:
620        return s
621    def expanduser(s):
622        if actual_username and s.startswith("~/"):
623            #replace "~/" with "~$actual_username/"
624            return os.path.expanduser("~%s/%s" % (actual_username, s[2:]))
625        return os.path.expanduser(s)
626    d = dict(subs or {})
627    d.update({
628        "PID"   : os.getpid(),
629        "HOME"  : expanduser("~/"),
630        })
631    if os.name=="posix":
632        d.update({
633            "UID"   : uid or os.geteuid(),
634            "GID"   : gid or os.getegid(),
635            })
636        if not OSX:
637            from xpra.platform.xposix.paths import get_runtime_dir
638            rd = get_runtime_dir()
639            if rd and "XDG_RUNTIME_DIR" not in os.environ:
640                d["XDG_RUNTIME_DIR"] = rd
641    if actual_username:
642        d["USERNAME"] = actual_username
643        d["USER"] = actual_username
644    #first, expand the substitutions themselves,
645    #as they may contain references to other variables:
646    ssub = {}
647    for k,v in d.items():
648        ssub[k] = expanduser(shellsub(str(v), d))
649    return os.path.expandvars(expanduser(shellsub(expanduser(s), ssub)))
652def path_permission_info(filename, ftype=None):
653    if not POSIX:
654        return []
655    info = []
656    try:
657        import stat
658        stat_info = os.stat(filename)
659        if not ftype:
660            ftype = "file"
661            if os.path.isdir(filename):
662                ftype = "directory"
663        info.append("permissions on %s %s: %s" % (ftype, filename, oct(stat.S_IMODE(stat_info.st_mode))))
664        import pwd
665        import grp      #@UnresolvedImport
666        user = pwd.getpwuid(stat_info.st_uid)[0]
667        group = grp.getgrgid(stat_info.st_gid)[0]
668        info.append("ownership %s:%s" % (user, group))
669    except Exception as e:
670        info.append("failed to query path information for '%s': %s" % (filename, e))
671    return info
674#code to temporarily redirect stderr and restore it afterwards, adapted from:
676#used by the sound code to get rid of the stupid gst warning below:
677#"** Message: pygobject_register_sinkfunc is deprecated (GstObject)"
678#ideally we would redirect to a buffer so we could still capture and show these messages in debug out
679class HideStdErr:
680    __slots__ = ("savedstderr")
681    def __init__(self, *_args):
682        self.savedstderr = None
684    def __enter__(self):
685        if POSIX and os.getppid()==1:
686            #this interferes with server daemonizing?
687            return
688        sys.stderr.flush() # <--- important when redirecting to files
689        self.savedstderr = os.dup(2)
690        devnull = os.open(os.devnull, os.O_WRONLY)
691        os.dup2(devnull, 2)
692        os.close(devnull)
693        sys.stderr = os.fdopen(self.savedstderr, 'w')
695    def __exit__(self, *_args):
696        if self.savedstderr is not None:
697            os.dup2(self.savedstderr, 2)
699class HideSysArgv:
700    __slots__ = ("savedsysargv")
701    def __init__(self, *_args):
702        self.savedsysargv = None
704    def __enter__(self):
705        self.savedsysargv = sys.argv
706        sys.argv = sys.argv[:1]
708    def __exit__(self, *_args):
709        if self.savedsysargv is not None:
710            sys.argv = self.savedsysargv
713class OSEnvContext:
714    __slots__ = ("env")
715    def __init__(self):
716        self.env = {}
717    def __enter__(self):
718        self.env = os.environ.copy()
719    def __exit__(self, *_args):
720        os.environ.clear()
721        os.environ.update(self.env)
722    def __repr__(self):
723        return "OSEnvContext"
726class FDChangeCaptureContext:
727    __slots__ = ("enter_fds", "exit_fds")
728    def __init__(self):
729        self.enter_fds = []
730        self.exit_fds = []
731    def __enter__(self):
732        self.enter_fds = get_all_fds()
733    def __exit__(self, *_args):
734        self.exit_fds = get_all_fds()
735    def __repr__(self):
736        return "FDChangeCaptureContext"
737    def get_new_fds(self):
738        return sorted(tuple(set(self.exit_fds)-set(self.enter_fds)))
739    def get_lost_fds(self):
740        return sorted(tuple(set(self.enter_fds)-set(self.exit_fds)))
742class DummyContextManager:
743    __slots__ = ()
744    def __enter__(self):
745        """ do nothing """
746    def __exit__(self, *_args):
747        """ do nothing """
748    def __repr__(self):
749        return "DummyContextManager"
752#workaround incompatibility between paramiko and gssapi:
753class nomodule_context:
754    __slots__ = ("module_name", "saved_module")
755    def __init__(self, module_name):
756        self.module_name = module_name
757    def __enter__(self):
758        self.saved_module = sys.modules.get(self.module_name)
759        sys.modules[self.module_name] = None
760    def __exit__(self, *_args):
761        if sys.modules.get(self.module_name) is None:
762            if self.saved_module is None:
763                sys.modules.pop(self.module_name, None)
764            else:
765                sys.modules[self.module_name] = self.saved_module
766    def __repr__(self):
767        return "nomodule_context(%s)" % self.module_name
769class umask_context:
770    __slots__ = ("umask", "orig_umask")
771    def __init__(self, umask):
772        self.umask = umask
773    def __enter__(self):
774        self.orig_umask = os.umask(self.umask)
775    def __exit__(self, *_args):
776        os.umask(self.orig_umask)
777    def __repr__(self):
778        return "umask_context(%s)" % self.umask
781def disable_stdout_buffering():
782    import gc
783    # Appending to gc.garbage is a way to stop an object from being
784    # destroyed.  If the old sys.stdout is ever collected, it will
785    # close() stdout, which is not good.
786    gc.garbage.append(sys.stdout)
787    sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
789def setbinarymode(fd):
790    if WIN32:
791        #turn on binary mode:
792        try:
793            import msvcrt
794            msvcrt.setmode(fd, os.O_BINARY)         #@UndefinedVariable pylint: disable=no-member
795        except OSError:
796            get_util_logger().error("setting stdin to binary mode failed", exc_info=True)
798def find_lib_ldconfig(libname):
799    libname = re.escape(libname)
801    arch_map = {"x86_64": "libc6,x86-64"}
802    arch = arch_map.get(os.uname()[4], "libc6")
804    pattern = r'^\s+lib%s\.[^\s]+ \(%s(?:,.*?)?\) => (.*lib%s[^\s]+)' % (libname, arch, libname)
806    #try to find ldconfig first, which may not be on the $PATH
807    #(it isn't on Debian..)
808    ldconfig = "ldconfig"
809    for d in ("/sbin", "/usr/sbin"):
810        t = os.path.join(d, "ldconfig")
811        if os.path.exists(t):
812            ldconfig = t
813            break
814    import subprocess
815    p = subprocess.Popen("%s -p" % ldconfig, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
816    data = bytestostr(p.communicate()[0])
818    libpath = re.search(pattern, data, re.MULTILINE)        #@UndefinedVariable
819    if libpath:
820        libpath = libpath.group(1)
821    return libpath
823def find_lib(libname):
824    #it would be better to rely on dlopen to find the paths
825    #but I cannot find a way of getting ctypes to tell us the path
826    #it found the library in
827    assert POSIX
828    libpaths = os.environ.get("LD_LIBRARY_PATH", "").split(":")
829    if BITS==64 and os.path.exists("/usr/lib64"):
830        libpaths.append("/usr/lib64")
831    else:
832        libpaths.append("/usr/lib")
833    for libpath in libpaths:
834        if not libpath or not os.path.exists(libpath):
835            continue
836        libname_so = os.path.join(libpath, libname)
837        if os.path.exists(libname_so):
838            return libname_so
839    return None
842def pollwait(process, timeout=5):
843    start = time.monotonic()
844    v = None
845    while time.monotonic()-start<timeout:
846        v = process.poll()
847        if v is not None:
848            break
849        time.sleep(0.1)
850    return v
852def which(command):
853    try:
854        from distutils.spawn import find_executable
855    except ImportError:
856        path = os.environ.get("PATH", None)
857        if not path:
858            return None
859        paths = path.split(os.pathsep)
860        for p in paths:
861            f = os.path.join(p, command)
862            if os.path.isfile(f):
863                return f
864        return None
865    else:
866        try:
867            return find_executable(command)
868        except Exception:
869            get_util_logger().debug("find_executable(%s)", command, exc_info=True)
870            return None
872def get_status_output(*args, **kwargs):
873    from subprocess import PIPE, Popen
874    kwargs.update({
875        "stdout"    : PIPE,
876        "stderr"    : PIPE,
877        "universal_newlines"    : True,
878        })
879    try:
880        p = Popen(*args, **kwargs)
881    except Exception as e:
882        print("error running %s,%s: %s" % (args, kwargs, e))
883        return -1, "", ""
884    stdout, stderr = p.communicate()
885    return p.returncode, stdout, stderr
888def is_systemd_pid1() -> bool:
889    if not POSIX:
890        return False
891    d = load_binary_file("/proc/1/cmdline")
892    return d and d.find(b"/systemd")>=0
895def get_ssh_port() -> int:
896    #on Linux we can run "ssh -T | grep port"
897    #but this usually requires root permissions to access /etc/ssh/sshd_config
898    if WIN32:
899        return 0
900    return 22
903def setuidgid(uid, gid):
904    if not POSIX:
905        return
906    log = get_util_logger()
907    if os.getuid()!=uid or os.getgid()!=gid:
908        #find the username for the given uid:
909        from pwd import getpwuid
910        try:
911            username = getpwuid(uid).pw_name
912        except KeyError:
913            raise Exception("uid %i not found" % uid) from None
914        #set the groups:
915        if hasattr(os, "initgroups"):   # python >= 2.7
916            os.initgroups(username, gid)
917        else:
918            import grp      #@UnresolvedImport
919            groups = [gr.gr_gid for gr in grp.getgrall() if username in gr.gr_mem]
920            os.setgroups(groups)
921    #change uid and gid:
922    try:
923        if os.getgid()!=gid:
924            os.setgid(gid)
925    except OSError as e:
926        log.error("Error: cannot change gid to %i:", gid)
927        if os.getgid()==0:
928            #don't run as root!
929            raise
930        log.error(" %s", e)
931        log.error(" continuing with gid=%i", os.getgid())
932    try:
933        if os.getuid()!=uid:
934            os.setuid(uid)
935    except OSError as e:
936        log.error("Error: cannot change uid to %i:", uid)
937        if os.getuid()==0:
938            #don't run as root!
939            raise
940        log.error(" %s", e)
941        log.error(" continuing with uid=%i", os.getuid())
942    log("new uid=%s, gid=%s", os.getuid(), os.getgid())
944def get_peercred(sock):
945    if LINUX:
946        SO_PEERCRED = 17
947        log = get_util_logger()
948        try:
949            import socket
950            creds = sock.getsockopt(socket.SOL_SOCKET, SO_PEERCRED, struct.calcsize(b'3i'))
951            pid, uid, gid = struct.unpack(b'3i',creds)
952            log("peer: %s", (pid, uid, gid))
953            return pid, uid, gid
954        except IOError as  e:
955            log("getsockopt", exc_info=True)
956            log.error("Error getting peer credentials: %s", e)
957            return None
958    elif FREEBSD:
959        log.warn("Warning: peercred is not yet implemented for FreeBSD")
960        #use getpeereid
961        #then pwd to get the gid?
962    return None