1"""An interactive shell for the Jupyter kernel."""
2import io
3import sys
4import builtins
5
6from xonsh.base_shell import BaseShell
7
8
9class StdJupyterRedirectBuf(io.RawIOBase):
10    """Redirects standard I/O buffers to the Jupyter kernel."""
11
12    def __init__(self, redirect):
13        self.redirect = redirect
14        self.encoding = redirect.encoding
15        self.errors = redirect.errors
16
17    def fileno(self):
18        """Returns the file descriptor of the std buffer."""
19        return self.redirect.fileno()
20
21    def seek(self, offset, whence=io.SEEK_SET):
22        """Sets the location in both the stdbuf and the membuf."""
23        raise io.UnsupportedOperation("cannot seek Jupyter redirect")
24
25    def truncate(self, size=None):
26        """Truncate both buffers."""
27        raise io.UnsupportedOperation("cannot truncate Jupyter redirect")
28
29    def readinto(self, b):
30        """Read bytes into buffer from both streams."""
31        raise io.UnsupportedOperation("cannot read into Jupyter redirect")
32
33    def write(self, b):
34        """Write bytes to kernel."""
35        s = b if isinstance(b, str) else b.decode(self.encoding, self.errors)
36        self.redirect.write(s)
37
38
39class StdJupyterRedirect(io.TextIOBase):
40    """Redirects a standard I/O stream to the Jupyter kernel."""
41
42    def __init__(self, name, kernel, parent_header=None):
43        """
44        Parameters
45        ----------
46        name : str
47            The name of the buffer in the sys module, e.g. 'stdout'.
48        kernel : XonshKernel
49            Instance of a Jupyter kernel
50        parent_header : dict or None, optional
51            parent header information to pass along with the kernel
52        """
53        self._name = name
54        self.kernel = kernel
55        self.parent_header = parent_header
56
57        self.std = getattr(sys, name)
58        self.buffer = StdJupyterRedirectBuf(self)
59        setattr(sys, name, self)
60
61    @property
62    def encoding(self):
63        """The encoding of the stream"""
64        env = builtins.__xonsh_env__
65        return getattr(self.std, "encoding", env.get("XONSH_ENCODING"))
66
67    @property
68    def errors(self):
69        """The encoding errors of the stream"""
70        env = builtins.__xonsh_env__
71        return getattr(self.std, "errors", env.get("XONSH_ENCODING_ERRORS"))
72
73    @property
74    def newlines(self):
75        """The newlines of the standard buffer."""
76        return self.std.newlines
77
78    def _replace_std(self):
79        std = self.std
80        if std is None:
81            return
82        setattr(sys, self._name, std)
83        self.std = None
84
85    def __del__(self):
86        self._replace_std()
87
88    def close(self):
89        """Restores the original std stream."""
90        self._replace_std()
91
92    def __enter__(self):
93        return self
94
95    def __exit__(self, *args, **kwargs):
96        self.close()
97
98    def write(self, s):
99        """Writes data to the original kernel stream."""
100        self.kernel._respond_in_chunks(self._name, s, parent_header=self.parent_header)
101
102    def flush(self):
103        """Flushes kernel iopub_stream."""
104        self.kernel.iopub_stream.flush()
105
106    def fileno(self):
107        """Tunnel fileno() calls to the std stream."""
108        return self.std.fileno()
109
110    def seek(self, offset, whence=io.SEEK_SET):
111        """Seek to a location."""
112        raise io.UnsupportedOperation("cannot seek Jupyter redirect")
113
114    def truncate(self, size=None):
115        """Truncate the streams."""
116        raise io.UnsupportedOperation("cannot truncate Jupyter redirect")
117
118    def detach(self):
119        """This operation is not supported."""
120        raise io.UnsupportedOperation("cannot detach a Jupyter redirect")
121
122    def read(self, size=None):
123        """Read from the stream"""
124        raise io.UnsupportedOperation("cannot read a Jupyter redirect")
125
126    def readline(self, size=-1):
127        """Read a line."""
128        raise io.UnsupportedOperation("cannot read a line from a Jupyter redirect")
129
130
131class JupyterShell(BaseShell):
132    """A shell for the Jupyter kernel."""
133
134    def __init__(self, *args, **kwargs):
135        super().__init__(*args, **kwargs)
136        self.kernel = None
137
138    def default(self, line, kernel, parent_header=None):
139        """Executes code, but redirects output to Jupyter client"""
140        stdout = StdJupyterRedirect("stdout", kernel, parent_header)
141        stderr = StdJupyterRedirect("stderr", kernel, parent_header)
142        with stdout, stderr:
143            rtn = super().default(line)
144        return rtn
145