1import code
2import os
3import sys
4import tempfile
5import io
6import unittest
7
8from contextlib import contextmanager
9from functools import partial
10from unittest import mock
11
12from bpython.curtsiesfrontend import repl as curtsiesrepl
13from bpython.curtsiesfrontend import interpreter
14from bpython.curtsiesfrontend import events as bpythonevents
15from bpython.curtsiesfrontend.repl import LineType
16from bpython import autocomplete
17from bpython import config
18from bpython import args
19from bpython.test import (
20    FixLanguageTestCase as TestCase,
21    MagicIterMock,
22    TEST_CONFIG,
23)
24
25from curtsies import events
26from importlib import invalidate_caches
27
28
29def setup_config(conf):
30    config_struct = config.Config(TEST_CONFIG)
31    for key, value in conf.items():
32        if not hasattr(config_struct, key):
33            raise ValueError(f"{key!r} is not a valid config attribute")
34        setattr(config_struct, key, value)
35    return config_struct
36
37
38class TestCurtsiesRepl(TestCase):
39    def setUp(self):
40        self.repl = create_repl()
41
42    def cfwp(self, source):
43        return interpreter.code_finished_will_parse(
44            source, self.repl.interp.compile
45        )
46
47    def test_code_finished_will_parse(self):
48        self.repl.buffer = ["1 + 1"]
49        self.assertTrue(self.cfwp("\n".join(self.repl.buffer)), (True, True))
50        self.repl.buffer = ["def foo(x):"]
51        self.assertTrue(self.cfwp("\n".join(self.repl.buffer)), (False, True))
52        self.repl.buffer = ["def foo(x)"]
53        self.assertTrue(self.cfwp("\n".join(self.repl.buffer)), (True, False))
54        self.repl.buffer = ["def foo(x):", "return 1"]
55        self.assertTrue(self.cfwp("\n".join(self.repl.buffer)), (True, False))
56        self.repl.buffer = ["def foo(x):", "    return 1"]
57        self.assertTrue(self.cfwp("\n".join(self.repl.buffer)), (True, True))
58        self.repl.buffer = ["def foo(x):", "    return 1", ""]
59        self.assertTrue(self.cfwp("\n".join(self.repl.buffer)), (True, True))
60
61    def test_external_communication(self):
62        self.repl.send_current_block_to_external_editor()
63        self.repl.send_session_to_external_editor()
64
65    @unittest.skipUnless(
66        all(map(config.can_encode, "å∂߃")), "Charset can not encode characters"
67    )
68    def test_external_communication_encoding(self):
69        with captured_output():
70            self.repl.display_lines.append('>>> "åß∂ƒ"')
71            self.repl.history.append('"åß∂ƒ"')
72            self.repl.all_logical_lines.append(('"åß∂ƒ"', LineType.INPUT))
73            self.repl.send_session_to_external_editor()
74
75    def test_get_last_word(self):
76        self.repl.rl_history.entries = ["1", "2 3", "4 5 6"]
77        self.repl._set_current_line("abcde")
78        self.repl.get_last_word()
79        self.assertEqual(self.repl.current_line, "abcde6")
80        self.repl.get_last_word()
81        self.assertEqual(self.repl.current_line, "abcde3")
82
83    def test_last_word(self):
84        self.assertEqual(curtsiesrepl._last_word(""), "")
85        self.assertEqual(curtsiesrepl._last_word(" "), "")
86        self.assertEqual(curtsiesrepl._last_word("a"), "a")
87        self.assertEqual(curtsiesrepl._last_word("a b"), "b")
88
89    @unittest.skip("this is the behavior of bash - not currently implemented")
90    def test_get_last_word_with_prev_line(self):
91        self.repl.rl_history.entries = ["1", "2 3", "4 5 6"]
92        self.repl._set_current_line("abcde")
93        self.repl.up_one_line()
94        self.assertEqual(self.repl.current_line, "4 5 6")
95        self.repl.get_last_word()
96        self.assertEqual(self.repl.current_line, "4 5 63")
97        self.repl.get_last_word()
98        self.assertEqual(self.repl.current_line, "4 5 64")
99        self.repl.up_one_line()
100        self.assertEqual(self.repl.current_line, "2 3")
101
102
103def mock_next(obj, return_value):
104    obj.__next__.return_value = return_value
105
106
107class TestCurtsiesReplTab(TestCase):
108    def setUp(self):
109        self.repl = create_repl()
110        self.repl.matches_iter = MagicIterMock()
111
112        def add_matches(*args, **kwargs):
113            self.repl.matches_iter.matches = ["aaa", "aab", "aac"]
114
115        self.repl.complete = mock.Mock(
116            side_effect=add_matches, return_value=True
117        )
118
119    def test_tab_with_no_matches_triggers_completion(self):
120        self.repl._current_line = " asdf"
121        self.repl._cursor_offset = 5
122        self.repl.matches_iter.matches = []
123        self.repl.matches_iter.is_cseq.return_value = False
124        self.repl.matches_iter.cur_line.return_value = (None, None)
125        self.repl.on_tab()
126        self.repl.complete.assert_called_once_with(tab=True)
127
128    def test_tab_after_indentation_adds_space(self):
129        self.repl._current_line = "    "
130        self.repl._cursor_offset = 4
131        self.repl.on_tab()
132        self.assertEqual(self.repl._current_line, "        ")
133        self.assertEqual(self.repl._cursor_offset, 8)
134
135    def test_tab_at_beginning_of_line_adds_space(self):
136        self.repl._current_line = ""
137        self.repl._cursor_offset = 0
138        self.repl.on_tab()
139        self.assertEqual(self.repl._current_line, "    ")
140        self.assertEqual(self.repl._cursor_offset, 4)
141
142    def test_tab_with_no_matches_selects_first(self):
143        self.repl._current_line = " aa"
144        self.repl._cursor_offset = 3
145        self.repl.matches_iter.matches = []
146        self.repl.matches_iter.is_cseq.return_value = False
147
148        mock_next(self.repl.matches_iter, None)
149        self.repl.matches_iter.cur_line.return_value = (None, None)
150        self.repl.on_tab()
151        self.repl.complete.assert_called_once_with(tab=True)
152        self.repl.matches_iter.cur_line.assert_called_once_with()
153
154    def test_tab_with_matches_selects_next_match(self):
155        self.repl._current_line = " aa"
156        self.repl._cursor_offset = 3
157        self.repl.complete()
158        self.repl.matches_iter.is_cseq.return_value = False
159        mock_next(self.repl.matches_iter, None)
160        self.repl.matches_iter.cur_line.return_value = (None, None)
161        self.repl.on_tab()
162        self.repl.matches_iter.cur_line.assert_called_once_with()
163
164    def test_tab_completes_common_sequence(self):
165        self.repl._current_line = " a"
166        self.repl._cursor_offset = 2
167        self.repl.matches_iter.matches = ["aaa", "aab", "aac"]
168        self.repl.matches_iter.is_cseq.return_value = True
169        self.repl.matches_iter.substitute_cseq.return_value = (None, None)
170        self.repl.on_tab()
171        self.repl.matches_iter.substitute_cseq.assert_called_once_with()
172
173
174class TestCurtsiesReplFilenameCompletion(TestCase):
175    def setUp(self):
176        self.repl = create_repl()
177
178    def test_list_win_visible_match_selected_on_tab_multiple_options(self):
179        self.repl._current_line = " './'"
180        self.repl._cursor_offset = 2
181        with mock.patch("bpython.autocomplete.get_completer") as m:
182            m.return_value = (
183                ["./abc", "./abcd", "./bcd"],
184                autocomplete.FilenameCompletion(),
185            )
186            self.repl.update_completion()
187            self.assertEqual(self.repl.list_win_visible, False)
188            self.repl.on_tab()
189        self.assertEqual(self.repl.current_match, "./abc")
190        self.assertEqual(self.repl.list_win_visible, True)
191
192    def test_list_win_not_visible_and_cseq_if_cseq(self):
193        self.repl._current_line = " './a'"
194        self.repl._cursor_offset = 5
195        with mock.patch("bpython.autocomplete.get_completer") as m:
196            m.return_value = (
197                ["./abcd", "./abce"],
198                autocomplete.FilenameCompletion(),
199            )
200            self.repl.update_completion()
201            self.assertEqual(self.repl.list_win_visible, False)
202        self.repl.on_tab()
203        self.assertEqual(self.repl._current_line, " './abc'")
204        self.assertEqual(self.repl.current_match, None)
205        self.assertEqual(self.repl.list_win_visible, False)
206
207    def test_list_win_not_visible_and_match_selected_if_one_option(self):
208        self.repl._current_line = " './a'"
209        self.repl._cursor_offset = 5
210        with mock.patch("bpython.autocomplete.get_completer") as m:
211            m.return_value = (["./abcd"], autocomplete.FilenameCompletion())
212            self.repl.update_completion()
213            self.assertEqual(self.repl.list_win_visible, False)
214        self.repl.on_tab()
215        self.assertEqual(self.repl._current_line, " './abcd'")
216        self.assertEqual(self.repl.current_match, None)
217        self.assertEqual(self.repl.list_win_visible, False)
218
219
220# from http://stackoverflow.com/a/17981937/398212 - thanks @rkennedy
221@contextmanager
222def captured_output():
223    new_out, new_err = io.StringIO(), io.StringIO()
224    old_out, old_err = sys.stdout, sys.stderr
225    try:
226        sys.stdout, sys.stderr = new_out, new_err
227        yield sys.stdout, sys.stderr
228    finally:
229        sys.stdout, sys.stderr = old_out, old_err
230
231
232def create_repl(**kwargs):
233    config = setup_config({"editor": "true"})
234    repl = curtsiesrepl.BaseRepl(config, **kwargs)
235    os.environ["PAGER"] = "true"
236    os.environ.pop("PYTHONSTARTUP", None)
237    repl.width = 50
238    repl.height = 20
239    return repl
240
241
242class TestFutureImports(TestCase):
243    def test_repl(self):
244        repl = create_repl()
245        with captured_output() as (out, err):
246            repl.push("1 / 2")
247        self.assertEqual(out.getvalue(), "0.5\n")
248
249    def test_interactive(self):
250        interp = code.InteractiveInterpreter(locals={})
251        with captured_output() as (out, err):
252            with tempfile.NamedTemporaryFile(mode="w", suffix=".py") as f:
253                f.write("print(1/2)\n")
254                f.flush()
255                args.exec_code(interp, [f.name])
256
257            repl = create_repl(interp=interp)
258            repl.push("1 / 2")
259
260        self.assertEqual(out.getvalue(), "0.5\n0.5\n")
261
262
263class TestStdOutErr(TestCase):
264    def setUp(self):
265        self.repl = create_repl()
266
267    def test_newline(self):
268        self.repl.send_to_stdouterr("\n\n")
269        self.assertEqual(self.repl.display_lines[-2], "")
270        self.assertEqual(self.repl.display_lines[-1], "")
271        self.assertEqual(self.repl.current_stdouterr_line, "")
272
273    def test_leading_newline(self):
274        self.repl.send_to_stdouterr("\nfoo\n")
275        self.assertEqual(self.repl.display_lines[-2], "")
276        self.assertEqual(self.repl.display_lines[-1], "foo")
277        self.assertEqual(self.repl.current_stdouterr_line, "")
278
279    def test_no_trailing_newline(self):
280        self.repl.send_to_stdouterr("foo")
281        self.assertEqual(self.repl.current_stdouterr_line, "foo")
282
283    def test_print_without_newline_then_print_with_leading_newline(self):
284        self.repl.send_to_stdouterr("foo")
285        self.repl.send_to_stdouterr("\nbar\n")
286        self.assertEqual(self.repl.display_lines[-2], "foo")
287        self.assertEqual(self.repl.display_lines[-1], "bar")
288        self.assertEqual(self.repl.current_stdouterr_line, "")
289
290
291class TestPredictedIndent(TestCase):
292    def setUp(self):
293        self.repl = create_repl()
294
295    def test_simple(self):
296        self.assertEqual(self.repl.predicted_indent(""), 0)
297        self.assertEqual(self.repl.predicted_indent("class Foo:"), 4)
298        self.assertEqual(self.repl.predicted_indent("class Foo: pass"), 0)
299        self.assertEqual(self.repl.predicted_indent("def asdf():"), 4)
300        self.assertEqual(self.repl.predicted_indent("def asdf(): return 7"), 0)
301
302    @unittest.skip("This would be interesting")
303    def test_complex(self):
304        self.assertEqual(self.repl.predicted_indent("[a, "), 1)
305        self.assertEqual(self.repl.predicted_indent("reduce(asdfasdf, "), 7)
306
307
308class TestCurtsiesReevaluate(TestCase):
309    def setUp(self):
310        self.repl = create_repl()
311
312    def test_variable_is_cleared(self):
313        self.repl._current_line = "b = 10"
314        self.repl.on_enter()
315        self.assertIn("b", self.repl.interp.locals)
316        self.repl.undo()
317        self.assertNotIn("b", self.repl.interp.locals)
318
319
320class TestCurtsiesReevaluateWithImport(TestCase):
321    def setUp(self):
322        self.repl = create_repl()
323        self.open = partial(io.open, mode="wt", encoding="utf-8")
324        self.dont_write_bytecode = sys.dont_write_bytecode
325        sys.dont_write_bytecode = True
326        self.sys_path = sys.path
327        sys.path = self.sys_path[:]
328
329        # Because these tests create Python source files at runtime,
330        # it's possible in Python >=3.3 for the importlib.machinery.FileFinder
331        # for a directory to have an outdated cache when
332        # * a module in that directory is imported,
333        # * then a new module is created in that directory,
334        # * then that new module is imported.
335        # Automatic cache invalidation is based on the second-resolution mtime
336        # of the directory, so we need to manually call invalidate_caches().
337        #
338        # see https://docs.python.org/3/library/importlib.html
339        # sections #importlib.machinery.FileFinder and
340        # #importlib.invalidate_caches
341        invalidate_caches()
342
343    def tearDown(self):
344        sys.dont_write_bytecode = self.dont_write_bytecode
345        sys.path = self.sys_path
346
347    def push(self, line):
348        self.repl._current_line = line
349        self.repl.on_enter()
350
351    def head(self, path):
352        self.push("import sys")
353        self.push('sys.path.append("%s")' % (path))
354
355    @staticmethod
356    @contextmanager
357    def tempfile():
358        with tempfile.NamedTemporaryFile(suffix=".py") as temp:
359            path, name = os.path.split(temp.name)
360            yield temp.name, path, name.replace(".py", "")
361
362    def test_module_content_changed(self):
363        with self.tempfile() as (fullpath, path, modname):
364            print(modname)
365            with self.open(fullpath) as f:
366                f.write("a = 0\n")
367            self.head(path)
368            self.push("import %s" % (modname))
369            self.push("a = %s.a" % (modname))
370            self.assertIn("a", self.repl.interp.locals)
371            self.assertEqual(self.repl.interp.locals["a"], 0)
372            with self.open(fullpath) as f:
373                f.write("a = 1\n")
374            self.repl.clear_modules_and_reevaluate()
375            self.assertIn("a", self.repl.interp.locals)
376            self.assertEqual(self.repl.interp.locals["a"], 1)
377
378    def test_import_module_with_rewind(self):
379        with self.tempfile() as (fullpath, path, modname):
380            print(modname)
381            with self.open(fullpath) as f:
382                f.write("a = 0\n")
383            self.head(path)
384            self.push("import %s" % (modname))
385            self.assertIn(modname, self.repl.interp.locals)
386            self.repl.undo()
387            self.assertNotIn(modname, self.repl.interp.locals)
388            self.repl.clear_modules_and_reevaluate()
389            self.assertNotIn(modname, self.repl.interp.locals)
390            self.push("import %s" % (modname))
391            self.push("a = %s.a" % (modname))
392            self.assertIn("a", self.repl.interp.locals)
393            self.assertEqual(self.repl.interp.locals["a"], 0)
394            with self.open(fullpath) as f:
395                f.write("a = 1\n")
396            self.repl.clear_modules_and_reevaluate()
397            self.assertIn("a", self.repl.interp.locals)
398            self.assertEqual(self.repl.interp.locals["a"], 1)
399
400
401class TestCurtsiesPagerText(TestCase):
402    def setUp(self):
403        self.repl = create_repl()
404        self.repl.pager = self.assert_pager_gets_unicode
405
406    def assert_pager_gets_unicode(self, text):
407        self.assertIsInstance(text, str)
408
409    def test_help(self):
410        self.repl.pager(self.repl.help_text())
411
412    @unittest.skipUnless(
413        all(map(config.can_encode, "å∂߃")), "Charset can not encode characters"
414    )
415    def test_show_source_not_formatted(self):
416        self.repl.config.highlight_show_source = False
417        self.repl.get_source_of_current_name = lambda: "source code å∂߃åß∂ƒ"
418        self.repl.show_source()
419
420    @unittest.skipUnless(
421        all(map(config.can_encode, "å∂߃")), "Charset can not encode characters"
422    )
423    def test_show_source_formatted(self):
424        self.repl.config.highlight_show_source = True
425        self.repl.get_source_of_current_name = lambda: "source code å∂߃åß∂ƒ"
426        self.repl.show_source()
427
428
429class TestCurtsiesStartup(TestCase):
430    def setUp(self):
431        self.repl = create_repl()
432
433    def write_startup_file(self, fname, encoding):
434        with open(fname, mode="wt", encoding=encoding) as f:
435            f.write("# coding: ")
436            f.write(encoding)
437            f.write("\n")
438            f.write('a = "äöü"\n')
439
440    def test_startup_event_utf8(self):
441        with tempfile.NamedTemporaryFile() as temp:
442            self.write_startup_file(temp.name, "utf-8")
443            with mock.patch.dict("os.environ", {"PYTHONSTARTUP": temp.name}):
444                self.repl.process_event(bpythonevents.RunStartupFileEvent())
445        self.assertIn("a", self.repl.interp.locals)
446
447    def test_startup_event_latin1(self):
448        with tempfile.NamedTemporaryFile() as temp:
449            self.write_startup_file(temp.name, "latin-1")
450            with mock.patch.dict("os.environ", {"PYTHONSTARTUP": temp.name}):
451                self.repl.process_event(bpythonevents.RunStartupFileEvent())
452        self.assertIn("a", self.repl.interp.locals)
453
454
455class TestCurtsiesPasteEvents(TestCase):
456    def setUp(self):
457        self.repl = create_repl()
458
459    def test_control_events_in_small_paste(self):
460        self.assertGreaterEqual(
461            curtsiesrepl.MAX_EVENTS_POSSIBLY_NOT_PASTE,
462            6,
463            "test assumes UI lag could cause 6 events",
464        )
465        p = events.PasteEvent()
466        p.events = ["a", "b", "c", "d", "<Ctrl-a>", "e"]
467        self.repl.process_event(p)
468        self.assertEqual(self.repl.current_line, "eabcd")
469
470    def test_control_events_in_large_paste(self):
471        """Large paste events should ignore control characters"""
472        p = events.PasteEvent()
473        p.events = ["a", "<Ctrl-a>"] + [
474            "e"
475        ] * curtsiesrepl.MAX_EVENTS_POSSIBLY_NOT_PASTE
476        self.repl.process_event(p)
477        self.assertEqual(
478            self.repl.current_line,
479            "a" + "e" * curtsiesrepl.MAX_EVENTS_POSSIBLY_NOT_PASTE,
480        )
481
482
483if __name__ == "__main__":
484    unittest.main()
485