1 #include "schpriv.h"
2
3 #ifndef NO_SCHEME_THREADS
4
5 READ_ONLY Scheme_Object *scheme_always_ready_evt;
6 THREAD_LOCAL_DECL(Scheme_Object *scheme_system_idle_channel);
7
8 static Scheme_Object *make_sema(int n, Scheme_Object **p);
9 static Scheme_Object *semap(int n, Scheme_Object **p);
10 static Scheme_Object *hit_sema(int n, Scheme_Object **p);
11 static Scheme_Object *block_sema_p(int n, Scheme_Object **p);
12 static Scheme_Object *block_sema(int n, Scheme_Object **p);
13 static Scheme_Object *block_sema_breakable(int n, Scheme_Object **p);
14 static Scheme_Object *make_sema_repost(int n, Scheme_Object **p);
15 static Scheme_Object *is_sema_repost(int n, Scheme_Object **p);
16
17 static Scheme_Object *make_channel(int n, Scheme_Object **p);
18 static Scheme_Object *make_channel_put(int n, Scheme_Object **p);
19 static Scheme_Object *channel_p(int n, Scheme_Object **p);
20 static Scheme_Object *channel_put_p(int n, Scheme_Object **p);
21 static Scheme_Object *chaperone_channel(int argc, Scheme_Object *argv[]);
22 static Scheme_Object *impersonate_channel(int argc, Scheme_Object *argv[]);
23
24 static Scheme_Object *thread_send(int n, Scheme_Object **p);
25 static Scheme_Object *thread_receive(int n, Scheme_Object **p);
26 static Scheme_Object *thread_try_receive(int n, Scheme_Object **p);
27 static Scheme_Object *thread_receive_evt(int n, Scheme_Object **p);
28 static Scheme_Object *thread_rewind_receive(int n, Scheme_Object **p);
29
30 static Scheme_Object *make_alarm(int n, Scheme_Object **p);
31 static Scheme_Object *make_sys_idle(int n, Scheme_Object **p);
32
33 static int channel_get_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
34 static int channel_put_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
35 static int channel_syncer_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
36 static int alarm_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
37 static int always_ready(Scheme_Object *w);
38 static int never_ready(Scheme_Object *w);
39 static int thread_recv_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
40
41 static int pending_break(Scheme_Thread *p);
42
43 THREAD_LOCAL_DECL(int scheme_main_was_once_suspended);
44 THREAD_LOCAL_DECL(static Scheme_Object *system_idle_put_evt);
45 READ_ONLY static Scheme_Object *thread_recv_evt;
46
47 #ifdef MZ_PRECISE_GC
48 static void register_traversers(void);
49 #endif
50
51 typedef struct {
52 Scheme_Object so;
53 double sleep_end;
54 } Scheme_Alarm;
55
56 /* For object-sync: */
sema_ready(Scheme_Object * s)57 static int sema_ready(Scheme_Object *s)
58 {
59 return scheme_wait_sema(s, 1);
60 }
61
sema_for_repost(Scheme_Object * s,int * repost)62 static Scheme_Object *sema_for_repost(Scheme_Object *s, int *repost)
63 {
64 *repost = 1;
65 return SCHEME_PTR_VAL(s);
66 }
67
scheme_init_sema(Scheme_Startup_Env * env)68 void scheme_init_sema(Scheme_Startup_Env *env)
69 {
70 Scheme_Object *o;
71
72 #ifdef MZ_PRECISE_GC
73 register_traversers();
74 #endif
75
76 scheme_addto_prim_instance("make-semaphore",
77 scheme_make_prim_w_arity(make_sema,
78 "make-semaphore",
79 0, 1),
80 env);
81 scheme_addto_prim_instance("semaphore?",
82 scheme_make_folding_prim(semap,
83 "semaphore?",
84 1, 1, 1),
85 env);
86 scheme_addto_prim_instance("semaphore-post",
87 scheme_make_prim_w_arity(hit_sema,
88 "semaphore-post",
89 1, 1),
90 env);
91 scheme_addto_prim_instance("semaphore-try-wait?",
92 scheme_make_prim_w_arity(block_sema_p,
93 "semaphore-try-wait?",
94 1, 1),
95 env);
96 scheme_addto_prim_instance("semaphore-wait",
97 scheme_make_prim_w_arity(block_sema,
98 "semaphore-wait",
99 1, 1),
100 env);
101 scheme_addto_prim_instance("semaphore-wait/enable-break",
102 scheme_make_prim_w_arity(block_sema_breakable,
103 "semaphore-wait/enable-break",
104 1, 1),
105 env);
106
107 scheme_addto_prim_instance("semaphore-peek-evt",
108 scheme_make_prim_w_arity(make_sema_repost,
109 "semaphore-peek-evt",
110 1, 1),
111 env);
112 scheme_addto_prim_instance("semaphore-peek-evt?",
113 scheme_make_folding_prim(is_sema_repost,
114 "semaphore-peek-evt?",
115 1, 1, 1),
116 env);
117
118 scheme_addto_prim_instance("make-channel",
119 scheme_make_prim_w_arity(make_channel,
120 "make-channel",
121 0, 0),
122 env);
123 scheme_addto_prim_instance("channel-put-evt",
124 scheme_make_prim_w_arity(make_channel_put,
125 "channel-put-evt",
126 2, 2),
127 env);
128 scheme_addto_prim_instance("channel?",
129 scheme_make_folding_prim(channel_p,
130 "channel?",
131 1, 1, 1),
132 env);
133 scheme_addto_prim_instance("channel-put-evt?",
134 scheme_make_folding_prim(channel_put_p,
135 "channel-put-evt?",
136 1, 1, 1),
137 env);
138 scheme_addto_prim_instance("chaperone-channel",
139 scheme_make_prim_w_arity(chaperone_channel,
140 "chaperone-channel",
141 3, -1),
142 env);
143 scheme_addto_prim_instance("impersonate-channel",
144 scheme_make_prim_w_arity(impersonate_channel,
145 "impersonate-channel",
146 3, -1),
147 env);
148
149 scheme_addto_prim_instance("thread-send",
150 scheme_make_prim_w_arity(thread_send,
151 "thread-send",
152 2, 3),
153 env);
154 scheme_addto_prim_instance("thread-receive",
155 scheme_make_prim_w_arity(thread_receive,
156 "thread-receive",
157 0, 0),
158 env);
159 scheme_addto_prim_instance("thread-try-receive",
160 scheme_make_prim_w_arity(thread_try_receive,
161 "thread-try-receive",
162 0, 0),
163 env);
164 scheme_addto_prim_instance("thread-receive-evt",
165 scheme_make_prim_w_arity(thread_receive_evt,
166 "thread-receive-evt",
167 0, 0),
168 env);
169 scheme_addto_prim_instance("thread-rewind-receive",
170 scheme_make_prim_w_arity(thread_rewind_receive,
171 "thread-rewind-receive",
172 1, 1),
173 env);
174
175 scheme_addto_prim_instance("alarm-evt",
176 scheme_make_prim_w_arity(make_alarm,
177 "alarm-evt",
178 1, 1),
179 env);
180
181 scheme_addto_prim_instance("system-idle-evt",
182 scheme_make_prim_w_arity(make_sys_idle,
183 "system-idle-evt",
184 0, 0),
185 env);
186
187 REGISTER_SO(scheme_always_ready_evt);
188 scheme_always_ready_evt = scheme_alloc_small_object();
189 scheme_always_ready_evt->type = scheme_always_evt_type;
190 scheme_addto_prim_instance("always-evt", scheme_always_ready_evt, env);
191
192 o = scheme_alloc_small_object();
193 o->type = scheme_never_evt_type;
194 scheme_addto_prim_instance("never-evt", o, env);
195
196 REGISTER_SO(thread_recv_evt);
197 o = scheme_alloc_small_object();
198 o->type = scheme_thread_recv_evt_type;
199 thread_recv_evt = o;
200
201 scheme_add_evt(scheme_sema_type, sema_ready, NULL, NULL, 0);
202 scheme_add_evt_through_sema(scheme_semaphore_repost_type, sema_for_repost, NULL);
203 scheme_add_evt(scheme_channel_type, (Scheme_Ready_Fun)channel_get_ready, NULL, NULL, 1);
204 scheme_add_evt(scheme_channel_put_type, (Scheme_Ready_Fun)channel_put_ready, NULL, NULL, 1);
205 scheme_add_evt(scheme_channel_syncer_type, (Scheme_Ready_Fun)channel_syncer_ready, NULL, NULL, 0);
206 scheme_add_evt(scheme_alarm_type, (Scheme_Ready_Fun)alarm_ready, NULL, NULL, 0);
207 scheme_add_evt(scheme_always_evt_type, always_ready, NULL, NULL, 0);
208 scheme_add_evt(scheme_never_evt_type, never_ready, NULL, NULL, 0);
209 scheme_add_evt(scheme_thread_recv_evt_type, (Scheme_Ready_Fun)thread_recv_ready, NULL, NULL, 0);
210 }
211
scheme_init_sema_places()212 void scheme_init_sema_places() {
213 REGISTER_SO(scheme_system_idle_channel);
214 scheme_system_idle_channel = scheme_make_channel();
215 }
216
scheme_make_sema(intptr_t v)217 Scheme_Object *scheme_make_sema(intptr_t v)
218 {
219 Scheme_Sema *sema;
220
221 sema = MALLOC_ONE_TAGGED(Scheme_Sema);
222 sema->value = v;
223
224 sema->so.type = scheme_sema_type;
225
226 return (Scheme_Object *)sema;
227 }
228
scheme_get_semaphore_init(const char * who,int n,Scheme_Object ** p)229 intptr_t scheme_get_semaphore_init(const char *who, int n, Scheme_Object **p)
230 {
231 intptr_t v;
232
233 if (n) {
234 if (!SCHEME_INTP(p[0])) {
235 if (!SCHEME_BIGNUMP(p[0]) || !SCHEME_BIGPOS(p[0]))
236 scheme_wrong_contract(who, "exact-nonnegative-integer?", 0, n, p);
237 }
238
239 if (!scheme_get_int_val(p[0], &v)) {
240 scheme_raise_exn(MZEXN_FAIL,
241 "%s: starting value %s is too large",
242 who,
243 scheme_make_provided_string(p[0], 0, NULL));
244 } else if (v < 0)
245 scheme_wrong_contract(who, "exact-nonnegative-integer?", 0, n, p);
246 } else
247 v = 0;
248
249 return v;
250 }
251
make_sema(int n,Scheme_Object ** p)252 static Scheme_Object *make_sema(int n, Scheme_Object **p)
253 {
254 intptr_t v;
255 v = scheme_get_semaphore_init("make-semaphore", n, p);
256 return scheme_make_sema(v);
257 }
258
make_sema_repost(int n,Scheme_Object ** p)259 static Scheme_Object *make_sema_repost(int n, Scheme_Object **p)
260 {
261 if (!SCHEME_SEMAP(p[0]))
262 scheme_wrong_contract("semaphore-peek-evt", "semaphore?", 0, n, p);
263
264 return scheme_make_sema_repost(p[0]);
265 }
266
scheme_make_sema_repost(Scheme_Object * sema)267 Scheme_Object *scheme_make_sema_repost(Scheme_Object *sema)
268 {
269 Scheme_Object *o;
270
271 o = scheme_alloc_small_object();
272 o->type = scheme_semaphore_repost_type;
273 SCHEME_PTR_VAL(o) = sema;
274
275 return o;
276 }
277
is_sema_repost(int n,Scheme_Object ** p)278 static Scheme_Object *is_sema_repost(int n, Scheme_Object **p)
279 {
280 return (SAME_TYPE(SCHEME_TYPE(p[0]), scheme_semaphore_repost_type)
281 ? scheme_true
282 : scheme_false);
283 }
284
semap(int n,Scheme_Object ** p)285 static Scheme_Object *semap(int n, Scheme_Object **p)
286 {
287 return SCHEME_SEMAP(p[0]) ? scheme_true : scheme_false;
288 }
289
did_post_sema(Scheme_Sema * t)290 void did_post_sema(Scheme_Sema *t)
291 {
292 while (t->first) {
293 Scheme_Channel_Syncer *w;
294 int consumed;
295
296 w = t->first;
297
298 t->first = w->next;
299 if (!w->next)
300 t->last = NULL;
301 else
302 t->first->prev = NULL;
303
304 if ((!w->syncing || !w->syncing->result) && !pending_break(w->p)) {
305 if (w->syncing) {
306 w->syncing->result = w->syncing_i + 1;
307 if (w->syncing->disable_break)
308 w->syncing->disable_break->suspend_break++;
309 scheme_post_syncing_nacks(w->syncing);
310 if (!w->syncing->reposts || !w->syncing->reposts[w->syncing_i]) {
311 t->value -= 1;
312 consumed = 1;
313 } else
314 consumed = 0;
315 if (w->syncing->accepts && w->syncing->accepts[w->syncing_i])
316 scheme_accept_sync(w->syncing, w->syncing_i);
317 } else {
318 /* In this case, we will remove the syncer from line, but
319 someone else might grab the post. This is unfair, but it
320 can help improve throughput when multiple threads synchronize
321 on a lock. */
322 consumed = 1;
323 }
324 w->picked = 1;
325 } else
326 consumed = 0;
327
328 w->in_line = 0;
329 w->prev = NULL;
330 w->next = NULL;
331
332 if (w->picked) {
333 scheme_weak_resume_thread(w->p);
334 if (consumed)
335 break;
336 }
337 /* otherwise, loop to find one we can wake up */
338 }
339 }
340
sema_overflow()341 static void sema_overflow()
342 {
343 scheme_raise_exn(MZEXN_FAIL,
344 "semaphore-post: the maximum post count has already been reached");
345 }
346
scheme_post_sema(Scheme_Object * o)347 void scheme_post_sema(Scheme_Object *o)
348 {
349 /* fast path is designed to avoid need for XFORM */
350 Scheme_Sema *t = (Scheme_Sema *)o;
351 int v;
352
353 if (t->value < 0) return;
354
355 v = (intptr_t)((uintptr_t)t->value + 1);
356 if (v > t->value) {
357 t->value = v;
358
359 if (t->first)
360 did_post_sema(t);
361 } else
362 sema_overflow();
363 }
364
scheme_post_sema_all(Scheme_Object * o)365 void scheme_post_sema_all(Scheme_Object *o)
366 {
367 Scheme_Sema *t = (Scheme_Sema *)o;
368
369 while (t->first) {
370 scheme_post_sema(o);
371 }
372 t->value = -1;
373 }
374
hit_sema(int n,Scheme_Object ** p)375 static Scheme_Object *hit_sema(int n, Scheme_Object **p)
376 {
377 if (!SCHEME_SEMAP(p[0]))
378 scheme_wrong_contract("semaphore-post", "semaphore?", 0, n, p);
379
380 scheme_post_sema(p[0]);
381
382 return scheme_void;
383 }
384
out_of_line(Scheme_Object * a)385 static int out_of_line(Scheme_Object *a)
386 {
387 Scheme_Thread *p;
388 int n, i;
389 Scheme_Channel_Syncer *w;
390
391 /* Out of one line? */
392 n = SCHEME_INT_VAL(((Scheme_Object **)a)[0]);
393 for (i = 0; i < n; i++) {
394 w = (((Scheme_Channel_Syncer ***)a)[1])[i];
395 if (w->picked)
396 return 1;
397 }
398
399 /* Suspended break? */
400 p = ((Scheme_Thread **)a)[2];
401 if (p->external_break) {
402 int v;
403 --p->suspend_break;
404 v = scheme_can_break(p);
405 p->suspend_break++;
406 if (v)
407 return 1;
408 }
409
410 /* Suspended by user? */
411 if ((p->running & MZTHREAD_USER_SUSPENDED)
412 || scheme_main_was_once_suspended)
413 return 1;
414
415 return 0;
416 }
417
get_into_line(Scheme_Sema * sema,Scheme_Channel_Syncer * w)418 static void get_into_line(Scheme_Sema *sema, Scheme_Channel_Syncer *w)
419 /* Can be called multiple times. */
420 {
421 Scheme_Channel_Syncer *last, *first;
422
423 w->in_line = 1;
424 w->picked = 0;
425
426 if (SAME_TYPE(SCHEME_TYPE(sema), scheme_never_evt_type)) {
427 return; /* !!!! skip everything else */
428 } else if (SCHEME_SEMAP(sema)) {
429 last = sema->last;
430 first = sema->first;
431 } else if (SCHEME_CHANNELP(sema)) {
432 last = ((Scheme_Channel *)sema)->get_last;
433 first = ((Scheme_Channel *)sema)->get_first;
434 } else {
435 last = ((Scheme_Channel_Put *)sema)->ch->put_last;
436 first = ((Scheme_Channel_Put *)sema)->ch->put_first;
437 }
438
439 w->prev = last;
440 if (last)
441 last->next = w;
442 else
443 first = w;
444 last = w;
445 w->next = NULL;
446
447 if (SCHEME_SEMAP(sema)) {
448 sema->last = last;
449 sema->first = first;
450 } else if (SCHEME_CHANNELP(sema)) {
451 ((Scheme_Channel *)sema)->get_last = last;
452 ((Scheme_Channel *)sema)->get_first = first;
453 } else {
454 ((Scheme_Channel_Put *)sema)->ch->put_last = last;
455 ((Scheme_Channel_Put *)sema)->ch->put_first = first;
456 }
457 }
458
get_outof_line(Scheme_Sema * sema,Scheme_Channel_Syncer * w)459 static void get_outof_line(Scheme_Sema *sema, Scheme_Channel_Syncer *w)
460 {
461 Scheme_Channel_Syncer *last, *first;
462
463 if (!w->in_line)
464 return;
465 w->in_line = 0;
466
467 if (SAME_TYPE(SCHEME_TYPE(sema), scheme_never_evt_type)) {
468 return; /* !!!! skip everything else */
469 } else if (SCHEME_SEMAP(sema)) {
470 last = sema->last;
471 first = sema->first;
472 } else if (SCHEME_CHANNELP(sema)) {
473 last = ((Scheme_Channel *)sema)->get_last;
474 first = ((Scheme_Channel *)sema)->get_first;
475 } else {
476 last = ((Scheme_Channel_Put *)sema)->ch->put_last;
477 first = ((Scheme_Channel_Put *)sema)->ch->put_first;
478 }
479
480 if (w->prev)
481 w->prev->next = w->next;
482 else
483 first = w->next;
484 if (w->next)
485 w->next->prev = w->prev;
486 else
487 last = w->prev;
488
489 if (SCHEME_SEMAP(sema)) {
490 sema->last = last;
491 sema->first = first;
492 } else if (SCHEME_CHANNELP(sema)) {
493 ((Scheme_Channel *)sema)->get_last = last;
494 ((Scheme_Channel *)sema)->get_first = first;
495 } else {
496 ((Scheme_Channel_Put *)sema)->ch->put_last = last;
497 ((Scheme_Channel_Put *)sema)->ch->put_first = first;
498 }
499 }
500
ext_get_into_line(Scheme_Object * ch,Scheme_Schedule_Info * sinfo)501 static void ext_get_into_line(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
502 {
503 Scheme_Channel_Syncer *w;
504
505 /* Get into line */
506 w = MALLOC_ONE_RT(Scheme_Channel_Syncer);
507 w->so.type = scheme_channel_syncer_type;
508 if (sinfo->false_positive_ok)
509 w->p = sinfo->false_positive_ok;
510 else
511 w->p = scheme_current_thread;
512 w->syncing = (Syncing *)sinfo->current_syncing;
513 w->obj = ch;
514 w->syncing_i = sinfo->w_i;
515
516 get_into_line((Scheme_Sema *)ch, w);
517
518 scheme_set_sync_target(sinfo, (Scheme_Object *)w, NULL, NULL, 0, 0, NULL);
519 }
520
scheme_get_outof_line(Scheme_Channel_Syncer * ch_w)521 void scheme_get_outof_line(Scheme_Channel_Syncer *ch_w)
522 {
523 get_outof_line((Scheme_Sema *)ch_w->obj, ch_w);
524 }
525
scheme_get_back_into_line(Scheme_Channel_Syncer * ch_w)526 void scheme_get_back_into_line(Scheme_Channel_Syncer *ch_w)
527 {
528 get_into_line((Scheme_Sema *)ch_w->obj, ch_w);
529 }
530
try_channel(Scheme_Sema * sema,Syncing * syncing,int pos,Scheme_Object ** result)531 static int try_channel(Scheme_Sema *sema, Syncing *syncing, int pos, Scheme_Object **result)
532 {
533 if (SCHEME_CHANNELP(sema)) {
534 /* GET mode */
535 Scheme_Channel *ch = (Scheme_Channel *)sema;
536 Scheme_Channel_Syncer *w = ch->put_first, *next;
537 int picked = 0;
538
539 while (w) {
540 if (w->syncing == syncing) {
541 /* can't synchronize with self */
542 w = w->next;
543 } else {
544 Scheme_Channel_Put *chp = (Scheme_Channel_Put *)w->obj;
545
546 if (!w->syncing->result && !pending_break(w->p)) {
547 w->picked = 1;
548 w->syncing->result = w->syncing_i + 1;
549 if (w->syncing->disable_break)
550 w->syncing->disable_break->suspend_break++;
551 scheme_post_syncing_nacks(w->syncing);
552 if (result)
553 *result = chp->val;
554 if (syncing && (pos >= 0)) {
555 syncing->result = pos + 1;
556 if (syncing->disable_break)
557 syncing->disable_break->suspend_break++;
558 scheme_post_syncing_nacks(syncing);
559 syncing->set->argv[pos] = chp->val;
560 }
561 picked = 1;
562 scheme_weak_resume_thread(w->p);
563 }
564
565 next = w->next;
566 get_outof_line((Scheme_Sema *)chp, w);
567 w = next;
568
569 if (picked)
570 return 1;
571 }
572 }
573
574 return 0;
575 } else {
576 /* PUT mode */
577 Scheme_Channel_Put *chp = (Scheme_Channel_Put *)sema;
578 Scheme_Channel_Syncer *w = chp->ch->get_first, *next;
579 int picked = 0;
580
581 while (w) {
582 if (w->syncing == syncing) {
583 /* can't synchronize with self */
584 w = w->next;
585 } else {
586 if (!w->syncing->result && !pending_break(w->p)) {
587 w->picked = 1;
588 w->syncing->set->argv[w->syncing_i] = chp->val;
589 w->syncing->result = w->syncing_i + 1;
590 if (w->syncing->disable_break)
591 w->syncing->disable_break->suspend_break++;
592 scheme_post_syncing_nacks(w->syncing);
593 if (syncing && (pos >= 0)) {
594 syncing->result = pos + 1;
595 if (syncing->disable_break)
596 syncing->disable_break->suspend_break++;
597 scheme_post_syncing_nacks(syncing);
598 }
599 picked = 1;
600 scheme_weak_resume_thread(w->p);
601 }
602
603 next = w->next;
604 get_outof_line((Scheme_Sema *)chp->ch, w);
605 w = next;
606
607 if (picked)
608 return 1;
609 }
610 }
611
612 return 0;
613 }
614 }
615
scheme_try_plain_sema(Scheme_Object * o)616 XFORM_NONGCING int scheme_try_plain_sema(Scheme_Object *o)
617 {
618 Scheme_Sema *sema = (Scheme_Sema *)o;
619
620 if (sema->value) {
621 if (sema->value > 0)
622 --sema->value;
623 return 1;
624 } else
625 return 0;
626 }
627
scheme_wait_semas_chs(int n,Scheme_Object ** o,int just_try,Syncing * syncing)628 int scheme_wait_semas_chs(int n, Scheme_Object **o, int just_try, Syncing *syncing)
629 /* When syncing is supplied, o can contain Scheme_Channel_Syncer
630 and never-evt values, and just_try must be 0. */
631 {
632 Scheme_Sema **semas = (Scheme_Sema **)o;
633 int v, i, ii;
634
635 if (just_try) {
636 /* assert: n == 1, !syncing */
637 Scheme_Sema *sema = semas[0];
638 if (just_try > 0) {
639 if (sema->so.type == scheme_sema_type) {
640 v = scheme_try_plain_sema((Scheme_Object *)sema);
641 } else {
642 v = try_channel(sema, syncing, 0, NULL);
643 }
644 } else {
645 Scheme_Cont_Frame_Data cframe;
646
647 scheme_push_break_enable(&cframe, 1, 1);
648
649 scheme_wait_sema((Scheme_Object *)sema, 0);
650
651 scheme_pop_break_enable(&cframe, 0);
652
653 return 1;
654 }
655 } else {
656 int start_pos;
657
658 #if 0
659 /* Use the "immutable" flag bit on a semaphore to check for
660 inconsistent use in atomic and non-atomic modes, which
661 can lead to an attempt to suspend in atomic mode. */
662 if ((n == 1) && SCHEME_SEMAP(o[0])) {
663 if (!do_atomic) {
664 SCHEME_SET_IMMUTABLE(o[0]);
665 } else if (SCHEME_IMMUTABLEP(o[0])) {
666 if (!on_atomic_timeout
667 || (do_atomic > atomic_timeout_atomic_level)) {
668 scheme_signal_error("using a seaphore in both atomic and non-atomic mode");
669 }
670 }
671 }
672 #endif
673
674 if (n > 1) {
675 if (syncing)
676 start_pos = syncing->start_pos;
677 else {
678 Scheme_Object *rand_state;
679 rand_state = scheme_get_param(scheme_current_config(), MZCONFIG_SCHEDULER_RANDOM_STATE);
680 start_pos = scheme_rand((Scheme_Random_State *)rand_state);
681 }
682 } else
683 start_pos = 0;
684
685 /* Initial poll */
686 while (1) {
687 i = 0;
688 for (ii = 0; ii < n; ii++) {
689 /* Randomized start position for poll ensures fairness: */
690 i = (start_pos + ii) % n;
691
692 if (semas[i]->so.type == scheme_sema_type) {
693 if (semas[i]->value) {
694 if ((semas[i]->value > 0) && (!syncing || !syncing->reposts || !syncing->reposts[i]))
695 --semas[i]->value;
696 if (syncing) {
697 syncing->result = i + 1;
698 if (syncing->accepts && syncing->accepts[i])
699 scheme_accept_sync(syncing, i);
700 }
701 break;
702 }
703 } else if (semas[i]->so.type == scheme_never_evt_type) {
704 /* Never ready. */
705 } else if (semas[i]->so.type == scheme_channel_syncer_type) {
706 if (((Scheme_Channel_Syncer *)semas[i])->picked)
707 break;
708 } else if (try_channel(semas[i], syncing, i, NULL))
709 break;
710 }
711
712 if (ii >= n) {
713 if (!scheme_wait_until_suspend_ok()) {
714 break;
715 } else {
716 /* there may have been some action on one of the waitables;
717 try again, if no result, yet */
718 if (syncing && syncing->result) {
719 i = syncing->result - 1;
720 ii = 0;
721 break;
722 }
723 }
724 } else
725 break;
726 }
727
728 /* In the following, syncers get changed back to channels,
729 and channel puts */
730 if (ii >= n) {
731 Scheme_Channel_Syncer **ws, *w;
732
733 ws = MALLOC_N(Scheme_Channel_Syncer*, n);
734 for (i = 0; i < n; i++) {
735 if (semas[i]->so.type == scheme_channel_syncer_type) {
736 ws[i] = (Scheme_Channel_Syncer *)semas[i];
737 semas[i] = (Scheme_Sema *)ws[i]->obj;
738 } else {
739 w = MALLOC_ONE_RT(Scheme_Channel_Syncer);
740 ws[i] = w;
741 w->so.type = scheme_channel_syncer_type;
742 w->p = scheme_current_thread;
743 w->syncing = syncing;
744 w->obj = (Scheme_Object *)semas[i];
745 w->syncing_i = i;
746 }
747 }
748
749 while (1) {
750 int out_of_a_line;
751
752 /* Get into line */
753 for (i = 0; i < n; i++) {
754 if (!ws[i]->in_line) {
755 get_into_line(semas[i], ws[i]);
756 }
757 }
758
759 if (!scheme_current_thread->next) {
760 void **a;
761
762 /* We're not allowed to suspend the main thread. Delay
763 breaks so we get a chance to clean up. */
764 scheme_current_thread->suspend_break++;
765
766 a = MALLOC_N(void*, 3);
767 a[0] = scheme_make_integer(n);
768 a[1] = ws;
769 a[2] = scheme_current_thread;
770
771 scheme_main_was_once_suspended = 0;
772
773 scheme_block_until(out_of_line, NULL, (Scheme_Object *)a, (float)0.0);
774
775 --scheme_current_thread->suspend_break;
776 } else {
777 /* Mark the thread to indicate that we need to clean up
778 if the thread is killed. */
779 int old_nkc;
780 old_nkc = (scheme_current_thread->running & MZTHREAD_NEED_KILL_CLEANUP);
781 if (!old_nkc)
782 scheme_current_thread->running += MZTHREAD_NEED_KILL_CLEANUP;
783 scheme_weak_suspend_thread(scheme_current_thread);
784 if (!old_nkc && (scheme_current_thread->running & MZTHREAD_NEED_KILL_CLEANUP))
785 scheme_current_thread->running -= MZTHREAD_NEED_KILL_CLEANUP;
786 }
787
788 /* We've been resumed. But was it for the semaphore, or a signal? */
789 out_of_a_line = 0;
790
791 /* If we get the post, we must return WITHOUT BLOCKING.
792 GRacket, for example, depends on this special property, which
793 ensures that the thread can't be broken or killed between
794 receiving the post and returning. */
795
796 if (!syncing) {
797 /* Poster can't be sure that we really will get it,
798 so we have to decrement the sema count here. */
799 i = 0;
800 for (ii = 0; ii < n; ii++) {
801 i = (start_pos + ii) % n;
802 if (ws[i]->picked) {
803 out_of_a_line = 1;
804 if (semas[i]->value) {
805 if (semas[i]->value > 0)
806 --(semas[i]->value);
807 break;
808 }
809 }
810 }
811 if (ii >= n)
812 i = n;
813 } else {
814 if (syncing->result) {
815 out_of_a_line = 1;
816 i = syncing->result - 1;
817 } else {
818 out_of_a_line = 0;
819 i = n;
820 }
821 }
822
823 if (!out_of_a_line) {
824 /* We weren't woken by any semaphore/channel. Get out of line, block once
825 (to handle breaks/kills) and then loop to get back into line. */
826 for (i = 0; i < n; i++) {
827 if (ws[i]->in_line)
828 get_outof_line(semas[i], ws[i]);
829 }
830
831 scheme_thread_block(0); /* ok if it returns multiple times */
832 scheme_current_thread->ran_some = 1;
833 /* [but why would it return multiple times?! there must have been a reason...] */
834 } else {
835
836 if ((scheme_current_thread->running & MZTHREAD_KILLED)
837 || ((scheme_current_thread->running & MZTHREAD_USER_SUSPENDED)
838 && !(scheme_current_thread->running & MZTHREAD_NEED_SUSPEND_CLEANUP))) {
839 /* We've been killed or suspended! */
840 i = -1;
841 }
842
843 /* We got a post from semas[i], or we were killed.
844 Did any (other) semaphore pick us?
845 (This only happens when syncing == NULL.) */
846 if (!syncing) {
847 int j;
848
849 for (j = 0; j < n; j++) {
850 if (j != i) {
851 if (ws[j]->picked) {
852 if (semas[j]->value) {
853 /* Consume the value and repost, because no one else
854 has been told to go, and we're accepting a different post. */
855 if (semas[j]->value > 0)
856 --semas[j]->value;
857 scheme_post_sema((Scheme_Object *)semas[j]);
858 }
859 }
860 }
861 }
862 }
863
864 /* If we're done, get out of all lines that we're still in. */
865 if (i < n) {
866 int j;
867 for (j = 0; j < n; j++) {
868 if (ws[j]->in_line)
869 get_outof_line(semas[j], ws[j]);
870 }
871 }
872
873 if (i == -1) {
874 scheme_thread_block(0); /* dies or suspends */
875 scheme_current_thread->ran_some = 1;
876 }
877
878 if (i < n)
879 break;
880 }
881
882 /* Otherwise: !syncing and someone stole the post, or we were
883 suspended and we have to start over. Either way, poll then
884 loop to get back in line an try again. */
885 for (ii = 0; ii < n; ii++) {
886 i = (start_pos + ii) % n;
887
888 if (semas[i]->so.type == scheme_sema_type) {
889 if (semas[i]->value) {
890 if ((semas[i]->value > 0) && (!syncing || !syncing->reposts || !syncing->reposts[i]))
891 --semas[i]->value;
892 if (syncing) {
893 syncing->result = i + 1;
894 if (syncing->accepts && syncing->accepts[i])
895 scheme_accept_sync(syncing, i);
896 }
897 break;
898 }
899 } else if (semas[i]->so.type == scheme_never_evt_type) {
900 /* Never ready. */
901 } else if (try_channel(semas[i], syncing, i, NULL))
902 break;
903 }
904
905 if (ii < n) {
906 /* Get out of any line that we still might be in: */
907 int j;
908 for (j = 0; j < n; j++) {
909 if (ws[j]->in_line)
910 get_outof_line(semas[j], ws[j]);
911 }
912
913 break;
914 }
915
916 if (!syncing) {
917 /* Looks like this thread is a victim of unfair semaphore access.
918 Go into fair mode by allocating a syncing: */
919 syncing = MALLOC_ONE_RT(Syncing);
920 #ifdef MZTAG_REQUIRED
921 syncing->type = scheme_rt_syncing;
922 #endif
923 syncing->start_pos = start_pos;
924
925 /* Get out of all lines, and set syncing field before we get back in line: */
926 {
927 int j;
928 for (j = 0; j < n; j++) {
929 if (ws[j]->in_line)
930 get_outof_line(semas[j], ws[j]);
931 ws[j]->syncing = syncing;
932 }
933 }
934 }
935 /* Back to top of loop to sync again */
936 }
937 }
938 v = i + 1;
939 }
940
941 return v;
942 }
943
slow_wait_sema(Scheme_Object * o,int just_try)944 static int slow_wait_sema(Scheme_Object *o, int just_try)
945 {
946 Scheme_Object *a[1];
947
948 a[0] = o;
949
950 return scheme_wait_semas_chs(1, a, just_try, NULL);
951 }
952
scheme_wait_sema(Scheme_Object * o,int just_try)953 int scheme_wait_sema(Scheme_Object *o, int just_try)
954 {
955 /* fast path is designed to avoid need for XFORM */
956 if (((just_try >= 0) || !scheme_current_thread->external_break)
957 && scheme_try_plain_sema(o))
958 return 1;
959
960 return slow_wait_sema(o, just_try);
961 }
962
block_sema_p(int n,Scheme_Object ** p)963 static Scheme_Object *block_sema_p(int n, Scheme_Object **p)
964 {
965 if (!SCHEME_SEMAP(p[0]))
966 scheme_wrong_contract("semaphore-try-wait?", "semaphore?", 0, n, p);
967
968 return scheme_wait_sema(p[0], 1) ? scheme_true : scheme_false;
969 }
970
block_sema(int n,Scheme_Object ** p)971 static Scheme_Object *block_sema(int n, Scheme_Object **p)
972 {
973 if (!SCHEME_SEMAP(p[0]))
974 scheme_wrong_contract("semaphore-wait", "semaphore?", 0, n, p);
975
976 scheme_wait_sema(p[0], 0);
977
978 /* In case a break appeared after we received the post,
979 check for a break, because scheme_wait_sema() won't: */
980 scheme_check_break_now();
981
982 return scheme_void;
983 }
984
block_sema_breakable(int n,Scheme_Object ** p)985 static Scheme_Object *block_sema_breakable(int n, Scheme_Object **p)
986 {
987 if (!SCHEME_SEMAP(p[0]))
988 scheme_wrong_contract("semaphore-wait/enable-break", "semaphore?", 0, n, p);
989
990 scheme_wait_sema(p[0], -1);
991
992 return scheme_void;
993 }
994
pending_break(Scheme_Thread * p)995 static int pending_break(Scheme_Thread *p)
996 {
997 if (p->running & (MZTHREAD_KILLED | MZTHREAD_USER_SUSPENDED))
998 return 1;
999
1000 if (p->external_break) {
1001 int v;
1002
1003 if (!p->next) {
1004 /* if p is the main thread, it must have a suspension
1005 to block on a channel or semaphore: */
1006 --p->suspend_break;
1007 }
1008
1009 v = scheme_can_break(p);
1010
1011 if (!p->next)
1012 p->suspend_break++;
1013
1014 return v;
1015 }
1016
1017 return 0;
1018 }
1019
1020 /**********************************************************************/
1021 /* Channels */
1022 /**********************************************************************/
1023
scheme_make_channel()1024 Scheme_Object *scheme_make_channel()
1025 {
1026 Scheme_Channel *c;
1027
1028 c = MALLOC_ONE_TAGGED(Scheme_Channel);
1029 c->so.type = scheme_channel_type;
1030
1031 return (Scheme_Object *)c;
1032 }
1033
make_channel(int n,Scheme_Object ** p)1034 static Scheme_Object *make_channel(int n, Scheme_Object **p)
1035 {
1036 return scheme_make_channel();
1037 }
1038
scheme_make_channel_put_evt(Scheme_Object * ch,Scheme_Object * v)1039 Scheme_Object *scheme_make_channel_put_evt(Scheme_Object *ch, Scheme_Object *v)
1040 {
1041 Scheme_Channel_Put *cp;
1042
1043 cp = MALLOC_ONE_TAGGED(Scheme_Channel_Put);
1044 cp->so.type = scheme_channel_put_type;
1045 cp->ch = (Scheme_Channel *)ch;
1046 cp->val = v;
1047
1048 return (Scheme_Object *)cp;
1049 }
1050
scheme_try_channel_put(Scheme_Object * ch,Scheme_Object * v)1051 int scheme_try_channel_put(Scheme_Object *ch, Scheme_Object *v)
1052 {
1053 if (((Scheme_Channel *)ch)->get_first) {
1054 Scheme_Object *a[2];
1055 v = scheme_make_channel_put_evt(ch, v);
1056 a[0] = scheme_make_integer(0);
1057 a[1] = v;
1058 v = scheme_sync_timeout(2, a);
1059 return SCHEME_TRUEP(v);
1060 } else
1061 return 0;
1062 }
1063
chaperone_put(Scheme_Object * obj,Scheme_Object * orig)1064 Scheme_Object *chaperone_put(Scheme_Object *obj, Scheme_Object *orig)
1065 {
1066 Scheme_Chaperone *px = (Scheme_Chaperone *)obj;
1067 Scheme_Object *val = orig;
1068 Scheme_Object *a[2];
1069 Scheme_Object *redirect;
1070
1071 while (1) {
1072 if (SCHEME_CHANNELP(px)) {
1073 return val;
1074 } else if (!(SAME_TYPE(SCHEME_TYPE(px->redirects), scheme_nack_guard_evt_type))) {
1075 a[0] = px->prev;
1076 a[1] = val;
1077 redirect = px->redirects;
1078 val = _scheme_apply(redirect, 2, a);
1079
1080 if (!(SCHEME_CHAPERONE_FLAGS(px) & SCHEME_CHAPERONE_IS_IMPERSONATOR))
1081 if (!scheme_chaperone_of(val, orig))
1082 scheme_wrong_chaperoned("channel-put", "result", orig, val);
1083
1084 px = (Scheme_Chaperone *)px->prev;
1085 } else {
1086 /* In this case, the `px` is actually an evt chaperone so we
1087 don't want to handle it here since we're doing a "put" */
1088 px = (Scheme_Chaperone *)px->prev;
1089 }
1090 }
1091
1092 return obj;
1093 }
1094
make_channel_put(int argc,Scheme_Object ** argv)1095 static Scheme_Object *make_channel_put(int argc, Scheme_Object **argv)
1096 {
1097 Scheme_Object *ch = argv[0];
1098 Scheme_Object *val = argv[1];
1099 Scheme_Object *chaperone = NULL;
1100
1101 if (SCHEME_NP_CHAPERONEP(ch)
1102 && SCHEME_CHANNELP(SCHEME_CHAPERONE_VAL(ch))) {
1103 chaperone = ch;
1104 ch = SCHEME_CHAPERONE_VAL(chaperone);
1105 } else if (!SCHEME_CHANNELP(argv[0])) {
1106 scheme_wrong_contract("channel-put-evt", "channel?", 0, argc, argv);
1107 }
1108
1109 if (chaperone)
1110 val = chaperone_put(chaperone, argv[1]);
1111
1112 return scheme_make_channel_put_evt(ch, val);
1113 }
1114
channel_p(int n,Scheme_Object ** p)1115 static Scheme_Object *channel_p(int n, Scheme_Object **p)
1116 {
1117 return ((SCHEME_CHANNELP(p[0]) ||
1118 (SCHEME_NP_CHAPERONEP(p[0])
1119 && SCHEME_CHANNELP(SCHEME_CHAPERONE_VAL(p[0]))))
1120 ? scheme_true
1121 : scheme_false);
1122 }
1123
channel_put_p(int n,Scheme_Object ** p)1124 static Scheme_Object *channel_put_p(int n, Scheme_Object **p)
1125 {
1126 return (SAME_TYPE(SCHEME_TYPE(p[0]), scheme_channel_put_type)
1127 ? scheme_true
1128 : scheme_false);
1129 }
1130
channel_get_ready(Scheme_Object * ch,Scheme_Schedule_Info * sinfo)1131 static int channel_get_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
1132 {
1133 Scheme_Object *result;
1134
1135 if (try_channel((Scheme_Sema *)ch, (Syncing *)sinfo->current_syncing, -1, &result)) {
1136 scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 0, NULL);
1137 return 1;
1138 }
1139
1140 ext_get_into_line(ch, sinfo);
1141
1142 return 0;
1143 }
1144
channel_put_ready(Scheme_Object * ch,Scheme_Schedule_Info * sinfo)1145 static int channel_put_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
1146 {
1147 if (try_channel((Scheme_Sema *)ch, (Syncing *)sinfo->current_syncing, -1, NULL))
1148 return 1;
1149
1150 ext_get_into_line(ch, sinfo);
1151
1152 return 0;
1153 }
1154
channel_syncer_ready(Scheme_Object * ch_w,Scheme_Schedule_Info * sinfo)1155 static int channel_syncer_ready(Scheme_Object *ch_w, Scheme_Schedule_Info *sinfo)
1156 {
1157 Scheme_Channel_Syncer *w = (Scheme_Channel_Syncer *)ch_w;
1158
1159 if (w->picked) {
1160 /* The value, if any, should have been tranferred already (in which
1161 case we would not have made it here, actually). */
1162 return 1;
1163 }
1164
1165 return 0;
1166 }
1167
scheme_try_channel_get(Scheme_Object * ch)1168 int scheme_try_channel_get(Scheme_Object *ch)
1169 {
1170 if (try_channel((Scheme_Sema *)ch, NULL, -1, NULL)) {
1171 return 1;
1172 }
1173 return 0;
1174 }
1175
1176 /* This chaperone only protects the "put" end of the channel because
1177 chaperone-evt is sufficient to protect the "get" end. Thus, it first
1178 wraps the object in an evt chaperone. */
do_chaperone_channel(const char * name,int is_impersonator,int argc,Scheme_Object ** argv)1179 Scheme_Object *do_chaperone_channel(const char *name, int is_impersonator, int argc, Scheme_Object **argv)
1180 {
1181 Scheme_Chaperone *px;
1182 Scheme_Object *val = argv[0];
1183 Scheme_Object *evt;
1184 Scheme_Object *props;
1185
1186 if (SCHEME_CHAPERONEP(val))
1187 val = SCHEME_CHAPERONE_VAL(val);
1188
1189 if (!SCHEME_CHANNELP(val))
1190 scheme_wrong_contract(name, "channel?", 0, argc, argv);
1191 scheme_check_proc_arity(name, 1, 1, argc, argv);
1192 scheme_check_proc_arity(name, 2, 2, argc, argv);
1193
1194 evt = scheme_do_chaperone_evt(name, is_impersonator, 2, argv);
1195
1196 props = scheme_parse_chaperone_props(name, 3, argc, argv);
1197
1198 px = MALLOC_ONE_TAGGED(Scheme_Chaperone);
1199 px->iso.so.type = scheme_chaperone_type;
1200 px->val = val;
1201 px->prev = evt;
1202 px->props = props;
1203 px->redirects = argv[2];
1204
1205 if (is_impersonator)
1206 SCHEME_CHAPERONE_FLAGS(px) |= SCHEME_CHAPERONE_IS_IMPERSONATOR;
1207
1208 return (Scheme_Object *)px;
1209 }
1210
chaperone_channel(int argc,Scheme_Object ** argv)1211 static Scheme_Object *chaperone_channel(int argc, Scheme_Object **argv)
1212 {
1213 return do_chaperone_channel("chaperone-channel", 0, argc, argv);
1214 }
1215
impersonate_channel(int argc,Scheme_Object ** argv)1216 static Scheme_Object *impersonate_channel(int argc, Scheme_Object **argv)
1217 {
1218 return do_chaperone_channel("impersonator-channel", 1, argc, argv);
1219 }
1220
1221 /**********************************************************************/
1222 /* Thread mbox */
1223 /**********************************************************************/
1224
make_mbox_sema(Scheme_Thread * p)1225 static void make_mbox_sema(Scheme_Thread *p)
1226 {
1227 if (!p->mbox_sema) {
1228 Scheme_Object *sema = NULL;
1229 sema = scheme_make_sema(0);
1230 p->mbox_sema = sema;
1231 }
1232 }
1233
mbox_push(Scheme_Thread * p,Scheme_Object * o)1234 static void mbox_push(Scheme_Thread *p, Scheme_Object *o)
1235 {
1236 Scheme_Object *next;
1237
1238 next = scheme_make_raw_pair(o, NULL);
1239
1240 if (p->mbox_first) {
1241 SCHEME_CDR(p->mbox_last) = next;
1242 p->mbox_last = next;
1243 } else {
1244 p->mbox_first = next;
1245 p->mbox_last = next;
1246 }
1247
1248 make_mbox_sema(p);
1249 scheme_post_sema(p->mbox_sema);
1250 /* Post can't overflow the semaphore, because we'd run out of
1251 memory for the queue, first. */
1252 }
1253
mbox_push_front(Scheme_Thread * p,Scheme_Object * lst)1254 static void mbox_push_front(Scheme_Thread *p, Scheme_Object *lst)
1255 {
1256 int cnt = -1;
1257 Scheme_Object *next, *hd;
1258
1259 make_mbox_sema(p);
1260
1261 next = lst;
1262 while (!SCHEME_NULLP(next)) {
1263 /* Push one: */
1264 hd = scheme_make_raw_pair(SCHEME_CAR(next), p->mbox_first);
1265 if (!p->mbox_first)
1266 p->mbox_last = hd;
1267 p->mbox_first = hd;
1268
1269 ++cnt;
1270 next = SCHEME_CDR(next);
1271
1272 if (SCHEME_NULLP(next) || (cnt == 256)) {
1273 /* Either done or need to pause to allow breaks/swaps; */
1274 /* do a single post for all messages so far. */
1275 ((Scheme_Sema*)p->mbox_sema)->value += cnt;
1276 scheme_post_sema(p->mbox_sema);
1277 SCHEME_USE_FUEL(cnt+1); /* might sleep */
1278 cnt = -1;
1279 }
1280 }
1281 }
1282
mbox_pop(Scheme_Thread * p,int dec)1283 static Scheme_Object *mbox_pop( Scheme_Thread *p, int dec)
1284 {
1285 /* Assertion: mbox_first != NULL */
1286 Scheme_Object *r = NULL;
1287
1288 r = SCHEME_CAR(p->mbox_first);
1289 p->mbox_first = SCHEME_CDR(p->mbox_first);
1290 if (!p->mbox_first)
1291 p->mbox_last = NULL;
1292
1293 if (dec)
1294 scheme_try_plain_sema(p->mbox_sema);
1295
1296 return r;
1297 }
1298
thread_send(int argc,Scheme_Object ** argv)1299 static Scheme_Object *thread_send(int argc, Scheme_Object **argv)
1300 {
1301 if (SCHEME_THREADP(argv[0])) {
1302 int running;
1303
1304 if (argc > 2) {
1305 if (!SCHEME_FALSEP(argv[2])) /* redundant, but keeps it fast as possible */
1306 scheme_check_proc_arity2("thread-send", 0, 2, argc, argv, 1);
1307 }
1308
1309 running = ((Scheme_Thread*)argv[0])->running;
1310 if (MZTHREAD_STILL_RUNNING(running)) {
1311 mbox_push((Scheme_Thread*)argv[0], argv[1]);
1312 return scheme_void;
1313 } else {
1314 if (argc > 2) {
1315 if (SCHEME_FALSEP(argv[2]))
1316 return scheme_false;
1317 else
1318 return _scheme_tail_apply(argv[2], 0, NULL);
1319 } else
1320 scheme_raise_exn(MZEXN_FAIL_CONTRACT, "thread-send: target thread is not running");
1321 }
1322 } else
1323 scheme_wrong_contract("thread-send", "thread?", 0, argc, argv);
1324
1325 return NULL;
1326 }
1327
thread_receive(int argc,Scheme_Object ** argv)1328 static Scheme_Object *thread_receive(int argc, Scheme_Object **argv)
1329 {
1330 /* The mbox semaphore can only be downed by the current thread, so
1331 receive/try-receive can directly dec+pop without syncing
1332 (by calling mbox_pop with dec=1). */
1333 if (scheme_current_thread->mbox_first) {
1334 return mbox_pop(scheme_current_thread, 1);
1335 } else {
1336 Scheme_Object *v;
1337 Scheme_Thread *p = scheme_current_thread;
1338
1339 make_mbox_sema(p);
1340
1341 scheme_wait_sema(p->mbox_sema, 0);
1342 /* We're relying on atomicity of return after wait succeeds to ensure
1343 that a semaphore wait guarantees a mailbox dequeue. */
1344 v = mbox_pop(p, 0);
1345
1346 /* Due to that atomicity, though, we're obliged to check for
1347 a break at this point: */
1348 scheme_check_break_now();
1349
1350 return v;
1351 }
1352 }
1353
thread_try_receive(int argc,Scheme_Object ** argv)1354 static Scheme_Object *thread_try_receive(int argc, Scheme_Object **argv)
1355 {
1356 if (scheme_current_thread->mbox_first)
1357 return mbox_pop(scheme_current_thread, 1);
1358 else
1359 return scheme_false;
1360 }
1361
thread_receive_evt(int argc,Scheme_Object ** argv)1362 static Scheme_Object *thread_receive_evt(int argc, Scheme_Object **argv)
1363 {
1364 return thread_recv_evt;
1365 }
1366
thread_recv_ready(Scheme_Object * ch,Scheme_Schedule_Info * sinfo)1367 static int thread_recv_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
1368 {
1369 Scheme_Thread *p;
1370
1371 p = sinfo->false_positive_ok;
1372 if (!p)
1373 p = scheme_current_thread;
1374
1375 make_mbox_sema(p);
1376
1377 scheme_set_sync_target(sinfo, p->mbox_sema, thread_recv_evt, NULL, 1, 1, NULL);
1378
1379 return 0;
1380 }
1381
thread_rewind_receive(int argc,Scheme_Object ** argv)1382 static Scheme_Object *thread_rewind_receive(int argc, Scheme_Object **argv)
1383 {
1384 if (scheme_is_list(argv[0])) {
1385 mbox_push_front(scheme_current_thread, argv[0]);
1386 return scheme_void;
1387 } else
1388 scheme_wrong_contract("thread-rewind-receive", "list?", 0, argc, argv);
1389
1390 return NULL;
1391 }
1392
1393 /**********************************************************************/
1394 /* alarms */
1395 /**********************************************************************/
1396
make_alarm(int argc,Scheme_Object ** argv)1397 static Scheme_Object *make_alarm(int argc, Scheme_Object **argv)
1398 {
1399 Scheme_Alarm *a;
1400 double sleep_end;
1401
1402 if (!SCHEME_REALP(argv[0])) {
1403 scheme_wrong_contract("alarm-evt", "real?", 0, argc, argv);
1404 }
1405
1406 sleep_end = scheme_get_val_as_double(argv[0]);
1407
1408 a = MALLOC_ONE_TAGGED(Scheme_Alarm);
1409 a->so.type = scheme_alarm_type;
1410 a->sleep_end = sleep_end;
1411
1412 return (Scheme_Object *)a;
1413 }
1414
alarm_ready(Scheme_Object * _a,Scheme_Schedule_Info * sinfo)1415 static int alarm_ready(Scheme_Object *_a, Scheme_Schedule_Info *sinfo)
1416 {
1417 Scheme_Alarm *a = (Scheme_Alarm *)_a;
1418
1419 if (!sinfo->sleep_end
1420 || (sinfo->sleep_end > a->sleep_end))
1421 sinfo->sleep_end = a->sleep_end;
1422
1423 if (a->sleep_end <= scheme_get_inexact_milliseconds())
1424 return 1;
1425
1426 return 0;
1427 }
1428
always_ready(Scheme_Object * w)1429 static int always_ready(Scheme_Object *w)
1430 {
1431 return 1;
1432 }
1433
never_ready(Scheme_Object * w)1434 static int never_ready(Scheme_Object *w)
1435 {
1436 return 0;
1437 }
1438
make_sys_idle(int n,Scheme_Object ** p)1439 static Scheme_Object *make_sys_idle(int n, Scheme_Object **p)
1440 {
1441 if (!system_idle_put_evt) {
1442 Scheme_Object *a[2];
1443 REGISTER_SO(system_idle_put_evt);
1444 system_idle_put_evt = scheme_make_channel_put_evt(scheme_system_idle_channel,
1445 scheme_void);
1446 a[0] = system_idle_put_evt;
1447 a[1] = scheme_void_proc;
1448 system_idle_put_evt = scheme_wrap_evt(2, a);
1449 }
1450
1451 return system_idle_put_evt;
1452 }
1453
1454 /**********************************************************************/
1455 /* Precise GC */
1456 /**********************************************************************/
1457
1458 #ifdef MZ_PRECISE_GC
1459
1460 START_XFORM_SKIP;
1461
1462 #include "mzmark_sema.inc"
1463
register_traversers(void)1464 static void register_traversers(void)
1465 {
1466 GC_REG_TRAV(scheme_alarm_type, mark_alarm);
1467 GC_REG_TRAV(scheme_channel_syncer_type, mark_channel_syncer);
1468 }
1469
1470 END_XFORM_SKIP;
1471
1472 #endif
1473
1474 #endif /* NO_SCHEME_THREADS */
1475