1"""
2    Support remote access to a Python interpreter.
3"""
4
5from guppy.etc import cmd
6from guppy import hpy
7from guppy.heapy import heapyc, Target
8from guppy.heapy.RemoteConstants import *
9from guppy.heapy.Console import Console
10from guppy.sets import mutbitset
11
12import atexit
13import io
14import queue
15import select
16import socket
17import sys
18import time
19import _thread
20import threading
21import traceback
22
23
24class SocketClosed(Exception):
25    pass
26
27
28class IsolatedCaller:
29    # Isolates the target interpreter from us
30    # when the _hiding_tag_ is set to the _hiding_tag_ of our hp.
31    # A solution of a problem discussed in notes Nov 8-9 2005.
32    # Note feb 3 2006: The class in the Target instance must be used.
33
34    def __init__(self, func):
35        self.func = func
36
37    def __call__(self, *args, **kwds):
38        return self.func(*args, **kwds)
39
40
41class QueueWithReadline(queue.Queue):
42    def readline(self, size=-1):
43        # Make sure we are interruptible
44        # in case we get a keyboard interrupt.
45        # Not a very elegant way but the 'only' there is?
46        while 1:
47            try:
48                return self.get(timeout=0.5)
49            except queue.Empty:
50                continue
51
52
53class InterruptableSocket:
54    def __init__(self, backing):
55        self._backing = backing
56        self.fileno = self._backing.fileno
57        self.close = self._backing.close
58        self.readable = self._backing.readable
59        self.writable = self._backing.writable
60        self.seekable = self._backing.seekable
61
62    @property
63    def closed(self):
64        return self._backing.closed
65
66    def read(self, size=-1):
67        while not select.select([self], [], [], 0.5)[0]:
68            pass
69
70        return self._backing.read(size)
71
72
73class NotiInput:
74    def __init__(self, input, output):
75        self.input = input
76        self.output = output
77
78    def read(self, size=-1):
79        # This may return less data than what was requested
80        return self.readline(size)
81
82    def readline(self, size=-1):
83        self.output.write(READLINE)
84        return self.input.readline(size)
85
86
87class Annex(cmd.Cmd):
88    address_family = socket.AF_INET
89    socket_type = socket.SOCK_STREAM
90    use_rawinput = 0
91    prompt = '<Annex> '
92
93    def __init__(self, target, port=None):
94        cmd.Cmd.__init__(self)
95        if port is None:
96            port = HEAPYPORT
97        self.server_address = (LOCALHOST, port)
98        self.target = target
99        target.close = target.sys.modules['guppy.heapy.Remote'].IsolatedCaller(
100        # target.close = IsolatedCaller(
101            self.asynch_close)
102        self.socket = None
103        self.isclosed = 0
104        self.closelock = _thread.allocate_lock()
105
106        self.intlocals = {
107        }
108        self.do_reset('')
109
110    def asynch_close(self):
111        # This may be called asynchronously
112        # by some other thread than the current (annex) thread.
113        # So I need to protect for a possible race condition.
114        # It is NOT enough with just an atomic test-and-set here
115        # since we need to wait during the time a close initiated
116        # from another thread is in progress, before exiting.
117
118        self.closelock.acquire()
119
120        try:
121            if not self.isclosed:
122                self.isclosed = 1
123                self.disconnect()
124        finally:
125            self.closelock.release()
126
127        while hasattr(self, 'th') and self.th.is_alive():
128            time.sleep(0.5)
129
130    def connect(self):
131        self.socket = socket.socket(self.address_family,
132                                    self.socket_type)
133        while not self.isclosed:
134            try:
135                self.socket.connect(self.server_address)
136            except SystemExit:
137                raise
138            except socket.error:
139                if self.isclosed:
140                    raise
141                time.sleep(2)
142            else:
143                break
144        else:
145            return
146
147        self.stdout = io.TextIOWrapper(
148            self.socket.makefile('wb', buffering=0),
149            encoding='utf-8', write_through=True)
150        self.stdin = NotiInput(io.TextIOWrapper(
151            InterruptableSocket(self.socket.makefile('rb', buffering=0)),
152            encoding='utf-8', write_through=True), self.stdout)
153        self.stderr = sys.stderr
154
155        self.start_ki_thread()
156
157        cmd.Cmd.__init__(self, stdin=self.stdin, stdout=self.stdout)
158
159    def start_ki_thread(self):
160        # Start a thread that can generates keyboard interrupr
161        # Inserts a spy thread between old stdin and a new stdin
162
163        queue = QueueWithReadline()
164        ostdin = self.stdin
165
166        self.stdin = NotiInput(input=queue,
167                               output=ostdin.output)
168
169        socket = self.socket
170
171        def run():
172            try:
173                while socket is self.socket:
174                    line = ostdin.input.readline()
175                    if not line:
176                        break
177                    if line == KEYBOARDINTERRUPT:
178                        if socket is self.socket:
179                            heapyc.set_async_exc(self.target.annex_thread,
180                                                 KeyboardInterrupt)
181                    else:
182                        queue.put(line)
183            finally:
184                if socket is self.socket:
185                    heapyc.set_async_exc(self.target.annex_thread,
186                                         SocketClosed)
187
188        self.th = threading.Thread(target=run,
189                                   args=())
190        self.th._hiding_tag_ = self.intlocals['hp']._hiding_tag_
191
192        self.th.start()
193
194    def disconnect(self):
195        sock = self.socket
196        if sock is None:
197            return
198        self.socket = None
199        try:
200            sock.send(DONE)
201        except Exception:
202            pass
203        try:
204            sock.shutdown(socket.SHUT_RDWR)
205        except Exception:
206            pass
207        try:
208            sock.close()
209        except Exception:
210            pass
211        sys.last_traceback = None
212
213    def do_close(self, arg):
214        self.asynch_close()
215        return 1
216
217    def help_close(self):
218        print("""close
219-----
220Close and disable this remote connection completely.  It can then not
221be reopened other than by some command from within the target process.
222
223Normally you shouldn't need to use this command, because you can
224return to the Monitor via other commands (<Ctrl-C> or .) keeping the
225connection open.
226
227But it might be useful when you want to get rid of the remote control
228interpreter and thread, if it uses too much memory or disturbs the
229target process in some other way.""", file=self.stdout)
230
231    do_h = cmd.Cmd.do_help
232
233    def help_h(self):
234        print("""h(elp)
235-----
236Without argument, print the list of available commands.
237With a command name as argument, print help about that command.""", file=self.stdout)
238
239    help_help = help_h
240
241    def do_int(self, arg):
242        # XXX We should really stop other tasks while we use changed stdio files
243        # but that seems to be hard to do
244        # so this is ok for some practical purposes.
245        # --- new note May 8 2005:
246        # --- and doesn't matter since we are in a different interpreter -
247        # --- so there is no XXX issue ?
248        ostdin = sys.stdin
249        ostdout = sys.stdout
250        ostderr = sys.stderr
251
252        try:
253            sys.stdin = self.stdin
254            sys.stdout = self.stdout
255            sys.stderr = self.stdout
256
257            con = Console(stdin=sys.stdin, stdout=sys.stdout,
258                          locals=self.intlocals)
259            con.interact(
260                "Remote interactive console. To return to Annex, type %r." %
261                con.EOF_key_sequence)
262
263        finally:
264            sys.stdin = ostdin
265            sys.stdout = ostdout
266            sys.stderr = ostderr
267
268    def help_int(self):
269        print("""int
270-----
271Interactive console.
272Bring up a Python console in the Remote Control interpreter.
273
274This console will initially have access to a heapy constructor, named
275hpy, and a ready-made instance, named hp, and the target (see also the
276reset command).  Other things may be imported as needed.
277
278After returning to the Annex (by q) or to the Monitor (by . or
279<Ctrl-C>), the data in the interactive console will remain there - and
280will be available till the next time the console is entered.  But the
281data may be cleared and reset to the initial state - a new heapy
282instance will be created - by the 'reset' command of Annex.
283
284It should be noted that the interpreter thread under investigation is
285executing in parallell with the remote control interpreter. So there
286may be some problems to do with that if both are executing at the same
287time. This has to be dealt with for each case specifically.""", file=self.stdout)
288
289    _bname = 'a1e55f5dc4c9f708311e9f97b8098cd3'
290
291    def do_isolatest(self, arg):
292        hp = self.intlocals['hp']
293
294        a = []
295        self._a = a
296        b = []
297        self.intlocals[self._bname] = b
298        # to make __builtins__ exist if it did not already
299        eval('0', self.intlocals)
300
301        testobjects = [a,
302                       b,
303                       self.intlocals['__builtins__'],
304                       self.intlocals,
305                       hp]
306
307        h = hp.heap()
308        if hp.iso(*testobjects) & h:
309            print('Isolation test failed.', file=self.stdout)
310            for i, v in enumerate(testobjects):
311                if hp.iso(v) & h:
312                    print(
313                        '-- Shortest Path(s) to testobjects[%d] --' % i, file=self.stdout)
314                    print(hp.iso(v).shpaths, file=self.stdout)
315        else:
316            print('Isolation test succeeded.', file=self.stdout)
317
318        del self._a
319        del self.intlocals[self._bname]
320
321    def help_isolatest(self):
322        print("""isolatest
323----------
324Isolation test.
325
326Test that the target interpreter heap view is isolated from the data
327in the remote control interpreter. Data introduced here, eg in the
328interactive console, should not be seen in the heap as reported by
329hp.heap() etc. This is achieved by setting hp to not follow the
330calling interpreter root.  However, this isolation may become broken.
331This test is intended to diagnose this problem. The test checks that
332none of a number of test objects is visible in the target heap
333view. If the test failed, it will show the shortest path(s) to each of
334the test objects that was visible.""", file=self.stdout)
335
336    def do_q(self, arg):
337        print('To return to Monitor, type <Ctrl-C> or .', file=self.stdout)
338        print("To close this connection ('permanently'), type close", file=self.stdout)
339
340    def help_q(self):
341        print("""q
342-----
343Quit.
344
345This doesn't currently do anything except printing a message.  (I
346thought it would be too confusing to have a q (quit) command from the
347Annex, when there was a similarly named command in the Monitor.)""", file=self.stdout)
348
349    def do_reset(self, arg):
350        self.intlocals.clear()
351        self.intlocals.update(
352            {'hpy': self.hpy,
353             'hp': self.hpy(),
354             'target': self.target
355             })
356        # Set shorthand h, it is so commonly used
357        # and the instance name now used in README example etc
358        self.intlocals['h'] = self.intlocals['hp']
359
360    def help_reset(self):
361        print("""reset
362-----
363Reset things to an initial state.
364
365This resets the state of the interactive console data only, for now.
366It is reinitialized to contain the following:
367
368hpy     --- from guppy import hpy
369hp      --- hp = hpy()
370target  --- a reference to some data in the target interpreter
371h       --- h = hp; h is a shorthand for hp
372
373(The hpy function is modified here from the normal one so
374it sets some options to make it be concerned with the target
375interpreter heap under investigation rather than the current one.)
376""", file=self.stdout)
377
378    def do_stat(self, arg):
379        print("Target overview", file=self.stdout)
380        print("------------------------------------", file=self.stdout)
381        print("target.sys.executable   = %s" %
382              self.target.sys.executable, file=self.stdout)
383        print("target.sys.argv         = %s" %
384              self.target.sys.argv, file=self.stdout)
385        print("target.wd               = %s" %
386              self.target.wd, file=self.stdout)
387        print("target.pid              = %d" %
388              self.target.pid, file=self.stdout)
389        print("------------------------------------", file=self.stdout)
390
391    def help_stat(self):
392        print("""stat
393-----
394Print an overview status table, with data from the target interpreter.
395
396In the table, sys.executable and sys.argv means the current values of
397those attributes in the sys module of the target interpreter. The row
398labeled target.wd is the working directory of the target interpreter,
399at the time the Remote Control interpreter was started (the actual
400working directory may have changed since that time). The row labeled
401target.pid is the process id of the target interpreter.
402
403""", file=self.stdout)
404
405    def hpy(self, *args, **kwds):
406        from guppy import hpy
407        hp = hpy(*args, **kwds)
408        hp.View.is_hiding_calling_interpreter = 1
409        hp.View.target = self.target
410        self.target._hiding_tag_ = hp._hiding_tag_
411        self.target.close._hiding_tag_ = hp._hiding_tag_
412        hp.reprefix = 'hp.'
413        return hp
414
415    def run(self):
416        try:
417            while not self.isclosed:
418                self.connect()
419                if not self.isclosed:
420                    self.do_stat('')
421                    while 1:
422                        try:
423                            self.cmdloop()
424                        except SocketClosed:
425                            break
426                        except Exception:
427                            try:
428                                traceback.print_exc(file=self.stdout)
429                            except Exception:
430                                traceback.print_exc(file=sys.stdout)
431                                break
432                            continue
433                self.disconnect()
434        finally:
435            # Make sure the thread/interpreter can't terminate
436            # without the annex being closed,
437            # and that we WAIT if someone else is being closing us.
438            self.asynch_close()
439
440
441def on():
442    # Start a remote monitoring enabling thread,
443    # unless I am that thread myself.
444    global annex_thread, target
445    if annex_thread is not None:
446        return
447    if getattr(sys, '_is_guppy_heapy_remote_interpreter_', 0):
448        return
449    start_annex = """\
450# Set a flag to stop recursion when importing site
451# in case sitecustomize tries to do Remote.on()
452import sys
453sys._is_guppy_heapy_remote_interpreter_ = 1
454import site
455from guppy.heapy import Remote
456Remote.Annex(target).run()
457"""
458    target = Target.Target()
459    annex_thread = heapyc.interpreter(start_annex, {'target': target})
460    target.annex_thread = annex_thread
461
462
463def off():
464    global annex_thread, target
465    if annex_thread is None:
466        return
467    for i in range(10):
468        try:
469            close = target.close
470        except AttributeError:
471            # It may not have been initiated yet.
472            # wait and repeat
473            time.sleep(0.05)
474        else:
475            close()
476            break
477    else:
478        raise
479
480    heapyc.set_async_exc(annex_thread, SystemExit)
481
482    while True:
483        if not hasattr(heapyc.RootState, 't%d_async_exc' % annex_thread):
484            break
485        else:
486            time.sleep(0.05)
487
488    annex_thread = target = None
489
490
491annex_thread = None
492target = None
493
494atexit.register(off)
495