1# Copyright (C) 2002-2006 Stephen Kennedy <stevek@gnome.org> 2# Copyright (C) 2012-2013 Kai Willadsen <kai.willadsen@gmail.com> 3# 4# This program is free software: you can redistribute it and/or modify 5# it under the terms of the GNU General Public License as published by 6# the Free Software Foundation, either version 2 of the License, or (at 7# your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, but 10# WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12# General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program. If not, see <http://www.gnu.org/licenses/>. 16 17"""Classes to implement scheduling for cooperative threads.""" 18 19import traceback 20 21 22class SchedulerBase: 23 """Base class with common functionality for schedulers 24 25 Derived classes must implement get_current_task. 26 """ 27 28 def __init__(self): 29 self.tasks = [] 30 self.callbacks = [] 31 32 def __repr__(self): 33 return "%s" % self.tasks 34 35 def connect(self, signal, action): 36 assert signal == "runnable" 37 if action not in self.callbacks: 38 self.callbacks.append(action) 39 40 def add_task(self, task, atfront=False): 41 """Add a task to the scheduler's task list 42 43 The task may be a function, generator or scheduler, and is 44 deemed to have finished when it returns a false value or raises 45 StopIteration. 46 """ 47 self.remove_task(task) 48 49 if atfront: 50 self.tasks.insert(0, task) 51 else: 52 self.tasks.append(task) 53 54 for callback in self.callbacks: 55 callback(self) 56 57 def remove_task(self, task): 58 """Remove a single task from the scheduler""" 59 try: 60 self.tasks.remove(task) 61 except ValueError: 62 pass 63 64 def remove_all_tasks(self): 65 """Remove all tasks from the scheduler""" 66 self.tasks = [] 67 68 def add_scheduler(self, sched): 69 """Adds a subscheduler as a child task of this scheduler""" 70 sched.connect("runnable", lambda t: self.add_task(t)) 71 72 def remove_scheduler(self, sched): 73 """Remove a sub-scheduler from this scheduler""" 74 self.remove_task(sched) 75 try: 76 self.callbacks.remove(sched) 77 except ValueError: 78 pass 79 80 def get_current_task(self): 81 """Overridden function returning the next task to run""" 82 raise NotImplementedError 83 84 def __call__(self): 85 """Run an iteration of the current task""" 86 if len(self.tasks): 87 r = self.iteration() 88 if r: 89 return r 90 return self.tasks_pending() 91 92 def complete_tasks(self): 93 """Run all of the scheduler's current tasks to completion""" 94 while self.tasks_pending(): 95 self.iteration() 96 97 def tasks_pending(self): 98 return len(self.tasks) != 0 99 100 def iteration(self): 101 """Perform one iteration of the current task""" 102 try: 103 task = self.get_current_task() 104 except StopIteration: 105 return 0 106 try: 107 if hasattr(task, "__iter__"): 108 ret = next(task) 109 else: 110 ret = task() 111 except StopIteration: 112 pass 113 except Exception: 114 traceback.print_exc() 115 else: 116 if ret: 117 return ret 118 self.tasks.remove(task) 119 return 0 120 121 122class LifoScheduler(SchedulerBase): 123 """Scheduler calling most recently added tasks first""" 124 125 def get_current_task(self): 126 try: 127 return self.tasks[-1] 128 except IndexError: 129 raise StopIteration 130 131 132class FifoScheduler(SchedulerBase): 133 """Scheduler calling tasks in the order they were added""" 134 135 def get_current_task(self): 136 try: 137 return self.tasks[0] 138 except IndexError: 139 raise StopIteration 140 141 142if __name__ == "__main__": 143 import time 144 import random 145 m = LifoScheduler() 146 147 def timetask(t): 148 while time.time() - t < 1: 149 print("***") 150 time.sleep(0.1) 151 print("!!!") 152 153 def sayhello(x): 154 for i in range(random.randint(2, 8)): 155 print("hello", x) 156 time.sleep(0.1) 157 yield 1 158 print("end", x) 159 160 s = FifoScheduler() 161 m.add_task(s) 162 s.add_task(sayhello(10)) 163 s.add_task(sayhello(20)) 164 s.add_task(sayhello(30)) 165 while s.tasks_pending(): 166 s.iteration() 167 time.sleep(2) 168 print("***") 169