1 
2 #include <stdio.h>
3 #include <stdarg.h>
4 #include <string.h>
5 #include <sys/time.h>
6 #include <unistd.h>
7 #include <pcl.h>
8 
9 /*
10   Implements a simple cooperative multi-threading environment.
11 
12   Copyright 2000 by E. Toernig <froese@gmx.de>.
13 
14   The API is made out of four functions:
15 
16   void cothread_init(void)
17   Initializes the data structures.  Has to be called once
18   at program start.
19 
20   coroutine_t cothread_new(void (*func)(), ...)
21   Creates a new coroutine.  func is called as it's startup
22   function and the additional arguments are passed as a va_list.
23   Returns 0 on success and -1 on failure.
24 
25   int cothread_wait(int mode [, int fd] [, int timeout])
26   mode is the bitwise OR of IOREAD, IOWRITE, and IOEXCEPT which
27   require the fd, and IOTIMEOUT which requires the timeout.
28   If any of the conditions become true, the function returns
29   with the mode-bits that became ready.
30 
31   There are some special combinations:
32   cothread_wait(0)
33   Waits forever.  Coroutine is deleted.	 This is the
34   standard method to delete a coroutine.
35   cothread_wait(IOTIMEOUT, ms)
36   Sleeps for ms milliseconds.
37   cothread_wait(IOTIMEOUT, 0)
38   Return immediately.  Side effect: all other coroutines
39   waiting for an event get a chance to run.
40 
41   int cothread_schedule(void)
42   Give up processing and let other coroutines run.  To restart
43   this one, another coroutine has to co_call back.
44 
45   The cothread routines only manage coroutines that are currently
46   executing cothread_wait.  You are not required to create them via
47   cothread_new.	 Any coroutine may use these functions.	 cothread_new
48   just makes sure, that execution comes back to the creating coroutine.
49 
50   [[xref: co_create, co_delete, co_call, co_current]]
51   [[xref: select, gettimeofday, memset]]
52 */
53 
54 
55 
56 #define IOREAD		1	// wait for fd to become readable
57 #define IOWRITE		2	// wait for fd to become writeable
58 #define IOEXCEPT	4	// wait for an exception condition on fd
59 #define IOTIMEOUT	8	// time out of specified time
60 
61 struct ioreq {
62 	struct ioreq *next;
63 	coroutine_t coro;	// coroutine that is waiting
64 	int mode;			// the events it is waiting for
65 	int fd;			// optional file descriptor
66 	struct timeval timeout[1];	// optional time out
67 };
68 
69 struct ioqueue {
70 	struct ioreq *req;		// first request in this queue
71 	int maxfd;			// highest fd used in the requests
72 	struct timeval *mintime;	// earliest timeout in the requests
73 	fd_set *rp, *wp, *ep;	// pointers to the fd_sets below.
74 	fd_set rfds[1], wfds[1], efds[1];	// fd_sets for the select
75 };
76 
77 struct iosched {
78 	struct timeval ctime[1];	// system time after last select
79 	struct ioqueue *active;	// requests processed by the last poll
80 	struct ioqueue *wait;	// requests for the next poll
81 	struct ioqueue queues[2];	// data area of the queues.
82 };
83 
84 static struct iosched glbl[1];
85 
86 
87 
88 
tvadd(struct timeval * dst,struct timeval * a,struct timeval * b)89 static struct timeval *tvadd(struct timeval *dst, struct timeval *a,
90 			     struct timeval *b) {
91 
92 	dst->tv_sec = a->tv_sec + b->tv_sec;
93 	dst->tv_usec = a->tv_usec + b->tv_usec;
94 	if (dst->tv_usec >= 1000000)
95 		dst->tv_sec++, dst->tv_usec -= 1000000;
96 	return dst;
97 }
98 
99 
tvsub(struct timeval * dst,struct timeval * a,struct timeval * b)100 static struct timeval *tvsub(struct timeval *dst, struct timeval *a,
101 			     struct timeval *b) {
102 
103 	dst->tv_sec = a->tv_sec - b->tv_sec;
104 	dst->tv_usec = a->tv_usec - b->tv_usec;
105 	if (dst->tv_usec < 0)
106 		dst->tv_sec--, dst->tv_usec += 1000000;
107 	return dst;
108 }
109 
110 
tvcmp(struct timeval * a,struct timeval * b)111 static long tvcmp(struct timeval *a, struct timeval *b) {
112 
113 	if (a->tv_sec - b->tv_sec)
114 		return a->tv_sec - b->tv_sec;
115 	return a->tv_usec - b->tv_usec;
116 }
117 
118 
to2tv(struct timeval * dst,int timeout)119 static struct timeval *to2tv(struct timeval *dst, int timeout) {
120 
121 	dst->tv_sec = timeout/1000;
122 	dst->tv_usec = timeout%1000 * 1000;
123 	return dst;
124 }
125 
126 
set_fds(struct ioreq * r,int mode,fd_set * fds,fd_set ** fp)127 static void set_fds(struct ioreq *r, int mode, fd_set *fds, fd_set **fp) {
128 
129 	if (r->mode & mode) {
130 		FD_SET(r->fd, fds);
131 		*fp = fds;
132 	}
133 }
134 
135 
tst_fds(struct ioreq * r,int mode,fd_set * fds)136 static int tst_fds(struct ioreq *r, int mode, fd_set *fds) {
137 
138 	if (r->mode & mode)
139 		if (FD_ISSET(r->fd, fds)) {
140 			FD_CLR(r->fd, fds);
141 			return mode;
142 		}
143 	return 0;
144 }
145 
146 
check(struct ioqueue * q,struct ioreq * r,struct timeval * ctime)147 static int check(struct ioqueue *q, struct ioreq *r, struct timeval *ctime) {
148 	int res = 0;
149 
150 	if (r->mode & (IOREAD|IOWRITE|IOEXCEPT)) {
151 		res |= tst_fds(r, IOREAD, q->rp);
152 		res |= tst_fds(r, IOWRITE, q->wp);
153 		res |= tst_fds(r, IOEXCEPT, q->ep);
154 	}
155 	if (res == 0) // IOTIMEOUT has lower precedence
156 		if (r->mode & IOTIMEOUT)
157 			if (tvcmp(r->timeout, ctime) <= 0)
158 				res |= IOTIMEOUT;
159 	return res;
160 }
161 
162 
enqueue(struct ioqueue * q,struct ioreq * r)163 static void enqueue(struct ioqueue *q, struct ioreq *r) {
164 
165 	if (r->mode & (IOREAD|IOWRITE|IOEXCEPT)) {
166 		set_fds(r, IOREAD, q->rfds, &q->rp);
167 		set_fds(r, IOWRITE, q->wfds, &q->wp);
168 		set_fds(r, IOEXCEPT, q->efds, &q->ep);
169 		if (r->fd >= q->maxfd)
170 			q->maxfd = r->fd + 1;
171 	}
172 	if (r->mode & IOTIMEOUT)
173 		if (!q->mintime || tvcmp(q->mintime, r->timeout) > 0)
174 			q->mintime = r->timeout;
175 	r->next = q->req;
176 	q->req = r;
177 }
178 
179 
vadd_req(struct ioreq * r,int mode,va_list args)180 static void vadd_req(struct ioreq *r, int mode, va_list args) {
181 
182 	r->coro = co_current();
183 	r->mode = mode;
184 	if (mode & (IOREAD|IOWRITE|IOEXCEPT))
185 		r->fd = va_arg(args, int);
186 	if (mode & IOTIMEOUT)
187 		tvadd(r->timeout, to2tv(r->timeout, va_arg(args, int)), glbl->ctime);
188 
189 	enqueue(glbl->wait, r);
190 }
191 
192 
add_req(struct ioreq * r,int mode,...)193 static void add_req(struct ioreq *r, int mode, ...) {
194 	va_list args;
195 
196 	va_start(args, mode);
197 	vadd_req(r, mode, args);
198 	va_end(args);
199 }
200 
201 
cothread_schedule(void)202 int cothread_schedule(void) {
203 	struct ioqueue *q;
204 	struct ioreq *r;
205 	struct timeval tv[1];
206 	int res;
207 
208 	for (;;) {
209 		q = glbl->active;
210 		while ((r = q->req)) {
211 			q->req = r->next;
212 			if ((res = check(q, r, glbl->ctime))) {
213 				co_call(r->coro);
214 				return -1;
215 			}
216 			if (r->mode == 0 && r->coro != co_current())
217 				co_delete(r->coro);
218 			else
219 				enqueue(glbl->wait, r);
220 		}
221 		q->rp = q->wp = q->ep = 0;
222 		q->mintime = 0;
223 		q->maxfd = 0;
224 		glbl->active = glbl->wait;
225 		glbl->wait = q;
226 
227 		q = glbl->active;
228 		if (q->mintime)
229 			q->mintime = tvsub(tv, q->mintime, glbl->ctime);
230 
231 		while (select(q->maxfd, q->rp, q->wp, q->ep, q->mintime) == -1)
232 			;
233 		gettimeofday(glbl->ctime, 0);
234 	}
235 
236 	return 0;
237 }
238 
239 
cothread_wait(int mode,...)240 int cothread_wait(int mode, ...) {
241 	va_list args;
242 	struct ioreq req[1];
243 
244 	va_start(args, mode);
245 	vadd_req(req, mode, args);
246 	va_end(args);
247 	return cothread_schedule();
248 }
249 
250 
cothread_new(void (* func)(),...)251 coroutine_t cothread_new(void (*func)(), ...) {
252 	coroutine_t co;
253 	va_list args;
254 	struct ioreq req[1];
255 
256 	add_req(req, IOTIMEOUT, 0);
257 	va_start(args, func);
258 
259 	if ((co = co_create(func, args, 0, 32768)))
260 		co_call(co);
261 
262 	va_end(args);
263 	return co;
264 }
265 
266 
cothread_init()267 void cothread_init() {
268 
269 	gettimeofday(glbl->ctime, 0);
270 	glbl->active = glbl->queues;
271 	glbl->wait = glbl->queues + 1;
272 	memset(glbl->queues, 0, sizeof(glbl->queues));
273 }
274 
275 
test1(va_list args)276 static void test1(va_list args) {
277 	char *str = va_arg(args, char *);
278 	int limit = va_arg(args, int);
279 	int i = 0;
280 
281 	printf("%s started\n", str);
282 	while (i < limit) {
283 		cothread_wait(IOTIMEOUT, 1000);
284 		printf("%s: %d\n", str, i++);
285 	}
286 	printf("%s: dying\n", str);
287 	cothread_wait(0);
288 }
289 
290 
test2(va_list args)291 static void test2(va_list args) {
292 	char *str = va_arg(args, char *);
293 	int in = va_arg(args, int);
294 	int out = va_arg(args, int);
295 	char buf[256];
296 	int n;
297 
298 	printf("%s started\n", str);
299 	for (;;) {
300 		cothread_wait(IOREAD, in);
301 		if ((n = read(in, buf, sizeof(buf))) <= 0)
302 			break;
303 		cothread_wait(IOWRITE, out);
304 		write(out, buf, n);
305 	}
306 	printf("%s: dying\n", str);
307 	cothread_wait(0);
308 }
309 
310 
main(int argc,char ** argv)311 int main(int argc, char **argv) {
312 	cothread_init();
313 
314 	cothread_new(test1, "test1a", 10);
315 	cothread_new(test1, "test1b", 12);
316 	cothread_new(test1, "test1c", 14);
317 	cothread_new(test2, "test2", 0, 2);
318 
319 	for (;;) {
320 		printf("main: waiting...\n");
321 		cothread_wait(IOTIMEOUT, 3000);
322 	}
323 }
324 
325