1""" 2 Support remote access to a Python interpreter. 3""" 4 5from guppy.etc import cmd 6from guppy import hpy 7from guppy.heapy import heapyc, Target 8from guppy.heapy.RemoteConstants import * 9from guppy.heapy.Console import Console 10from guppy.sets import mutbitset 11 12import atexit 13import io 14import queue 15import select 16import socket 17import sys 18import time 19import _thread 20import threading 21import traceback 22 23 24class SocketClosed(Exception): 25 pass 26 27 28class IsolatedCaller: 29 # Isolates the target interpreter from us 30 # when the _hiding_tag_ is set to the _hiding_tag_ of our hp. 31 # A solution of a problem discussed in notes Nov 8-9 2005. 32 # Note feb 3 2006: The class in the Target instance must be used. 33 34 def __init__(self, func): 35 self.func = func 36 37 def __call__(self, *args, **kwds): 38 return self.func(*args, **kwds) 39 40 41class QueueWithReadline(queue.Queue): 42 def readline(self, size=-1): 43 # Make sure we are interruptible 44 # in case we get a keyboard interrupt. 45 # Not a very elegant way but the 'only' there is? 46 while 1: 47 try: 48 return self.get(timeout=0.5) 49 except queue.Empty: 50 continue 51 52 53class InterruptableSocket: 54 def __init__(self, backing): 55 self._backing = backing 56 self.fileno = self._backing.fileno 57 self.close = self._backing.close 58 self.readable = self._backing.readable 59 self.writable = self._backing.writable 60 self.seekable = self._backing.seekable 61 62 @property 63 def closed(self): 64 return self._backing.closed 65 66 def read(self, size=-1): 67 while not select.select([self], [], [], 0.5)[0]: 68 pass 69 70 return self._backing.read(size) 71 72 73class NotiInput: 74 def __init__(self, input, output): 75 self.input = input 76 self.output = output 77 78 def read(self, size=-1): 79 # This may return less data than what was requested 80 return self.readline(size) 81 82 def readline(self, size=-1): 83 self.output.write(READLINE) 84 return self.input.readline(size) 85 86 87class Annex(cmd.Cmd): 88 address_family = socket.AF_INET 89 socket_type = socket.SOCK_STREAM 90 use_rawinput = 0 91 prompt = '<Annex> ' 92 93 def __init__(self, target, port=None): 94 cmd.Cmd.__init__(self) 95 if port is None: 96 port = HEAPYPORT 97 self.server_address = (LOCALHOST, port) 98 self.target = target 99 target.close = target.sys.modules['guppy.heapy.Remote'].IsolatedCaller( 100 # target.close = IsolatedCaller( 101 self.asynch_close) 102 self.socket = None 103 self.isclosed = 0 104 self.closelock = _thread.allocate_lock() 105 106 self.intlocals = { 107 } 108 self.do_reset('') 109 110 def asynch_close(self): 111 # This may be called asynchronously 112 # by some other thread than the current (annex) thread. 113 # So I need to protect for a possible race condition. 114 # It is NOT enough with just an atomic test-and-set here 115 # since we need to wait during the time a close initiated 116 # from another thread is in progress, before exiting. 117 118 self.closelock.acquire() 119 120 try: 121 if not self.isclosed: 122 self.isclosed = 1 123 self.disconnect() 124 finally: 125 self.closelock.release() 126 127 while hasattr(self, 'th') and self.th.is_alive(): 128 time.sleep(0.5) 129 130 def connect(self): 131 self.socket = socket.socket(self.address_family, 132 self.socket_type) 133 while not self.isclosed: 134 try: 135 self.socket.connect(self.server_address) 136 except SystemExit: 137 raise 138 except socket.error: 139 if self.isclosed: 140 raise 141 time.sleep(2) 142 else: 143 break 144 else: 145 return 146 147 self.stdout = io.TextIOWrapper( 148 self.socket.makefile('wb', buffering=0), 149 encoding='utf-8', write_through=True) 150 self.stdin = NotiInput(io.TextIOWrapper( 151 InterruptableSocket(self.socket.makefile('rb', buffering=0)), 152 encoding='utf-8', write_through=True), self.stdout) 153 self.stderr = sys.stderr 154 155 self.start_ki_thread() 156 157 cmd.Cmd.__init__(self, stdin=self.stdin, stdout=self.stdout) 158 159 def start_ki_thread(self): 160 # Start a thread that can generates keyboard interrupr 161 # Inserts a spy thread between old stdin and a new stdin 162 163 queue = QueueWithReadline() 164 ostdin = self.stdin 165 166 self.stdin = NotiInput(input=queue, 167 output=ostdin.output) 168 169 socket = self.socket 170 171 def run(): 172 try: 173 while socket is self.socket: 174 line = ostdin.input.readline() 175 if not line: 176 break 177 if line == KEYBOARDINTERRUPT: 178 if socket is self.socket: 179 heapyc.set_async_exc(self.target.annex_thread, 180 KeyboardInterrupt) 181 else: 182 queue.put(line) 183 finally: 184 if socket is self.socket: 185 heapyc.set_async_exc(self.target.annex_thread, 186 SocketClosed) 187 188 self.th = threading.Thread(target=run, 189 args=()) 190 self.th._hiding_tag_ = self.intlocals['hp']._hiding_tag_ 191 192 self.th.start() 193 194 def disconnect(self): 195 sock = self.socket 196 if sock is None: 197 return 198 self.socket = None 199 try: 200 sock.send(DONE) 201 except Exception: 202 pass 203 try: 204 sock.shutdown(socket.SHUT_RDWR) 205 except Exception: 206 pass 207 try: 208 sock.close() 209 except Exception: 210 pass 211 sys.last_traceback = None 212 213 def do_close(self, arg): 214 self.asynch_close() 215 return 1 216 217 def help_close(self): 218 print("""close 219----- 220Close and disable this remote connection completely. It can then not 221be reopened other than by some command from within the target process. 222 223Normally you shouldn't need to use this command, because you can 224return to the Monitor via other commands (<Ctrl-C> or .) keeping the 225connection open. 226 227But it might be useful when you want to get rid of the remote control 228interpreter and thread, if it uses too much memory or disturbs the 229target process in some other way.""", file=self.stdout) 230 231 do_h = cmd.Cmd.do_help 232 233 def help_h(self): 234 print("""h(elp) 235----- 236Without argument, print the list of available commands. 237With a command name as argument, print help about that command.""", file=self.stdout) 238 239 help_help = help_h 240 241 def do_int(self, arg): 242 # XXX We should really stop other tasks while we use changed stdio files 243 # but that seems to be hard to do 244 # so this is ok for some practical purposes. 245 # --- new note May 8 2005: 246 # --- and doesn't matter since we are in a different interpreter - 247 # --- so there is no XXX issue ? 248 ostdin = sys.stdin 249 ostdout = sys.stdout 250 ostderr = sys.stderr 251 252 try: 253 sys.stdin = self.stdin 254 sys.stdout = self.stdout 255 sys.stderr = self.stdout 256 257 con = Console(stdin=sys.stdin, stdout=sys.stdout, 258 locals=self.intlocals) 259 con.interact( 260 "Remote interactive console. To return to Annex, type %r." % 261 con.EOF_key_sequence) 262 263 finally: 264 sys.stdin = ostdin 265 sys.stdout = ostdout 266 sys.stderr = ostderr 267 268 def help_int(self): 269 print("""int 270----- 271Interactive console. 272Bring up a Python console in the Remote Control interpreter. 273 274This console will initially have access to a heapy constructor, named 275hpy, and a ready-made instance, named hp, and the target (see also the 276reset command). Other things may be imported as needed. 277 278After returning to the Annex (by q) or to the Monitor (by . or 279<Ctrl-C>), the data in the interactive console will remain there - and 280will be available till the next time the console is entered. But the 281data may be cleared and reset to the initial state - a new heapy 282instance will be created - by the 'reset' command of Annex. 283 284It should be noted that the interpreter thread under investigation is 285executing in parallell with the remote control interpreter. So there 286may be some problems to do with that if both are executing at the same 287time. This has to be dealt with for each case specifically.""", file=self.stdout) 288 289 _bname = 'a1e55f5dc4c9f708311e9f97b8098cd3' 290 291 def do_isolatest(self, arg): 292 hp = self.intlocals['hp'] 293 294 a = [] 295 self._a = a 296 b = [] 297 self.intlocals[self._bname] = b 298 # to make __builtins__ exist if it did not already 299 eval('0', self.intlocals) 300 301 testobjects = [a, 302 b, 303 self.intlocals['__builtins__'], 304 self.intlocals, 305 hp] 306 307 h = hp.heap() 308 if hp.iso(*testobjects) & h: 309 print('Isolation test failed.', file=self.stdout) 310 for i, v in enumerate(testobjects): 311 if hp.iso(v) & h: 312 print( 313 '-- Shortest Path(s) to testobjects[%d] --' % i, file=self.stdout) 314 print(hp.iso(v).shpaths, file=self.stdout) 315 else: 316 print('Isolation test succeeded.', file=self.stdout) 317 318 del self._a 319 del self.intlocals[self._bname] 320 321 def help_isolatest(self): 322 print("""isolatest 323---------- 324Isolation test. 325 326Test that the target interpreter heap view is isolated from the data 327in the remote control interpreter. Data introduced here, eg in the 328interactive console, should not be seen in the heap as reported by 329hp.heap() etc. This is achieved by setting hp to not follow the 330calling interpreter root. However, this isolation may become broken. 331This test is intended to diagnose this problem. The test checks that 332none of a number of test objects is visible in the target heap 333view. If the test failed, it will show the shortest path(s) to each of 334the test objects that was visible.""", file=self.stdout) 335 336 def do_q(self, arg): 337 print('To return to Monitor, type <Ctrl-C> or .', file=self.stdout) 338 print("To close this connection ('permanently'), type close", file=self.stdout) 339 340 def help_q(self): 341 print("""q 342----- 343Quit. 344 345This doesn't currently do anything except printing a message. (I 346thought it would be too confusing to have a q (quit) command from the 347Annex, when there was a similarly named command in the Monitor.)""", file=self.stdout) 348 349 def do_reset(self, arg): 350 self.intlocals.clear() 351 self.intlocals.update( 352 {'hpy': self.hpy, 353 'hp': self.hpy(), 354 'target': self.target 355 }) 356 # Set shorthand h, it is so commonly used 357 # and the instance name now used in README example etc 358 self.intlocals['h'] = self.intlocals['hp'] 359 360 def help_reset(self): 361 print("""reset 362----- 363Reset things to an initial state. 364 365This resets the state of the interactive console data only, for now. 366It is reinitialized to contain the following: 367 368hpy --- from guppy import hpy 369hp --- hp = hpy() 370target --- a reference to some data in the target interpreter 371h --- h = hp; h is a shorthand for hp 372 373(The hpy function is modified here from the normal one so 374it sets some options to make it be concerned with the target 375interpreter heap under investigation rather than the current one.) 376""", file=self.stdout) 377 378 def do_stat(self, arg): 379 print("Target overview", file=self.stdout) 380 print("------------------------------------", file=self.stdout) 381 print("target.sys.executable = %s" % 382 self.target.sys.executable, file=self.stdout) 383 print("target.sys.argv = %s" % 384 self.target.sys.argv, file=self.stdout) 385 print("target.wd = %s" % 386 self.target.wd, file=self.stdout) 387 print("target.pid = %d" % 388 self.target.pid, file=self.stdout) 389 print("------------------------------------", file=self.stdout) 390 391 def help_stat(self): 392 print("""stat 393----- 394Print an overview status table, with data from the target interpreter. 395 396In the table, sys.executable and sys.argv means the current values of 397those attributes in the sys module of the target interpreter. The row 398labeled target.wd is the working directory of the target interpreter, 399at the time the Remote Control interpreter was started (the actual 400working directory may have changed since that time). The row labeled 401target.pid is the process id of the target interpreter. 402 403""", file=self.stdout) 404 405 def hpy(self, *args, **kwds): 406 from guppy import hpy 407 hp = hpy(*args, **kwds) 408 hp.View.is_hiding_calling_interpreter = 1 409 hp.View.target = self.target 410 self.target._hiding_tag_ = hp._hiding_tag_ 411 self.target.close._hiding_tag_ = hp._hiding_tag_ 412 hp.reprefix = 'hp.' 413 return hp 414 415 def run(self): 416 try: 417 while not self.isclosed: 418 self.connect() 419 if not self.isclosed: 420 self.do_stat('') 421 while 1: 422 try: 423 self.cmdloop() 424 except SocketClosed: 425 break 426 except Exception: 427 try: 428 traceback.print_exc(file=self.stdout) 429 except Exception: 430 traceback.print_exc(file=sys.stdout) 431 break 432 continue 433 self.disconnect() 434 finally: 435 # Make sure the thread/interpreter can't terminate 436 # without the annex being closed, 437 # and that we WAIT if someone else is being closing us. 438 self.asynch_close() 439 440 441def on(): 442 # Start a remote monitoring enabling thread, 443 # unless I am that thread myself. 444 global annex_thread, target 445 if annex_thread is not None: 446 return 447 if getattr(sys, '_is_guppy_heapy_remote_interpreter_', 0): 448 return 449 start_annex = """\ 450# Set a flag to stop recursion when importing site 451# in case sitecustomize tries to do Remote.on() 452import sys 453sys._is_guppy_heapy_remote_interpreter_ = 1 454import site 455from guppy.heapy import Remote 456Remote.Annex(target).run() 457""" 458 target = Target.Target() 459 annex_thread = heapyc.interpreter(start_annex, {'target': target}) 460 target.annex_thread = annex_thread 461 462 463def off(): 464 global annex_thread, target 465 if annex_thread is None: 466 return 467 for i in range(10): 468 try: 469 close = target.close 470 except AttributeError: 471 # It may not have been initiated yet. 472 # wait and repeat 473 time.sleep(0.05) 474 else: 475 close() 476 break 477 else: 478 raise 479 480 heapyc.set_async_exc(annex_thread, SystemExit) 481 482 while True: 483 if not hasattr(heapyc.RootState, 't%d_async_exc' % annex_thread): 484 break 485 else: 486 time.sleep(0.05) 487 488 annex_thread = target = None 489 490 491annex_thread = None 492target = None 493 494atexit.register(off) 495