1 #include "threadimpl.h"
2 
3 /*
4  * One can go through a lot of effort to avoid this global lock.
5  * You have to put locks in all the channels and all the Alt
6  * structures.  At the beginning of an alt you have to lock all
7  * the channels, but then to try to actually exec an op you
8  * have to lock the other guy's alt structure, so that other
9  * people aren't trying to use him in some other op at the
10  * same time.
11  *
12  * For Plan 9 apps, it's just not worth the extra effort.
13  */
14 static QLock chanlock;
15 
16 Channel*
chancreate(int elemsize,int bufsize)17 chancreate(int elemsize, int bufsize)
18 {
19 	Channel *c;
20 
21 	c = malloc(sizeof *c+bufsize*elemsize);
22 	if(c == nil)
23 		sysfatal("chancreate malloc: %r");
24 	memset(c, 0, sizeof *c);
25 	c->elemsize = elemsize;
26 	c->bufsize = bufsize;
27 	c->nbuf = 0;
28 	c->buf = (uchar*)(c+1);
29 	return c;
30 }
31 
32 void
chansetname(Channel * c,char * fmt,...)33 chansetname(Channel *c, char *fmt, ...)
34 {
35 	char *name;
36 	va_list arg;
37 
38 	va_start(arg, fmt);
39 	name = vsmprint(fmt, arg);
40 	va_end(arg);
41 	free(c->name);
42 	c->name = name;
43 }
44 
45 /* bug - work out races */
46 void
chanfree(Channel * c)47 chanfree(Channel *c)
48 {
49 	if(c == nil)
50 		return;
51 	free(c->name);
52 	free(c->arecv.a);
53 	free(c->asend.a);
54 	free(c);
55 }
56 
57 static void
addarray(_Altarray * a,Alt * alt)58 addarray(_Altarray *a, Alt *alt)
59 {
60 	if(a->n == a->m){
61 		a->m += 16;
62 		a->a = realloc(a->a, a->m*sizeof a->a[0]);
63 	}
64 	a->a[a->n++] = alt;
65 }
66 
67 static void
delarray(_Altarray * a,int i)68 delarray(_Altarray *a, int i)
69 {
70 	--a->n;
71 	a->a[i] = a->a[a->n];
72 }
73 
74 /*
75  * doesn't really work for things other than CHANSND and CHANRCV
76  * but is only used as arg to chanarray, which can handle it
77  */
78 #define otherop(op)	(CHANSND+CHANRCV-(op))
79 
80 static _Altarray*
chanarray(Channel * c,uint op)81 chanarray(Channel *c, uint op)
82 {
83 	switch(op){
84 	default:
85 		return nil;
86 	case CHANSND:
87 		return &c->asend;
88 	case CHANRCV:
89 		return &c->arecv;
90 	}
91 }
92 
93 static int
altcanexec(Alt * a)94 altcanexec(Alt *a)
95 {
96 	_Altarray *ar;
97 	Channel *c;
98 
99 	if(a->op == CHANNOP || (c=a->c) == nil)
100 		return 0;
101 	if(c->bufsize == 0){
102 		ar = chanarray(c, otherop(a->op));
103 		return ar && ar->n;
104 	}else{
105 		switch(a->op){
106 		default:
107 			return 0;
108 		case CHANSND:
109 			return c->nbuf < c->bufsize;
110 		case CHANRCV:
111 			return c->nbuf > 0;
112 		}
113 	}
114 }
115 
116 static void
altqueue(Alt * a)117 altqueue(Alt *a)
118 {
119 	_Altarray *ar;
120 
121 	if(a->c == nil)
122 		return;
123 	ar = chanarray(a->c, a->op);
124 	addarray(ar, a);
125 }
126 
127 static void
altdequeue(Alt * a)128 altdequeue(Alt *a)
129 {
130 	int i;
131 	_Altarray *ar;
132 
133 	ar = chanarray(a->c, a->op);
134 	if(ar == nil){
135 		fprint(2, "bad use of altdequeue op=%d\n", a->op);
136 		abort();
137 	}
138 
139 	for(i=0; i<ar->n; i++)
140 		if(ar->a[i] == a){
141 			delarray(ar, i);
142 			return;
143 		}
144 	fprint(2, "cannot find self in altdequeue\n");
145 	abort();
146 }
147 
148 static void
altalldequeue(Alt * a)149 altalldequeue(Alt *a)
150 {
151 	int i;
152 
153 	for(i=0; a[i].op!=CHANEND && a[i].op!=CHANNOBLK; i++)
154 		if(a[i].op != CHANNOP)
155 			altdequeue(&a[i]);
156 }
157 
158 static void
amove(void * dst,void * src,uint n)159 amove(void *dst, void *src, uint n)
160 {
161 	if(dst){
162 		if(src == nil)
163 			memset(dst, 0, n);
164 		else
165 			memmove(dst, src, n);
166 	}
167 }
168 
169 /*
170  * Actually move the data around.  There are up to three
171  * players: the sender, the receiver, and the channel itself.
172  * If the channel is unbuffered or the buffer is empty,
173  * data goes from sender to receiver.  If the channel is full,
174  * the receiver removes some from the channel and the sender
175  * gets to put some in.
176  */
177 static void
altcopy(Alt * s,Alt * r)178 altcopy(Alt *s, Alt *r)
179 {
180 	Alt *t;
181 	Channel *c;
182 	uchar *cp;
183 
184 	/*
185 	 * Work out who is sender and who is receiver
186 	 */
187 	if(s == nil && r == nil)
188 		return;
189 	assert(s != nil);
190 	c = s->c;
191 	if(s->op == CHANRCV){
192 		t = s;
193 		s = r;
194 		r = t;
195 	}
196 	assert(s==nil || s->op == CHANSND);
197 	assert(r==nil || r->op == CHANRCV);
198 
199 	/*
200 	 * Channel is empty (or unbuffered) - copy directly.
201 	 */
202 	if(s && r && c->nbuf == 0){
203 		amove(r->v, s->v, c->elemsize);
204 		return;
205 	}
206 
207 	/*
208 	 * Otherwise it's always okay to receive and then send.
209 	 */
210 	if(r){
211 		cp = c->buf + c->off*c->elemsize;
212 		amove(r->v, cp, c->elemsize);
213 		--c->nbuf;
214 		if(++c->off == c->bufsize)
215 			c->off = 0;
216 	}
217 	if(s){
218 		cp = c->buf + (c->off+c->nbuf)%c->bufsize*c->elemsize;
219 		amove(cp, s->v, c->elemsize);
220 		++c->nbuf;
221 	}
222 }
223 
224 static void
altexec(Alt * a)225 altexec(Alt *a)
226 {
227 	int i;
228 	_Altarray *ar;
229 	Alt *other;
230 	Channel *c;
231 
232 	c = a->c;
233 	ar = chanarray(c, otherop(a->op));
234 	if(ar && ar->n){
235 		i = rand()%ar->n;
236 		other = ar->a[i];
237 		altcopy(a, other);
238 		altalldequeue(other->thread->alt);
239 		other->thread->alt = other;
240 		_threadready(other->thread);
241 	}else
242 		altcopy(a, nil);
243 }
244 
245 #define dbgalt 0
246 int
chanalt(Alt * a)247 chanalt(Alt *a)
248 {
249 	int i, j, ncan, n, canblock;
250 	Channel *c;
251 	_Thread *t;
252 
253 	needstack(512);
254 	for(i=0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++)
255 		;
256 	n = i;
257 	canblock = a[i].op == CHANEND;
258 
259 	t = proc()->thread;
260 	for(i=0; i<n; i++)
261 		a[i].thread = t;
262 	t->alt = a;
263 	qlock(&chanlock);
264 if(dbgalt) print("alt ");
265 	ncan = 0;
266 	for(i=0; i<n; i++){
267 		c = a[i].c;
268 if(dbgalt) print(" %c:", "esrnb"[a[i].op]);
269 if(dbgalt) if(c->name) print("%s", c->name); else print("%p", c);
270 		if(altcanexec(&a[i])){
271 if(dbgalt) print("*");
272 			ncan++;
273 		}
274 	}
275 	if(ncan){
276 		j = rand()%ncan;
277 		for(i=0; i<n; i++){
278 			if(altcanexec(&a[i])){
279 				if(j-- == 0){
280 if(dbgalt){
281 c = a[i].c;
282 print(" => %c:", "esrnb"[a[i].op]);
283 if(c->name) print("%s", c->name); else print("%p", c);
284 print("\n");
285 }
286 					altexec(&a[i]);
287 					qunlock(&chanlock);
288 					return i;
289 				}
290 			}
291 		}
292 	}
293 if(dbgalt)print("\n");
294 
295 	if(!canblock){
296 		qunlock(&chanlock);
297 		return -1;
298 	}
299 
300 	for(i=0; i<n; i++){
301 		if(a[i].op != CHANNOP)
302 			altqueue(&a[i]);
303 	}
304 	qunlock(&chanlock);
305 
306 	_threadswitch();
307 
308 	/*
309 	 * the guy who ran the op took care of dequeueing us
310 	 * and then set t->alt to the one that was executed.
311 	 */
312 	if(t->alt < a || t->alt >= a+n)
313 		sysfatal("channel bad alt");
314 	return t->alt - a;
315 }
316 
317 static int
_chanop(Channel * c,int op,void * p,int canblock)318 _chanop(Channel *c, int op, void *p, int canblock)
319 {
320 	Alt a[2];
321 
322 	a[0].c = c;
323 	a[0].op = op;
324 	a[0].v = p;
325 	a[1].op = canblock ? CHANEND : CHANNOBLK;
326 	if(chanalt(a) < 0)
327 		return -1;
328 	return 1;
329 }
330 
331 int
chansend(Channel * c,void * v)332 chansend(Channel *c, void *v)
333 {
334 	return _chanop(c, CHANSND, v, 1);
335 }
336 
337 int
channbsend(Channel * c,void * v)338 channbsend(Channel *c, void *v)
339 {
340 	return _chanop(c, CHANSND, v, 0);
341 }
342 
343 int
chanrecv(Channel * c,void * v)344 chanrecv(Channel *c, void *v)
345 {
346 	return _chanop(c, CHANRCV, v, 1);
347 }
348 
349 int
channbrecv(Channel * c,void * v)350 channbrecv(Channel *c, void *v)
351 {
352 	return _chanop(c, CHANRCV, v, 0);
353 }
354 
355 int
chansendp(Channel * c,void * v)356 chansendp(Channel *c, void *v)
357 {
358 	return _chanop(c, CHANSND, (void*)&v, 1);
359 }
360 
361 void*
chanrecvp(Channel * c)362 chanrecvp(Channel *c)
363 {
364 	void *v;
365 
366 	if(_chanop(c, CHANRCV, (void*)&v, 1) > 0)
367 		return v;
368 	return nil;
369 }
370 
371 int
channbsendp(Channel * c,void * v)372 channbsendp(Channel *c, void *v)
373 {
374 	return _chanop(c, CHANSND, (void*)&v, 0);
375 }
376 
377 void*
channbrecvp(Channel * c)378 channbrecvp(Channel *c)
379 {
380 	void *v;
381 
382 	if(_chanop(c, CHANRCV, (void*)&v, 0) > 0)
383 		return v;
384 	return nil;
385 }
386 
387 int
chansendul(Channel * c,ulong val)388 chansendul(Channel *c, ulong val)
389 {
390 	return _chanop(c, CHANSND, &val, 1);
391 }
392 
393 ulong
chanrecvul(Channel * c)394 chanrecvul(Channel *c)
395 {
396 	ulong val;
397 
398 	if(_chanop(c, CHANRCV, &val, 1) > 0)
399 		return val;
400 	return 0;
401 }
402 
403 int
channbsendul(Channel * c,ulong val)404 channbsendul(Channel *c, ulong val)
405 {
406 	return _chanop(c, CHANSND, &val, 0);
407 }
408 
409 ulong
channbrecvul(Channel * c)410 channbrecvul(Channel *c)
411 {
412 	ulong val;
413 
414 	if(_chanop(c, CHANRCV, &val, 0) > 0)
415 		return val;
416 	return 0;
417 }
418