1# Copyright (C) 2002-2006 Stephen Kennedy <stevek@gnome.org>
2# Copyright (C) 2012-2013 Kai Willadsen <kai.willadsen@gmail.com>
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation, either version 2 of the License, or (at
7# your option) any later version.
8#
9# This program is distributed in the hope that it will be useful, but
10# WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12# General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17"""Classes to implement scheduling for cooperative threads."""
18
19import traceback
20
21
22class SchedulerBase:
23    """Base class with common functionality for schedulers
24
25    Derived classes must implement get_current_task.
26    """
27
28    def __init__(self):
29        self.tasks = []
30        self.callbacks = []
31
32    def __repr__(self):
33        return "%s" % self.tasks
34
35    def connect(self, signal, action):
36        assert signal == "runnable"
37        if action not in self.callbacks:
38            self.callbacks.append(action)
39
40    def add_task(self, task, atfront=False):
41        """Add a task to the scheduler's task list
42
43        The task may be a function, generator or scheduler, and is
44        deemed to have finished when it returns a false value or raises
45        StopIteration.
46        """
47        self.remove_task(task)
48
49        if atfront:
50            self.tasks.insert(0, task)
51        else:
52            self.tasks.append(task)
53
54        for callback in self.callbacks:
55            callback(self)
56
57    def remove_task(self, task):
58        """Remove a single task from the scheduler"""
59        try:
60            self.tasks.remove(task)
61        except ValueError:
62            pass
63
64    def remove_all_tasks(self):
65        """Remove all tasks from the scheduler"""
66        self.tasks = []
67
68    def add_scheduler(self, sched):
69        """Adds a subscheduler as a child task of this scheduler"""
70        sched.connect("runnable", lambda t: self.add_task(t))
71
72    def remove_scheduler(self, sched):
73        """Remove a sub-scheduler from this scheduler"""
74        self.remove_task(sched)
75        try:
76            self.callbacks.remove(sched)
77        except ValueError:
78            pass
79
80    def get_current_task(self):
81        """Overridden function returning the next task to run"""
82        raise NotImplementedError
83
84    def __call__(self):
85        """Run an iteration of the current task"""
86        if len(self.tasks):
87            r = self.iteration()
88            if r:
89                return r
90        return self.tasks_pending()
91
92    def complete_tasks(self):
93        """Run all of the scheduler's current tasks to completion"""
94        while self.tasks_pending():
95            self.iteration()
96
97    def tasks_pending(self):
98        return len(self.tasks) != 0
99
100    def iteration(self):
101        """Perform one iteration of the current task"""
102        try:
103            task = self.get_current_task()
104        except StopIteration:
105            return 0
106        try:
107            if hasattr(task, "__iter__"):
108                ret = next(task)
109            else:
110                ret = task()
111        except StopIteration:
112            pass
113        except Exception:
114            traceback.print_exc()
115        else:
116            if ret:
117                return ret
118        self.tasks.remove(task)
119        return 0
120
121
122class LifoScheduler(SchedulerBase):
123    """Scheduler calling most recently added tasks first"""
124
125    def get_current_task(self):
126        try:
127            return self.tasks[-1]
128        except IndexError:
129            raise StopIteration
130
131
132class FifoScheduler(SchedulerBase):
133    """Scheduler calling tasks in the order they were added"""
134
135    def get_current_task(self):
136        try:
137            return self.tasks[0]
138        except IndexError:
139            raise StopIteration
140
141
142if __name__ == "__main__":
143    import time
144    import random
145    m = LifoScheduler()
146
147    def timetask(t):
148        while time.time() - t < 1:
149            print("***")
150            time.sleep(0.1)
151        print("!!!")
152
153    def sayhello(x):
154        for i in range(random.randint(2, 8)):
155            print("hello", x)
156            time.sleep(0.1)
157            yield 1
158        print("end", x)
159
160    s = FifoScheduler()
161    m.add_task(s)
162    s.add_task(sayhello(10))
163    s.add_task(sayhello(20))
164    s.add_task(sayhello(30))
165    while s.tasks_pending():
166        s.iteration()
167    time.sleep(2)
168    print("***")
169