1# -*- coding: utf-8 -*-
2# This file is part of Xpra.
3# Copyright (C) 2020 Antoine Martin <antoine@xpra.org>
4# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
5# later version. See the file COPYING for details.
6
7import io
8from contextlib import redirect_stdout, redirect_stderr
9
10from xpra.util import typedict
11from xpra.scripts.config import TRUE_OPTIONS
12from xpra.server.source.stub_source_mixin import StubSourceMixin
13from xpra.log import Logger
14
15log = Logger("exec")
16
17
18class ShellMixin(StubSourceMixin):
19
20    @classmethod
21    def is_needed(cls, caps : typedict) -> bool:
22        return caps.boolget("shell", False)
23
24    def __init__(self, *_args):
25        self._server = None
26        self.shell_enabled = False
27        self.saved_logging_handler = None
28        self.log_records = []
29        self.log_thread = None
30
31    def init_from(self, protocol, server):
32        self._server = server
33        try:
34            options = protocol._conn.options
35            shell = options.get("shell", "")
36            self.shell_enabled = shell.lower() in TRUE_OPTIONS
37        except AttributeError:
38            options = {}
39            self.shell_enabled = False
40        log("init_from(%s, %s) shell_enabled(%s)=%s", protocol, server, options, self.shell_enabled)
41
42    def get_caps(self) -> dict:
43        return {"shell" : self.shell_enabled}
44
45    def get_info(self) -> dict:
46        return {"shell" : self.shell_enabled}
47
48    def shell_exec(self, code):
49        stdout, stderr = self.do_shell_exec(code)
50        log("shell_exec(%s) stdout=%r", code, stdout)
51        log("shell_exec(%s) stderr=%r", code, stderr)
52        if stdout is not None:
53            self.send("shell-reply", 1, stdout)
54        if stderr:
55            self.send("shell-reply", 2, stderr)
56        return stdout, stderr
57
58    def do_shell_exec(self, code):
59        log("shell_exec(%r)", code)
60        try:
61            assert self.shell_enabled, "shell support is not available with this connection"
62            _globals = {
63                "connection" : self,
64                "server"    : self._server,
65                "log"       : log,
66                }
67            stdout = io.StringIO()
68            stderr = io.StringIO()
69            with redirect_stdout(stdout):
70                with redirect_stderr(stderr):
71                    exec(code, _globals, {})  #pylint: disable=exec-used
72            return stdout.getvalue(), stderr.getvalue()
73        except Exception as e:
74            log("shell_exec(..)", exc_info=True)
75            log.error("Error running %r:", code)
76            log.error(" %s", e)
77            return None, str(e)
78