1 #include "schpriv.h"
2 
3 #ifndef NO_SCHEME_THREADS
4 
5 READ_ONLY Scheme_Object *scheme_always_ready_evt;
6 THREAD_LOCAL_DECL(Scheme_Object *scheme_system_idle_channel);
7 
8 static Scheme_Object *make_sema(int n, Scheme_Object **p);
9 static Scheme_Object *semap(int n, Scheme_Object **p);
10 static Scheme_Object *hit_sema(int n, Scheme_Object **p);
11 static Scheme_Object *block_sema_p(int n, Scheme_Object **p);
12 static Scheme_Object *block_sema(int n, Scheme_Object **p);
13 static Scheme_Object *block_sema_breakable(int n, Scheme_Object **p);
14 static Scheme_Object *make_sema_repost(int n, Scheme_Object **p);
15 static Scheme_Object *is_sema_repost(int n, Scheme_Object **p);
16 
17 static Scheme_Object *make_channel(int n, Scheme_Object **p);
18 static Scheme_Object *make_channel_put(int n, Scheme_Object **p);
19 static Scheme_Object *channel_p(int n, Scheme_Object **p);
20 static Scheme_Object *channel_put_p(int n, Scheme_Object **p);
21 static Scheme_Object *chaperone_channel(int argc, Scheme_Object *argv[]);
22 static Scheme_Object *impersonate_channel(int argc, Scheme_Object *argv[]);
23 
24 static Scheme_Object *thread_send(int n, Scheme_Object **p);
25 static Scheme_Object *thread_receive(int n, Scheme_Object **p);
26 static Scheme_Object *thread_try_receive(int n, Scheme_Object **p);
27 static Scheme_Object *thread_receive_evt(int n, Scheme_Object **p);
28 static Scheme_Object *thread_rewind_receive(int n, Scheme_Object **p);
29 
30 static Scheme_Object *make_alarm(int n, Scheme_Object **p);
31 static Scheme_Object *make_sys_idle(int n, Scheme_Object **p);
32 
33 static int channel_get_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
34 static int channel_put_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
35 static int channel_syncer_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
36 static int alarm_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
37 static int always_ready(Scheme_Object *w);
38 static int never_ready(Scheme_Object *w);
39 static int thread_recv_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
40 
41 static int pending_break(Scheme_Thread *p);
42 
43 THREAD_LOCAL_DECL(int scheme_main_was_once_suspended);
44 THREAD_LOCAL_DECL(static Scheme_Object *system_idle_put_evt);
45 READ_ONLY static Scheme_Object *thread_recv_evt;
46 
47 #ifdef MZ_PRECISE_GC
48 static void register_traversers(void);
49 #endif
50 
51 typedef struct {
52   Scheme_Object so;
53   double sleep_end;
54 } Scheme_Alarm;
55 
56 /* For object-sync: */
sema_ready(Scheme_Object * s)57 static int sema_ready(Scheme_Object *s)
58 {
59   return scheme_wait_sema(s, 1);
60 }
61 
sema_for_repost(Scheme_Object * s,int * repost)62 static Scheme_Object *sema_for_repost(Scheme_Object *s, int *repost)
63 {
64   *repost = 1;
65   return SCHEME_PTR_VAL(s);
66 }
67 
scheme_init_sema(Scheme_Startup_Env * env)68 void scheme_init_sema(Scheme_Startup_Env *env)
69 {
70   Scheme_Object *o;
71 
72 #ifdef MZ_PRECISE_GC
73   register_traversers();
74 #endif
75 
76   scheme_addto_prim_instance("make-semaphore",
77 			     scheme_make_prim_w_arity(make_sema,
78 						      "make-semaphore",
79 						      0, 1),
80 			     env);
81   scheme_addto_prim_instance("semaphore?",
82 			     scheme_make_folding_prim(semap,
83 						      "semaphore?",
84 						      1, 1, 1),
85 			     env);
86   scheme_addto_prim_instance("semaphore-post",
87 			     scheme_make_prim_w_arity(hit_sema,
88 						      "semaphore-post",
89 						      1, 1),
90 			     env);
91   scheme_addto_prim_instance("semaphore-try-wait?",
92 			     scheme_make_prim_w_arity(block_sema_p,
93 						      "semaphore-try-wait?",
94 						      1, 1),
95 			     env);
96   scheme_addto_prim_instance("semaphore-wait",
97 			     scheme_make_prim_w_arity(block_sema,
98 						      "semaphore-wait",
99 						      1, 1),
100 			     env);
101   scheme_addto_prim_instance("semaphore-wait/enable-break",
102 			     scheme_make_prim_w_arity(block_sema_breakable,
103 						      "semaphore-wait/enable-break",
104 						      1, 1),
105 			     env);
106 
107   scheme_addto_prim_instance("semaphore-peek-evt",
108 			     scheme_make_prim_w_arity(make_sema_repost,
109 						      "semaphore-peek-evt",
110 						      1, 1),
111 			     env);
112   scheme_addto_prim_instance("semaphore-peek-evt?",
113 			     scheme_make_folding_prim(is_sema_repost,
114 						      "semaphore-peek-evt?",
115 						      1, 1, 1),
116 			     env);
117 
118   scheme_addto_prim_instance("make-channel",
119 			     scheme_make_prim_w_arity(make_channel,
120 						      "make-channel",
121 						      0, 0),
122 			     env);
123   scheme_addto_prim_instance("channel-put-evt",
124 			     scheme_make_prim_w_arity(make_channel_put,
125 						      "channel-put-evt",
126 						      2, 2),
127 			     env);
128   scheme_addto_prim_instance("channel?",
129 			     scheme_make_folding_prim(channel_p,
130 						      "channel?",
131 						      1, 1, 1),
132 			     env);
133   scheme_addto_prim_instance("channel-put-evt?",
134 			     scheme_make_folding_prim(channel_put_p,
135                                                       "channel-put-evt?",
136                                                       1, 1, 1),
137 			     env);
138   scheme_addto_prim_instance("chaperone-channel",
139 			     scheme_make_prim_w_arity(chaperone_channel,
140                                                       "chaperone-channel",
141                                                       3, -1),
142 			     env);
143   scheme_addto_prim_instance("impersonate-channel",
144 			     scheme_make_prim_w_arity(impersonate_channel,
145                                                       "impersonate-channel",
146                                                       3, -1),
147 			     env);
148 
149   scheme_addto_prim_instance("thread-send",
150 			     scheme_make_prim_w_arity(thread_send,
151 						      "thread-send",
152 						      2, 3),
153 			     env);
154   scheme_addto_prim_instance("thread-receive",
155 			     scheme_make_prim_w_arity(thread_receive,
156 						      "thread-receive",
157 						      0, 0),
158 			     env);
159   scheme_addto_prim_instance("thread-try-receive",
160 			     scheme_make_prim_w_arity(thread_try_receive,
161 						      "thread-try-receive",
162 						      0, 0),
163 			     env);
164   scheme_addto_prim_instance("thread-receive-evt",
165 			     scheme_make_prim_w_arity(thread_receive_evt,
166 						      "thread-receive-evt",
167 						      0, 0),
168 			     env);
169   scheme_addto_prim_instance("thread-rewind-receive",
170 			     scheme_make_prim_w_arity(thread_rewind_receive,
171 						      "thread-rewind-receive",
172 						      1, 1),
173 			     env);
174 
175   scheme_addto_prim_instance("alarm-evt",
176 			     scheme_make_prim_w_arity(make_alarm,
177 						      "alarm-evt",
178 						      1, 1),
179 			     env);
180 
181   scheme_addto_prim_instance("system-idle-evt",
182 			     scheme_make_prim_w_arity(make_sys_idle,
183 						      "system-idle-evt",
184 						      0, 0),
185 			     env);
186 
187   REGISTER_SO(scheme_always_ready_evt);
188   scheme_always_ready_evt = scheme_alloc_small_object();
189   scheme_always_ready_evt->type = scheme_always_evt_type;
190   scheme_addto_prim_instance("always-evt", scheme_always_ready_evt, env);
191 
192   o = scheme_alloc_small_object();
193   o->type = scheme_never_evt_type;
194   scheme_addto_prim_instance("never-evt", o, env);
195 
196   REGISTER_SO(thread_recv_evt);
197   o = scheme_alloc_small_object();
198   o->type = scheme_thread_recv_evt_type;
199   thread_recv_evt = o;
200 
201   scheme_add_evt(scheme_sema_type, sema_ready, NULL, NULL, 0);
202   scheme_add_evt_through_sema(scheme_semaphore_repost_type, sema_for_repost, NULL);
203   scheme_add_evt(scheme_channel_type, (Scheme_Ready_Fun)channel_get_ready, NULL, NULL, 1);
204   scheme_add_evt(scheme_channel_put_type, (Scheme_Ready_Fun)channel_put_ready, NULL, NULL, 1);
205   scheme_add_evt(scheme_channel_syncer_type, (Scheme_Ready_Fun)channel_syncer_ready, NULL, NULL, 0);
206   scheme_add_evt(scheme_alarm_type, (Scheme_Ready_Fun)alarm_ready, NULL, NULL, 0);
207   scheme_add_evt(scheme_always_evt_type, always_ready, NULL, NULL, 0);
208   scheme_add_evt(scheme_never_evt_type, never_ready, NULL, NULL, 0);
209   scheme_add_evt(scheme_thread_recv_evt_type, (Scheme_Ready_Fun)thread_recv_ready, NULL, NULL, 0);
210 }
211 
scheme_init_sema_places()212 void scheme_init_sema_places() {
213   REGISTER_SO(scheme_system_idle_channel);
214   scheme_system_idle_channel = scheme_make_channel();
215 }
216 
scheme_make_sema(intptr_t v)217 Scheme_Object *scheme_make_sema(intptr_t v)
218 {
219   Scheme_Sema *sema;
220 
221   sema = MALLOC_ONE_TAGGED(Scheme_Sema);
222   sema->value = v;
223 
224   sema->so.type = scheme_sema_type;
225 
226   return (Scheme_Object *)sema;
227 }
228 
scheme_get_semaphore_init(const char * who,int n,Scheme_Object ** p)229 intptr_t scheme_get_semaphore_init(const char *who, int n, Scheme_Object **p)
230 {
231   intptr_t v;
232 
233   if (n) {
234     if (!SCHEME_INTP(p[0])) {
235       if (!SCHEME_BIGNUMP(p[0]) || !SCHEME_BIGPOS(p[0]))
236 	scheme_wrong_contract(who, "exact-nonnegative-integer?", 0, n, p);
237     }
238 
239     if (!scheme_get_int_val(p[0], &v)) {
240       scheme_raise_exn(MZEXN_FAIL,
241 		       "%s: starting value %s is too large",
242                        who,
243 		       scheme_make_provided_string(p[0], 0, NULL));
244     } else if (v < 0)
245       scheme_wrong_contract(who, "exact-nonnegative-integer?", 0, n, p);
246   } else
247     v = 0;
248 
249   return v;
250 }
251 
make_sema(int n,Scheme_Object ** p)252 static Scheme_Object *make_sema(int n, Scheme_Object **p)
253 {
254   intptr_t v;
255   v = scheme_get_semaphore_init("make-semaphore", n, p);
256   return scheme_make_sema(v);
257 }
258 
make_sema_repost(int n,Scheme_Object ** p)259 static Scheme_Object *make_sema_repost(int n, Scheme_Object **p)
260 {
261   if (!SCHEME_SEMAP(p[0]))
262     scheme_wrong_contract("semaphore-peek-evt", "semaphore?", 0, n, p);
263 
264   return scheme_make_sema_repost(p[0]);
265 }
266 
scheme_make_sema_repost(Scheme_Object * sema)267 Scheme_Object *scheme_make_sema_repost(Scheme_Object *sema)
268 {
269   Scheme_Object *o;
270 
271   o = scheme_alloc_small_object();
272   o->type = scheme_semaphore_repost_type;
273   SCHEME_PTR_VAL(o) = sema;
274 
275   return o;
276 }
277 
is_sema_repost(int n,Scheme_Object ** p)278 static Scheme_Object *is_sema_repost(int n, Scheme_Object **p)
279 {
280   return (SAME_TYPE(SCHEME_TYPE(p[0]), scheme_semaphore_repost_type)
281           ? scheme_true
282           : scheme_false);
283 }
284 
semap(int n,Scheme_Object ** p)285 static Scheme_Object *semap(int n, Scheme_Object **p)
286 {
287   return SCHEME_SEMAP(p[0]) ? scheme_true : scheme_false;
288 }
289 
did_post_sema(Scheme_Sema * t)290 void did_post_sema(Scheme_Sema *t)
291 {
292   while (t->first) {
293     Scheme_Channel_Syncer *w;
294     int consumed;
295 
296     w = t->first;
297 
298     t->first = w->next;
299     if (!w->next)
300       t->last = NULL;
301     else
302       t->first->prev = NULL;
303 
304     if ((!w->syncing || !w->syncing->result) && !pending_break(w->p)) {
305       if (w->syncing) {
306         w->syncing->result = w->syncing_i + 1;
307         if (w->syncing->disable_break)
308           w->syncing->disable_break->suspend_break++;
309         scheme_post_syncing_nacks(w->syncing);
310         if (!w->syncing->reposts || !w->syncing->reposts[w->syncing_i]) {
311           t->value -= 1;
312           consumed = 1;
313         } else
314           consumed = 0;
315         if (w->syncing->accepts && w->syncing->accepts[w->syncing_i])
316           scheme_accept_sync(w->syncing, w->syncing_i);
317       } else {
318         /* In this case, we will remove the syncer from line, but
319            someone else might grab the post. This is unfair, but it
320            can help improve throughput when multiple threads synchronize
321            on a lock. */
322         consumed = 1;
323       }
324       w->picked = 1;
325     } else
326       consumed = 0;
327 
328     w->in_line = 0;
329     w->prev = NULL;
330     w->next = NULL;
331 
332     if (w->picked) {
333       scheme_weak_resume_thread(w->p);
334       if (consumed)
335         break;
336     }
337     /* otherwise, loop to find one we can wake up */
338   }
339 }
340 
sema_overflow()341 static void sema_overflow()
342 {
343   scheme_raise_exn(MZEXN_FAIL,
344 		   "semaphore-post: the maximum post count has already been reached");
345 }
346 
scheme_post_sema(Scheme_Object * o)347 void scheme_post_sema(Scheme_Object *o)
348 {
349   /* fast path is designed to avoid need for XFORM */
350   Scheme_Sema *t = (Scheme_Sema *)o;
351   int v;
352 
353   if (t->value < 0) return;
354 
355   v = (intptr_t)((uintptr_t)t->value + 1);
356   if (v > t->value) {
357     t->value = v;
358 
359     if (t->first)
360       did_post_sema(t);
361   } else
362     sema_overflow();
363 }
364 
scheme_post_sema_all(Scheme_Object * o)365 void scheme_post_sema_all(Scheme_Object *o)
366 {
367   Scheme_Sema *t = (Scheme_Sema *)o;
368 
369   while (t->first) {
370     scheme_post_sema(o);
371   }
372   t->value = -1;
373 }
374 
hit_sema(int n,Scheme_Object ** p)375 static Scheme_Object *hit_sema(int n, Scheme_Object **p)
376 {
377   if (!SCHEME_SEMAP(p[0]))
378     scheme_wrong_contract("semaphore-post", "semaphore?", 0, n, p);
379 
380   scheme_post_sema(p[0]);
381 
382   return scheme_void;
383 }
384 
out_of_line(Scheme_Object * a)385 static int out_of_line(Scheme_Object *a)
386 {
387   Scheme_Thread *p;
388   int n, i;
389   Scheme_Channel_Syncer *w;
390 
391   /* Out of one line? */
392   n = SCHEME_INT_VAL(((Scheme_Object **)a)[0]);
393   for (i = 0; i < n; i++) {
394     w = (((Scheme_Channel_Syncer ***)a)[1])[i];
395     if (w->picked)
396       return 1;
397   }
398 
399   /* Suspended break? */
400   p = ((Scheme_Thread **)a)[2];
401   if (p->external_break) {
402     int v;
403     --p->suspend_break;
404     v = scheme_can_break(p);
405     p->suspend_break++;
406     if (v)
407       return 1;
408   }
409 
410   /* Suspended by user? */
411   if ((p->running & MZTHREAD_USER_SUSPENDED)
412       || scheme_main_was_once_suspended)
413     return 1;
414 
415   return 0;
416 }
417 
get_into_line(Scheme_Sema * sema,Scheme_Channel_Syncer * w)418 static void get_into_line(Scheme_Sema *sema, Scheme_Channel_Syncer *w)
419   /* Can be called multiple times. */
420 {
421   Scheme_Channel_Syncer *last, *first;
422 
423   w->in_line = 1;
424   w->picked = 0;
425 
426   if (SAME_TYPE(SCHEME_TYPE(sema), scheme_never_evt_type)) {
427     return; /* !!!! skip everything else */
428   } else if (SCHEME_SEMAP(sema)) {
429     last = sema->last;
430     first = sema->first;
431   } else if (SCHEME_CHANNELP(sema)) {
432     last = ((Scheme_Channel *)sema)->get_last;
433     first = ((Scheme_Channel *)sema)->get_first;
434   } else {
435     last = ((Scheme_Channel_Put *)sema)->ch->put_last;
436     first = ((Scheme_Channel_Put *)sema)->ch->put_first;
437   }
438 
439   w->prev = last;
440   if (last)
441     last->next = w;
442   else
443     first = w;
444   last = w;
445   w->next = NULL;
446 
447   if (SCHEME_SEMAP(sema)) {
448     sema->last = last;
449     sema->first = first;
450   } else if (SCHEME_CHANNELP(sema)) {
451     ((Scheme_Channel *)sema)->get_last = last;
452     ((Scheme_Channel *)sema)->get_first = first;
453   } else {
454     ((Scheme_Channel_Put *)sema)->ch->put_last = last;
455     ((Scheme_Channel_Put *)sema)->ch->put_first = first;
456   }
457 }
458 
get_outof_line(Scheme_Sema * sema,Scheme_Channel_Syncer * w)459 static void get_outof_line(Scheme_Sema *sema, Scheme_Channel_Syncer *w)
460 {
461   Scheme_Channel_Syncer *last, *first;
462 
463   if (!w->in_line)
464     return;
465   w->in_line = 0;
466 
467   if (SAME_TYPE(SCHEME_TYPE(sema), scheme_never_evt_type)) {
468     return; /* !!!! skip everything else */
469   } else if (SCHEME_SEMAP(sema)) {
470     last = sema->last;
471     first = sema->first;
472   } else if (SCHEME_CHANNELP(sema)) {
473     last = ((Scheme_Channel *)sema)->get_last;
474     first = ((Scheme_Channel *)sema)->get_first;
475   } else {
476     last = ((Scheme_Channel_Put *)sema)->ch->put_last;
477     first = ((Scheme_Channel_Put *)sema)->ch->put_first;
478   }
479 
480   if (w->prev)
481     w->prev->next = w->next;
482   else
483     first = w->next;
484   if (w->next)
485     w->next->prev = w->prev;
486   else
487     last = w->prev;
488 
489   if (SCHEME_SEMAP(sema)) {
490     sema->last = last;
491     sema->first = first;
492   } else if (SCHEME_CHANNELP(sema)) {
493     ((Scheme_Channel *)sema)->get_last = last;
494     ((Scheme_Channel *)sema)->get_first = first;
495   } else {
496     ((Scheme_Channel_Put *)sema)->ch->put_last = last;
497     ((Scheme_Channel_Put *)sema)->ch->put_first = first;
498   }
499 }
500 
ext_get_into_line(Scheme_Object * ch,Scheme_Schedule_Info * sinfo)501 static void ext_get_into_line(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
502 {
503   Scheme_Channel_Syncer *w;
504 
505   /* Get into line */
506   w = MALLOC_ONE_RT(Scheme_Channel_Syncer);
507   w->so.type = scheme_channel_syncer_type;
508   if (sinfo->false_positive_ok)
509     w->p = sinfo->false_positive_ok;
510   else
511     w->p = scheme_current_thread;
512   w->syncing = (Syncing *)sinfo->current_syncing;
513   w->obj = ch;
514   w->syncing_i = sinfo->w_i;
515 
516   get_into_line((Scheme_Sema *)ch, w);
517 
518   scheme_set_sync_target(sinfo, (Scheme_Object *)w, NULL, NULL, 0, 0, NULL);
519 }
520 
scheme_get_outof_line(Scheme_Channel_Syncer * ch_w)521 void scheme_get_outof_line(Scheme_Channel_Syncer *ch_w)
522 {
523   get_outof_line((Scheme_Sema *)ch_w->obj, ch_w);
524 }
525 
scheme_get_back_into_line(Scheme_Channel_Syncer * ch_w)526 void scheme_get_back_into_line(Scheme_Channel_Syncer *ch_w)
527 {
528   get_into_line((Scheme_Sema *)ch_w->obj, ch_w);
529 }
530 
try_channel(Scheme_Sema * sema,Syncing * syncing,int pos,Scheme_Object ** result)531 static int try_channel(Scheme_Sema *sema, Syncing *syncing, int pos, Scheme_Object **result)
532 {
533   if (SCHEME_CHANNELP(sema)) {
534     /* GET mode */
535     Scheme_Channel *ch = (Scheme_Channel *)sema;
536     Scheme_Channel_Syncer *w = ch->put_first, *next;
537     int picked = 0;
538 
539     while (w) {
540       if (w->syncing == syncing) {
541 	/* can't synchronize with self */
542 	w = w->next;
543       } else {
544 	Scheme_Channel_Put *chp = (Scheme_Channel_Put *)w->obj;
545 
546         if (!w->syncing->result && !pending_break(w->p)) {
547 	  w->picked = 1;
548 	  w->syncing->result = w->syncing_i + 1;
549 	  if (w->syncing->disable_break)
550 	    w->syncing->disable_break->suspend_break++;
551 	  scheme_post_syncing_nacks(w->syncing);
552 	  if (result)
553 	    *result = chp->val;
554 	  if (syncing && (pos >= 0)) {
555 	    syncing->result = pos + 1;
556 	    if (syncing->disable_break)
557 	      syncing->disable_break->suspend_break++;
558 	    scheme_post_syncing_nacks(syncing);
559 	    syncing->set->argv[pos] = chp->val;
560 	  }
561 	  picked = 1;
562 	  scheme_weak_resume_thread(w->p);
563 	}
564 
565 	next = w->next;
566 	get_outof_line((Scheme_Sema *)chp, w);
567 	w = next;
568 
569 	if (picked)
570 	  return 1;
571       }
572     }
573 
574     return 0;
575   } else {
576     /* PUT mode */
577     Scheme_Channel_Put *chp = (Scheme_Channel_Put *)sema;
578     Scheme_Channel_Syncer *w = chp->ch->get_first, *next;
579     int picked = 0;
580 
581     while (w) {
582       if (w->syncing == syncing) {
583 	/* can't synchronize with self */
584 	w = w->next;
585       } else {
586 	if (!w->syncing->result && !pending_break(w->p)) {
587 	  w->picked = 1;
588 	  w->syncing->set->argv[w->syncing_i] = chp->val;
589 	  w->syncing->result = w->syncing_i + 1;
590 	  if (w->syncing->disable_break)
591 	    w->syncing->disable_break->suspend_break++;
592 	  scheme_post_syncing_nacks(w->syncing);
593 	  if (syncing && (pos >= 0)) {
594 	    syncing->result = pos + 1;
595 	    if (syncing->disable_break)
596 	      syncing->disable_break->suspend_break++;
597 	    scheme_post_syncing_nacks(syncing);
598 	  }
599 	  picked = 1;
600 	  scheme_weak_resume_thread(w->p);
601 	}
602 
603 	next = w->next;
604 	get_outof_line((Scheme_Sema *)chp->ch, w);
605 	w = next;
606 
607 	if (picked)
608 	  return 1;
609       }
610     }
611 
612     return 0;
613   }
614 }
615 
scheme_try_plain_sema(Scheme_Object * o)616 XFORM_NONGCING int scheme_try_plain_sema(Scheme_Object *o)
617 {
618   Scheme_Sema *sema = (Scheme_Sema *)o;
619 
620   if (sema->value) {
621     if (sema->value > 0)
622       --sema->value;
623     return 1;
624   } else
625     return 0;
626 }
627 
scheme_wait_semas_chs(int n,Scheme_Object ** o,int just_try,Syncing * syncing)628 int scheme_wait_semas_chs(int n, Scheme_Object **o, int just_try, Syncing *syncing)
629      /* When syncing is supplied, o can contain Scheme_Channel_Syncer
630 	and never-evt values, and just_try must be 0. */
631 {
632   Scheme_Sema **semas = (Scheme_Sema **)o;
633   int v, i, ii;
634 
635   if (just_try) {
636     /* assert: n == 1, !syncing */
637     Scheme_Sema *sema = semas[0];
638     if (just_try > 0) {
639       if (sema->so.type == scheme_sema_type) {
640         v = scheme_try_plain_sema((Scheme_Object *)sema);
641       } else {
642 	v = try_channel(sema, syncing, 0, NULL);
643       }
644     } else {
645       Scheme_Cont_Frame_Data cframe;
646 
647       scheme_push_break_enable(&cframe, 1, 1);
648 
649       scheme_wait_sema((Scheme_Object *)sema, 0);
650 
651       scheme_pop_break_enable(&cframe, 0);
652 
653       return 1;
654     }
655   } else {
656     int start_pos;
657 
658 #if 0
659     /* Use the "immutable" flag bit on a semaphore to check for
660        inconsistent use in atomic and non-atomic modes, which
661        can lead to an attempt to suspend in atomic mode. */
662     if ((n == 1) && SCHEME_SEMAP(o[0])) {
663       if (!do_atomic) {
664         SCHEME_SET_IMMUTABLE(o[0]);
665       } else if (SCHEME_IMMUTABLEP(o[0])) {
666         if (!on_atomic_timeout
667             || (do_atomic > atomic_timeout_atomic_level)) {
668           scheme_signal_error("using a seaphore in both atomic and non-atomic mode");
669         }
670       }
671     }
672 #endif
673 
674     if (n > 1) {
675       if (syncing)
676 	start_pos = syncing->start_pos;
677       else {
678 	Scheme_Object *rand_state;
679 	rand_state = scheme_get_param(scheme_current_config(), MZCONFIG_SCHEDULER_RANDOM_STATE);
680 	start_pos = scheme_rand((Scheme_Random_State *)rand_state);
681       }
682     } else
683       start_pos = 0;
684 
685     /* Initial poll */
686     while (1) {
687       i = 0;
688       for (ii = 0; ii < n; ii++) {
689         /* Randomized start position for poll ensures fairness: */
690         i = (start_pos + ii) % n;
691 
692         if (semas[i]->so.type == scheme_sema_type) {
693           if (semas[i]->value) {
694             if ((semas[i]->value > 0) && (!syncing || !syncing->reposts || !syncing->reposts[i]))
695               --semas[i]->value;
696             if (syncing) {
697 	      syncing->result = i + 1;
698 	      if (syncing->accepts && syncing->accepts[i])
699 		scheme_accept_sync(syncing, i);
700 	    }
701             break;
702           }
703         } else if (semas[i]->so.type == scheme_never_evt_type) {
704           /* Never ready. */
705         } else if (semas[i]->so.type == scheme_channel_syncer_type) {
706           if (((Scheme_Channel_Syncer *)semas[i])->picked)
707             break;
708         } else if (try_channel(semas[i], syncing, i, NULL))
709           break;
710       }
711 
712       if (ii >= n) {
713         if (!scheme_wait_until_suspend_ok()) {
714           break;
715         } else {
716           /* there may have been some action on one of the waitables;
717              try again, if no result, yet */
718           if (syncing && syncing->result) {
719             i = syncing->result - 1;
720             ii = 0;
721             break;
722           }
723         }
724       } else
725         break;
726     }
727 
728     /* In the following, syncers get changed back to channels,
729        and channel puts */
730     if (ii >= n) {
731       Scheme_Channel_Syncer **ws, *w;
732 
733       ws = MALLOC_N(Scheme_Channel_Syncer*, n);
734       for (i = 0; i < n; i++) {
735 	if (semas[i]->so.type == scheme_channel_syncer_type) {
736 	  ws[i] = (Scheme_Channel_Syncer *)semas[i];
737 	  semas[i] = (Scheme_Sema *)ws[i]->obj;
738 	} else {
739 	  w = MALLOC_ONE_RT(Scheme_Channel_Syncer);
740 	  ws[i] = w;
741 	  w->so.type = scheme_channel_syncer_type;
742 	  w->p = scheme_current_thread;
743 	  w->syncing = syncing;
744 	  w->obj = (Scheme_Object *)semas[i];
745 	  w->syncing_i = i;
746 	}
747       }
748 
749       while (1) {
750 	int out_of_a_line;
751 
752 	/* Get into line */
753 	for (i = 0; i < n; i++) {
754 	  if (!ws[i]->in_line) {
755 	    get_into_line(semas[i], ws[i]);
756 	  }
757 	}
758 
759 	if (!scheme_current_thread->next) {
760 	  void **a;
761 
762 	  /* We're not allowed to suspend the main thread. Delay
763 	     breaks so we get a chance to clean up. */
764 	  scheme_current_thread->suspend_break++;
765 
766 	  a = MALLOC_N(void*, 3);
767 	  a[0] = scheme_make_integer(n);
768 	  a[1] = ws;
769 	  a[2] = scheme_current_thread;
770 
771 	  scheme_main_was_once_suspended = 0;
772 
773 	  scheme_block_until(out_of_line, NULL, (Scheme_Object *)a, (float)0.0);
774 
775 	  --scheme_current_thread->suspend_break;
776 	} else {
777 	  /* Mark the thread to indicate that we need to clean up
778 	     if the thread is killed. */
779 	  int old_nkc;
780 	  old_nkc = (scheme_current_thread->running & MZTHREAD_NEED_KILL_CLEANUP);
781 	  if (!old_nkc)
782 	    scheme_current_thread->running += MZTHREAD_NEED_KILL_CLEANUP;
783 	  scheme_weak_suspend_thread(scheme_current_thread);
784 	  if (!old_nkc && (scheme_current_thread->running & MZTHREAD_NEED_KILL_CLEANUP))
785 	    scheme_current_thread->running -= MZTHREAD_NEED_KILL_CLEANUP;
786 	}
787 
788 	/* We've been resumed. But was it for the semaphore, or a signal? */
789 	out_of_a_line = 0;
790 
791 	/* If we get the post, we must return WITHOUT BLOCKING.
792 	   GRacket, for example, depends on this special property, which
793 	   ensures that the thread can't be broken or killed between
794 	   receiving the post and returning. */
795 
796 	if (!syncing) {
797 	  /* Poster can't be sure that we really will get it,
798 	     so we have to decrement the sema count here. */
799 	  i = 0;
800 	  for (ii = 0; ii < n; ii++) {
801 	    i = (start_pos + ii) % n;
802 	    if (ws[i]->picked) {
803 	      out_of_a_line = 1;
804 	      if (semas[i]->value) {
805 		if (semas[i]->value > 0)
806 		  --(semas[i]->value);
807 		break;
808 	      }
809 	    }
810 	  }
811 	  if (ii >= n)
812 	    i = n;
813 	} else {
814 	  if (syncing->result) {
815 	    out_of_a_line = 1;
816 	    i = syncing->result - 1;
817 	  } else {
818 	    out_of_a_line = 0;
819 	    i = n;
820 	  }
821 	}
822 
823 	if (!out_of_a_line) {
824 	  /* We weren't woken by any semaphore/channel. Get out of line, block once
825 	     (to handle breaks/kills) and then loop to get back into line. */
826 	  for (i = 0; i < n; i++) {
827 	    if (ws[i]->in_line)
828 	      get_outof_line(semas[i], ws[i]);
829 	  }
830 
831 	  scheme_thread_block(0); /* ok if it returns multiple times */
832 	  scheme_current_thread->ran_some = 1;
833 	  /* [but why would it return multiple times?! there must have been a reason...] */
834 	} else {
835 
836 	  if ((scheme_current_thread->running & MZTHREAD_KILLED)
837 	      || ((scheme_current_thread->running & MZTHREAD_USER_SUSPENDED)
838 		  && !(scheme_current_thread->running & MZTHREAD_NEED_SUSPEND_CLEANUP))) {
839 	    /* We've been killed or suspended! */
840 	    i = -1;
841 	  }
842 
843 	  /* We got a post from semas[i], or we were killed.
844 	     Did any (other) semaphore pick us?
845 	     (This only happens when syncing == NULL.) */
846 	  if (!syncing) {
847 	    int j;
848 
849 	    for (j = 0; j < n; j++) {
850 	      if (j != i) {
851 		if (ws[j]->picked) {
852 		  if (semas[j]->value) {
853 		    /* Consume the value and repost, because no one else
854 		       has been told to go, and we're accepting a different post. */
855 		    if (semas[j]->value > 0)
856 		      --semas[j]->value;
857 		    scheme_post_sema((Scheme_Object *)semas[j]);
858 		  }
859 		}
860 	      }
861 	    }
862 	  }
863 
864 	  /* If we're done, get out of all lines that we're still in. */
865 	  if (i < n) {
866 	    int j;
867 	    for (j = 0; j < n; j++) {
868 	      if (ws[j]->in_line)
869 		get_outof_line(semas[j], ws[j]);
870 	    }
871 	  }
872 
873 	  if (i == -1) {
874 	    scheme_thread_block(0); /* dies or suspends */
875 	    scheme_current_thread->ran_some = 1;
876 	  }
877 
878 	  if (i < n)
879 	    break;
880 	}
881 
882 	/* Otherwise: !syncing and someone stole the post, or we were
883 	   suspended and we have to start over. Either way, poll then
884 	   loop to get back in line an try again. */
885 	for (ii = 0; ii < n; ii++) {
886 	  i = (start_pos + ii) % n;
887 
888 	  if (semas[i]->so.type == scheme_sema_type) {
889 	    if (semas[i]->value) {
890 	      if ((semas[i]->value > 0) && (!syncing || !syncing->reposts || !syncing->reposts[i]))
891 		--semas[i]->value;
892               if (syncing) {
893                 syncing->result = i + 1;
894                 if (syncing->accepts && syncing->accepts[i])
895                   scheme_accept_sync(syncing, i);
896               }
897 	      break;
898 	    }
899 	  }  else if (semas[i]->so.type == scheme_never_evt_type) {
900 	    /* Never ready. */
901 	  } else if (try_channel(semas[i], syncing, i, NULL))
902 	    break;
903 	}
904 
905 	if (ii < n) {
906 	  /* Get out of any line that we still might be in: */
907 	  int j;
908 	  for (j = 0; j < n; j++) {
909 	    if (ws[j]->in_line)
910 	      get_outof_line(semas[j], ws[j]);
911 	  }
912 
913 	  break;
914 	}
915 
916 	if (!syncing) {
917 	  /* Looks like this thread is a victim of unfair semaphore access.
918 	     Go into fair mode by allocating a syncing: */
919 	  syncing = MALLOC_ONE_RT(Syncing);
920 #ifdef MZTAG_REQUIRED
921 	  syncing->type = scheme_rt_syncing;
922 #endif
923 	  syncing->start_pos = start_pos;
924 
925 	  /* Get out of all lines, and set syncing field before we get back in line: */
926 	  {
927 	    int j;
928 	    for (j = 0; j < n; j++) {
929 	      if (ws[j]->in_line)
930 		get_outof_line(semas[j], ws[j]);
931 	      ws[j]->syncing = syncing;
932 	    }
933 	  }
934 	}
935 	/* Back to top of loop to sync again */
936       }
937     }
938     v = i + 1;
939   }
940 
941   return v;
942 }
943 
slow_wait_sema(Scheme_Object * o,int just_try)944 static int slow_wait_sema(Scheme_Object *o, int just_try)
945 {
946   Scheme_Object *a[1];
947 
948   a[0] = o;
949 
950   return scheme_wait_semas_chs(1, a, just_try, NULL);
951 }
952 
scheme_wait_sema(Scheme_Object * o,int just_try)953 int scheme_wait_sema(Scheme_Object *o, int just_try)
954 {
955   /* fast path is designed to avoid need for XFORM */
956   if (((just_try >= 0) || !scheme_current_thread->external_break)
957       && scheme_try_plain_sema(o))
958     return 1;
959 
960   return slow_wait_sema(o, just_try);
961 }
962 
block_sema_p(int n,Scheme_Object ** p)963 static Scheme_Object *block_sema_p(int n, Scheme_Object **p)
964 {
965   if (!SCHEME_SEMAP(p[0]))
966     scheme_wrong_contract("semaphore-try-wait?", "semaphore?", 0, n, p);
967 
968   return scheme_wait_sema(p[0], 1) ? scheme_true : scheme_false;
969 }
970 
block_sema(int n,Scheme_Object ** p)971 static Scheme_Object *block_sema(int n, Scheme_Object **p)
972 {
973   if (!SCHEME_SEMAP(p[0]))
974     scheme_wrong_contract("semaphore-wait", "semaphore?", 0, n, p);
975 
976   scheme_wait_sema(p[0], 0);
977 
978   /* In case a break appeared after we received the post,
979      check for a break, because scheme_wait_sema() won't: */
980   scheme_check_break_now();
981 
982   return scheme_void;
983 }
984 
block_sema_breakable(int n,Scheme_Object ** p)985 static Scheme_Object *block_sema_breakable(int n, Scheme_Object **p)
986 {
987   if (!SCHEME_SEMAP(p[0]))
988     scheme_wrong_contract("semaphore-wait/enable-break", "semaphore?", 0, n, p);
989 
990   scheme_wait_sema(p[0], -1);
991 
992   return scheme_void;
993 }
994 
pending_break(Scheme_Thread * p)995 static int pending_break(Scheme_Thread *p)
996 {
997   if (p->running & (MZTHREAD_KILLED | MZTHREAD_USER_SUSPENDED))
998     return 1;
999 
1000   if (p->external_break) {
1001     int v;
1002 
1003     if (!p->next) {
1004       /* if p is the main thread, it must have a suspension
1005 	 to block on a channel or semaphore: */
1006       --p->suspend_break;
1007     }
1008 
1009     v = scheme_can_break(p);
1010 
1011     if (!p->next)
1012       p->suspend_break++;
1013 
1014     return v;
1015   }
1016 
1017   return 0;
1018 }
1019 
1020 /**********************************************************************/
1021 /*                            Channels                                */
1022 /**********************************************************************/
1023 
scheme_make_channel()1024 Scheme_Object *scheme_make_channel()
1025 {
1026   Scheme_Channel *c;
1027 
1028   c = MALLOC_ONE_TAGGED(Scheme_Channel);
1029   c->so.type = scheme_channel_type;
1030 
1031   return (Scheme_Object *)c;
1032 }
1033 
make_channel(int n,Scheme_Object ** p)1034 static Scheme_Object *make_channel(int n, Scheme_Object **p)
1035 {
1036   return scheme_make_channel();
1037 }
1038 
scheme_make_channel_put_evt(Scheme_Object * ch,Scheme_Object * v)1039 Scheme_Object *scheme_make_channel_put_evt(Scheme_Object *ch, Scheme_Object *v)
1040 {
1041   Scheme_Channel_Put *cp;
1042 
1043   cp = MALLOC_ONE_TAGGED(Scheme_Channel_Put);
1044   cp->so.type = scheme_channel_put_type;
1045   cp->ch = (Scheme_Channel *)ch;
1046   cp->val = v;
1047 
1048   return (Scheme_Object *)cp;
1049 }
1050 
scheme_try_channel_put(Scheme_Object * ch,Scheme_Object * v)1051 int scheme_try_channel_put(Scheme_Object *ch, Scheme_Object *v)
1052 {
1053   if (((Scheme_Channel *)ch)->get_first) {
1054     Scheme_Object *a[2];
1055     v = scheme_make_channel_put_evt(ch, v);
1056     a[0] = scheme_make_integer(0);
1057     a[1] = v;
1058     v = scheme_sync_timeout(2, a);
1059     return SCHEME_TRUEP(v);
1060   } else
1061     return 0;
1062 }
1063 
chaperone_put(Scheme_Object * obj,Scheme_Object * orig)1064 Scheme_Object *chaperone_put(Scheme_Object *obj, Scheme_Object *orig)
1065 {
1066   Scheme_Chaperone *px = (Scheme_Chaperone *)obj;
1067   Scheme_Object *val = orig;
1068   Scheme_Object *a[2];
1069   Scheme_Object *redirect;
1070 
1071   while (1) {
1072     if (SCHEME_CHANNELP(px)) {
1073       return val;
1074     } else if (!(SAME_TYPE(SCHEME_TYPE(px->redirects), scheme_nack_guard_evt_type))) {
1075       a[0] = px->prev;
1076       a[1] = val;
1077       redirect = px->redirects;
1078       val = _scheme_apply(redirect, 2, a);
1079 
1080       if (!(SCHEME_CHAPERONE_FLAGS(px) & SCHEME_CHAPERONE_IS_IMPERSONATOR))
1081         if (!scheme_chaperone_of(val, orig))
1082           scheme_wrong_chaperoned("channel-put", "result", orig, val);
1083 
1084       px = (Scheme_Chaperone *)px->prev;
1085     } else {
1086       /* In this case, the `px` is actually an evt chaperone so we
1087          don't want to handle it here since we're doing a "put" */
1088       px = (Scheme_Chaperone *)px->prev;
1089     }
1090   }
1091 
1092   return obj;
1093 }
1094 
make_channel_put(int argc,Scheme_Object ** argv)1095 static Scheme_Object *make_channel_put(int argc, Scheme_Object **argv)
1096 {
1097   Scheme_Object *ch = argv[0];
1098   Scheme_Object *val = argv[1];
1099   Scheme_Object *chaperone = NULL;
1100 
1101   if (SCHEME_NP_CHAPERONEP(ch)
1102       && SCHEME_CHANNELP(SCHEME_CHAPERONE_VAL(ch))) {
1103     chaperone = ch;
1104     ch = SCHEME_CHAPERONE_VAL(chaperone);
1105   } else if (!SCHEME_CHANNELP(argv[0])) {
1106     scheme_wrong_contract("channel-put-evt", "channel?", 0, argc, argv);
1107   }
1108 
1109   if (chaperone)
1110     val = chaperone_put(chaperone, argv[1]);
1111 
1112   return scheme_make_channel_put_evt(ch, val);
1113 }
1114 
channel_p(int n,Scheme_Object ** p)1115 static Scheme_Object *channel_p(int n, Scheme_Object **p)
1116 {
1117   return ((SCHEME_CHANNELP(p[0]) ||
1118            (SCHEME_NP_CHAPERONEP(p[0])
1119             && SCHEME_CHANNELP(SCHEME_CHAPERONE_VAL(p[0]))))
1120 	  ? scheme_true
1121 	  : scheme_false);
1122 }
1123 
channel_put_p(int n,Scheme_Object ** p)1124 static Scheme_Object *channel_put_p(int n, Scheme_Object **p)
1125 {
1126   return (SAME_TYPE(SCHEME_TYPE(p[0]), scheme_channel_put_type)
1127 	  ? scheme_true
1128 	  : scheme_false);
1129 }
1130 
channel_get_ready(Scheme_Object * ch,Scheme_Schedule_Info * sinfo)1131 static int channel_get_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
1132 {
1133   Scheme_Object *result;
1134 
1135   if (try_channel((Scheme_Sema *)ch, (Syncing *)sinfo->current_syncing, -1, &result)) {
1136     scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 0, NULL);
1137     return 1;
1138   }
1139 
1140   ext_get_into_line(ch, sinfo);
1141 
1142   return 0;
1143 }
1144 
channel_put_ready(Scheme_Object * ch,Scheme_Schedule_Info * sinfo)1145 static int channel_put_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
1146 {
1147   if (try_channel((Scheme_Sema *)ch, (Syncing *)sinfo->current_syncing, -1, NULL))
1148     return 1;
1149 
1150   ext_get_into_line(ch, sinfo);
1151 
1152   return 0;
1153 }
1154 
channel_syncer_ready(Scheme_Object * ch_w,Scheme_Schedule_Info * sinfo)1155 static int channel_syncer_ready(Scheme_Object *ch_w, Scheme_Schedule_Info *sinfo)
1156 {
1157   Scheme_Channel_Syncer *w = (Scheme_Channel_Syncer *)ch_w;
1158 
1159   if (w->picked) {
1160     /* The value, if any, should have been tranferred already (in which
1161        case we would not have made it here, actually). */
1162     return 1;
1163   }
1164 
1165   return 0;
1166 }
1167 
scheme_try_channel_get(Scheme_Object * ch)1168 int scheme_try_channel_get(Scheme_Object *ch)
1169 {
1170   if (try_channel((Scheme_Sema *)ch, NULL, -1, NULL)) {
1171     return 1;
1172   }
1173   return 0;
1174 }
1175 
1176 /* This chaperone only protects the "put" end of the channel because
1177    chaperone-evt is sufficient to protect the "get" end. Thus, it first
1178    wraps the object in an evt chaperone. */
do_chaperone_channel(const char * name,int is_impersonator,int argc,Scheme_Object ** argv)1179 Scheme_Object *do_chaperone_channel(const char *name, int is_impersonator, int argc, Scheme_Object **argv)
1180 {
1181   Scheme_Chaperone *px;
1182   Scheme_Object *val = argv[0];
1183   Scheme_Object *evt;
1184   Scheme_Object *props;
1185 
1186   if (SCHEME_CHAPERONEP(val))
1187     val = SCHEME_CHAPERONE_VAL(val);
1188 
1189   if (!SCHEME_CHANNELP(val))
1190     scheme_wrong_contract(name, "channel?", 0, argc, argv);
1191   scheme_check_proc_arity(name, 1, 1, argc, argv);
1192   scheme_check_proc_arity(name, 2, 2, argc, argv);
1193 
1194   evt = scheme_do_chaperone_evt(name, is_impersonator, 2, argv);
1195 
1196   props = scheme_parse_chaperone_props(name, 3, argc, argv);
1197 
1198   px = MALLOC_ONE_TAGGED(Scheme_Chaperone);
1199   px->iso.so.type = scheme_chaperone_type;
1200   px->val = val;
1201   px->prev = evt;
1202   px->props = props;
1203   px->redirects = argv[2];
1204 
1205   if (is_impersonator)
1206     SCHEME_CHAPERONE_FLAGS(px) |= SCHEME_CHAPERONE_IS_IMPERSONATOR;
1207 
1208   return (Scheme_Object *)px;
1209 }
1210 
chaperone_channel(int argc,Scheme_Object ** argv)1211 static Scheme_Object *chaperone_channel(int argc, Scheme_Object **argv)
1212 {
1213   return do_chaperone_channel("chaperone-channel", 0, argc, argv);
1214 }
1215 
impersonate_channel(int argc,Scheme_Object ** argv)1216 static Scheme_Object *impersonate_channel(int argc, Scheme_Object **argv)
1217 {
1218   return do_chaperone_channel("impersonator-channel", 1, argc, argv);
1219 }
1220 
1221 /**********************************************************************/
1222 /*                           Thread mbox                              */
1223 /**********************************************************************/
1224 
make_mbox_sema(Scheme_Thread * p)1225 static void make_mbox_sema(Scheme_Thread *p)
1226 {
1227   if (!p->mbox_sema) {
1228     Scheme_Object *sema = NULL;
1229     sema = scheme_make_sema(0);
1230     p->mbox_sema = sema;
1231   }
1232 }
1233 
mbox_push(Scheme_Thread * p,Scheme_Object * o)1234 static void mbox_push(Scheme_Thread *p, Scheme_Object *o)
1235 {
1236   Scheme_Object *next;
1237 
1238   next = scheme_make_raw_pair(o, NULL);
1239 
1240   if (p->mbox_first) {
1241     SCHEME_CDR(p->mbox_last) = next;
1242     p->mbox_last = next;
1243   } else {
1244     p->mbox_first = next;
1245     p->mbox_last = next;
1246   }
1247 
1248   make_mbox_sema(p);
1249   scheme_post_sema(p->mbox_sema);
1250   /* Post can't overflow the semaphore, because we'd run out of
1251      memory for the queue, first. */
1252 }
1253 
mbox_push_front(Scheme_Thread * p,Scheme_Object * lst)1254 static void mbox_push_front(Scheme_Thread *p, Scheme_Object *lst)
1255 {
1256   int cnt = -1;
1257   Scheme_Object *next, *hd;
1258 
1259   make_mbox_sema(p);
1260 
1261   next = lst;
1262   while (!SCHEME_NULLP(next)) {
1263     /* Push one: */
1264     hd = scheme_make_raw_pair(SCHEME_CAR(next), p->mbox_first);
1265     if (!p->mbox_first)
1266       p->mbox_last = hd;
1267     p->mbox_first = hd;
1268 
1269     ++cnt;
1270     next = SCHEME_CDR(next);
1271 
1272     if (SCHEME_NULLP(next) || (cnt == 256)) {
1273       /* Either done or need to pause to allow breaks/swaps; */
1274       /* do a single post for all messages so far. */
1275       ((Scheme_Sema*)p->mbox_sema)->value += cnt;
1276       scheme_post_sema(p->mbox_sema);
1277       SCHEME_USE_FUEL(cnt+1); /* might sleep */
1278       cnt = -1;
1279     }
1280   }
1281 }
1282 
mbox_pop(Scheme_Thread * p,int dec)1283 static Scheme_Object *mbox_pop( Scheme_Thread *p, int dec)
1284 {
1285   /* Assertion: mbox_first != NULL */
1286   Scheme_Object *r = NULL;
1287 
1288   r = SCHEME_CAR(p->mbox_first);
1289   p->mbox_first = SCHEME_CDR(p->mbox_first);
1290   if (!p->mbox_first)
1291     p->mbox_last = NULL;
1292 
1293   if (dec)
1294     scheme_try_plain_sema(p->mbox_sema);
1295 
1296   return r;
1297 }
1298 
thread_send(int argc,Scheme_Object ** argv)1299 static Scheme_Object *thread_send(int argc, Scheme_Object **argv)
1300 {
1301   if (SCHEME_THREADP(argv[0])) {
1302     int running;
1303 
1304     if (argc > 2) {
1305       if (!SCHEME_FALSEP(argv[2])) /* redundant, but keeps it fast as possible */
1306         scheme_check_proc_arity2("thread-send", 0, 2, argc, argv, 1);
1307     }
1308 
1309     running = ((Scheme_Thread*)argv[0])->running;
1310     if (MZTHREAD_STILL_RUNNING(running)) {
1311       mbox_push((Scheme_Thread*)argv[0], argv[1]);
1312       return scheme_void;
1313     } else {
1314       if (argc > 2) {
1315         if (SCHEME_FALSEP(argv[2]))
1316           return scheme_false;
1317         else
1318           return _scheme_tail_apply(argv[2], 0, NULL);
1319       } else
1320         scheme_raise_exn(MZEXN_FAIL_CONTRACT, "thread-send: target thread is not running");
1321     }
1322   } else
1323     scheme_wrong_contract("thread-send", "thread?", 0, argc, argv);
1324 
1325   return NULL;
1326 }
1327 
thread_receive(int argc,Scheme_Object ** argv)1328 static Scheme_Object *thread_receive(int argc, Scheme_Object **argv)
1329 {
1330   /* The mbox semaphore can only be downed by the current thread, so
1331      receive/try-receive can directly dec+pop without syncing
1332      (by calling mbox_pop with dec=1). */
1333   if (scheme_current_thread->mbox_first) {
1334     return mbox_pop(scheme_current_thread, 1);
1335   } else {
1336     Scheme_Object *v;
1337     Scheme_Thread *p = scheme_current_thread;
1338 
1339     make_mbox_sema(p);
1340 
1341     scheme_wait_sema(p->mbox_sema, 0);
1342     /* We're relying on atomicity of return after wait succeeds to ensure
1343        that a semaphore wait guarantees a mailbox dequeue. */
1344     v = mbox_pop(p, 0);
1345 
1346     /* Due to that atomicity, though, we're obliged to check for
1347        a break at this point: */
1348     scheme_check_break_now();
1349 
1350     return v;
1351   }
1352 }
1353 
thread_try_receive(int argc,Scheme_Object ** argv)1354 static Scheme_Object *thread_try_receive(int argc, Scheme_Object **argv)
1355 {
1356   if (scheme_current_thread->mbox_first)
1357     return mbox_pop(scheme_current_thread, 1);
1358   else
1359     return scheme_false;
1360 }
1361 
thread_receive_evt(int argc,Scheme_Object ** argv)1362 static Scheme_Object *thread_receive_evt(int argc, Scheme_Object **argv)
1363 {
1364   return thread_recv_evt;
1365 }
1366 
thread_recv_ready(Scheme_Object * ch,Scheme_Schedule_Info * sinfo)1367 static int thread_recv_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
1368 {
1369   Scheme_Thread *p;
1370 
1371   p = sinfo->false_positive_ok;
1372   if (!p)
1373     p = scheme_current_thread;
1374 
1375   make_mbox_sema(p);
1376 
1377   scheme_set_sync_target(sinfo, p->mbox_sema, thread_recv_evt, NULL, 1, 1, NULL);
1378 
1379   return 0;
1380 }
1381 
thread_rewind_receive(int argc,Scheme_Object ** argv)1382 static Scheme_Object *thread_rewind_receive(int argc, Scheme_Object **argv)
1383 {
1384   if (scheme_is_list(argv[0])) {
1385     mbox_push_front(scheme_current_thread, argv[0]);
1386     return scheme_void;
1387   } else
1388     scheme_wrong_contract("thread-rewind-receive", "list?", 0, argc, argv);
1389 
1390   return NULL;
1391 }
1392 
1393 /**********************************************************************/
1394 /*                             alarms                                 */
1395 /**********************************************************************/
1396 
make_alarm(int argc,Scheme_Object ** argv)1397 static Scheme_Object *make_alarm(int argc, Scheme_Object **argv)
1398 {
1399   Scheme_Alarm *a;
1400   double sleep_end;
1401 
1402   if (!SCHEME_REALP(argv[0])) {
1403     scheme_wrong_contract("alarm-evt", "real?", 0, argc, argv);
1404   }
1405 
1406   sleep_end = scheme_get_val_as_double(argv[0]);
1407 
1408   a = MALLOC_ONE_TAGGED(Scheme_Alarm);
1409   a->so.type = scheme_alarm_type;
1410   a->sleep_end = sleep_end;
1411 
1412   return (Scheme_Object *)a;
1413 }
1414 
alarm_ready(Scheme_Object * _a,Scheme_Schedule_Info * sinfo)1415 static int alarm_ready(Scheme_Object *_a, Scheme_Schedule_Info *sinfo)
1416 {
1417   Scheme_Alarm *a = (Scheme_Alarm *)_a;
1418 
1419   if (!sinfo->sleep_end
1420       || (sinfo->sleep_end > a->sleep_end))
1421     sinfo->sleep_end = a->sleep_end;
1422 
1423   if (a->sleep_end <= scheme_get_inexact_milliseconds())
1424     return 1;
1425 
1426   return 0;
1427 }
1428 
always_ready(Scheme_Object * w)1429 static int always_ready(Scheme_Object *w)
1430 {
1431   return 1;
1432 }
1433 
never_ready(Scheme_Object * w)1434 static int never_ready(Scheme_Object *w)
1435 {
1436   return 0;
1437 }
1438 
make_sys_idle(int n,Scheme_Object ** p)1439 static Scheme_Object *make_sys_idle(int n, Scheme_Object **p)
1440 {
1441   if (!system_idle_put_evt) {
1442     Scheme_Object *a[2];
1443     REGISTER_SO(system_idle_put_evt);
1444     system_idle_put_evt = scheme_make_channel_put_evt(scheme_system_idle_channel,
1445                                                       scheme_void);
1446     a[0] = system_idle_put_evt;
1447     a[1] = scheme_void_proc;
1448     system_idle_put_evt = scheme_wrap_evt(2, a);
1449   }
1450 
1451   return system_idle_put_evt;
1452 }
1453 
1454 /**********************************************************************/
1455 /*                           Precise GC                               */
1456 /**********************************************************************/
1457 
1458 #ifdef MZ_PRECISE_GC
1459 
1460 START_XFORM_SKIP;
1461 
1462 #include "mzmark_sema.inc"
1463 
register_traversers(void)1464 static void register_traversers(void)
1465 {
1466   GC_REG_TRAV(scheme_alarm_type, mark_alarm);
1467   GC_REG_TRAV(scheme_channel_syncer_type, mark_channel_syncer);
1468 }
1469 
1470 END_XFORM_SKIP;
1471 
1472 #endif
1473 
1474 #endif /* NO_SCHEME_THREADS */
1475