1#!/usr/bin/env python3
2'''
3Provides IPython console widget.
4
5@author: Eitan Isaacson
6@organization: IBM Corporation
7@copyright: Copyright (c) 2007 IBM Corporation
8@license: BSD
9
10All rights reserved. This program and the accompanying materials are made
11available under the terms of the BSD which accompanies this distribution, and
12is available at U{https://www.opensource.org/licenses/bsd-license.php}
13'''
14
15# Taken from [1] (rev 36e2742, 2019-07-27) with slight modifications.
16# The exact license [2] is reproduced below (3-clause BSD).
17#
18# [1] https://git.gnome.org/browse/accerciser/tree/plugins/ipython_view.py
19# [2] https://git.gnome.org/browse/accerciser/tree/COPYING
20
21# Copyright (c) 2005, 2007 IBM Corporation
22# All rights reserved.
23#
24# Redistribution and use in source and binary forms, with or without
25# modification, are permitted provided that the following conditions are met:
26#
27#     * Redistributions of source code must retain the above copyright notice,
28#     this list of conditions and the following disclaimer.
29#     * Redistributions in binary form must reproduce the above copyright notice,
30#     this list of conditions and the following disclaimer in the documentation
31#     and/or other materials provided with the distribution.
32#     * Neither the name of the IBM Corporation nor the names of its contributors
33#     may be used to endorse or promote products derived from this software
34#     without specific prior written permission.
35#
36# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
37# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
38# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
39# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
40# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
41# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
42# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
43# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
44# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
45# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
46
47
48import re
49import sys
50import os
51import subprocess
52
53from io import StringIO
54from functools import reduce
55
56from gi.repository import Gtk
57from gi.repository import Gdk
58from gi.repository import GLib
59from gi.repository import Pango
60
61import IPython
62
63
64IPYTHON_VERSION = int(IPython.__version__.split('.', 1)[0])
65
66
67class IterableIPShell:
68    """
69    Create an IPython instance. Does not start a blocking event loop,
70    instead allow single iterations. This allows embedding in GTK+
71    without blockage.
72
73    @ivar IP: IPython instance.
74    @type IP: IPython.iplib.InteractiveShell
75    @ivar iter_more: Indicates if the line executed was a complete command,
76    or we should wait for more.
77    @type iter_more: integer
78    @ivar history_level: The place in history where we currently are
79    when pressing up/down.
80    @type history_level: integer
81    @ivar complete_sep: Separation delimiters for completion function.
82    @type complete_sep: _sre.SRE_Pattern
83    """
84
85    def __init__(self, user_ns=None, cin=None, cout=None, cerr=None, input_func=None):
86        """
87
88        @param argv: Command line options for IPython
89        @type argv: list
90        @param user_ns: User namespace.
91        @type user_ns: dictionary
92        @param user_global_ns: User global namespace.
93        @type user_global_ns: dictionary.
94        @param cin: Console standard input.
95        @type cin: IO stream
96        @param cout: Console standard output.
97        @type cout: IO stream
98        @param cerr: Console standard error.
99        @type cerr: IO stream
100        @param input_func: Replacement for builtin raw_input()
101        @type input_func: function
102        """
103
104        self.prompt = None
105
106        ip_io = IPython.utils.io
107        if input_func:
108            IPython.terminal.interactiveshell.raw_input_original = input_func
109        if cin:
110            ip_io.stdin = ip_io.IOStream(cin)
111        if cout:
112            ip_io.stdout = ip_io.IOStream(cout)
113        if cerr:
114            ip_io.stderr = ip_io.IOStream(cerr)
115
116        # This is to get rid of the blockage that occurs during
117        # IPython.Shell.InteractiveShell.user_setup()
118
119        ip_io.raw_input = lambda x: None
120
121        os.environ['TERM'] = 'dumb'
122        excepthook = sys.excepthook
123
124        if IPYTHON_VERSION >= 4:
125            from traitlets.config.loader import Config
126        else:
127            from IPython.config.loader import Config
128
129        cfg = Config()
130        cfg.InteractiveShell.colors = "Linux"
131        cfg.Completer.use_jedi = False
132
133        # InteractiveShell's __init__ overwrites ip_io.stdout,ip_io.stderr with
134        # sys.stdout, sys.stderr, this makes sure they are right
135        #
136        old_stdout, old_stderr = sys.stdout, sys.stderr
137        sys.stdout, sys.stderr = ip_io.stdout.stream, ip_io.stderr.stream
138
139        # InteractiveShell inherits from SingletonConfigurable, so use instance()
140        #
141        self.IP = IPython.terminal.embed.InteractiveShellEmbed.instance(
142            config=cfg, user_ns=user_ns
143        )
144
145        sys.stdout, sys.stderr = old_stdout, old_stderr
146
147        self.IP.system = lambda cmd: self.shell(
148            self.IP.var_expand(cmd), header='IPython system call: '
149        )
150        # local_ns=user_ns)
151        # global_ns=user_global_ns)
152        # verbose=self.IP.rc.system_verbose)
153
154        self.IP.raw_input = input_func
155        sys.excepthook = excepthook
156        self.iter_more = 0
157        self.history_level = 0
158        self.complete_sep = re.compile(r'[\s\{\}\[\]\(\)]')
159        self.updateNamespace({'exit': lambda: None})
160        self.updateNamespace({'quit': lambda: None})
161        if int(IPYTHON_VERSION) < 5:  # HACK
162            self.IP.readline_startup_hook(self.IP.pre_readline)
163        # Workaround for updating namespace with sys.modules
164        #
165        self.__update_namespace()
166
167        # help() is blocking, which hangs GTK+.
168        import pydoc
169
170        self.updateNamespace({'help': pydoc.doc})
171
172        # Avoid using input splitter when not really needed.
173        # Perhaps it could work even before 5.8.0
174        # But it definitely does not work any more with >= 7.0.0
175        self.no_input_splitter = IPYTHON_VERSION >= 5
176        self.lines = []
177        self.indent_spaces = ''
178
179    def __update_namespace(self):
180        """
181        Update self.IP namespace for autocompletion with sys.modules
182        """
183        for key, value in sys.modules.items():
184            if '.' not in key:
185                self.IP.user_ns.update({key: value})
186
187    def execute(self):
188        """
189        Executes the current line provided by the shell object.
190        """
191        self.history_level = 0
192
193        # this is needed because some functions in IPython use 'print' to print
194        # output (like 'who')
195        #
196        orig_stdout = sys.stdout
197        sys.stdout = IPython.utils.io.stdout
198
199        orig_stdin = sys.stdin
200        sys.stdin = IPython.utils.io.stdin
201        self.prompt = self.generatePrompt(self.iter_more)
202
203        self.IP.hooks.pre_prompt_hook()
204        if self.iter_more:
205            try:
206                self.prompt = self.generatePrompt(True)
207            except:
208                self.IP.showtraceback()
209            if self.IP.autoindent:
210                self.IP.rl_do_indent = True
211
212        try:
213            line = self.IP.raw_input(self.prompt)
214        except KeyboardInterrupt:
215            self.IP.write('\nKeyboardInterrupt\n')
216            if self.no_input_splitter:
217                self.lines = []
218            else:
219                self.IP.input_splitter.reset()
220        except:
221            self.IP.showtraceback()
222        else:
223            if self.no_input_splitter:
224                self.lines.append(line)
225                (status, self.indent_spaces) = self.IP.check_complete(
226                    '\n'.join(self.lines)
227                )
228                self.iter_more = status == 'incomplete'
229            else:
230                self.IP.input_splitter.push(line)
231                self.iter_more = self.IP.input_splitter.push_accepts_more()
232            self.prompt = self.generatePrompt(self.iter_more)
233            if not self.iter_more:
234                if self.no_input_splitter:
235                    source_raw = '\n'.join(self.lines)
236                    self.lines = []
237                else:
238                    source_raw = self.IP.input_splitter.raw_reset()
239                self.IP.run_cell(source_raw, store_history=True)
240                self.IP.rl_do_indent = False
241            else:
242                # TODO: Auto-indent
243                #
244                self.IP.rl_do_indent = True
245
246        sys.stdout = orig_stdout
247        sys.stdin = orig_stdin
248
249    def generatePrompt(self, is_continuation):
250        """
251        Generate prompt depending on is_continuation value
252
253        @param is_continuation
254        @type is_continuation: boolean
255
256        @return: The prompt string representation
257        @rtype: string
258
259        """
260        if IPYTHON_VERSION >= 5:  # HACK
261            return '... ' if is_continuation else '>>> '
262        if is_continuation:
263            prompt = self.IP.prompt_manager.render('in2')
264        else:
265            prompt = self.IP.prompt_manager.render('in')
266
267        return prompt
268
269    def historyBack(self):
270        """
271        Provides one history command back.
272
273        @return: The command string.
274        @rtype: string
275        """
276        self.history_level -= 1
277        if not self._getHistory():
278            self.history_level += 1
279        return self._getHistory()
280
281    def historyForward(self):
282        """
283        Provides one history command forward.
284
285        @return: The command string.
286        @rtype: string
287        """
288        if self.history_level < 0:
289            self.history_level += 1
290        return self._getHistory()
291
292    def _getHistory(self):
293        """
294        Get's the command string of the current history level.
295
296        @return: Historic command string.
297        @rtype: string
298        """
299        try:
300            rv = self.IP.user_ns['In'][self.history_level].strip('\n')
301        except IndexError:
302            rv = ''
303        return rv
304
305    def updateNamespace(self, ns_dict):
306        """
307        Add the current dictionary to the shell namespace.
308
309        @param ns_dict: A dictionary of symbol-values.
310        @type ns_dict: dictionary
311        """
312        self.IP.user_ns.update(ns_dict)
313
314    def complete(self, line):
315        """
316        Returns an auto completed line and/or possibilities for completion.
317
318        @param line: Given line so far.
319        @type line: string
320
321        @return: Line completed as for as possible,
322        and possible further completions.
323        @rtype: tuple
324        """
325        split_line = self.complete_sep.split(line)
326        if split_line[-1]:
327            possibilities = self.IP.complete(split_line[-1])
328        else:
329            completed = line
330            possibilities = ['', []]
331        if possibilities:
332
333            def _commonPrefix(str1, str2):
334                """
335                Reduction function. returns common prefix of two given strings.
336
337                @param str1: First string.
338                @type str1: string
339                @param str2: Second string
340                @type str2: string
341
342                @return: Common prefix to both strings.
343                @rtype: string
344                """
345                for i in range(len(str1)):
346                    if not str2.startswith(str1[: i + 1]):
347                        return str1[:i]
348                return str1
349
350            if possibilities[1]:
351                common_prefix = (
352                    reduce(_commonPrefix, possibilities[1]) or split_line[-1]
353                )
354                completed = line[: -len(split_line[-1])] + common_prefix
355            else:
356                completed = line
357        else:
358            completed = line
359        return completed, possibilities[1]
360
361    def shell(self, cmd, verbose=0, debug=0, header=''):
362        """
363        Replacement method to allow shell commands without them blocking.
364
365        @param cmd: Shell command to execute.
366        @type cmd: string
367        @param verbose: Verbosity
368        @type verbose: integer
369        @param debug: Debug level
370        @type debug: integer
371        @param header: Header to be printed before output
372        @type header: string
373        """
374        if verbose or debug:
375            print((header + cmd))
376        # flush stdout so we don't mangle python's buffering
377        if not debug:
378            popen = subprocess.Popen(
379                cmd,
380                bufsize=0,
381                stdin=subprocess.PIPE,
382                stdout=subprocess.PIPE,
383                stderr=subprocess.STDOUT,
384                close_fds=True,
385            )
386            print((popen.stdout.read()))
387
388
389class ConsoleView(Gtk.TextView):
390    """
391    Specialized text view for console-like workflow.
392
393    @cvar ANSI_COLORS: Mapping of terminal colors to X11 names.
394    @type ANSI_COLORS: dictionary
395
396    @ivar text_buffer: Widget's text buffer.
397    @type text_buffer: gtk.TextBuffer
398    @ivar color_pat: Regex of terminal color pattern
399    @type color_pat: _sre.SRE_Pattern
400    @ivar mark: Scroll mark for automatic scrolling on input.
401    @type mark: gtk.TextMark
402    @ivar line_start: Start of command line mark.
403    @type line_start: gtk.TextMark
404    """
405
406    ANSI_COLORS = {
407        '0;30': 'Black',
408        '0;31': 'Red',
409        '0;32': 'Green',
410        '0;33': 'Brown',
411        '0;34': 'Blue',
412        '0;35': 'Purple',
413        '0;36': 'Cyan',
414        '0;37': 'LightGray',
415        '1;30': 'DarkGray',
416        '1;31': 'DarkRed',
417        '1;32': 'SeaGreen',
418        '1;33': 'Yellow',
419        '1;34': 'LightBlue',
420        '1;35': 'MediumPurple',
421        '1;36': 'LightCyan',
422        '1;37': 'White',
423    }
424
425    def __init__(self):
426        """
427        Initialize console view.
428        """
429        Gtk.TextView.__init__(self)
430        pango_ctx = self.get_pango_context()
431        chosen = None
432        for f in pango_ctx.list_families():
433            name = f.get_name()
434            # These are known to show e.g U+FFFC
435            if name in ["Courier New", "Courier Mono"]:
436                chosen = name
437                break
438            # if name in ["Liberation Sans"]:
439            #    chosen = name
440            #    # But prefer a monospace one if possible
441        if chosen == None:
442            chosen = "Mono"
443        self.modify_font(Pango.FontDescription(chosen))
444        self.set_cursor_visible(True)
445        self.text_buffer = self.get_buffer()
446        self.mark = self.text_buffer.create_mark(
447            'scroll_mark', self.text_buffer.get_end_iter(), False
448        )
449        for code in self.ANSI_COLORS:
450            self.text_buffer.create_tag(
451                code, foreground=self.ANSI_COLORS[code], weight=700
452            )
453        self.text_buffer.create_tag('0')
454        self.text_buffer.create_tag('notouch', editable=False)
455        self.color_pat = re.compile(r'\x01?\x1b\[(.*?)m\x02?')
456        self.line_start = self.text_buffer.create_mark(
457            'line_start', self.text_buffer.get_end_iter(), True
458        )
459        self.connect('key-press-event', self.onKeyPress)
460
461    def write(self, text, editable=False):
462        GLib.idle_add(self._write, text, editable)
463
464    def _write(self, text, editable=False):
465        """
466        Write given text to buffer.
467
468        @param text: Text to append.
469        @type text: string
470        @param editable: If true, added text is editable.
471        @type editable: boolean
472        """
473        segments = self.color_pat.split(text)
474        segment = segments.pop(0)
475        start_mark = self.text_buffer.create_mark(
476            None, self.text_buffer.get_end_iter(), True
477        )
478        self.text_buffer.insert(self.text_buffer.get_end_iter(), segment)
479
480        if segments:
481            ansi_tags = self.color_pat.findall(text)
482            for tag in ansi_tags:
483                i = segments.index(tag)
484                self.text_buffer.insert_with_tags_by_name(
485                    self.text_buffer.get_end_iter(), segments[i + 1], tag
486                )
487                segments.pop(i)
488        if not editable:
489            self.text_buffer.apply_tag_by_name(
490                'notouch',
491                self.text_buffer.get_iter_at_mark(start_mark),
492                self.text_buffer.get_end_iter(),
493            )
494        self.text_buffer.delete_mark(start_mark)
495        self.scroll_mark_onscreen(self.mark)
496
497    def showPrompt(self, prompt):
498        GLib.idle_add(self._showPrompt, prompt)
499
500    def _showPrompt(self, prompt):
501        """
502        Prints prompt at start of line.
503
504        @param prompt: Prompt to print.
505        @type prompt: string
506        """
507        self._write(prompt)
508        self.text_buffer.move_mark(self.line_start, self.text_buffer.get_end_iter())
509
510    def changeLine(self, text):
511        GLib.idle_add(self._changeLine, text)
512
513    def _changeLine(self, text):
514        """
515        Replace currently entered command line with given text.
516
517        @param text: Text to use as replacement.
518        @type text: string
519        """
520        text_iter = self.text_buffer.get_iter_at_mark(self.line_start)
521        text_iter.forward_to_line_end()
522        self.text_buffer.delete(
523            self.text_buffer.get_iter_at_mark(self.line_start), text_iter
524        )
525        self._write(text, True)
526
527    def getCurrentLine(self):
528        """
529        Get text in current command line.
530
531        @return: Text of current command line.
532        @rtype: string
533        """
534        rv = self.text_buffer.get_slice(
535            self.text_buffer.get_iter_at_mark(self.line_start),
536            self.text_buffer.get_end_iter(),
537            False,
538        )
539        return rv
540
541    def showReturned(self, text):
542        GLib.idle_add(self._showReturned, text)
543
544    def _showReturned(self, text):
545        """
546        Show returned text from last command and print new prompt.
547
548        @param text: Text to show.
549        @type text: string
550        """
551        text_iter = self.text_buffer.get_iter_at_mark(self.line_start)
552        text_iter.forward_to_line_end()
553        self.text_buffer.apply_tag_by_name(
554            'notouch', self.text_buffer.get_iter_at_mark(self.line_start), text_iter
555        )
556        self._write('\n' + text)
557        if text:
558            self._write('\n')
559        self._write('\n')  # Add extra line, like normal IPython
560        self._showPrompt(self.prompt)
561        self.text_buffer.move_mark(self.line_start, self.text_buffer.get_end_iter())
562        self.text_buffer.place_cursor(self.text_buffer.get_end_iter())
563
564        if self.IP.rl_do_indent:
565            if self.no_input_splitter:
566                indentation = self.indent_spaces
567            else:
568                indentation = self.IP.input_splitter.indent_spaces * ' '
569            self.text_buffer.insert_at_cursor(indentation)
570
571    def onKeyPress(self, _widget, event):
572        """
573        Key press callback used for correcting behavior for console-like
574        interfaces. For example 'home' should go to prompt, not to beginning of
575        line.
576
577        @param widget: Widget that key press occurred in.
578        @type widget: gtk.Widget
579        @param event: Event object
580        @type event: gtk.gdk.Event
581
582        @return: Return True if event should not trickle.
583        @rtype: boolean
584        """
585        insert_mark = self.text_buffer.get_insert()
586        insert_iter = self.text_buffer.get_iter_at_mark(insert_mark)
587        selection_mark = self.text_buffer.get_selection_bound()
588        selection_iter = self.text_buffer.get_iter_at_mark(selection_mark)
589        start_iter = self.text_buffer.get_iter_at_mark(self.line_start)
590        if event.keyval == Gdk.KEY_Home:
591            if (
592                event.state & Gdk.ModifierType.CONTROL_MASK
593                or event.state & Gdk.ModifierType.MOD1_MASK
594            ):
595                pass
596            elif event.state & Gdk.ModifierType.SHIFT_MASK:
597                self.text_buffer.move_mark(insert_mark, start_iter)
598                return True
599            else:
600                self.text_buffer.place_cursor(start_iter)
601                return True
602        elif event.keyval == Gdk.KEY_Left:
603            insert_iter.backward_cursor_position()
604            if not insert_iter.editable(True):
605                return True
606        elif not event.string:
607            pass
608        elif (
609            start_iter.compare(insert_iter) <= 0
610            and start_iter.compare(selection_iter) <= 0
611        ):
612            pass
613        elif (
614            start_iter.compare(insert_iter) > 0
615            and start_iter.compare(selection_iter) > 0
616        ):
617            self.text_buffer.place_cursor(start_iter)
618        elif insert_iter.compare(selection_iter) < 0:
619            self.text_buffer.move_mark(insert_mark, start_iter)
620        elif insert_iter.compare(selection_iter) > 0:
621            self.text_buffer.move_mark(selection_mark, start_iter)
622
623        return self.onKeyPressExtend(event)
624
625    def onKeyPressExtend(self, event):
626        """
627        For some reason we can't extend onKeyPress directly (bug #500900).
628        """
629        pass
630
631
632class IPythonView(ConsoleView, IterableIPShell):
633    """
634    Sub-class of both modified IPython shell and L{ConsoleView} this makes
635    a GTK+ IPython console.
636    """
637
638    def __init__(self):
639        """
640        Initialize. Redirect I/O to console.
641        """
642        ConsoleView.__init__(self)
643        self.cout = StringIO()
644        IterableIPShell.__init__(
645            self, cout=self.cout, cerr=self.cout, input_func=self.raw_input
646        )
647        #        self.connect('key_press_event', self.keyPress)
648        self.interrupt = False
649        self.execute()
650        self.prompt = self.generatePrompt(False)
651        self.cout.truncate(0)
652        self.showPrompt(self.prompt)
653
654    def raw_input(self, prompt=''):
655        """
656        Custom raw_input() replacement. Get's current line from console buffer.
657
658        @param prompt: Prompt to print. Here for compatibility as replacement.
659        @type prompt: string
660
661        @return: The current command line text.
662        @rtype: string
663        """
664        if self.interrupt:
665            self.interrupt = False
666            raise KeyboardInterrupt
667        return self.getCurrentLine()
668
669    def onKeyPressExtend(self, event):
670        """
671        Key press callback with plenty of shell goodness, like history,
672        autocompletions, etc.
673
674        @param widget: Widget that key press occurred in.
675        @type widget: gtk.Widget
676        @param event: Event object.
677        @type event: gtk.gdk.Event
678
679        @return: True if event should not trickle.
680        @rtype: boolean
681        """
682        if event.state & Gdk.ModifierType.CONTROL_MASK and event.keyval == 99:
683            self.interrupt = True
684            self._processLine()
685            return True
686        elif event.keyval == Gdk.KEY_Return:
687            self._processLine()
688            return True
689        elif event.keyval == Gdk.KEY_Up:
690            self.changeLine(self.historyBack())
691            return True
692        elif event.keyval == Gdk.KEY_Down:
693            self.changeLine(self.historyForward())
694            return True
695        elif event.keyval == Gdk.KEY_Tab:
696            if not self.getCurrentLine().strip():
697                return False
698            completed, possibilities = self.complete(self.getCurrentLine())
699            if len(possibilities) > 1:
700                line_slice = self.getCurrentLine()
701                self.write('\n')
702                for symbol in possibilities:
703                    self.write(symbol + '\n')
704                self.showPrompt(self.prompt)
705            self.changeLine(completed or line_slice)
706            return True
707
708    def _processLine(self):
709        """
710        Process current command line.
711        """
712        self.history_pos = 0
713        self.execute()
714        returnvalue = self.cout.getvalue()
715        if returnvalue:
716            returnvalue = returnvalue.strip('\n')
717        self.showReturned(returnvalue)
718        self.cout.truncate(0)
719        self.cout.seek(0)
720
721
722if __name__ == "__main__":
723    window = Gtk.Window()
724    window.set_default_size(640, 320)
725    window.connect('delete-event', lambda x, y: Gtk.main_quit())
726    window.add(IPythonView())
727    window.show_all()
728    Gtk.main()
729