1import queue 2import sched 3import threading 4import time 5import unittest 6from test import support 7from test.support import threading_helper 8 9 10TIMEOUT = support.SHORT_TIMEOUT 11 12 13class Timer: 14 def __init__(self): 15 self._cond = threading.Condition() 16 self._time = 0 17 self._stop = 0 18 19 def time(self): 20 with self._cond: 21 return self._time 22 23 # increase the time but not beyond the established limit 24 def sleep(self, t): 25 assert t >= 0 26 with self._cond: 27 t += self._time 28 while self._stop < t: 29 self._time = self._stop 30 self._cond.wait() 31 self._time = t 32 33 # advance time limit for user code 34 def advance(self, t): 35 assert t >= 0 36 with self._cond: 37 self._stop += t 38 self._cond.notify_all() 39 40 41class TestCase(unittest.TestCase): 42 43 def test_enter(self): 44 l = [] 45 fun = lambda x: l.append(x) 46 scheduler = sched.scheduler(time.time, time.sleep) 47 for x in [0.5, 0.4, 0.3, 0.2, 0.1]: 48 z = scheduler.enter(x, 1, fun, (x,)) 49 scheduler.run() 50 self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5]) 51 52 def test_enterabs(self): 53 l = [] 54 fun = lambda x: l.append(x) 55 scheduler = sched.scheduler(time.time, time.sleep) 56 for x in [0.05, 0.04, 0.03, 0.02, 0.01]: 57 z = scheduler.enterabs(x, 1, fun, (x,)) 58 scheduler.run() 59 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) 60 61 def test_enter_concurrent(self): 62 q = queue.Queue() 63 fun = q.put 64 timer = Timer() 65 scheduler = sched.scheduler(timer.time, timer.sleep) 66 scheduler.enter(1, 1, fun, (1,)) 67 scheduler.enter(3, 1, fun, (3,)) 68 t = threading.Thread(target=scheduler.run) 69 t.start() 70 timer.advance(1) 71 self.assertEqual(q.get(timeout=TIMEOUT), 1) 72 self.assertTrue(q.empty()) 73 for x in [4, 5, 2]: 74 z = scheduler.enter(x - 1, 1, fun, (x,)) 75 timer.advance(2) 76 self.assertEqual(q.get(timeout=TIMEOUT), 2) 77 self.assertEqual(q.get(timeout=TIMEOUT), 3) 78 self.assertTrue(q.empty()) 79 timer.advance(1) 80 self.assertEqual(q.get(timeout=TIMEOUT), 4) 81 self.assertTrue(q.empty()) 82 timer.advance(1) 83 self.assertEqual(q.get(timeout=TIMEOUT), 5) 84 self.assertTrue(q.empty()) 85 timer.advance(1000) 86 threading_helper.join_thread(t) 87 self.assertTrue(q.empty()) 88 self.assertEqual(timer.time(), 5) 89 90 def test_priority(self): 91 l = [] 92 fun = lambda x: l.append(x) 93 scheduler = sched.scheduler(time.time, time.sleep) 94 for priority in [1, 2, 3, 4, 5]: 95 z = scheduler.enterabs(0.01, priority, fun, (priority,)) 96 scheduler.run() 97 self.assertEqual(l, [1, 2, 3, 4, 5]) 98 99 def test_cancel(self): 100 l = [] 101 fun = lambda x: l.append(x) 102 scheduler = sched.scheduler(time.time, time.sleep) 103 now = time.time() 104 event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,)) 105 event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,)) 106 event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,)) 107 event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,)) 108 event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,)) 109 scheduler.cancel(event1) 110 scheduler.cancel(event5) 111 scheduler.run() 112 self.assertEqual(l, [0.02, 0.03, 0.04]) 113 114 def test_cancel_concurrent(self): 115 q = queue.Queue() 116 fun = q.put 117 timer = Timer() 118 scheduler = sched.scheduler(timer.time, timer.sleep) 119 now = timer.time() 120 event1 = scheduler.enterabs(now + 1, 1, fun, (1,)) 121 event2 = scheduler.enterabs(now + 2, 1, fun, (2,)) 122 event4 = scheduler.enterabs(now + 4, 1, fun, (4,)) 123 event5 = scheduler.enterabs(now + 5, 1, fun, (5,)) 124 event3 = scheduler.enterabs(now + 3, 1, fun, (3,)) 125 t = threading.Thread(target=scheduler.run) 126 t.start() 127 timer.advance(1) 128 self.assertEqual(q.get(timeout=TIMEOUT), 1) 129 self.assertTrue(q.empty()) 130 scheduler.cancel(event2) 131 scheduler.cancel(event5) 132 timer.advance(1) 133 self.assertTrue(q.empty()) 134 timer.advance(1) 135 self.assertEqual(q.get(timeout=TIMEOUT), 3) 136 self.assertTrue(q.empty()) 137 timer.advance(1) 138 self.assertEqual(q.get(timeout=TIMEOUT), 4) 139 self.assertTrue(q.empty()) 140 timer.advance(1000) 141 threading_helper.join_thread(t) 142 self.assertTrue(q.empty()) 143 self.assertEqual(timer.time(), 4) 144 145 def test_cancel_correct_event(self): 146 # bpo-19270 147 events = [] 148 scheduler = sched.scheduler() 149 scheduler.enterabs(1, 1, events.append, ("a",)) 150 b = scheduler.enterabs(1, 1, events.append, ("b",)) 151 scheduler.enterabs(1, 1, events.append, ("c",)) 152 scheduler.cancel(b) 153 scheduler.run() 154 self.assertEqual(events, ["a", "c"]) 155 156 def test_empty(self): 157 l = [] 158 fun = lambda x: l.append(x) 159 scheduler = sched.scheduler(time.time, time.sleep) 160 self.assertTrue(scheduler.empty()) 161 for x in [0.05, 0.04, 0.03, 0.02, 0.01]: 162 z = scheduler.enterabs(x, 1, fun, (x,)) 163 self.assertFalse(scheduler.empty()) 164 scheduler.run() 165 self.assertTrue(scheduler.empty()) 166 167 def test_queue(self): 168 l = [] 169 fun = lambda x: l.append(x) 170 scheduler = sched.scheduler(time.time, time.sleep) 171 now = time.time() 172 e5 = scheduler.enterabs(now + 0.05, 1, fun) 173 e1 = scheduler.enterabs(now + 0.01, 1, fun) 174 e2 = scheduler.enterabs(now + 0.02, 1, fun) 175 e4 = scheduler.enterabs(now + 0.04, 1, fun) 176 e3 = scheduler.enterabs(now + 0.03, 1, fun) 177 # queue property is supposed to return an order list of 178 # upcoming events 179 self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5]) 180 181 def test_args_kwargs(self): 182 seq = [] 183 def fun(*a, **b): 184 seq.append((a, b)) 185 186 now = time.time() 187 scheduler = sched.scheduler(time.time, time.sleep) 188 scheduler.enterabs(now, 1, fun) 189 scheduler.enterabs(now, 1, fun, argument=(1, 2)) 190 scheduler.enterabs(now, 1, fun, argument=('a', 'b')) 191 scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3}) 192 scheduler.run() 193 self.assertCountEqual(seq, [ 194 ((), {}), 195 ((1, 2), {}), 196 (('a', 'b'), {}), 197 ((1, 2), {'foo': 3}) 198 ]) 199 200 def test_run_non_blocking(self): 201 l = [] 202 fun = lambda x: l.append(x) 203 scheduler = sched.scheduler(time.time, time.sleep) 204 for x in [10, 9, 8, 7, 6]: 205 scheduler.enter(x, 1, fun, (x,)) 206 scheduler.run(blocking=False) 207 self.assertEqual(l, []) 208 209 210if __name__ == "__main__": 211 unittest.main() 212