1import asyncio
2import os
3import psutil
4import signal
5import subprocess
6import sys
7import time
8
9from gi.repository import GObject, Gtk, Gio, GLib
10
11from pychess.compat import create_task
12from pychess.Utils import wait_signal
13from pychess.System.Log import log
14from pychess.Players.ProtocolEngine import TIME_OUT_SECOND
15
16
17class SubProcess(GObject.GObject):
18    __gsignals__ = {
19        "line": (GObject.SignalFlags.RUN_FIRST, None, (str, )),
20        "died": (GObject.SignalFlags.RUN_FIRST, None, ())
21    }
22
23    def __init__(self, path, args=[], warnwords=[], env=None, cwd=".", lowPriority=False):
24        GObject.GObject.__init__(self)
25
26        self.path = path
27        self.args = args
28        self.warnwords = warnwords
29        self.env = env or os.environ
30        self.cwd = cwd
31        self.lowPriority = lowPriority
32
33        self.defname = os.path.split(path)[1]
34        self.defname = self.defname[:1].upper() + self.defname[1:].lower()
35        cur_time = time.time()
36        self.defname = (self.defname,
37                        time.strftime("%H:%m:%%.3f", time.localtime(cur_time)) %
38                        (cur_time % 60))
39        log.debug(path + " " + " ".join(self.args), extra={"task": self.defname})
40
41        self.argv = [str(u) for u in [self.path] + self.args]
42        self.terminated = False
43
44    @asyncio.coroutine
45    def start(self):
46        log.debug("SubProcess.start(): create_subprocess_exec...", extra={"task": self.defname})
47        if sys.platform == "win32":
48            # To prevent engines opening console window
49            startupinfo = subprocess.STARTUPINFO()
50            startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
51        else:
52            startupinfo = None
53
54        create = asyncio.create_subprocess_exec(* self.argv,
55                                                stdin=asyncio.subprocess.PIPE,
56                                                stdout=asyncio.subprocess.PIPE,
57                                                startupinfo=startupinfo,
58                                                env=self.env,
59                                                cwd=self.cwd)
60        try:
61            self.proc = yield from asyncio.wait_for(create, TIME_OUT_SECOND)
62            self.pid = self.proc.pid
63            # print(self.pid, self.path)
64            if self.lowPriority:
65                proc = psutil.Process(self.pid)
66                if sys.platform == "win32":
67                    niceness = psutil.BELOW_NORMAL_PRIORITY_CLASS
68                else:
69                    niceness = 15  # The higher, the lower the priority
70                if psutil.__version__ >= "2.0.0":
71                    proc.nice(niceness)
72                else:
73                    proc.set_nice(niceness)
74            self.read_stdout_task = create_task(self.read_stdout(self.proc.stdout))
75            self.write_task = None
76        except asyncio.TimeoutError:
77            log.warning("TimeoutError", extra={"task": self.defname})
78            raise
79        except GLib.GError:
80            log.warning("GLib.GError", extra={"task": self.defname})
81            raise
82        except Exception:
83            e = sys.exc_info()[0]
84            log.warning("%s" % e, extra={"task": self.defname})
85            raise
86
87    def write(self, line):
88        self.write_task = create_task(self.write_stdin(self.proc.stdin, line))
89
90    @asyncio.coroutine
91    def write_stdin(self, writer, line):
92        if self.terminated:
93            return
94        try:
95            log.debug(line, extra={"task": self.defname})
96            writer.write(line.encode())
97            yield from writer.drain()
98        except BrokenPipeError:
99            log.debug('SubProcess.write_stdin(): BrokenPipeError', extra={"task": self.defname})
100            self.emit("died")
101            self.terminate()
102        except ConnectionResetError:
103            log.debug('SubProcess.write_stdin(): ConnectionResetError', extra={"task": self.defname})
104            self.emit("died")
105            self.terminate()
106        except GLib.GError:
107            log.debug("SubProcess.write_stdin(): GLib.GError", extra={"task": self.defname})
108            self.emit("died")
109            self.terminate()
110
111    @asyncio.coroutine
112    def read_stdout(self, reader):
113        while True:
114            line = yield from reader.readline()
115            if line:
116                yield
117
118                try:
119                    line = line.decode().rstrip()
120                except UnicodeError:
121                    # Some engines send author names in different encodinds (f.e. spike)
122                    print("UnicodeError while decoding:", line)
123                    continue
124
125                if not line:
126                    continue
127
128                for word in self.warnwords:
129                    if word in line:
130                        log.debug(line, extra={"task": self.defname})
131                        break
132                else:
133                    log.debug(line, extra={"task": self.defname})
134                self.emit("line", line)
135            else:
136                self.emit("died")
137                break
138        self.terminate()
139
140    def terminate(self):
141        if self.write_task is not None:
142            self.write_task.cancel()
143        self.read_stdout_task.cancel()
144        try:
145            self.proc.terminate()
146            log.debug("SubProcess.terminate()", extra={"task": self.defname})
147        except ProcessLookupError:
148            log.debug("SubProcess.terminate(): ProcessLookupError", extra={"task": self.defname})
149
150        self.terminated = True
151
152    def pause(self):
153        if sys.platform != "win32":
154            try:
155                self.proc.send_signal(signal.SIGSTOP)
156            except ProcessLookupError:
157                log.debug("SubProcess.pause(): ProcessLookupError", extra={"task": self.defname})
158
159    def resume(self):
160        if sys.platform != "win32":
161            try:
162                self.proc.send_signal(signal.SIGCONT)
163            except ProcessLookupError:
164                log.debug("SubProcess.pause(): ProcessLookupError", extra={"task": self.defname})
165
166
167MENU_XML = """
168<?xml version="1.0" encoding="UTF-8"?>
169<interface>
170  <menu id="app-menu">
171    <section>
172      <item>
173        <attribute name="action">app.subprocess</attribute>
174        <attribute name="label">Subprocess</attribute>
175      </item>
176      <item>
177        <attribute name="action">app.quit</attribute>
178        <attribute name="label">Quit</attribute>
179    </item>
180    </section>
181  </menu>
182</interface>
183"""
184
185
186class Application(Gtk.Application):
187    def __init__(self):
188        Gtk.Application.__init__(self, application_id="org.subprocess")
189        self.window = None
190
191    def do_startup(self):
192        Gtk.Application.do_startup(self)
193
194        action = Gio.SimpleAction.new("subprocess", None)
195        action.connect("activate", self.on_subprocess)
196        self.add_action(action)
197
198        action = Gio.SimpleAction.new("quit", None)
199        action.connect("activate", self.on_quit)
200        self.add_action(action)
201
202        builder = Gtk.Builder.new_from_string(MENU_XML, -1)
203        self.set_app_menu(builder.get_object("app-menu"))
204
205    def do_activate(self):
206        if not self.window:
207            self.window = Gtk.ApplicationWindow(application=self)
208
209        self.window.present()
210
211    def on_subprocess(self, action, param):
212        proc = SubProcess("python", [os.path.expanduser("~") + "/pychess/lib/pychess/Players/PyChess.py", ])
213        create_task(self.parse_line(proc))
214        print("xboard", file=proc)
215        print("protover 2", file=proc)
216
217    @asyncio.coroutine
218    def parse_line(self, proc):
219        while True:
220            line = yield from wait_signal(proc, 'line')
221            if line:
222                print('  parse_line:', line[1])
223            else:
224                print("no more lines")
225                break
226
227    def on_quit(self, action, param):
228        self.quit()
229
230
231if __name__ == "__main__":
232    from pychess.external import gbulb
233    gbulb.install(gtk=True)
234    app = Application()
235
236    loop = asyncio.get_event_loop()
237    loop.set_debug(enabled=True)
238    loop.run_forever(application=app)
239