1"""Generic wrapper for read-eval-print-loops, a.k.a. interactive shells 2""" 3import os.path 4import signal 5import sys 6 7import pexpect 8 9PY3 = (sys.version_info[0] >= 3) 10 11if PY3: 12 basestring = str 13 14PEXPECT_PROMPT = u'[PEXPECT_PROMPT>' 15PEXPECT_CONTINUATION_PROMPT = u'[PEXPECT_PROMPT+' 16 17class REPLWrapper(object): 18 """Wrapper for a REPL. 19 20 :param cmd_or_spawn: This can either be an instance of :class:`pexpect.spawn` 21 in which a REPL has already been started, or a str command to start a new 22 REPL process. 23 :param str orig_prompt: The prompt to expect at first. 24 :param str prompt_change: A command to change the prompt to something more 25 unique. If this is ``None``, the prompt will not be changed. This will 26 be formatted with the new and continuation prompts as positional 27 parameters, so you can use ``{}`` style formatting to insert them into 28 the command. 29 :param str new_prompt: The more unique prompt to expect after the change. 30 :param str extra_init_cmd: Commands to do extra initialisation, such as 31 disabling pagers. 32 """ 33 def __init__(self, cmd_or_spawn, orig_prompt, prompt_change, 34 new_prompt=PEXPECT_PROMPT, 35 continuation_prompt=PEXPECT_CONTINUATION_PROMPT, 36 extra_init_cmd=None): 37 if isinstance(cmd_or_spawn, basestring): 38 self.child = pexpect.spawn(cmd_or_spawn, echo=False, encoding='utf-8') 39 else: 40 self.child = cmd_or_spawn 41 if self.child.echo: 42 # Existing spawn instance has echo enabled, disable it 43 # to prevent our input from being repeated to output. 44 self.child.setecho(False) 45 self.child.waitnoecho() 46 47 if prompt_change is None: 48 self.prompt = orig_prompt 49 else: 50 self.set_prompt(orig_prompt, 51 prompt_change.format(new_prompt, continuation_prompt)) 52 self.prompt = new_prompt 53 self.continuation_prompt = continuation_prompt 54 55 self._expect_prompt() 56 57 if extra_init_cmd is not None: 58 self.run_command(extra_init_cmd) 59 60 def set_prompt(self, orig_prompt, prompt_change): 61 self.child.expect(orig_prompt) 62 self.child.sendline(prompt_change) 63 64 def _expect_prompt(self, timeout=-1, async_=False): 65 return self.child.expect_exact([self.prompt, self.continuation_prompt], 66 timeout=timeout, async_=async_) 67 68 def run_command(self, command, timeout=-1, async_=False): 69 """Send a command to the REPL, wait for and return output. 70 71 :param str command: The command to send. Trailing newlines are not needed. 72 This should be a complete block of input that will trigger execution; 73 if a continuation prompt is found after sending input, :exc:`ValueError` 74 will be raised. 75 :param int timeout: How long to wait for the next prompt. -1 means the 76 default from the :class:`pexpect.spawn` object (default 30 seconds). 77 None means to wait indefinitely. 78 :param bool async_: On Python 3.4, or Python 3.3 with asyncio 79 installed, passing ``async_=True`` will make this return an 80 :mod:`asyncio` Future, which you can yield from to get the same 81 result that this method would normally give directly. 82 """ 83 # Split up multiline commands and feed them in bit-by-bit 84 cmdlines = command.splitlines() 85 # splitlines ignores trailing newlines - add it back in manually 86 if command.endswith('\n'): 87 cmdlines.append('') 88 if not cmdlines: 89 raise ValueError("No command was given") 90 91 if async_: 92 from ._async import repl_run_command_async 93 return repl_run_command_async(self, cmdlines, timeout) 94 95 res = [] 96 self.child.sendline(cmdlines[0]) 97 for line in cmdlines[1:]: 98 self._expect_prompt(timeout=timeout) 99 res.append(self.child.before) 100 self.child.sendline(line) 101 102 # Command was fully submitted, now wait for the next prompt 103 if self._expect_prompt(timeout=timeout) == 1: 104 # We got the continuation prompt - command was incomplete 105 self.child.kill(signal.SIGINT) 106 self._expect_prompt(timeout=1) 107 raise ValueError("Continuation prompt found - input was incomplete:\n" 108 + command) 109 return u''.join(res + [self.child.before]) 110 111def python(command="python"): 112 """Start a Python shell and return a :class:`REPLWrapper` object.""" 113 return REPLWrapper(command, u">>> ", u"import sys; sys.ps1={0!r}; sys.ps2={1!r}") 114 115def bash(command="bash"): 116 """Start a bash shell and return a :class:`REPLWrapper` object.""" 117 bashrc = os.path.join(os.path.dirname(__file__), 'bashrc.sh') 118 child = pexpect.spawn(command, ['--rcfile', bashrc], echo=False, 119 encoding='utf-8') 120 121 # If the user runs 'env', the value of PS1 will be in the output. To avoid 122 # replwrap seeing that as the next prompt, we'll embed the marker characters 123 # for invisible characters in the prompt; these show up when inspecting the 124 # environment variable, but not when bash displays the prompt. 125 ps1 = PEXPECT_PROMPT[:5] + u'\\[\\]' + PEXPECT_PROMPT[5:] 126 ps2 = PEXPECT_CONTINUATION_PROMPT[:5] + u'\\[\\]' + PEXPECT_CONTINUATION_PROMPT[5:] 127 prompt_change = u"PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2) 128 129 return REPLWrapper(child, u'\\$', prompt_change, 130 extra_init_cmd="export PAGER=cat") 131