1
2 #include <stdio.h>
3 #include <stdarg.h>
4 #include <string.h>
5 #include <sys/time.h>
6 #include <unistd.h>
7 #include <pcl.h>
8
9 /*
10 Implements a simple cooperative multi-threading environment.
11
12 Copyright 2000 by E. Toernig <froese@gmx.de>.
13
14 The API is made out of four functions:
15
16 void cothread_init(void)
17 Initializes the data structures. Has to be called once
18 at program start.
19
20 coroutine_t cothread_new(void (*func)(), ...)
21 Creates a new coroutine. func is called as it's startup
22 function and the additional arguments are passed as a va_list.
23 Returns 0 on success and -1 on failure.
24
25 int cothread_wait(int mode [, int fd] [, int timeout])
26 mode is the bitwise OR of IOREAD, IOWRITE, and IOEXCEPT which
27 require the fd, and IOTIMEOUT which requires the timeout.
28 If any of the conditions become true, the function returns
29 with the mode-bits that became ready.
30
31 There are some special combinations:
32 cothread_wait(0)
33 Waits forever. Coroutine is deleted. This is the
34 standard method to delete a coroutine.
35 cothread_wait(IOTIMEOUT, ms)
36 Sleeps for ms milliseconds.
37 cothread_wait(IOTIMEOUT, 0)
38 Return immediately. Side effect: all other coroutines
39 waiting for an event get a chance to run.
40
41 int cothread_schedule(void)
42 Give up processing and let other coroutines run. To restart
43 this one, another coroutine has to co_call back.
44
45 The cothread routines only manage coroutines that are currently
46 executing cothread_wait. You are not required to create them via
47 cothread_new. Any coroutine may use these functions. cothread_new
48 just makes sure, that execution comes back to the creating coroutine.
49
50 [[xref: co_create, co_delete, co_call, co_current]]
51 [[xref: select, gettimeofday, memset]]
52 */
53
54
55
56 #define IOREAD 1 // wait for fd to become readable
57 #define IOWRITE 2 // wait for fd to become writeable
58 #define IOEXCEPT 4 // wait for an exception condition on fd
59 #define IOTIMEOUT 8 // time out of specified time
60
61 struct ioreq {
62 struct ioreq *next;
63 coroutine_t coro; // coroutine that is waiting
64 int mode; // the events it is waiting for
65 int fd; // optional file descriptor
66 struct timeval timeout[1]; // optional time out
67 };
68
69 struct ioqueue {
70 struct ioreq *req; // first request in this queue
71 int maxfd; // highest fd used in the requests
72 struct timeval *mintime; // earliest timeout in the requests
73 fd_set *rp, *wp, *ep; // pointers to the fd_sets below.
74 fd_set rfds[1], wfds[1], efds[1]; // fd_sets for the select
75 };
76
77 struct iosched {
78 struct timeval ctime[1]; // system time after last select
79 struct ioqueue *active; // requests processed by the last poll
80 struct ioqueue *wait; // requests for the next poll
81 struct ioqueue queues[2]; // data area of the queues.
82 };
83
84 static struct iosched glbl[1];
85
86
87
88
tvadd(struct timeval * dst,struct timeval * a,struct timeval * b)89 static struct timeval *tvadd(struct timeval *dst, struct timeval *a,
90 struct timeval *b) {
91
92 dst->tv_sec = a->tv_sec + b->tv_sec;
93 dst->tv_usec = a->tv_usec + b->tv_usec;
94 if (dst->tv_usec >= 1000000)
95 dst->tv_sec++, dst->tv_usec -= 1000000;
96 return dst;
97 }
98
99
tvsub(struct timeval * dst,struct timeval * a,struct timeval * b)100 static struct timeval *tvsub(struct timeval *dst, struct timeval *a,
101 struct timeval *b) {
102
103 dst->tv_sec = a->tv_sec - b->tv_sec;
104 dst->tv_usec = a->tv_usec - b->tv_usec;
105 if (dst->tv_usec < 0)
106 dst->tv_sec--, dst->tv_usec += 1000000;
107 return dst;
108 }
109
110
tvcmp(struct timeval * a,struct timeval * b)111 static long tvcmp(struct timeval *a, struct timeval *b) {
112
113 if (a->tv_sec - b->tv_sec)
114 return a->tv_sec - b->tv_sec;
115 return a->tv_usec - b->tv_usec;
116 }
117
118
to2tv(struct timeval * dst,int timeout)119 static struct timeval *to2tv(struct timeval *dst, int timeout) {
120
121 dst->tv_sec = timeout/1000;
122 dst->tv_usec = timeout%1000 * 1000;
123 return dst;
124 }
125
126
set_fds(struct ioreq * r,int mode,fd_set * fds,fd_set ** fp)127 static void set_fds(struct ioreq *r, int mode, fd_set *fds, fd_set **fp) {
128
129 if (r->mode & mode) {
130 FD_SET(r->fd, fds);
131 *fp = fds;
132 }
133 }
134
135
tst_fds(struct ioreq * r,int mode,fd_set * fds)136 static int tst_fds(struct ioreq *r, int mode, fd_set *fds) {
137
138 if (r->mode & mode)
139 if (FD_ISSET(r->fd, fds)) {
140 FD_CLR(r->fd, fds);
141 return mode;
142 }
143 return 0;
144 }
145
146
check(struct ioqueue * q,struct ioreq * r,struct timeval * ctime)147 static int check(struct ioqueue *q, struct ioreq *r, struct timeval *ctime) {
148 int res = 0;
149
150 if (r->mode & (IOREAD|IOWRITE|IOEXCEPT)) {
151 res |= tst_fds(r, IOREAD, q->rp);
152 res |= tst_fds(r, IOWRITE, q->wp);
153 res |= tst_fds(r, IOEXCEPT, q->ep);
154 }
155 if (res == 0) // IOTIMEOUT has lower precedence
156 if (r->mode & IOTIMEOUT)
157 if (tvcmp(r->timeout, ctime) <= 0)
158 res |= IOTIMEOUT;
159 return res;
160 }
161
162
enqueue(struct ioqueue * q,struct ioreq * r)163 static void enqueue(struct ioqueue *q, struct ioreq *r) {
164
165 if (r->mode & (IOREAD|IOWRITE|IOEXCEPT)) {
166 set_fds(r, IOREAD, q->rfds, &q->rp);
167 set_fds(r, IOWRITE, q->wfds, &q->wp);
168 set_fds(r, IOEXCEPT, q->efds, &q->ep);
169 if (r->fd >= q->maxfd)
170 q->maxfd = r->fd + 1;
171 }
172 if (r->mode & IOTIMEOUT)
173 if (!q->mintime || tvcmp(q->mintime, r->timeout) > 0)
174 q->mintime = r->timeout;
175 r->next = q->req;
176 q->req = r;
177 }
178
179
vadd_req(struct ioreq * r,int mode,va_list args)180 static void vadd_req(struct ioreq *r, int mode, va_list args) {
181
182 r->coro = co_current();
183 r->mode = mode;
184 if (mode & (IOREAD|IOWRITE|IOEXCEPT))
185 r->fd = va_arg(args, int);
186 if (mode & IOTIMEOUT)
187 tvadd(r->timeout, to2tv(r->timeout, va_arg(args, int)), glbl->ctime);
188
189 enqueue(glbl->wait, r);
190 }
191
192
add_req(struct ioreq * r,int mode,...)193 static void add_req(struct ioreq *r, int mode, ...) {
194 va_list args;
195
196 va_start(args, mode);
197 vadd_req(r, mode, args);
198 va_end(args);
199 }
200
201
cothread_schedule(void)202 int cothread_schedule(void) {
203 struct ioqueue *q;
204 struct ioreq *r;
205 struct timeval tv[1];
206 int res;
207
208 for (;;) {
209 q = glbl->active;
210 while ((r = q->req)) {
211 q->req = r->next;
212 if ((res = check(q, r, glbl->ctime))) {
213 co_call(r->coro);
214 return -1;
215 }
216 if (r->mode == 0 && r->coro != co_current())
217 co_delete(r->coro);
218 else
219 enqueue(glbl->wait, r);
220 }
221 q->rp = q->wp = q->ep = 0;
222 q->mintime = 0;
223 q->maxfd = 0;
224 glbl->active = glbl->wait;
225 glbl->wait = q;
226
227 q = glbl->active;
228 if (q->mintime)
229 q->mintime = tvsub(tv, q->mintime, glbl->ctime);
230
231 while (select(q->maxfd, q->rp, q->wp, q->ep, q->mintime) == -1)
232 ;
233 gettimeofday(glbl->ctime, 0);
234 }
235
236 return 0;
237 }
238
239
cothread_wait(int mode,...)240 int cothread_wait(int mode, ...) {
241 va_list args;
242 struct ioreq req[1];
243
244 va_start(args, mode);
245 vadd_req(req, mode, args);
246 va_end(args);
247 return cothread_schedule();
248 }
249
250
cothread_new(void (* func)(),...)251 coroutine_t cothread_new(void (*func)(), ...) {
252 coroutine_t co;
253 va_list args;
254 struct ioreq req[1];
255
256 add_req(req, IOTIMEOUT, 0);
257 va_start(args, func);
258
259 if ((co = co_create(func, args, 0, 32768)))
260 co_call(co);
261
262 va_end(args);
263 return co;
264 }
265
266
cothread_init()267 void cothread_init() {
268
269 gettimeofday(glbl->ctime, 0);
270 glbl->active = glbl->queues;
271 glbl->wait = glbl->queues + 1;
272 memset(glbl->queues, 0, sizeof(glbl->queues));
273 }
274
275
test1(va_list args)276 static void test1(va_list args) {
277 char *str = va_arg(args, char *);
278 int limit = va_arg(args, int);
279 int i = 0;
280
281 printf("%s started\n", str);
282 while (i < limit) {
283 cothread_wait(IOTIMEOUT, 1000);
284 printf("%s: %d\n", str, i++);
285 }
286 printf("%s: dying\n", str);
287 cothread_wait(0);
288 }
289
290
test2(va_list args)291 static void test2(va_list args) {
292 char *str = va_arg(args, char *);
293 int in = va_arg(args, int);
294 int out = va_arg(args, int);
295 char buf[256];
296 int n;
297
298 printf("%s started\n", str);
299 for (;;) {
300 cothread_wait(IOREAD, in);
301 if ((n = read(in, buf, sizeof(buf))) <= 0)
302 break;
303 cothread_wait(IOWRITE, out);
304 write(out, buf, n);
305 }
306 printf("%s: dying\n", str);
307 cothread_wait(0);
308 }
309
310
main(int argc,char ** argv)311 int main(int argc, char **argv) {
312 cothread_init();
313
314 cothread_new(test1, "test1a", 10);
315 cothread_new(test1, "test1b", 12);
316 cothread_new(test1, "test1c", 14);
317 cothread_new(test2, "test2", 0, 2);
318
319 for (;;) {
320 printf("main: waiting...\n");
321 cothread_wait(IOTIMEOUT, 3000);
322 }
323 }
324
325