1import queue
2import sched
3import threading
4import time
5import unittest
6from test import support
7from test.support import threading_helper
8
9
10TIMEOUT = support.SHORT_TIMEOUT
11
12
13class Timer:
14    def __init__(self):
15        self._cond = threading.Condition()
16        self._time = 0
17        self._stop = 0
18
19    def time(self):
20        with self._cond:
21            return self._time
22
23    # increase the time but not beyond the established limit
24    def sleep(self, t):
25        assert t >= 0
26        with self._cond:
27            t += self._time
28            while self._stop < t:
29                self._time = self._stop
30                self._cond.wait()
31            self._time = t
32
33    # advance time limit for user code
34    def advance(self, t):
35        assert t >= 0
36        with self._cond:
37            self._stop += t
38            self._cond.notify_all()
39
40
41class TestCase(unittest.TestCase):
42
43    def test_enter(self):
44        l = []
45        fun = lambda x: l.append(x)
46        scheduler = sched.scheduler(time.time, time.sleep)
47        for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
48            z = scheduler.enter(x, 1, fun, (x,))
49        scheduler.run()
50        self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
51
52    def test_enterabs(self):
53        l = []
54        fun = lambda x: l.append(x)
55        scheduler = sched.scheduler(time.time, time.sleep)
56        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
57            z = scheduler.enterabs(x, 1, fun, (x,))
58        scheduler.run()
59        self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
60
61    def test_enter_concurrent(self):
62        q = queue.Queue()
63        fun = q.put
64        timer = Timer()
65        scheduler = sched.scheduler(timer.time, timer.sleep)
66        scheduler.enter(1, 1, fun, (1,))
67        scheduler.enter(3, 1, fun, (3,))
68        t = threading.Thread(target=scheduler.run)
69        t.start()
70        timer.advance(1)
71        self.assertEqual(q.get(timeout=TIMEOUT), 1)
72        self.assertTrue(q.empty())
73        for x in [4, 5, 2]:
74            z = scheduler.enter(x - 1, 1, fun, (x,))
75        timer.advance(2)
76        self.assertEqual(q.get(timeout=TIMEOUT), 2)
77        self.assertEqual(q.get(timeout=TIMEOUT), 3)
78        self.assertTrue(q.empty())
79        timer.advance(1)
80        self.assertEqual(q.get(timeout=TIMEOUT), 4)
81        self.assertTrue(q.empty())
82        timer.advance(1)
83        self.assertEqual(q.get(timeout=TIMEOUT), 5)
84        self.assertTrue(q.empty())
85        timer.advance(1000)
86        threading_helper.join_thread(t)
87        self.assertTrue(q.empty())
88        self.assertEqual(timer.time(), 5)
89
90    def test_priority(self):
91        l = []
92        fun = lambda x: l.append(x)
93        scheduler = sched.scheduler(time.time, time.sleep)
94        for priority in [1, 2, 3, 4, 5]:
95            z = scheduler.enterabs(0.01, priority, fun, (priority,))
96        scheduler.run()
97        self.assertEqual(l, [1, 2, 3, 4, 5])
98
99    def test_cancel(self):
100        l = []
101        fun = lambda x: l.append(x)
102        scheduler = sched.scheduler(time.time, time.sleep)
103        now = time.time()
104        event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
105        event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
106        event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
107        event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
108        event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
109        scheduler.cancel(event1)
110        scheduler.cancel(event5)
111        scheduler.run()
112        self.assertEqual(l, [0.02, 0.03, 0.04])
113
114    def test_cancel_concurrent(self):
115        q = queue.Queue()
116        fun = q.put
117        timer = Timer()
118        scheduler = sched.scheduler(timer.time, timer.sleep)
119        now = timer.time()
120        event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
121        event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
122        event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
123        event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
124        event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
125        t = threading.Thread(target=scheduler.run)
126        t.start()
127        timer.advance(1)
128        self.assertEqual(q.get(timeout=TIMEOUT), 1)
129        self.assertTrue(q.empty())
130        scheduler.cancel(event2)
131        scheduler.cancel(event5)
132        timer.advance(1)
133        self.assertTrue(q.empty())
134        timer.advance(1)
135        self.assertEqual(q.get(timeout=TIMEOUT), 3)
136        self.assertTrue(q.empty())
137        timer.advance(1)
138        self.assertEqual(q.get(timeout=TIMEOUT), 4)
139        self.assertTrue(q.empty())
140        timer.advance(1000)
141        threading_helper.join_thread(t)
142        self.assertTrue(q.empty())
143        self.assertEqual(timer.time(), 4)
144
145    def test_cancel_correct_event(self):
146        # bpo-19270
147        events = []
148        scheduler = sched.scheduler()
149        scheduler.enterabs(1, 1, events.append, ("a",))
150        b = scheduler.enterabs(1, 1, events.append, ("b",))
151        scheduler.enterabs(1, 1, events.append, ("c",))
152        scheduler.cancel(b)
153        scheduler.run()
154        self.assertEqual(events, ["a", "c"])
155
156    def test_empty(self):
157        l = []
158        fun = lambda x: l.append(x)
159        scheduler = sched.scheduler(time.time, time.sleep)
160        self.assertTrue(scheduler.empty())
161        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
162            z = scheduler.enterabs(x, 1, fun, (x,))
163        self.assertFalse(scheduler.empty())
164        scheduler.run()
165        self.assertTrue(scheduler.empty())
166
167    def test_queue(self):
168        l = []
169        fun = lambda x: l.append(x)
170        scheduler = sched.scheduler(time.time, time.sleep)
171        now = time.time()
172        e5 = scheduler.enterabs(now + 0.05, 1, fun)
173        e1 = scheduler.enterabs(now + 0.01, 1, fun)
174        e2 = scheduler.enterabs(now + 0.02, 1, fun)
175        e4 = scheduler.enterabs(now + 0.04, 1, fun)
176        e3 = scheduler.enterabs(now + 0.03, 1, fun)
177        # queue property is supposed to return an order list of
178        # upcoming events
179        self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
180
181    def test_args_kwargs(self):
182        seq = []
183        def fun(*a, **b):
184            seq.append((a, b))
185
186        now = time.time()
187        scheduler = sched.scheduler(time.time, time.sleep)
188        scheduler.enterabs(now, 1, fun)
189        scheduler.enterabs(now, 1, fun, argument=(1, 2))
190        scheduler.enterabs(now, 1, fun, argument=('a', 'b'))
191        scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3})
192        scheduler.run()
193        self.assertCountEqual(seq, [
194            ((), {}),
195            ((1, 2), {}),
196            (('a', 'b'), {}),
197            ((1, 2), {'foo': 3})
198        ])
199
200    def test_run_non_blocking(self):
201        l = []
202        fun = lambda x: l.append(x)
203        scheduler = sched.scheduler(time.time, time.sleep)
204        for x in [10, 9, 8, 7, 6]:
205            scheduler.enter(x, 1, fun, (x,))
206        scheduler.run(blocking=False)
207        self.assertEqual(l, [])
208
209
210if __name__ == "__main__":
211    unittest.main()
212