1import unittest
2
3from collections import namedtuple
4from bpython.curtsies import combined_events
5from bpython.test import FixLanguageTestCase as TestCase
6
7import curtsies.events
8
9
10ScheduledEvent = namedtuple("ScheduledEvent", ["when", "event"])
11
12
13class EventGenerator:
14    def __init__(self, initial_events=(), scheduled_events=()):
15        self._events = []
16        self._current_tick = 0
17        for e in initial_events:
18            self.schedule_event(e, 0)
19        for e, w in scheduled_events:
20            self.schedule_event(e, w)
21
22    def schedule_event(self, event, when):
23        self._events.append(ScheduledEvent(when, event))
24        self._events.sort()
25
26    def send(self, timeout=None):
27        if timeout not in [None, 0]:
28            raise ValueError("timeout value %r not supported" % timeout)
29        if not self._events:
30            return None
31        if self._events[0].when <= self._current_tick:
32            return self._events.pop(0).event
33
34        if timeout == 0:
35            return None
36        elif timeout is None:
37            e = self._events.pop(0)
38            self._current_tick = e.when
39            return e.event
40        else:
41            raise ValueError("timeout value %r not supported" % timeout)
42
43    def tick(self, dt=1):
44        self._current_tick += dt
45        return self._current_tick
46
47
48class TestCurtsiesPasteDetection(TestCase):
49    def test_paste_threshold(self):
50        eg = EventGenerator(list("abc"))
51        cb = combined_events(eg, paste_threshold=3)
52        e = next(cb)
53        self.assertIsInstance(e, curtsies.events.PasteEvent)
54        self.assertEqual(e.events, list("abc"))
55        self.assertEqual(next(cb), None)
56
57        eg = EventGenerator(list("abc"))
58        cb = combined_events(eg, paste_threshold=4)
59        self.assertEqual(next(cb), "a")
60        self.assertEqual(next(cb), "b")
61        self.assertEqual(next(cb), "c")
62        self.assertEqual(next(cb), None)
63
64    def test_set_timeout(self):
65        eg = EventGenerator("a", zip("bcdefg", [1, 2, 3, 3, 3, 4]))
66        eg.schedule_event(curtsies.events.SigIntEvent(), 5)
67        eg.schedule_event("h", 6)
68        cb = combined_events(eg, paste_threshold=3)
69        self.assertEqual(next(cb), "a")
70        self.assertEqual(cb.send(0), None)
71        self.assertEqual(next(cb), "b")
72        self.assertEqual(cb.send(0), None)
73        eg.tick()
74        self.assertEqual(cb.send(0), "c")
75        self.assertEqual(cb.send(0), None)
76        eg.tick()
77        self.assertIsInstance(cb.send(0), curtsies.events.PasteEvent)
78        self.assertEqual(cb.send(0), None)
79        self.assertEqual(cb.send(None), "g")
80        self.assertEqual(cb.send(0), None)
81        eg.tick(1)
82        self.assertIsInstance(cb.send(0), curtsies.events.SigIntEvent)
83        self.assertEqual(cb.send(0), None)
84        self.assertEqual(cb.send(None), "h")
85        self.assertEqual(cb.send(None), None)
86
87
88if __name__ == "__main__":
89    unittest.main()
90