1 #include "schpriv.h"
2 #include "schmach.h"
3 #include "schrktio.h"
4
future_p(int argc,Scheme_Object * argv[])5 static Scheme_Object *future_p(int argc, Scheme_Object *argv[])
6 {
7 if (SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_future_type))
8 return scheme_true;
9 else
10 return scheme_false;
11 }
12
scheme_fsemaphore_p(int argc,Scheme_Object * argv[])13 Scheme_Object *scheme_fsemaphore_p(int argc, Scheme_Object *argv[])
14 {
15 if (SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
16 return scheme_true;
17 else
18 return scheme_false;
19 }
20
futures_enabled(int argc,Scheme_Object * argv[])21 static Scheme_Object *futures_enabled(int argc, Scheme_Object *argv[])
22 {
23 #ifdef MZ_USE_FUTURES
24 return scheme_true;
25 #else
26 return scheme_false;
27 #endif
28 }
29
30
31 #ifdef MZ_PRECISE_GC
32 static void register_traversers(void);
33 #endif
34
35 #ifndef MZ_USE_FUTURES
36
37 /* Futures not enabled, but make a stub module and implementation */
38
39 typedef struct future_t {
40 Scheme_Object so;
41 Scheme_Object *running_sema;
42 Scheme_Object *orig_lambda;
43 Scheme_Object *retval;
44 int multiple_count;
45 Scheme_Object **multiple_array;
46 int no_retval;
47 } future_t;
48
49 typedef struct fsemaphore_t {
50 Scheme_Object so;
51 Scheme_Object *sema;
52 } fsemaphore_t;
53
scheme_future(int argc,Scheme_Object * argv[])54 Scheme_Object *scheme_future(int argc, Scheme_Object *argv[])
55 {
56 future_t *ft;
57
58 scheme_check_proc_arity("future", 0, 0, argc, argv);
59
60 ft = MALLOC_ONE_TAGGED(future_t);
61 ft->so.type = scheme_future_type;
62
63 ft->orig_lambda = argv[0];
64
65 return (Scheme_Object *)ft;
66 }
67
touch(int argc,Scheme_Object * argv[])68 static Scheme_Object *touch(int argc, Scheme_Object *argv[])
69 {
70 future_t * volatile ft;
71
72 if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_future_type))
73 scheme_wrong_contract("touch", "future?", 0, argc, argv);
74
75 ft = (future_t *)argv[0];
76
77 while (1) {
78 if (ft->retval) {
79 if (SAME_OBJ(ft->retval, SCHEME_MULTIPLE_VALUES)) {
80 Scheme_Thread *p = scheme_current_thread;
81 p->ku.multiple.array = ft->multiple_array;
82 p->ku.multiple.count = ft->multiple_count;
83 }
84 return ft->retval;
85 }
86 if (ft->no_retval)
87 scheme_signal_error("touch: future previously aborted");
88
89 if (ft->running_sema) {
90 scheme_wait_sema(ft->running_sema, 0);
91 scheme_post_sema(ft->running_sema);
92 } else {
93 Scheme_Object *sema;
94 future_t *old_ft;
95 mz_jmp_buf newbuf, * volatile savebuf;
96 Scheme_Thread *p = scheme_current_thread;
97
98 /* In case another Racket thread touches the future. */
99 sema = scheme_make_sema(0);
100 ft->running_sema = sema;
101
102 old_ft = p->current_ft;
103 p->current_ft = ft;
104
105 savebuf = p->error_buf;
106 p->error_buf = &newbuf;
107 if (scheme_setjmp(newbuf)) {
108 ft->no_retval = 1;
109 p->current_ft = old_ft;
110 scheme_post_sema(ft->running_sema);
111 scheme_longjmp(*savebuf, 1);
112 } else {
113 GC_CAN_IGNORE Scheme_Object *retval, *proc;
114 proc = ft->orig_lambda;
115 ft->orig_lambda = NULL; /* don't hold on to proc */
116 retval = scheme_apply_multi(proc, 0, NULL);
117 ft->retval = retval;
118 if (SAME_OBJ(retval, SCHEME_MULTIPLE_VALUES)) {
119 ft->multiple_array = p->ku.multiple.array;
120 ft->multiple_count = p->ku.multiple.count;
121 p->ku.multiple.array = NULL;
122 }
123 scheme_post_sema(ft->running_sema);
124 p->current_ft = old_ft;
125 p->error_buf = savebuf;
126 }
127 }
128 }
129
130 return NULL;
131 }
132
133
134
processor_count(int argc,Scheme_Object * argv[])135 static Scheme_Object *processor_count(int argc, Scheme_Object *argv[])
136 {
137 return scheme_make_integer(1);
138 }
139
scheme_is_multithreaded(int now)140 int scheme_is_multithreaded(int now)
141 {
142 return 0;
143 }
144
scheme_current_future(int argc,Scheme_Object * argv[])145 Scheme_Object *scheme_current_future(int argc, Scheme_Object *argv[])
146 {
147 future_t *ft = scheme_current_thread->current_ft;
148
149 return (ft ? (Scheme_Object *)ft : scheme_false);
150 }
151
scheme_make_fsemaphore(int argc,Scheme_Object * argv[])152 Scheme_Object *scheme_make_fsemaphore(int argc, Scheme_Object *argv[])
153 {
154 intptr_t v;
155 fsemaphore_t *fsema;
156 Scheme_Object *sema;
157
158 v = scheme_get_semaphore_init("make-fsemaphore", argc, argv);
159
160 fsema = MALLOC_ONE_TAGGED(fsemaphore_t);
161 fsema->so.type = scheme_fsemaphore_type;
162 sema = scheme_make_sema(v);
163 fsema->sema = sema;
164
165 return (Scheme_Object*)fsema;
166 }
167
make_fsemaphore(int argc,Scheme_Object * argv[])168 static Scheme_Object *make_fsemaphore(int argc, Scheme_Object *argv[])
169 {
170 return scheme_make_fsemaphore(argc, argv);
171 }
172
scheme_fsemaphore_post(int argc,Scheme_Object * argv[])173 Scheme_Object *scheme_fsemaphore_post(int argc, Scheme_Object *argv[])
174 {
175 fsemaphore_t *fsema;
176 if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
177 scheme_wrong_contract("fsemaphore-post", "fsemaphore?", 0, argc, argv);
178
179 fsema = (fsemaphore_t*)argv[0];
180 scheme_post_sema(fsema->sema);
181
182 return scheme_void;
183 }
184
scheme_fsemaphore_wait(int argc,Scheme_Object * argv[])185 Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object *argv[])
186 {
187 fsemaphore_t *fsema;
188 if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
189 scheme_wrong_contract("fsemaphore-wait", "fsemaphore?", 0, argc, argv);
190
191 fsema = (fsemaphore_t*)argv[0];
192 scheme_wait_sema(fsema->sema, 0);
193
194 return scheme_void;
195 }
196
scheme_fsemaphore_try_wait(int argc,Scheme_Object * argv[])197 Scheme_Object *scheme_fsemaphore_try_wait(int argc, Scheme_Object *argv[])
198 {
199 fsemaphore_t *fsema;
200 if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
201 scheme_wrong_contract("fsemaphore-try-wait?", "fsemaphore?", 0, argc, argv);
202
203 fsema = (fsemaphore_t*)argv[0];
204 if (scheme_wait_sema(fsema->sema, 1))
205 return scheme_true;
206
207 return scheme_false;
208 }
209
scheme_fsemaphore_count(int argc,Scheme_Object * argv[])210 Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object *argv[])
211 {
212 fsemaphore_t *fsema;
213 if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
214 scheme_wrong_contract("fsemaphore-count", "fsemaphore?", 0, argc, argv);
215
216 fsema = (fsemaphore_t*)argv[0];
217 return scheme_make_integer(((Scheme_Sema *)fsema->sema)->value);
218 }
219
would_be_future(int argc,Scheme_Object * argv[])220 static Scheme_Object *would_be_future(int argc, Scheme_Object *argv[])
221 {
222 scheme_check_proc_arity("would-be-future", 0, 0, argc, argv);
223 return scheme_future(argc, argv);
224 }
225
reset_future_logs_for_tracking(int argc,Scheme_Object * argv[])226 static Scheme_Object *reset_future_logs_for_tracking(int argc, Scheme_Object *argv[])
227 {
228 return scheme_void;
229 }
230
mark_future_trace_end(int argc,Scheme_Object * argv[])231 static Scheme_Object *mark_future_trace_end(int argc, Scheme_Object *argv[])
232 {
233 return scheme_void;
234 }
235
scheme_init_futures_once()236 void scheme_init_futures_once()
237 {
238 }
239
scheme_init_futures_per_place()240 void scheme_init_futures_per_place()
241 {
242 #ifdef MZ_PRECISE_GC
243 register_traversers();
244 #endif
245 }
246
scheme_end_futures_per_place()247 void scheme_end_futures_per_place()
248 {
249 }
250
251 /* Set differently below when futures are supported */
252 #define SCHEME_FUTURE_PRIM_IS_NARY_INLINED SCHEME_PRIM_SOMETIMES_INLINED
253 #define SCHEME_FUTURE_PRIM_IS_UNARY_INLINED SCHEME_PRIM_SOMETIMES_INLINED
254
255 #else
256
257 #include "future.h"
258 #include <stdlib.h>
259 #include <string.h>
260 #include "jit.h"
261
262 #define FUTURE_ASSERT(x) MZ_ASSERT(x)
263
264 static Scheme_Object *make_fsemaphore(int argc, Scheme_Object *argv[]);
265 static Scheme_Object *touch(int argc, Scheme_Object *argv[]);
266 static Scheme_Object *processor_count(int argc, Scheme_Object *argv[]);
267 static void futures_init(void);
268 static void init_future_thread(struct Scheme_Future_State *fs, int i);
269 static Scheme_Future_Thread_State *alloc_future_thread_state();
270 static void requeue_future(struct future_t *future, struct Scheme_Future_State *fs);
271 static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
272 void *func,
273 int is_atomic,
274 int can_suspend,
275 int for_overflow);
276 static int capture_future_continuation(struct Scheme_Future_State *fs, future_t *ft, void **storage,
277 int need_lock, int for_overflow);
278
279 #define FUTURE_C_STACK_SIZE 500000
280 #define FUTURE_RUNSTACK_SIZE 2000
281
282 #define FEVENT_BUFFER_SIZE 512
283 #define NO_FUTURE_ID -1
284
285 enum {
286 FEVENT_CREATE,
287 FEVENT_COMPLETE,
288 FEVENT_START_WORK,
289 FEVENT_START_RTONLY_WORK,
290 FEVENT_RESUME_WORK,
291 FEVENT_END_WORK,
292 FEVENT_RTCALL_ATOMIC,
293 FEVENT_HANDLE_RTCALL_ATOMIC,
294 FEVENT_RTCALL,
295 FEVENT_RTCALL_TOUCH,
296 FEVENT_HANDLE_RTCALL,
297 FEVENT_RTCALL_RESULT,
298 FEVENT_HANDLE_RTCALL_RESULT,
299 FEVENT_RTCALL_ABORT,
300 FEVENT_HANDLE_RTCALL_ABORT,
301 FEVENT_RTCALL_SUSPEND,
302 FEVENT_OVERFLOW,
303 FEVENT_TOUCH_PAUSE,
304 FEVENT_TOUCH_RESUME,
305 FEVENT_MISSING,
306 FEVENT_STOP_TRACE,
307 _FEVENT_COUNT_
308 };
309
310 static const char * const fevent_strs[] = { "create", "complete",
311 "start-work", "start-0-work", "start-overflow-work",
312 "end-work",
313 "sync", "sync", "block", "touch", "block",
314 "result", "result", "abort", "abort",
315 "suspend", "overflow",
316 "touch-pause", "touch-resume", "missing", "stop-trace" };
317 static const char * const fevent_long_strs[] = { "created", "completed",
318 "started work", "started (process 0, only)", "started (overflow)",
319 "ended work",
320 "synchronizing with process 0", "synchronizing",
321 "BLOCKING on process 0", "touching future", "HANDLING",
322 "result from process 0", "result determined",
323 "abort from process 0", "abort determined",
324 "suspended", "overflow",
325 "paused for touch", "resumed for touch",
326 "events missing", "stop future tracing" };
327
328
329 typedef struct Scheme_Future_State {
330 int thread_pool_size;
331 Scheme_Future_Thread_State **pool_threads;
332 int busy_thread_count;
333
334 void *signal_handle;
335
336 int future_queue_count;
337 future_t *future_queue;
338 future_t *future_queue_end;
339 future_t *future_waiting_atomic;
340 future_t *future_waiting_lwc;
341 future_t *future_waiting_touch;
342 int next_futureid;
343
344 mzrt_mutex *future_mutex; /* BEWARE: don't allocate while holding this lock */
345 mzrt_sema *future_pending_sema;
346 mzrt_sema *gc_ok_c;
347 mzrt_sema *gc_done_c;
348
349 int gc_not_ok, wait_for_gc, need_gc_ok_post, need_gc_done_post;
350 int abort_all_futures;
351
352 int *gc_counter_ptr;
353
354 int future_threads_created;
355
356 Fevent_Buffer runtime_fevents;
357 Scheme_Object **fevent_syms;
358 Scheme_Struct_Type *fevent_prefab;
359 } Scheme_Future_State;
360
361
362 THREAD_LOCAL_DECL(static Scheme_Future_State *scheme_future_state);
363 THREAD_LOCAL_DECL(void *jit_future_storage[4]);
364
365 #ifdef MZ_PRECISE_GC
366 THREAD_LOCAL_DECL(extern uintptr_t GC_gen0_alloc_page_ptr);
367 THREAD_LOCAL_DECL(extern uintptr_t GC_gen0_alloc_page_end);
368 THREAD_LOCAL_DECL(extern int GC_gen0_alloc_only);
369 #endif
370
371 static void start_gc_not_ok(Scheme_Future_State *fs);
372 static void end_gc_not_ok(Scheme_Future_Thread_State *fts,
373 Scheme_Future_State *fs,
374 Scheme_Object **current_rs);
375
376 static void *worker_thread_future_loop(void *arg);
377 static void invoke_rtcall(Scheme_Future_State * volatile fs, future_t * volatile future, volatile int is_atomic);
378 static future_t *enqueue_future(Scheme_Future_State *fs, future_t *ft);;
379 static future_t *get_pending_future(Scheme_Future_State *fs);
380 static void receive_special_result(future_t *f, Scheme_Object *retval, int clear);
381 static void send_special_result(future_t *f, Scheme_Object *retval);
382 static Scheme_Object *_apply_future_lw(future_t *ft);
383 static Scheme_Object *apply_future_lw(future_t *ft);
384 static int fsemaphore_ready(Scheme_Object *obj);
385 static void init_fevent(Fevent_Buffer *b);
386 static void free_fevent(Fevent_Buffer *b);
387 static int future_in_runtime(Scheme_Future_State *fs, future_t * volatile ft, int what);
388 static Scheme_Object *would_be_future(int argc, Scheme_Object *argv[]);
389 static void push_suspended_lw(Scheme_Future_State *fs, future_t *ft);
390 static void pop_suspended_lw(Scheme_Future_State *fs, future_t *ft);
391
392 static Scheme_Object *bad_multi_result_proc;
393 static Scheme_Object *bad_multi_result(int argc, Scheme_Object **argv);
394 static Scheme_Object *reset_future_logs_for_tracking(int argc, Scheme_Object *argv[]);
395 static Scheme_Object *mark_future_trace_end(int argc, Scheme_Object *argv[]);
396
397 READ_ONLY static int cpucount;
398
399 #ifdef MZ_PRECISE_GC
400 # define scheme_future_setjmp(newbuf) scheme_jit_setjmp((newbuf).jb)
401 # define scheme_future_longjmp(newbuf, v) scheme_jit_longjmp((newbuf).jb, v)
402 #else
403 # define scheme_future_setjmp(newbuf) scheme_setjmp(newbuf)
404 # define scheme_future_longjmp(newbuf, v) scheme_longjmp(newbuf, v)
405 #endif
406
407 #ifndef MZ_PRECISE_GC
408 # define GC_set_accounting_custodian(c) /* nothing */
409 # define GC_register_thread(t, c) /* nothing */
410 # define GC_register_new_thread(t, c) /* nothing */
411 #endif
412
413 /**********************************************************************/
414 /* Arguments for a newly created future thread */
415 /**********************************************************************/
416
417 typedef struct future_thread_params_t {
418 mzrt_sema *ready_sema;
419 struct NewGC *shared_GC;
420 Scheme_Future_State *fs;
421 Scheme_Future_Thread_State *fts;
422 Scheme_Object **runstack_start;
423
424 Scheme_Object ***scheme_current_runstack_ptr;
425 Scheme_Object ***scheme_current_runstack_start_ptr;
426 Scheme_Thread **current_thread_ptr;
427 void **jit_future_storage_ptr;
428 Scheme_Current_LWC *lwc;
429 } future_thread_params_t;
430
431 /* Set differently above when futures are not supported */
432 #define SCHEME_FUTURE_PRIM_IS_NARY_INLINED SCHEME_PRIM_IS_NARY_INLINED
433 #define SCHEME_FUTURE_PRIM_IS_UNARY_INLINED SCHEME_PRIM_IS_UNARY_INLINED
434
435 #endif
436
437 /**********************************************************************/
438 /* Plumbing for Racket initialization */
439 /**********************************************************************/
440
441 /* Invoked by the runtime on startup to make primitives known */
scheme_init_futures(Scheme_Startup_Env * newenv)442 void scheme_init_futures(Scheme_Startup_Env *newenv)
443 {
444 Scheme_Object *p;
445
446 scheme_addto_prim_instance("future?",
447 scheme_make_folding_prim(future_p,
448 "future?",
449 1,
450 1,
451 1),
452 newenv);
453
454 p = scheme_make_prim_w_arity(scheme_future, "future", 1, 1);
455 SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
456 scheme_addto_prim_instance("future", p, newenv);
457
458 scheme_addto_prim_instance("processor-count",
459 scheme_make_prim_w_arity(processor_count,
460 "processor-count",
461 0,
462 0),
463 newenv);
464
465 p = scheme_make_prim_w_arity(touch, "touch", 1, 1);
466 SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
467 scheme_addto_prim_instance("touch", p, newenv);
468
469 p = scheme_make_immed_prim(scheme_current_future,
470 "current-future",
471 0,
472 0);
473 SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_NARY_INLINED);
474 scheme_addto_prim_instance("current-future", p, newenv);
475
476 p = scheme_make_immed_prim(scheme_fsemaphore_p,
477 "fsemaphore?",
478 1,
479 1);
480
481 SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
482 scheme_addto_prim_instance("fsemaphore?", p, newenv);
483
484 p = scheme_make_immed_prim(make_fsemaphore,
485 "make-fsemaphore",
486 1,
487 1);
488 SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
489 scheme_addto_prim_instance("make-fsemaphore", p, newenv);
490
491 p = scheme_make_immed_prim(scheme_fsemaphore_count,
492 "fsemaphore-count",
493 1,
494 1);
495 SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
496 scheme_addto_prim_instance("fsemaphore-count", p, newenv);
497
498 p = scheme_make_immed_prim(scheme_fsemaphore_wait,
499 "fsemaphore-wait",
500 1,
501 1);
502 SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
503 scheme_addto_prim_instance("fsemaphore-wait", p, newenv);
504
505 p = scheme_make_immed_prim(scheme_fsemaphore_post,
506 "fsemaphore-post",
507 1,
508 1);
509 SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
510 scheme_addto_prim_instance("fsemaphore-post", p, newenv);
511
512 p = scheme_make_immed_prim(scheme_fsemaphore_try_wait,
513 "fsemaphore-try-wait?",
514 1,
515 1);
516 SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
517 scheme_addto_prim_instance("fsemaphore-try-wait?", p, newenv);
518
519 ADD_PRIM_W_ARITY("would-be-future", would_be_future, 1, 1, newenv);
520 ADD_PRIM_W_ARITY("futures-enabled?", futures_enabled, 0, 0, newenv);
521 ADD_PRIM_W_ARITY("reset-future-logs-for-tracing!", reset_future_logs_for_tracking, 0, 0, newenv);
522 ADD_PRIM_W_ARITY("mark-future-trace-end!", mark_future_trace_end, 0, 0, newenv);
523 }
524
525 #ifdef MZ_USE_FUTURES
526
scheme_init_futures_once()527 void scheme_init_futures_once()
528 {
529 REGISTER_SO(bad_multi_result_proc);
530 bad_multi_result_proc = scheme_make_prim_w_arity(bad_multi_result, "bad-multi-result", 0, -1);
531 }
532
scheme_init_futures_per_place()533 void scheme_init_futures_per_place()
534 {
535 futures_init();
536 }
537
set_fts_thread(Scheme_Object * ignored)538 static Scheme_Object *set_fts_thread(Scheme_Object *ignored)
539 {
540 scheme_future_thread_state->thread = scheme_current_thread;
541 return ignored;
542 }
543
futures_init(void)544 void futures_init(void)
545 {
546 Scheme_Future_State *fs;
547 Scheme_Future_Thread_State *rt_fts;
548 Scheme_Future_Thread_State **ftss;
549 void *hand;
550 Scheme_Object **syms, *sym;
551 Scheme_Struct_Type *stype;
552 int pool_size;
553
554 if (cpucount < 1)
555 cpucount = rktio_processor_count(scheme_rktio);
556
557 fs = (Scheme_Future_State *)malloc(sizeof(Scheme_Future_State));
558 memset(fs, 0, sizeof(Scheme_Future_State));
559 scheme_future_state = fs;
560
561 pool_size = cpucount * 2;
562 ftss = (Scheme_Future_Thread_State **)malloc(pool_size * sizeof(Scheme_Future_Thread_State*));
563 memset(ftss, 0, pool_size * sizeof(Scheme_Future_Thread_State*));
564 fs->pool_threads = ftss;
565 fs->thread_pool_size = pool_size;
566
567 mzrt_mutex_create(&fs->future_mutex);
568 mzrt_sema_create(&fs->future_pending_sema, 0);
569 mzrt_sema_create(&fs->gc_ok_c, 0);
570 mzrt_sema_create(&fs->gc_done_c, 0);
571 fs->gc_counter_ptr = &scheme_did_gc_count;
572
573 /* Create a 'dummy' FTS for the RT thread */
574 rt_fts = alloc_future_thread_state();
575 rt_fts->is_runtime_thread = 1;
576 rt_fts->gen0_size = 1;
577 scheme_future_thread_state = rt_fts;
578
579 scheme_add_swap_callback(set_fts_thread, scheme_false);
580 set_fts_thread(scheme_false);
581
582 REGISTER_SO(fs->future_queue);
583 REGISTER_SO(fs->future_queue_end);
584 REGISTER_SO(fs->future_waiting_atomic);
585 REGISTER_SO(fs->future_waiting_lwc);
586 REGISTER_SO(fs->future_waiting_touch);
587 REGISTER_SO(fs->fevent_syms);
588 REGISTER_SO(fs->fevent_prefab);
589 REGISTER_SO(jit_future_storage);
590
591 hand = scheme_get_signal_handle();
592 fs->signal_handle = hand;
593
594 syms = MALLOC_N(Scheme_Object*, _FEVENT_COUNT_);
595 fs->fevent_syms = syms;
596 sym = scheme_intern_symbol(fevent_strs[FEVENT_HANDLE_RTCALL_ATOMIC]);
597 syms[FEVENT_HANDLE_RTCALL_ATOMIC] = sym;
598 sym = scheme_intern_symbol(fevent_strs[FEVENT_HANDLE_RTCALL]);
599 syms[FEVENT_HANDLE_RTCALL] = sym;
600
601 sym = scheme_intern_symbol("future-event");
602 stype = scheme_lookup_prefab_type(sym, 6);
603 fs->fevent_prefab = stype;
604
605 init_fevent(&fs->runtime_fevents);
606
607 #ifdef MZ_PRECISE_GC
608 register_traversers();
609 #endif
610 }
611
init_future_thread(Scheme_Future_State * fs,int i)612 static void init_future_thread(Scheme_Future_State *fs, int i)
613 {
614 Scheme_Future_Thread_State *fts;
615 GC_CAN_IGNORE future_thread_params_t params;
616 Scheme_Thread *skeleton;
617 Scheme_Object **runstack_start;
618 mz_proc_thread *t;
619
620 /* Create the worker thread pool. These threads will
621 'queue up' and wait for futures to become available. */
622
623 fts = alloc_future_thread_state();
624 fts->id = i;
625
626 fts->gen0_size = 1;
627
628 fts->use_fevents1 = 1;
629 init_fevent(&fts->fevents1);
630 init_fevent(&fts->fevents2);
631
632 params.shared_GC = GC_instance;
633 params.fts = fts;
634 params.fs = fs;
635
636 /* Make enough of a thread record to deal with multiple values
637 and to support GC and memory accounting. */
638 skeleton = MALLOC_ONE_TAGGED(Scheme_Thread);
639 skeleton->so.type = scheme_thread_type;
640 GC_register_new_thread(skeleton, main_custodian);
641 skeleton->running = MZTHREAD_RUNNING;
642
643 fts->thread = skeleton;
644
645 {
646 Scheme_Object **rs_start;
647 intptr_t init_runstack_size = FUTURE_RUNSTACK_SIZE;
648 rs_start = scheme_alloc_runstack(init_runstack_size);
649 runstack_start = rs_start;
650 fts->runstack_size = init_runstack_size;
651 }
652
653 /* Fill in GCable values just before creating the thread,
654 because the GC ignores `params': */
655 params.runstack_start = runstack_start;
656
657 mzrt_sema_create(¶ms.ready_sema, 0);
658 t = mz_proc_thread_create_w_stacksize(worker_thread_future_loop, ¶ms, FUTURE_C_STACK_SIZE);
659 mzrt_sema_wait(params.ready_sema);
660 mzrt_sema_destroy(params.ready_sema);
661 params.ready_sema = NULL;
662
663 fts->t = t;
664
665 scheme_register_static(params.scheme_current_runstack_ptr, sizeof(void*));
666 scheme_register_static(params.scheme_current_runstack_start_ptr, sizeof(void*));
667 scheme_register_static(params.jit_future_storage_ptr, 4 * sizeof(void*));
668 scheme_register_static(params.current_thread_ptr, sizeof(void*));
669
670 fs->pool_threads[i] = fts;
671 }
672
alloc_future_thread_state()673 static Scheme_Future_Thread_State *alloc_future_thread_state()
674 {
675 Scheme_Future_Thread_State *fts;
676
677 fts = (Scheme_Future_Thread_State *)malloc(sizeof(Scheme_Future_Thread_State));
678 memset(fts, 0, sizeof(Scheme_Future_Thread_State));
679 scheme_register_static(&fts->thread, sizeof(Scheme_Thread*));
680
681 return fts;
682 }
683
scheme_end_futures_per_place()684 void scheme_end_futures_per_place()
685 {
686 Scheme_Future_State *fs = scheme_future_state;
687
688 if (fs) {
689 int i;
690
691 mzrt_mutex_lock(fs->future_mutex);
692 fs->abort_all_futures = 1;
693 fs->wait_for_gc = 1;
694 mzrt_mutex_unlock(fs->future_mutex);
695
696 /* post enough semas to ensure that every future
697 wakes up and tries to disable GC: */
698 for (i = 0; i < fs->thread_pool_size; i++) {
699 if (fs->pool_threads[i]) {
700 mzrt_sema_post(fs->future_pending_sema);
701 mzrt_sema_post(fs->pool_threads[i]->worker_can_continue_sema);
702 }
703 }
704
705 scheme_future_block_until_gc();
706
707 /* wait for all future threads to end: */
708 for (i = 0; i < fs->thread_pool_size; i++) {
709 if (fs->pool_threads[i]) {
710 (void)mz_proc_thread_wait(fs->pool_threads[i]->t);
711
712 free_fevent(&fs->pool_threads[i]->fevents1);
713 free_fevent(&fs->pool_threads[i]->fevents2);
714
715 free(fs->pool_threads[i]);
716 }
717 }
718
719 free_fevent(&fs->runtime_fevents);
720
721 mzrt_mutex_destroy(fs->future_mutex);
722 mzrt_sema_destroy(fs->future_pending_sema);
723 mzrt_sema_destroy(fs->gc_ok_c);
724 mzrt_sema_destroy(fs->gc_done_c);
725
726 free(fs->pool_threads);
727 free(fs);
728
729 scheme_future_state = NULL;
730 }
731 }
732
check_future_thread_creation(Scheme_Future_State * fs)733 static void check_future_thread_creation(Scheme_Future_State *fs)
734 {
735 if (!fs->future_threads_created && !fs->future_queue_count)
736 return;
737
738 if (fs->future_threads_created < fs->thread_pool_size) {
739 int count, busy;
740
741 mzrt_mutex_lock(fs->future_mutex);
742 count = fs->future_queue_count;
743 busy = fs->busy_thread_count;
744 mzrt_mutex_unlock(fs->future_mutex);
745
746 if (count >= (fs->future_threads_created - busy)) {
747 init_future_thread(fs, fs->future_threads_created);
748 fs->future_threads_created++;
749 }
750 }
751 }
752
start_gc_not_ok(Scheme_Future_State * fs)753 static void start_gc_not_ok(Scheme_Future_State *fs)
754 /* must have mutex_lock */
755 {
756 Scheme_Thread *p;
757
758 while (fs->wait_for_gc) {
759 int quit = fs->abort_all_futures;
760 fs->need_gc_done_post++;
761 mzrt_mutex_unlock(fs->future_mutex);
762 if (quit) mz_proc_thread_exit(NULL);
763 mzrt_sema_wait(fs->gc_done_c);
764 mzrt_mutex_lock(fs->future_mutex);
765 }
766
767 fs->gc_not_ok++;
768
769 #ifdef MZ_PRECISE_GC
770 {
771 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
772 if (fts->worker_gc_counter != *fs->gc_counter_ptr) {
773 GC_gen0_alloc_page_ptr = 0; /* forces future to ask for memory */
774 GC_gen0_alloc_page_end = 0;
775 fts->gen0_start = 0;
776 if (fts->gen0_size > 1)
777 fts->gen0_size >>= 1;
778 fts->worker_gc_counter = *fs->gc_counter_ptr;
779 }
780 }
781 #endif
782
783 p = scheme_current_thread;
784 MZ_RUNSTACK = p->runstack;
785 MZ_RUNSTACK_START = p->runstack_start;
786 }
787
end_gc_not_ok(Scheme_Future_Thread_State * fts,Scheme_Future_State * fs,Scheme_Object ** current_rs)788 static void end_gc_not_ok(Scheme_Future_Thread_State *fts,
789 Scheme_Future_State *fs,
790 Scheme_Object **current_rs)
791 /* must have mutex_lock */
792 {
793 Scheme_Thread *p;
794
795 scheme_set_runstack_limits(MZ_RUNSTACK_START,
796 fts->runstack_size,
797 (current_rs
798 ? current_rs XFORM_OK_MINUS MZ_RUNSTACK_START
799 : fts->runstack_size),
800 fts->runstack_size);
801 p = scheme_current_thread;
802 p->runstack = MZ_RUNSTACK;
803 p->runstack_start = MZ_RUNSTACK_START;
804 p->cont_mark_stack = MZ_CONT_MARK_STACK;
805 p->cont_mark_pos = MZ_CONT_MARK_POS;
806
807 /* To ensure that memory accounting goes through the thread
808 record, clear these roots: */
809 MZ_RUNSTACK = NULL;
810 MZ_RUNSTACK_START = NULL;
811
812 /* FIXME: clear scheme_current_thread->ku.multiple.array ? */
813
814 --fs->gc_not_ok;
815 if (fs->need_gc_ok_post) {
816 fs->need_gc_ok_post = 0;
817 mzrt_sema_post(fs->gc_ok_c);
818 }
819 }
820
scheme_future_block_until_gc()821 void scheme_future_block_until_gc()
822 {
823 Scheme_Future_State *fs = scheme_future_state;
824 int i;
825
826 if (!fs) return;
827 if (!fs->future_threads_created) return;
828
829 mzrt_mutex_lock(fs->future_mutex);
830 fs->wait_for_gc = 1;
831 mzrt_mutex_unlock(fs->future_mutex);
832
833 for (i = 0; i < fs->thread_pool_size; i++) {
834 if (fs->pool_threads[i]) {
835 *(fs->pool_threads[i]->need_gc_pointer) = 1;
836 if (*(fs->pool_threads[i]->fuel_pointer)) {
837 *(fs->pool_threads[i]->fuel_pointer) = 0;
838 *(fs->pool_threads[i]->stack_boundary_pointer) += FUTURE_C_STACK_SIZE;
839 }
840 }
841 }
842
843 if (cpucount > 1) {
844 /* In principle, we need some sort of fence to ensure that future
845 threads see the change to the fuel pointer. The MFENCE
846 instruction would do that, but it requires SSE2. The CPUID
847 instruction is a non-privileged serializing instruction that
848 should be available on any x86 platform that runs threads. */
849 #if defined(i386) || defined(__i386__) || defined(__x86_64) || defined(__x86_64__) || defined(__amd64__)
850 # ifdef _MSC_VER
851 {
852 int r[4];
853 __cpuid(r, 0);
854 }
855 # else
856 {
857 # if defined(i386) || defined(__i386__)
858 # define MZ_PUSH_EBX "pushl %%ebx"
859 # define MZ_POP_EBX "popl %%ebx"
860 # endif
861 # if defined(__x86_64) || defined(__x86_64__) || defined(__amd64__)
862 # define MZ_PUSH_EBX "pushq %%rbx"
863 # define MZ_POP_EBX "popq %%rbx"
864 # endif
865 int _eax, _ebx, _ecx, _edx, op = 0;
866 /* we can't always use EBX, so save and restore it: */
867 __asm__ (MZ_PUSH_EBX "\n\t"
868 "cpuid \n\t"
869 "movl %%ebx, %1 \n\t"
870 MZ_POP_EBX
871 : "=a" (_eax), "=r" (_ebx), "=c" (_ecx), "=d" (_edx) : "a" (op));
872 }
873 # undef MZ_PUSH_EBX
874 # undef MZ_POP_EBX
875 # endif
876 #endif
877 }
878
879 mzrt_mutex_lock(fs->future_mutex);
880 while (fs->gc_not_ok) {
881 fs->need_gc_ok_post = 1;
882 mzrt_mutex_unlock(fs->future_mutex);
883 mzrt_sema_wait(fs->gc_ok_c);
884 mzrt_mutex_lock(fs->future_mutex);
885 }
886 mzrt_mutex_unlock(fs->future_mutex);
887 }
888
scheme_future_continue_after_gc()889 void scheme_future_continue_after_gc()
890 {
891 Scheme_Future_State *fs = scheme_future_state;
892 int i;
893
894 if (!fs) return;
895
896 for (i = 0; i < fs->thread_pool_size; i++) {
897 if (fs->pool_threads[i]) {
898 *(fs->pool_threads[i]->need_gc_pointer) = 0;
899
900 if (!fs->pool_threads[i]->thread->current_ft
901 || scheme_custodian_is_available(fs->pool_threads[i]->thread->current_ft->cust)) {
902 *(fs->pool_threads[i]->fuel_pointer) = 1;
903 *(fs->pool_threads[i]->stack_boundary_pointer) -= FUTURE_C_STACK_SIZE;
904 } else {
905 /* leave fuel exhausted, which will force the thread into a slow
906 path when it resumes to suspend the computation */
907 }
908 }
909 }
910
911 mzrt_mutex_lock(fs->future_mutex);
912 fs->wait_for_gc = 0;
913 while (fs->need_gc_done_post) {
914 --fs->need_gc_done_post;
915 mzrt_sema_post(fs->gc_done_c);
916 }
917 mzrt_mutex_unlock(fs->future_mutex);
918 }
919
scheme_future_gc_pause()920 void scheme_future_gc_pause()
921 /* Called in future thread */
922 {
923 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
924 Scheme_Future_State *fs = scheme_future_state;
925
926 mzrt_mutex_lock(fs->future_mutex);
927 end_gc_not_ok(fts, fs, MZ_RUNSTACK);
928 start_gc_not_ok(fs); /* waits until wait_for_gc is 0 */
929 mzrt_mutex_unlock(fs->future_mutex);
930 }
931
scheme_future_check_custodians()932 void scheme_future_check_custodians()
933 {
934 scheme_future_block_until_gc();
935 scheme_future_continue_after_gc();
936 }
937
scheme_future_is_runtime_thread()938 int scheme_future_is_runtime_thread()
939 {
940 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
941 return fts->is_runtime_thread;
942 }
943
944 /**********************************************************************/
945 /* Future-event logging */
946 /**********************************************************************/
947
reset_future_logs_for_tracking(int argc,Scheme_Object ** argv)948 static Scheme_Object *reset_future_logs_for_tracking(int argc, Scheme_Object **argv)
949 {
950 Scheme_Future_State *fs;
951 Scheme_Future_Thread_State *fts;
952 Scheme_Future_Thread_State *rt_fts;
953
954 fs = scheme_future_state;
955 rt_fts = scheme_future_thread_state;
956 if (fs) {
957 int i;
958 mzrt_mutex_lock(fs->future_mutex);
959 init_fevent(&fs->runtime_fevents);
960
961 if (rt_fts) {
962 init_fevent(&rt_fts->fevents1);
963 init_fevent(&rt_fts->fevents2);
964 rt_fts->use_fevents1 = 1;
965 }
966
967 for (i = 0; i < fs->thread_pool_size; i++) {
968 fts = fs->pool_threads[i];
969 if (fts) {
970 init_fevent(&fts->fevents1);
971 init_fevent(&fts->fevents2);
972 fts->use_fevents1 = 1;
973 }
974 }
975 mzrt_mutex_unlock(fs->future_mutex);
976
977 }
978
979 return scheme_void;
980 }
981
get_future_timestamp()982 static double get_future_timestamp() XFORM_SKIP_PROC {
983 #if 1
984 return scheme_get_inexact_milliseconds();
985 #else
986 return 0.0;
987 #endif
988 }
989
init_fevent(Fevent_Buffer * b)990 static void init_fevent(Fevent_Buffer *b) XFORM_SKIP_PROC
991 {
992 if (b->a) free(b->a);
993
994 b->pos = 0;
995 b->overflow = 0;
996 b->a = (Fevent *)malloc(FEVENT_BUFFER_SIZE * sizeof(Fevent));
997 memset(b->a, 0, FEVENT_BUFFER_SIZE * sizeof(Fevent));
998 }
999
free_fevent(Fevent_Buffer * b)1000 static void free_fevent(Fevent_Buffer *b)
1001 {
1002 if (b->a) {
1003 free(b->a);
1004 b->a = NULL;
1005 }
1006 }
1007
record_fevent_with_data(int what,int fid,int data)1008 static void record_fevent_with_data(int what, int fid, int data)
1009 XFORM_SKIP_PROC
1010 {
1011 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
1012 Fevent_Buffer *b;
1013
1014 if (!fts->is_runtime_thread) {
1015 if (fts->use_fevents1)
1016 b = &fts->fevents1;
1017 else
1018 b = &fts->fevents2;
1019 } else
1020 b = &scheme_future_state->runtime_fevents;
1021
1022 b->a[b->pos].timestamp = get_future_timestamp();
1023 b->a[b->pos].what = what;
1024 b->a[b->pos].fid = fid;
1025 b->a[b->pos].data = data;
1026
1027 b->pos++;
1028 if (b->pos == FEVENT_BUFFER_SIZE) {
1029 b->overflow = 1;
1030 b->pos = 0;
1031 }
1032 }
1033
record_fevent(int what,int fid)1034 static void record_fevent(int what, int fid) XFORM_SKIP_PROC
1035 /* call with the lock or in the runtime thread */
1036 {
1037 record_fevent_with_data(what, fid, 0);
1038 }
1039
init_traversal(Fevent_Buffer * b)1040 static void init_traversal(Fevent_Buffer *b)
1041 {
1042 if (b->overflow) {
1043 b->count = FEVENT_BUFFER_SIZE;
1044 b->i = b->pos;
1045 } else {
1046 b->i = 0;
1047 b->count = b->pos;
1048 }
1049 }
1050
end_traversal(Fevent_Buffer * b)1051 static void end_traversal(Fevent_Buffer *b)
1052 {
1053 b->overflow = 0;
1054 b->pos = 0;
1055 }
1056
log_future_event(Scheme_Future_State * fs,const char * msg_str,const char * extra_str,int which,int what,double timestamp,int fid,Scheme_Object * user_data)1057 static void log_future_event(Scheme_Future_State *fs,
1058 const char *msg_str,
1059 const char *extra_str,
1060 int which,
1061 int what,
1062 double timestamp,
1063 int fid,
1064 Scheme_Object *user_data)
1065 {
1066 Scheme_Object *data, *v;
1067 Scheme_Logger *fl;
1068
1069 fl = scheme_get_future_logger();
1070 if (!scheme_log_level_p(fl, SCHEME_LOG_DEBUG))
1071 return;
1072
1073 data = scheme_make_blank_prefab_struct_instance(fs->fevent_prefab);
1074 if (what == FEVENT_MISSING || fid == NO_FUTURE_ID)
1075 ((Scheme_Structure *)data)->slots[0] = scheme_false;
1076 else
1077 ((Scheme_Structure *)data)->slots[0] = scheme_make_integer(fid);
1078 ((Scheme_Structure *)data)->slots[1] = scheme_make_integer((which+1));
1079 v = fs->fevent_syms[what];
1080 if (!v) {
1081 v = scheme_intern_symbol(fevent_strs[what]);
1082 fs->fevent_syms[what] = v;
1083 }
1084 ((Scheme_Structure *)data)->slots[2] = v;
1085 v = scheme_make_double(timestamp);
1086 ((Scheme_Structure *)data)->slots[3] = v;
1087 if (what == FEVENT_HANDLE_RTCALL || what == FEVENT_HANDLE_RTCALL_ATOMIC) {
1088 v = scheme_intern_symbol(extra_str);
1089 ((Scheme_Structure *)data)->slots[4] = v;
1090 } else
1091 ((Scheme_Structure *)data)->slots[4] = scheme_false;
1092
1093 /* User data (target fid for creates, alloc amount for allocation */
1094 if (!user_data)
1095 user_data = scheme_false;
1096
1097 ((Scheme_Structure *)data)->slots[5] = user_data;
1098
1099 scheme_log_w_data(fl, SCHEME_LOG_DEBUG, 0,
1100 data,
1101 msg_str,
1102 fid,
1103 which+1,
1104 fevent_long_strs[what],
1105 extra_str,
1106 timestamp);
1107
1108 }
1109
mark_future_trace_end(int argc,Scheme_Object ** argv)1110 static Scheme_Object *mark_future_trace_end(int argc, Scheme_Object **argv)
1111 {
1112 Scheme_Future_State *fs;
1113 fs = scheme_future_state;
1114 log_future_event(fs,
1115 "id %d, process %d: %s: %s; time: %f",
1116 "tracing",
1117 -1,
1118 FEVENT_STOP_TRACE,
1119 get_future_timestamp(),
1120 0,
1121 0);
1122
1123 return scheme_void;
1124 }
1125
log_overflow_event(Scheme_Future_State * fs,int which,double timestamp)1126 static void log_overflow_event(Scheme_Future_State *fs, int which, double timestamp)
1127 {
1128 log_future_event(fs,
1129 "id ??%-, process %d: %s%s; before time: %f",
1130 "",
1131 which,
1132 FEVENT_MISSING,
1133 timestamp,
1134 0,
1135 NULL);
1136 }
1137
flush_future_logs(Scheme_Future_State * fs)1138 static void flush_future_logs(Scheme_Future_State *fs)
1139 {
1140 Scheme_Future_Thread_State *fts;
1141 double t, min_t;
1142 int i, min_which, min_set;
1143 Fevent_Buffer *b, *min_b;
1144 Scheme_Object *data_val;
1145
1146 if (scheme_log_level_p(scheme_get_future_logger(), SCHEME_LOG_DEBUG)) {
1147 /* Hold lock while swapping buffers: */
1148 mzrt_mutex_lock(fs->future_mutex);
1149 for (i = 0; i < fs->thread_pool_size; i++) {
1150 fts = fs->pool_threads[i];
1151 if (fts) {
1152 fts->use_fevents1 = !fts->use_fevents1;
1153 if (fts->use_fevents1)
1154 b = &fts->fevents2;
1155 else
1156 b = &fts->fevents1;
1157 init_traversal(b);
1158 }
1159 }
1160 mzrt_mutex_unlock(fs->future_mutex);
1161 init_traversal(&fs->runtime_fevents);
1162
1163 if (fs->runtime_fevents.overflow)
1164 log_overflow_event(fs, -1, fs->runtime_fevents.a[fs->runtime_fevents.i].timestamp);
1165 for (i = 0; i < fs->thread_pool_size; i++) {
1166 fts = fs->pool_threads[i];
1167 if (fts) {
1168 if (fts->use_fevents1)
1169 b = &fts->fevents2;
1170 else
1171 b = &fts->fevents1;
1172 if (b->overflow)
1173 log_overflow_event(fs, i, b->a[b->i].timestamp);
1174 }
1175 }
1176
1177 while (1) {
1178 min_set = 0;
1179 min_t = 0;
1180 min_b = NULL;
1181 min_which = -1;
1182 if (fs->runtime_fevents.count) {
1183 t = fs->runtime_fevents.a[fs->runtime_fevents.i].timestamp;
1184 if (!min_set || (t < min_t)) {
1185 min_t = t;
1186 min_b = &fs->runtime_fevents;
1187 min_set = 1;
1188 }
1189 }
1190 for (i = 0; i < fs->thread_pool_size; i++) {
1191 fts = fs->pool_threads[i];
1192 if (fts) {
1193 if (fts->use_fevents1)
1194 b = &fts->fevents2;
1195 else
1196 b = &fts->fevents1;
1197
1198 if (b->count) {
1199 t = b->a[b->i].timestamp;
1200 if (!min_set || (t < min_t)) {
1201 min_t = t;
1202 min_b = b;
1203 min_which = i;
1204 min_set = 1;
1205 }
1206 }
1207 }
1208 }
1209
1210 if (!min_b)
1211 break;
1212
1213 data_val = scheme_make_integer(min_b->a[min_b->i].data);
1214 log_future_event(fs,
1215 "id %d, process %d: %s%s; time: %f",
1216 "",
1217 min_which,
1218 min_b->a[min_b->i].what,
1219 min_b->a[min_b->i].timestamp,
1220 min_b->a[min_b->i].fid,
1221 data_val);
1222
1223 --min_b->count;
1224 min_b->i++;
1225 if (min_b->i == FEVENT_BUFFER_SIZE)
1226 min_b->i = 0;
1227 }
1228
1229 for (i = 0; i < fs->thread_pool_size; i++) {
1230 fts = fs->pool_threads[i];
1231 if (fts) {
1232 if (fts->use_fevents1)
1233 b = &fts->fevents2;
1234 else
1235 b = &fts->fevents1;
1236 end_traversal(b);
1237 }
1238 }
1239 end_traversal(&fs->runtime_fevents);
1240 }
1241 }
1242
1243 /**********************************************************************/
1244 /* Primitive implementations */
1245 /**********************************************************************/
1246
1247 void scheme_wrong_contract_from_ft(const char *who, const char *expected_type, int what, int argc, Scheme_Object **argv);
1248
1249 #define SCHEME_WRONG_CONTRACT_MAYBE_IN_FT(who, expected_type, what, argc, argv) \
1250 if (scheme_use_rtcall) \
1251 scheme_wrong_contract_from_ft(who, expected_type, what, argc, argv); \
1252 else \
1253 scheme_wrong_contract(who, expected_type, what, argc, argv);
1254
1255
make_future(Scheme_Object * lambda,int enqueue,future_t * cur_ft)1256 static Scheme_Object *make_future(Scheme_Object *lambda, int enqueue, future_t *cur_ft)
1257 /* Called in runtime thread --- as atomic on behalf of a future thread
1258 if `lambda' is known to be a thunk */
1259 {
1260 Scheme_Future_State *fs = scheme_future_state;
1261 int futureid;
1262 future_t *ft;
1263 Scheme_Native_Closure *nc;
1264 Scheme_Native_Lambda *ncd;
1265 Scheme_Custodian *c;
1266
1267 if (SAME_TYPE(SCHEME_TYPE(lambda), scheme_native_closure_type)) {
1268 nc = (Scheme_Native_Closure*)lambda;
1269 ncd = nc->code;
1270 } else {
1271 nc = NULL;
1272 ncd = NULL;
1273 }
1274
1275 /* Create the future descriptor and add to the queue as 'pending' */
1276 ft = MALLOC_ONE_TAGGED(future_t);
1277 ft->so.type = scheme_future_type;
1278
1279 ft->orig_lambda = lambda;
1280 ft->status = PENDING;
1281
1282 if (scheme_current_thread->mref)
1283 c = scheme_custodian_extract_reference(scheme_current_thread->mref);
1284 else {
1285 /* must be in a future thread (if futures can be created in future threads) */
1286 c = scheme_current_thread->current_ft->cust;
1287 }
1288 ft->cust = c;
1289
1290 /* JIT the code if not already JITted */
1291 if (ncd) {
1292 scheme_jit_now(lambda);
1293
1294 if (ncd->max_let_depth > FUTURE_RUNSTACK_SIZE * sizeof(void*)) {
1295 /* Can't even call it in a future thread */
1296 ft->status = PENDING_OVERSIZE;
1297 }
1298 } else
1299 ft->status = PENDING_OVERSIZE;
1300
1301 mzrt_mutex_lock(fs->future_mutex);
1302 futureid = ++fs->next_futureid;
1303 ft->id = futureid;
1304 record_fevent_with_data(FEVENT_CREATE, (cur_ft ? cur_ft->id : NO_FUTURE_ID), futureid);
1305 if (enqueue) {
1306 if (ft->status != PENDING_OVERSIZE)
1307 enqueue_future(fs, ft);
1308 }
1309
1310 mzrt_mutex_unlock(fs->future_mutex);
1311 if (enqueue)
1312 check_future_thread_creation(fs);
1313
1314 return (Scheme_Object*)ft;
1315 }
1316
scheme_can_apply_native_in_future(Scheme_Object * proc)1317 int scheme_can_apply_native_in_future(Scheme_Object *proc)
1318 XFORM_SKIP_PROC /* can be called from future thread */
1319 {
1320 return (((Scheme_Native_Closure *)proc)->code->max_let_depth < FUTURE_RUNSTACK_SIZE * sizeof(void*));
1321 }
1322
do_make_future(int argc,Scheme_Object * argv[],future_t * cur_ft)1323 static Scheme_Object *do_make_future(int argc, Scheme_Object *argv[], future_t *cur_ft)
1324 {
1325 Scheme_Future_State *fs;
1326 scheme_check_proc_arity("future", 0, 0, argc, argv);
1327
1328 fs = scheme_future_state;
1329 flush_future_logs(fs);
1330
1331 return make_future(argv[0], 1, cur_ft);
1332 }
1333
scheme_future(int argc,Scheme_Object * argv[])1334 Scheme_Object *scheme_future(int argc, Scheme_Object *argv[])
1335 XFORM_SKIP_PROC /* can be called from future thread */
1336 {
1337 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
1338 if (fts->is_runtime_thread) {
1339 return do_make_future(argc, argv, (scheme_current_thread ? scheme_current_thread->current_ft : NULL));
1340 }
1341 else {
1342 Scheme_Object *proc = argv[0];
1343 #ifdef MZ_PRECISE_GC
1344 if (SAME_TYPE(SCHEME_TYPE(proc), scheme_native_closure_type)
1345 && scheme_native_arity_check(proc, 0)
1346 && (((Scheme_Native_Closure *)proc)->code->start_code != scheme_on_demand_jit_code)
1347 && scheme_can_apply_native_in_future(proc)) {
1348 /* try to allocate a future in the future thread */
1349 future_t *ft;
1350 ft = MALLOC_ONE_TAGGED(future_t);
1351 if (ft) {
1352 future_t *cur_ft = scheme_current_thread->current_ft;
1353 Scheme_Future_State *fs = scheme_future_state;
1354
1355 ft->so.type = scheme_future_type;
1356 ft->orig_lambda = proc;
1357 ft->status = PENDING;
1358 ft->cust = scheme_current_thread->current_ft->cust;
1359
1360 mzrt_mutex_lock(fs->future_mutex);
1361 ft->id = ++fs->next_futureid;
1362 record_fevent_with_data(FEVENT_CREATE, (cur_ft ? cur_ft->id : NO_FUTURE_ID), ft->id);
1363 enqueue_future(fs, ft);
1364 mzrt_mutex_unlock(fs->future_mutex);
1365
1366 return (Scheme_Object *)ft;
1367 } else {
1368 /* It would be nice to encourage allocation of a page for
1369 the future thread in this case, since it might try to
1370 allocate more futures. */
1371 return scheme_rtcall_make_future(proc);
1372 }
1373 } else {
1374 return scheme_rtcall_make_future(proc);
1375 }
1376 #else
1377 /* future-local allocation is not supported */
1378 return scheme_rtcall_make_future(proc);
1379 #endif
1380 }
1381 }
1382
would_be_future(int argc,Scheme_Object * argv[])1383 static Scheme_Object *would_be_future(int argc, Scheme_Object *argv[])
1384 /* Called in runtime thread */
1385 {
1386 future_t *ft;
1387 Scheme_Future_Thread_State *fts;
1388 scheme_check_proc_arity("would-be-future", 0, 0, argc, argv);
1389 fts = scheme_future_thread_state;
1390
1391 ft = (future_t*)make_future(argv[0], 0, (fts->thread ? fts->thread->current_ft : NULL));
1392 ft->in_tracing_mode = 1;
1393 ft->fts = scheme_future_thread_state;
1394
1395 return (Scheme_Object*)ft;
1396 }
1397
run_would_be_future(future_t * ft)1398 static void run_would_be_future(future_t *ft)
1399 {
1400 mz_jmp_buf newbuf, *savebuf;
1401 Scheme_Thread *p;
1402 Scheme_Future_State *fs;
1403 int aborted = 0;
1404
1405 p = scheme_current_thread;
1406 fs = scheme_future_state;
1407
1408 /* Setup the future thread state */
1409 p->futures_slow_path_tracing++;
1410 scheme_use_rtcall++;
1411
1412 savebuf = p->error_buf;
1413 p->error_buf = &newbuf;
1414
1415 /* Run the future */
1416 if (scheme_setjmp(newbuf)) {
1417 aborted = 1;
1418 } else {
1419 future_in_runtime(fs, ft, FEVENT_START_WORK);
1420 }
1421
1422 scheme_use_rtcall--;
1423 p->futures_slow_path_tracing--;
1424 ft->in_tracing_mode = 0;
1425
1426 p->error_buf = savebuf;
1427 if (aborted)
1428 scheme_longjmp(*savebuf, 1);
1429
1430 return;
1431 }
1432
fsemaphore_finalize(void * p,void * data)1433 void fsemaphore_finalize(void *p, void *data)
1434 {
1435 fsemaphore_t *sema;
1436 sema = (fsemaphore_t*)p;
1437 mzrt_mutex_destroy(sema->mut);
1438 }
1439
scheme_make_fsemaphore_inl(Scheme_Object * ready)1440 Scheme_Object *scheme_make_fsemaphore_inl(Scheme_Object *ready)
1441 /* Called in runtime thread */
1442 {
1443 fsemaphore_t *sema;
1444 intptr_t v;
1445
1446 v = scheme_get_semaphore_init("make-fsemaphore", 1, &ready);
1447
1448 sema = MALLOC_ONE_TAGGED(fsemaphore_t);
1449 sema->so.type = scheme_fsemaphore_type;
1450
1451 mzrt_mutex_create(&sema->mut);
1452 sema->ready = v;
1453
1454 scheme_register_finalizer((void*)sema, fsemaphore_finalize, NULL, NULL, NULL);
1455
1456 return (Scheme_Object*)sema;
1457 }
1458
1459
make_fsemaphore(int argc,Scheme_Object ** argv)1460 Scheme_Object *make_fsemaphore(int argc, Scheme_Object **argv)
1461 /* Called in runtime thread (atomic/synchronized) */
1462 {
1463 return scheme_make_fsemaphore_inl(argv[0]);
1464 }
1465
scheme_fsemaphore_count(int argc,Scheme_Object ** argv)1466 Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object **argv)
1467 XFORM_SKIP_PROC
1468 {
1469 fsemaphore_t *sema;
1470
1471 if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
1472 SCHEME_WRONG_CONTRACT_MAYBE_IN_FT("fsemaphore-count", "fsemaphore?", 0, argc, argv);
1473 }
1474
1475 sema = (fsemaphore_t*)argv[0];
1476 return scheme_make_integer(sema->ready);
1477 }
1478
requeue_future_within_lock(future_t * future,Scheme_Future_State * fs)1479 static void requeue_future_within_lock(future_t *future, Scheme_Future_State *fs)
1480 XFORM_SKIP_PROC
1481 /* called in any thread with lock held */
1482 {
1483 if (scheme_custodian_is_available(future->cust)) {
1484 future->status = PENDING;
1485 enqueue_future(fs, future);
1486 } else {
1487 /* The future's constodian is shut down, so don't try to
1488 run it in a future thread anymore */
1489 future->status = SUSPENDED;
1490 }
1491 }
1492
requeue_future(future_t * future,Scheme_Future_State * fs)1493 static void requeue_future(future_t *future, Scheme_Future_State *fs)
1494 {
1495 mzrt_mutex_lock(fs->future_mutex);
1496 requeue_future_within_lock(future, fs);
1497 mzrt_mutex_unlock(fs->future_mutex);
1498 }
1499
try_resume_future_from_fsema_wait(fsemaphore_t * sema,Scheme_Future_State * fs)1500 static int try_resume_future_from_fsema_wait(fsemaphore_t *sema, Scheme_Future_State *fs)
1501 XFORM_SKIP_PROC
1502 {
1503 future_t *ft;
1504
1505 if (!sema->queue_front)
1506 return 0;
1507
1508 ft = sema->queue_front;
1509 sema->queue_front = ft->next_in_fsema_queue;
1510 ft->next_in_fsema_queue = NULL;
1511
1512 if (!sema->queue_front)
1513 sema->queue_end = NULL;
1514 else
1515 sema->queue_front->prev_in_fsema_queue = NULL;
1516
1517 sema->ready--;
1518
1519 ft->retval_s = scheme_void;
1520
1521 /* Place the waiting future back on the run queue */
1522 requeue_future(ft, fs);
1523
1524 return 1;
1525 }
1526
scheme_fsemaphore_post(int argc,Scheme_Object ** argv)1527 Scheme_Object *scheme_fsemaphore_post(int argc, Scheme_Object **argv)
1528 XFORM_SKIP_PROC
1529 {
1530 fsemaphore_t *sema;
1531 Scheme_Future_State *fs;
1532 int old_count;
1533
1534 if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
1535 SCHEME_WRONG_CONTRACT_MAYBE_IN_FT("fsemaphore-post", "fsemaphore?", 0, argc, argv);
1536 }
1537
1538 sema = (fsemaphore_t*)argv[0];
1539
1540 #ifdef FSEMA_LOGGING
1541 printf("[Thread %p]: scheme_fsemaphore_post for sema at %p. Count before V: %d\n",
1542 pthread_self(),
1543 sema,
1544 sema->ready);
1545 #endif
1546
1547 fs = scheme_future_state;
1548 mzrt_mutex_lock(sema->mut);
1549
1550 old_count = sema->ready;
1551 sema->ready++;
1552 if (!old_count) {
1553 try_resume_future_from_fsema_wait(sema, fs);
1554 }
1555
1556 mzrt_mutex_unlock(sema->mut);
1557 return scheme_void;
1558 }
1559
enqueue_future_for_fsema(future_t * ft,fsemaphore_t * sema)1560 static void enqueue_future_for_fsema(future_t *ft, fsemaphore_t *sema)
1561 /* This function assumed sema->mut has already been acquired! */
1562 {
1563 future_t *front;
1564
1565 /* Enqueue this future in the semaphore's queue */
1566 front = sema->queue_front;
1567 if (!front) {
1568 sema->queue_front = ft;
1569 sema->queue_end = ft;
1570 } else {
1571 future_t *end = sema->queue_end;
1572 end->next_in_fsema_queue = ft;
1573 ft->prev_in_fsema_queue = end;
1574 sema->queue_end = ft;
1575 }
1576 }
1577
block_until_sema_ready(fsemaphore_t * sema)1578 static fsemaphore_t *block_until_sema_ready(fsemaphore_t *sema)
1579 {
1580 /* This little function cooperates with the GC, unlike the
1581 function that calls it. */
1582 scheme_block_until(fsemaphore_ready, NULL, (Scheme_Object*)sema, 0);
1583 return sema;
1584 }
1585
scheme_fsemaphore_wait(int argc,Scheme_Object ** argv)1586 Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object **argv)
1587 XFORM_SKIP_PROC
1588 {
1589 fsemaphore_t *sema;
1590 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
1591 Scheme_Future_State *fs = scheme_future_state;
1592 void *storage[4];
1593
1594 if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
1595 SCHEME_WRONG_CONTRACT_MAYBE_IN_FT("fsemaphore-wait", "fsemaphore?", 0, argc, argv);
1596 }
1597
1598 sema = (fsemaphore_t*)argv[0];
1599
1600 #ifdef FSEMA_LOGGING
1601 printf("[Thread %p]: scheme_fsemaphore_wait for sema at %p. Count before P: %d\n",
1602 pthread_self(),
1603 sema,
1604 sema->ready);
1605 #endif
1606
1607 mzrt_mutex_lock(sema->mut);
1608 if (!sema->ready) {
1609 if (fts->is_runtime_thread) {
1610 /* Then we are on the runtime thread -- if in would-be-future
1611 mode, should we just return? Otherwise, block and wait
1612 for the fsema to be ready while cooperating with the
1613 scheduler */
1614 if (scheme_current_thread->futures_slow_path_tracing) {
1615 mzrt_mutex_unlock(sema->mut);
1616 return scheme_void;
1617 }
1618
1619 do {
1620 mzrt_mutex_unlock(sema->mut);
1621 sema = block_until_sema_ready(sema);
1622 mzrt_mutex_lock(sema->mut);
1623 } while (!sema->ready);
1624 } else {
1625 /* On a future thread, suspend the future (to be
1626 resumed whenever the fsema becomes ready */
1627 future_t *future = fts->thread->current_ft;
1628 jit_future_storage[0] = (void*)sema;
1629 jit_future_storage[1] = (void*)future;
1630 if (!future) {
1631 /* Should never be here */
1632 scheme_log_abort("fsemaphore-wait: future was NULL for future thread.");
1633 abort();
1634 }
1635
1636 /* Setup for LWC capture */
1637 mzrt_mutex_unlock(sema->mut);
1638 scheme_fill_lwc_end();
1639 future->lwc = scheme_current_lwc;
1640 future->fts = fts;
1641 future->prim_protocol = SIG_s_s;
1642
1643 /* Try to capture it locally (on this thread) */
1644 if (GC_gen0_alloc_page_ptr
1645 && capture_future_continuation(fs, future, storage, 0, 0)) {
1646 /* capture sets fts->thread->current_ft to NULL */
1647 mzrt_mutex_lock(fs->future_mutex);
1648 } else {
1649 /* Can't capture the continuation locally, so ask the runtime
1650 thread to do it */
1651 mzrt_mutex_lock(fs->future_mutex);
1652 if (!future->in_queue_waiting_for_lwc) {
1653 future->next_waiting_lwc = fs->future_waiting_lwc;
1654 fs->future_waiting_lwc = future;
1655 future->in_queue_waiting_for_lwc = 1;
1656 }
1657 future->want_lw = 1;
1658 }
1659 future->status = WAITING_FOR_FSEMA;
1660
1661 scheme_signal_received_at(fs->signal_handle);
1662 if (fts->thread->current_ft) {
1663 /* Will get here if relying on runtime thread to capture for us --
1664 wait for the signal that LW continuation was captured
1665 by the runtime thread. */
1666 future->can_continue_sema = fts->worker_can_continue_sema;
1667 end_gc_not_ok(fts, fs, MZ_RUNSTACK);
1668 mzrt_mutex_unlock(fs->future_mutex);
1669
1670 mzrt_sema_wait(fts->worker_can_continue_sema);
1671
1672 mzrt_mutex_lock(fs->future_mutex);
1673 start_gc_not_ok(fs);
1674 }
1675 mzrt_mutex_unlock(fs->future_mutex);
1676
1677 FUTURE_ASSERT(!fts->thread->current_ft);
1678
1679 /* Fetch the future and sema pointers again, in case moved by a GC */
1680 sema = (fsemaphore_t*)jit_future_storage[0];
1681 future = (future_t*)jit_future_storage[1];
1682
1683 FUTURE_ASSERT(future->suspended_lw);
1684 FUTURE_ASSERT(!future->can_continue_sema);
1685
1686 /* Check again to see whether the sema has become ready */
1687 mzrt_mutex_lock(sema->mut);
1688 if (sema->ready) {
1689 /* Then resume the future immediately (requeue) */
1690 sema->ready--;
1691 requeue_future(future, fs);
1692 } else {
1693 /* Add the future to the sema's wait queue */
1694 enqueue_future_for_fsema(future, sema);
1695 }
1696
1697 mzrt_mutex_unlock(sema->mut);
1698
1699 /* Jump back to the worker thread future loop (this thread
1700 is now free */
1701 scheme_future_longjmp(*scheme_current_thread->error_buf, 1);
1702 }
1703 }
1704
1705 /* Semaphore is ready -- decrement and continue */
1706 sema->ready--;
1707 mzrt_mutex_unlock(sema->mut);
1708 return scheme_void;
1709 }
1710
scheme_fsemaphore_try_wait(int argc,Scheme_Object ** argv)1711 Scheme_Object *scheme_fsemaphore_try_wait(int argc, Scheme_Object **argv)
1712 XFORM_SKIP_PROC
1713 {
1714 fsemaphore_t *sema;
1715 Scheme_Object *ret;
1716
1717 if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
1718 SCHEME_WRONG_CONTRACT_MAYBE_IN_FT("fsemaphore-try-wait?", "fsemaphore?", 0, argc, argv);
1719 }
1720
1721 sema = (fsemaphore_t*)argv[0];
1722 mzrt_mutex_lock(sema->mut);
1723 if (!sema->ready) {
1724 ret = scheme_false;
1725 } else {
1726 sema->ready--;
1727 ret = scheme_true;
1728 }
1729
1730 mzrt_mutex_unlock(sema->mut);
1731 return ret;
1732 }
1733
fsemaphore_ready(Scheme_Object * obj)1734 static int fsemaphore_ready(Scheme_Object *obj)
1735 /* Called in runtime thread by Racket scheduler */
1736 {
1737 int ret = 0;
1738 fsemaphore_t *fsema = (fsemaphore_t*)obj;
1739 mzrt_mutex_lock(fsema->mut);
1740 ret = fsema->ready;
1741 mzrt_mutex_unlock(fsema->mut);
1742 return ret;
1743 }
1744
1745
future_ready(Scheme_Object * obj)1746 int future_ready(Scheme_Object *obj)
1747 /* Called in runtime thread by Racket scheduler */
1748 {
1749 Scheme_Future_State *fs = scheme_future_state;
1750 int ret = 0;
1751 future_t *ft = (future_t*)obj;
1752
1753 mzrt_mutex_lock(fs->future_mutex);
1754 if ((ft->status != RUNNING)
1755 && (ft->status != WAITING_FOR_FSEMA)
1756 && (ft->status != HANDLING_PRIM)) {
1757 ret = 1;
1758 }
1759 mzrt_mutex_unlock(fs->future_mutex);
1760
1761 return ret;
1762 }
1763
dequeue_future(Scheme_Future_State * fs,future_t * ft)1764 static void dequeue_future(Scheme_Future_State *fs, future_t *ft)
1765 XFORM_SKIP_PROC
1766 /* called from both future and runtime threads */
1767 {
1768 if (ft->prev == NULL)
1769 fs->future_queue = ft->next;
1770 else
1771 ft->prev->next = ft->next;
1772
1773 if (ft->next == NULL)
1774 fs->future_queue_end = ft->prev;
1775 else
1776 ft->next->prev = ft->prev;
1777
1778 ft->next = NULL;
1779 ft->prev = NULL;
1780
1781 if (ft->in_future_queue) {
1782 --fs->future_queue_count;
1783 ft->in_future_queue = 0;
1784 }
1785 }
1786
complete_rtcall(Scheme_Future_State * fs,future_t * future)1787 static void complete_rtcall(Scheme_Future_State *fs, future_t *future)
1788 XFORM_SKIP_PROC
1789 /* called in any thread with lock held */
1790 {
1791 if (future->suspended_lw) {
1792 /* Re-enqueue the future so that some future thread can continue */
1793 requeue_future_within_lock(future, fs);
1794 } else {
1795 /* Signal the waiting worker thread that it
1796 can continue running machine code */
1797 future->want_lw = 0; /* needed if we get here via direct_future_to_future_touch() */
1798 if (future->can_continue_sema) {
1799 mzrt_sema *can_continue_sema = future->can_continue_sema;
1800 FUTURE_ASSERT(!future->in_atomic_queue);
1801 future->can_continue_sema = NULL;
1802 mzrt_sema_post(can_continue_sema);
1803 }
1804 }
1805 }
1806
direct_future_to_future_touch(Scheme_Future_State * fs,future_t * ft,future_t * t_ft)1807 static void direct_future_to_future_touch(Scheme_Future_State *fs, future_t *ft, future_t *t_ft)
1808 XFORM_SKIP_PROC
1809 /* called in any thread with lock held */
1810 {
1811 Scheme_Object *retval = ft->retval;
1812
1813 receive_special_result(ft, retval, 0);
1814 t_ft->retval_s = retval;
1815 send_special_result(t_ft, retval);
1816
1817 t_ft->arg_S1 = NULL;
1818 t_ft->status = HANDLING_PRIM; /* handled as if by runtime thread */
1819
1820 complete_rtcall(fs, t_ft);
1821 }
1822
get_future_for_touch(future_t * ft)1823 static future_t *get_future_for_touch(future_t *ft)
1824 XFORM_SKIP_PROC
1825 /* called in any thread with lock held */
1826 {
1827 if ((ft->status == WAITING_FOR_PRIM) && (ft->prim_func == touch)) {
1828 /* try to enqueue it... */
1829 Scheme_Object **a = ft->arg_S1;
1830 if (ft->suspended_lw)
1831 a = scheme_adjust_runstack_argument(ft->suspended_lw, a);
1832 return (future_t *)a[0];
1833 } else
1834 return NULL;
1835 }
1836
trigger_added_touches(Scheme_Future_State * fs,future_t * ft)1837 static void trigger_added_touches(Scheme_Future_State *fs, future_t *ft)
1838 XFORM_SKIP_PROC
1839 /* lock held; called from both future and runtime threads */
1840 {
1841 if (ft->touching) {
1842 Scheme_Object *touching = ft->touching;
1843 ft->touching = NULL;
1844 while (!SCHEME_NULLP(touching)) {
1845 Scheme_Object *wb = SCHEME_CAR(touching);
1846 future_t *t_ft = (future_t *)SCHEME_WEAK_BOX_VAL(wb);
1847
1848 if (t_ft && (get_future_for_touch(t_ft) == ft)) {
1849 direct_future_to_future_touch(fs, ft, t_ft);
1850 }
1851
1852 touching = SCHEME_CDR(touching);
1853 }
1854 }
1855 }
1856
shallower_apply_future_lw_k(void)1857 static Scheme_Object *shallower_apply_future_lw_k(void)
1858 {
1859 Scheme_Thread *p = scheme_current_thread;
1860 future_t *ft = (future_t *)p->ku.k.p1;
1861
1862 p->ku.k.p1 = NULL;
1863
1864 return apply_future_lw(ft);
1865 }
1866
future_in_runtime(Scheme_Future_State * fs,future_t * volatile ft,int what)1867 static int future_in_runtime(Scheme_Future_State *fs, future_t * volatile ft, int what)
1868 {
1869 mz_jmp_buf newbuf, * volatile savebuf;
1870 Scheme_Thread *p = scheme_current_thread;
1871 Scheme_Object * volatile retval;
1872 future_t * volatile old_ft;
1873 int done;
1874
1875 old_ft = p->current_ft;
1876 p->current_ft = ft;
1877
1878 savebuf = p->error_buf;
1879 p->error_buf = &newbuf;
1880
1881 record_fevent(what, ft->id);
1882
1883 if (scheme_setjmp(newbuf)) {
1884 ft->no_retval = 1;
1885 retval = NULL;
1886 } else {
1887 if (ft->suspended_lw) {
1888 if (scheme_can_apply_lightweight_continuation(ft->suspended_lw, 1) > 1) {
1889 p->ku.k.p1 = ft;
1890 retval = scheme_handle_stack_overflow(shallower_apply_future_lw_k);
1891 } else
1892 retval = apply_future_lw(ft);
1893 } else {
1894 if (ft->suspended_lw_stack) {
1895 Scheme_Object *rator, **argv;
1896 int argc;
1897 Scheme_Lightweight_Continuation *lc;
1898 rator = (Scheme_Object *)ft->suspended_lw_stack[2];
1899 argc = SCHEME_INT_VAL((Scheme_Object *)ft->suspended_lw_stack[3]);
1900 argv = (Scheme_Object **)ft->suspended_lw_stack[4];
1901 ft->suspended_lw_stack[2] = NULL;
1902 ft->suspended_lw_stack[4] = NULL;
1903
1904 lc = (Scheme_Lightweight_Continuation *)ft->suspended_lw_stack[1];
1905 scheme_restore_lightweight_continuation_marks(lc);
1906
1907 if (ft->suspended_lw_stack[5])
1908 retval = _scheme_apply_multi(rator, argc, argv);
1909 else
1910 retval = _scheme_apply(rator, argc, argv);
1911 } else
1912 retval = scheme_apply_multi(ft->orig_lambda, 0, NULL);
1913 }
1914 send_special_result(ft, retval);
1915 }
1916
1917 p->error_buf = savebuf;
1918 p->current_ft = old_ft;
1919
1920 ft->retval = retval;
1921
1922 mzrt_mutex_lock(fs->future_mutex);
1923
1924 if (ft->suspended_lw_stack && retval) {
1925 pop_suspended_lw(fs, ft);
1926 done = 0;
1927 } else {
1928 if (!retval)
1929 ft->suspended_lw_stack = NULL;
1930 ft->status = FINISHED;
1931 trigger_added_touches(fs, ft);
1932 done = 1;
1933 }
1934 record_fevent(FEVENT_COMPLETE, ft->id);
1935 mzrt_mutex_unlock(fs->future_mutex);
1936
1937 record_fevent(FEVENT_END_WORK, ft->id);
1938
1939 if (!retval) {
1940 scheme_longjmp(*savebuf, 1);
1941 }
1942
1943 return done;
1944 }
1945
prefer_to_apply_future_in_runtime()1946 static int prefer_to_apply_future_in_runtime()
1947 /* Called with the future mutex held. */
1948 {
1949 /* Policy question: if the runtime thread is blocked on a
1950 future, should we just run the future (or its suspended continuation)
1951 directly in the runtime thread?
1952
1953 If we don't, then we're better able to handle non-blocking requests
1954 from future threads. At the same time, the runtime thread shouldn't
1955 wait if no one is working on the future. We err on the safe side
1956 and always run when we're waiting on the future: */
1957 return 1;
1958 }
1959
general_touch(int argc,Scheme_Object * argv[])1960 Scheme_Object *general_touch(int argc, Scheme_Object *argv[])
1961 /* Called in runtime thread */
1962 {
1963 Scheme_Future_State *fs = scheme_future_state;
1964 Scheme_Object *retval = NULL;
1965 future_t *ft;
1966
1967 if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_future_type))
1968 scheme_wrong_contract("touch", "future?", 0, argc, argv);
1969
1970 ft = (future_t*)argv[0];
1971
1972 /* Spin waiting for primitive calls or a return value from
1973 the worker thread */
1974 while (1) {
1975 mzrt_mutex_lock(fs->future_mutex);
1976 if ((((ft->status == PENDING)
1977 && prefer_to_apply_future_in_runtime())
1978 || (ft->status == PENDING_OVERSIZE)
1979 || (ft->status == SUSPENDED))
1980 && (!ft->suspended_lw
1981 || scheme_can_apply_lightweight_continuation(ft->suspended_lw, 0)))
1982 {
1983 int what = FEVENT_START_WORK;
1984 if (ft->status == PENDING_OVERSIZE) {
1985 what = FEVENT_START_RTONLY_WORK;
1986 } else if (ft->status != SUSPENDED) {
1987 dequeue_future(fs, ft);
1988 if (ft->suspended_lw_stack)
1989 what = FEVENT_RESUME_WORK;
1990 }
1991 ft->status = RUNNING;
1992 mzrt_mutex_unlock(fs->future_mutex);
1993 if (ft->in_tracing_mode) {
1994 run_would_be_future(ft);
1995 retval = ft->retval;
1996 break;
1997 } else {
1998 if (future_in_runtime(fs, ft, what)) {
1999 retval = ft->retval;
2000 break;
2001 }
2002 }
2003 }
2004 else if ((ft->status == RUNNING)
2005 || (ft->status == WAITING_FOR_FSEMA)
2006 || (ft->status == HANDLING_PRIM))
2007 {
2008 /* someone else got to it first */
2009 mzrt_mutex_unlock(fs->future_mutex);
2010 }
2011 else if (ft->status == FINISHED)
2012 {
2013 retval = ft->retval;
2014
2015 mzrt_mutex_unlock(fs->future_mutex);
2016
2017 break;
2018 }
2019 else if (ft->status == WAITING_FOR_PRIM)
2020 {
2021 if (ft->in_atomic_queue) {
2022 /* Should be in the atomic-wait queue, so
2023 handle those actions now: */
2024 mzrt_mutex_unlock(fs->future_mutex);
2025 scheme_check_future_work();
2026 } else {
2027 /* Invoke the primitive and stash the result.
2028 Release the lock so other threads can manipulate the queue
2029 while the runtime call executes. */
2030 ft->status = HANDLING_PRIM;
2031 ft->want_lw = 0;
2032 mzrt_mutex_unlock(fs->future_mutex);
2033 invoke_rtcall(fs, ft, 0);
2034 }
2035 }
2036 else if (ft->maybe_suspended_lw && (ft->status != WAITING_FOR_FSEMA))
2037 {
2038 ft->maybe_suspended_lw = 0;
2039 if (ft->suspended_lw) {
2040 if (scheme_can_apply_lightweight_continuation(ft->suspended_lw, 0)
2041 && prefer_to_apply_future_in_runtime()) {
2042 if (ft->status != SUSPENDED)
2043 dequeue_future(fs, ft);
2044 ft->status = RUNNING;
2045 /* may raise an exception or escape: */
2046 mzrt_mutex_unlock(fs->future_mutex);
2047 (void)future_in_runtime(fs, ft, FEVENT_START_WORK);
2048 } else {
2049 /* Someone needs to handle the future. We're banking on some
2050 future thread eventually picking up the future, which is
2051 not actually guaranteed if they're all busy looping... */
2052 mzrt_mutex_unlock(fs->future_mutex);
2053 }
2054 } else {
2055 mzrt_mutex_unlock(fs->future_mutex);
2056 }
2057 }
2058 else
2059 {
2060 mzrt_mutex_unlock(fs->future_mutex);
2061 }
2062
2063 scheme_thread_block(0.0); /* to ensure check for futures work */
2064
2065 record_fevent(FEVENT_TOUCH_PAUSE, ft->id);
2066 scheme_block_until(future_ready, NULL, (Scheme_Object*)ft, 0);
2067 record_fevent(FEVENT_TOUCH_RESUME, ft->id);
2068 }
2069
2070 if (!retval) {
2071 scheme_signal_error("touch: future previously aborted");
2072 }
2073
2074 receive_special_result(ft, retval, 0);
2075
2076 flush_future_logs(fs);
2077
2078 return retval;
2079 }
2080
touch(int argc,Scheme_Object * argv[])2081 Scheme_Object *touch(int argc, Scheme_Object *argv[])
2082 XFORM_SKIP_PROC
2083 /* can be called in future thread */
2084 {
2085 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
2086 if (fts->is_runtime_thread) {
2087 future_t *ft;
2088 if (fts->thread
2089 && (ft = fts->thread->current_ft)
2090 && ft->in_tracing_mode) {
2091 future_t *targ_ft = (future_t*)argv[0];
2092 Scheme_Future_State *fs = scheme_future_state;
2093 Scheme_Object *targid_obj;
2094 targid_obj = scheme_make_integer(targ_ft->id);
2095 log_future_event(fs,
2096 "id %d, process %d: %s: %s; time: %f",
2097 "touch",
2098 -1,
2099 FEVENT_RTCALL_TOUCH,
2100 get_future_timestamp(),
2101 ft->id,
2102 targid_obj);
2103 }
2104
2105 return general_touch(argc, argv);
2106 } else {
2107 if (SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_future_type)) {
2108 Scheme_Future_State *fs = scheme_future_state;
2109 future_t *ft = (future_t *)argv[0];
2110 int status;
2111
2112 mzrt_mutex_lock(fs->future_mutex);
2113 status = ft->status;
2114 mzrt_mutex_unlock(fs->future_mutex);
2115
2116 if (status == FINISHED) {
2117 Scheme_Object *retval = ft->retval;
2118 receive_special_result(ft, retval, 0);
2119 return retval;
2120 } else {
2121 #ifdef MZ_PRECISE_GC
2122 /* Try adding current future to ft's chain of touching futures */
2123 Scheme_Object *wb, *pr;
2124 future_t *current_ft = scheme_current_thread->current_ft;
2125
2126 wb = GC_malloc_weak_box(current_ft, NULL, 0, 0);
2127 if (wb) {
2128 pr = GC_malloc_pair(wb, scheme_null);
2129 if (pr) {
2130 mzrt_mutex_lock(fs->future_mutex);
2131 if (ft->status != FINISHED) {
2132 if (ft->touching)
2133 SCHEME_CDR(pr) = ft->touching;
2134 ft->touching = pr;
2135 current_ft->in_future_specific_touch_queue = 1;
2136 mzrt_mutex_unlock(fs->future_mutex);
2137 } else {
2138 /* `ft' switched to FINISHED while we were trying add,
2139 so carry on with its result */
2140 Scheme_Object *retval = ft->retval;
2141 mzrt_mutex_unlock(fs->future_mutex);
2142 receive_special_result(ft, retval, 0);
2143 return retval;
2144 }
2145 }
2146 }
2147 #endif
2148 }
2149 }
2150 return scheme_rtcall_iS_s("touch", FSRC_PRIM, touch, argc, argv);
2151 }
2152 }
2153
scheme_is_multithreaded(int now)2154 int scheme_is_multithreaded(int now)
2155 {
2156 if (!now)
2157 return 1;
2158 else {
2159 Scheme_Future_State *fs = scheme_future_state;
2160 return (fs && fs->future_threads_created);
2161 }
2162 }
2163
processor_count(int argc,Scheme_Object * argv[])2164 Scheme_Object *processor_count(int argc, Scheme_Object *argv[])
2165 /* Called in runtime thread */
2166 {
2167 return scheme_make_integer(rktio_processor_count(scheme_rktio));
2168 }
2169
scheme_current_future(int argc,Scheme_Object * argv[])2170 Scheme_Object *scheme_current_future(int argc, Scheme_Object *argv[])
2171 XFORM_SKIP_PROC
2172 /* Called from any thread (either runtime or future) */
2173 {
2174 future_t *ft = scheme_current_thread->current_ft;
2175
2176 return (ft ? (Scheme_Object *)ft : scheme_false);
2177 }
2178
2179 /* Entry point for a worker thread allocated for
2180 executing futures. This function will never terminate
2181 (until the process dies). */
worker_thread_future_loop(void * arg)2182 void *worker_thread_future_loop(void *arg)
2183 XFORM_SKIP_PROC
2184 /* Called in future thread; runtime thread is blocked until ready_sema
2185 is signaled. */
2186 {
2187 /* valid only until signaling */
2188 future_thread_params_t *params = (future_thread_params_t *)arg;
2189 Scheme_Future_Thread_State *fts = params->fts;
2190 Scheme_Future_State *fs = params->fs;
2191 Scheme_Object *v;
2192 Scheme_Native_Proc *jitcode;
2193 future_t *ft;
2194 mz_jmp_buf newbuf;
2195 int fid, what;
2196
2197 scheme_future_state = fs;
2198 scheme_future_thread_state = fts;
2199
2200 GC_instance = params->shared_GC;
2201
2202 GC_gen0_alloc_only = 1;
2203
2204 /* Set processor affinity */
2205 /*mzrt_mutex_lock(fs->future_mutex);
2206 static uintptr_t cur_cpu_mask = 1;
2207 if (pthread_setaffinity_np(pthread_self(), sizeof(g_cur_cpu_mask), &g_cur_cpu_mask))
2208 {
2209 printf(
2210 "Could not set CPU affinity (%lu) for thread %p!\n",
2211 ++g_cur_cpu_mask,
2212 pthread_self());
2213 }
2214
2215 mzrt_mutex_unlock(fs->future_mutex);
2216 */
2217
2218 scheme_configure_floating_point();
2219
2220 mzrt_sema_create(&fts->worker_can_continue_sema, 0);
2221
2222 scheme_use_rtcall = 1;
2223
2224 scheme_current_thread = fts->thread;
2225
2226 scheme_fuel_counter = 1;
2227 scheme_jit_stack_boundary = ((uintptr_t)&v) - FUTURE_C_STACK_SIZE;
2228
2229 fts->need_gc_pointer = &scheme_future_need_gc_pause;
2230 fts->fuel_pointer = &scheme_fuel_counter;
2231 fts->stack_boundary_pointer = &scheme_jit_stack_boundary;
2232
2233 MZ_RUNSTACK_START = params->runstack_start;
2234 MZ_RUNSTACK = MZ_RUNSTACK_START + fts->runstack_size;
2235
2236 params->scheme_current_runstack_ptr = &scheme_current_runstack;
2237 params->scheme_current_runstack_start_ptr = &scheme_current_runstack_start;
2238 params->current_thread_ptr = &scheme_current_thread;
2239 params->jit_future_storage_ptr = &jit_future_storage[0];
2240
2241 scheme_init_thread_lwc();
2242 params->lwc = scheme_current_lwc;
2243
2244 mzrt_sema_post(params->ready_sema);
2245
2246 scheme_current_thread->runstack = MZ_RUNSTACK;
2247 scheme_current_thread->runstack_start = MZ_RUNSTACK_START;
2248
2249 while (1) {
2250 mzrt_sema_wait(fs->future_pending_sema);
2251 mzrt_mutex_lock(fs->future_mutex);
2252 start_gc_not_ok(fs);
2253
2254 ft = get_pending_future(fs);
2255
2256 if (ft) {
2257 FUTURE_ASSERT(!ft->in_atomic_queue);
2258 FUTURE_ASSERT(!ft->in_future_queue);
2259
2260 fs->busy_thread_count++;
2261
2262 if (ft->suspended_lw_stack)
2263 what = FEVENT_RESUME_WORK;
2264 else
2265 what = FEVENT_START_WORK;
2266 fid = ft->id;
2267 record_fevent(what, fid);
2268
2269 /* Work is available for this thread */
2270 ft->status = RUNNING;
2271 ft->maybe_suspended_lw = 0;
2272 mzrt_mutex_unlock(fs->future_mutex);
2273
2274 ft->thread_short_id = fts->id;
2275
2276 /* Set up the JIT compiler for this thread */
2277 scheme_jit_fill_threadlocal_table();
2278
2279 fts->thread->current_ft = ft;
2280 GC_register_thread(fts->thread, ft->cust);
2281
2282 MZ_RUNSTACK = MZ_RUNSTACK_START + fts->runstack_size;
2283 MZ_CONT_MARK_STACK = 0;
2284 MZ_CONT_MARK_POS = (MZ_MARK_POS_TYPE)1;
2285
2286 if (ft->suspended_lw) {
2287 /* invoke a lightweight continuation */
2288 scheme_current_thread->error_buf = &newbuf;
2289 if (scheme_future_setjmp(newbuf)) {
2290 /* failed or suspended */
2291 v = NULL;
2292 } else {
2293 v = _apply_future_lw(ft);
2294 }
2295 } else {
2296 /* Run the code:
2297 The lambda passed to a future will always be a parameterless
2298 function.
2299 From this thread's perspective, this call will never return
2300 until all the work to be done in the future has been completed,
2301 including runtime calls.
2302 If jitcode asks the runtime thread to do work, then
2303 a GC can occur. */
2304
2305 scheme_current_thread->error_buf = &newbuf;
2306 if (scheme_future_setjmp(newbuf)) {
2307 /* failed or suspended */
2308 v = NULL;
2309 } else {
2310 Scheme_Object *rator, **argv;
2311 int argc;
2312
2313 scheme_fill_lwc_start();
2314
2315 if (ft->suspended_lw_stack) {
2316 Scheme_Lightweight_Continuation *lc;
2317
2318 lc = (Scheme_Lightweight_Continuation *)ft->suspended_lw_stack[1];
2319 scheme_restore_lightweight_continuation_marks(lc); /* might trigger GC */
2320 ft = fts->thread->current_ft;
2321
2322 rator = (Scheme_Object *)ft->suspended_lw_stack[2];
2323 argc = SCHEME_INT_VAL((Scheme_Object *)ft->suspended_lw_stack[3]);
2324 argv = (Scheme_Object **)ft->suspended_lw_stack[4];
2325 ft->suspended_lw_stack[2] = NULL;
2326 ft->suspended_lw_stack[4] = NULL;
2327 } else {
2328 rator = ft->orig_lambda;
2329 argc = 0;
2330 argv = NULL;
2331 }
2332
2333 jitcode = ((Scheme_Native_Closure *)rator)->code->start_code;
2334 v = scheme_call_as_lightweight_continuation(jitcode, rator, argc, argv);
2335 if (SAME_OBJ(v, SCHEME_TAIL_CALL_WAITING))
2336 v = scheme_force_value_same_mark_as_lightweight_continuation(v);
2337 }
2338 }
2339
2340 /* Get future again, since a GC may have occurred or
2341 future may have been suspended */
2342 ft = fts->thread->current_ft;
2343
2344 mzrt_mutex_lock(fs->future_mutex);
2345
2346 if (!ft) {
2347 /* continuation of future will be requeued, and this future
2348 thread can do something else */
2349 } else {
2350 FUTURE_ASSERT(v || ft->no_retval);
2351
2352 if (ft->no_retval >= 0) {
2353 /* Set the return val in the descriptor */
2354 ft->retval = v;
2355 /* In case of multiple values: */
2356 send_special_result(ft, v);
2357
2358 if (ft->suspended_lw_stack) {
2359 if (!ft->suspended_lw_stack[5] && SAME_OBJ(v, SCHEME_MULTIPLE_VALUES)) {
2360 /* multiple results are not allowed; keep the same lw stack,
2361 but replace the function to call: */
2362 ft->status = PENDING_OVERSIZE;
2363 ft->suspended_lw_stack[2] = bad_multi_result_proc;
2364 ft->suspended_lw_stack[3] = scheme_make_integer(ft->multiple_count);
2365 ft->suspended_lw_stack[4] = ft->multiple_array;
2366 ft->retval_s = NULL;
2367 ft->multiple_array = NULL;
2368 } else
2369 pop_suspended_lw(fs, ft);
2370 } else {
2371 /* Update the status */
2372 ft->status = FINISHED;
2373 trigger_added_touches(fs, ft);
2374 }
2375 record_fevent(FEVENT_COMPLETE, fid);
2376 } else {
2377 ft->suspended_lw_stack = NULL;
2378 }
2379
2380 fts->thread->current_ft = NULL;
2381 GC_register_thread(fts->thread, main_custodian);
2382 }
2383
2384 /* Clear stacks */
2385 MZ_RUNSTACK = MZ_RUNSTACK_START + fts->runstack_size;
2386 MZ_CONT_MARK_STACK = 0;
2387
2388 if (ft)
2389 scheme_signal_received_at(fs->signal_handle);
2390
2391 record_fevent(FEVENT_END_WORK, fid);
2392
2393 --fs->busy_thread_count;
2394 }
2395
2396 end_gc_not_ok(fts, fs, NULL);
2397 mzrt_mutex_unlock(fs->future_mutex);
2398 }
2399
2400 return NULL;
2401 }
2402
_apply_future_lw(future_t * ft)2403 static Scheme_Object *_apply_future_lw(future_t *ft)
2404 XFORM_SKIP_PROC
2405 /* Called from any thread (either runtime or future) */
2406 {
2407 struct Scheme_Lightweight_Continuation *lw = ft->suspended_lw;
2408 Scheme_Object *v;
2409 int result_is_rs_plus_two;
2410
2411 ft->suspended_lw = NULL;
2412
2413 v = ft->retval_s;
2414 if (ft->retval_is_rs_plus_two) {
2415 result_is_rs_plus_two = 1;
2416 ft->retval_is_rs_plus_two = 0;
2417 } else {
2418 ft->retval_s = NULL;
2419 receive_special_result(ft, v, 1);
2420 result_is_rs_plus_two = 0;
2421 }
2422
2423 FUTURE_ASSERT((ft->prim_protocol != SIG_ON_DEMAND) == !result_is_rs_plus_two);
2424 FUTURE_ASSERT(v || (ft->prim_protocol != SIG_ALLOC));
2425
2426 v = scheme_apply_lightweight_continuation(lw, v, result_is_rs_plus_two,
2427 FUTURE_RUNSTACK_SIZE);
2428
2429 if (SAME_OBJ(v, SCHEME_TAIL_CALL_WAITING)) {
2430 if (scheme_future_thread_state->is_runtime_thread)
2431 v = scheme_force_value_same_mark(v);
2432 else
2433 v = scheme_force_value_same_mark_as_lightweight_continuation(v);
2434 }
2435
2436 return v;
2437 }
2438
apply_future_lw_k(void)2439 static void *apply_future_lw_k(void)
2440 {
2441 Scheme_Thread *p = scheme_current_thread;
2442 future_t *ft = (future_t *)p->ku.k.p1;
2443
2444 p->ku.k.p1 = NULL;
2445
2446 return _apply_future_lw(ft);
2447 }
2448
apply_future_lw(future_t * ft)2449 static Scheme_Object *apply_future_lw(future_t *ft)
2450 {
2451 Scheme_Thread *p = scheme_current_thread;
2452
2453 p->ku.k.p1 = ft;
2454
2455 return (Scheme_Object *)scheme_top_level_do(apply_future_lw_k, 0);
2456 }
2457
capture_future_continuation(Scheme_Future_State * fs,future_t * ft,void ** storage,int need_lock,int for_overflow)2458 static int capture_future_continuation(Scheme_Future_State *fs, future_t *ft, void **storage,
2459 int need_lock, int for_overflow)
2460 XFORM_SKIP_PROC
2461 /* The lock is *not* held when calling this function.
2462 This function explicitly cooperates with the GC by storing the
2463 pointers it needs to save across a collection in `storage', so
2464 it can be used in a future thread. If future-thread-local
2465 allocation fails, the result is 0. */
2466 {
2467 Scheme_Lightweight_Continuation *lw;
2468 Scheme_Object **arg_S;
2469 void **stack;
2470
2471 #ifndef MZ_PRECISE_GC
2472 if (scheme_use_rtcall)
2473 return 0;
2474 #endif
2475
2476 storage[2] = ft;
2477
2478 if (for_overflow) {
2479 stack = MALLOC_N(void*, 6);
2480 if (!stack) return 0;
2481 storage[3] = stack;
2482 ft = (future_t *)storage[2];
2483 } else
2484 stack = NULL;
2485
2486 lw = scheme_capture_lightweight_continuation(ft->fts->thread, ft->lwc, storage);
2487 if (!lw) return 0;
2488
2489 ft = (future_t *)storage[2];
2490 stack = (void **)storage[3];
2491
2492 if (need_lock) {
2493 mzrt_mutex_lock(fs->future_mutex);
2494
2495 if (!ft->want_lw) {
2496 /* Future can already continue. This can only happen
2497 if ft was blocked on another future, and the other
2498 future decided that it could continue while we were
2499 trying to grab the continuation. Drop the captured
2500 continuation. */
2501 return 1;
2502 }
2503
2504 ft->want_lw = 0;
2505 }
2506
2507 ft->fts->thread->current_ft = NULL; /* tells worker thread that it no longer
2508 needs to handle the future */
2509 GC_register_thread(ft->fts->thread, main_custodian);
2510
2511 ft->suspended_lw = lw;
2512 ft->maybe_suspended_lw = 1;
2513
2514 if (ft->arg_S0) {
2515 arg_S = scheme_adjust_runstack_argument(lw, ft->arg_S0);
2516 ft->arg_S0 = arg_S;
2517 }
2518 if (ft->arg_S1) {
2519 arg_S = scheme_adjust_runstack_argument(lw, ft->arg_S1);
2520 ft->arg_S1 = arg_S;
2521 }
2522 if (ft->arg_S2) {
2523 arg_S = scheme_adjust_runstack_argument(lw, ft->arg_S2);
2524 ft->arg_S2 = arg_S;
2525 }
2526
2527 if (for_overflow) {
2528 stack[0] = ft->suspended_lw_stack;
2529 stack[5] = ((for_overflow > 1) ? scheme_true : NULL);
2530 ft->suspended_lw_stack = stack;
2531 }
2532
2533 return 1;
2534 }
2535
push_suspended_lw(Scheme_Future_State * fs,future_t * ft)2536 static void push_suspended_lw(Scheme_Future_State *fs, future_t *ft)
2537 XFORM_SKIP_PROC
2538 /* called in any thread; needs lock */
2539 {
2540 ft->suspended_lw_stack[1] = ft->suspended_lw;
2541 ft->suspended_lw = NULL;
2542
2543 ft->suspended_lw_stack[2] = ft->arg_s0;
2544 ft->arg_s0 = NULL;
2545 ft->suspended_lw_stack[3] = scheme_make_integer(ft->arg_i0);
2546 ft->suspended_lw_stack[4] = ft->arg_S0;
2547 ft->arg_S0 = NULL;
2548
2549 ft->status = PENDING;
2550 (void)enqueue_future(fs, ft);
2551 }
2552
pop_suspended_lw(Scheme_Future_State * fs,future_t * ft)2553 static void pop_suspended_lw(Scheme_Future_State *fs, future_t *ft)
2554 XFORM_SKIP_PROC
2555 /* called in any thread; needs lock */
2556 {
2557 ft->retval_s = ft->retval;
2558 ft->retval = NULL;
2559
2560 ft->suspended_lw = (Scheme_Lightweight_Continuation *)ft->suspended_lw_stack[1];
2561 ft->maybe_suspended_lw = 1;
2562
2563 ft->suspended_lw_stack = (void **)ft->suspended_lw_stack[0];
2564
2565 ft->status = PENDING;
2566 (void)enqueue_future(fs, ft);
2567 }
2568
scheme_check_future_work()2569 void scheme_check_future_work()
2570 /* Called in the runtime thread by the scheduler */
2571 {
2572 /* Check for work that future threads need from the runtime thread
2573 and that can be done in any Racket thread (e.g., get a new page
2574 for allocation). */
2575 future_t *ft, *other_ft;
2576 Scheme_Future_State *fs = scheme_future_state;
2577 mzrt_sema *can_continue_sema;
2578 int more;
2579
2580 if (!fs) return;
2581
2582 flush_future_logs(fs);
2583
2584 check_future_thread_creation(fs);
2585
2586 if (!fs->future_threads_created) return;
2587
2588 more = 1;
2589 while (more) {
2590 /* Try to get a future waiting on a atomic operation */
2591 mzrt_mutex_lock(fs->future_mutex);
2592 ft = fs->future_waiting_atomic;
2593 if (ft) {
2594 fs->future_waiting_atomic = ft->next_waiting_atomic;
2595 ft->next_waiting_atomic = NULL;
2596 ft->in_atomic_queue = 0;
2597 if ((ft->status == WAITING_FOR_PRIM) && ft->rt_prim_is_atomic) {
2598 ft->status = HANDLING_PRIM;
2599 ft->want_lw = 0; /* we expect to handle it quickly,
2600 so the future thread should just wait */
2601 } else
2602 ft = NULL;
2603 more = 1;
2604 } else
2605 more = 0;
2606 mzrt_mutex_unlock(fs->future_mutex);
2607
2608 if (ft)
2609 invoke_rtcall(fs, ft, 1);
2610 }
2611
2612 more = 1;
2613 while (more) {
2614 /* Try to get a future that's waiting to touch another future: */
2615 mzrt_mutex_lock(fs->future_mutex);
2616 ft = fs->future_waiting_touch;
2617 if (ft) {
2618 fs->future_waiting_touch = ft->next_waiting_touch;
2619 ft->next_waiting_touch = NULL;
2620 ft->in_touch_queue = 0;
2621 other_ft = get_future_for_touch(ft);
2622 more = 1;
2623 } else {
2624 other_ft = NULL;
2625 more = 0;
2626 }
2627 mzrt_mutex_unlock(fs->future_mutex);
2628
2629 if (other_ft) {
2630 /* Chain to `ft' from `other_ft': */
2631 Scheme_Object *wb, *pr;
2632
2633 wb = scheme_make_weak_box((Scheme_Object *)ft);
2634 pr = scheme_make_pair(wb, scheme_null);
2635
2636 mzrt_mutex_lock(fs->future_mutex);
2637 if (other_ft->status == FINISHED) {
2638 /* Completed while we tried to allocate a chain link. */
2639 direct_future_to_future_touch(fs, other_ft, ft);
2640 } else {
2641 /* enqueue */
2642 if (other_ft->touching)
2643 SCHEME_CDR(pr) = other_ft->touching;
2644 other_ft->touching = pr;
2645 }
2646 mzrt_mutex_unlock(fs->future_mutex);
2647 }
2648 }
2649
2650 while (1) {
2651 /* Try to get a future waiting to be suspended */
2652 mzrt_mutex_lock(fs->future_mutex);
2653 ft = fs->future_waiting_lwc;
2654 if (ft) {
2655 fs->future_waiting_lwc = ft->next_waiting_lwc;
2656 ft->next_waiting_lwc = NULL;
2657 ft->in_queue_waiting_for_lwc = 0;
2658 if (!ft->want_lw)
2659 ft = NULL;
2660 else {
2661 /* If ft is touching another future, then the other
2662 future may resume ft while we grab the continuation.
2663 Withhold ft->can_continue_sema for now, so that we can
2664 capture the continuation, and then double-check
2665 afterward whether the continuation wants a lwc: */
2666 can_continue_sema = ft->can_continue_sema;
2667 ft->can_continue_sema = NULL;
2668 FUTURE_ASSERT(!ft->in_atomic_queue);
2669 }
2670 }
2671 mzrt_mutex_unlock(fs->future_mutex);
2672
2673 if (ft) {
2674 void *storage[4];
2675
2676 if (capture_future_continuation(fs, ft, storage, 1,
2677 ((ft->status == WAITING_FOR_OVERFLOW)
2678 ? ft->arg_i1
2679 : 0))) {
2680 /* capture performs mzrt_mutex_lock(fs->future_mutex) on success. */
2681 if (ft->suspended_lw) {
2682 FUTURE_ASSERT((ft->status == WAITING_FOR_PRIM)
2683 || (ft->status == WAITING_FOR_FSEMA)
2684 || (ft->status == WAITING_FOR_OVERFLOW));
2685 if (ft->status == WAITING_FOR_OVERFLOW) {
2686 push_suspended_lw(fs, ft);
2687 }
2688 } else
2689 FUTURE_ASSERT(ft->status != RUNNING);
2690 mzrt_mutex_unlock(fs->future_mutex);
2691 } else {
2692 /* Couldn't capture the continuation. */
2693 FUTURE_ASSERT(ft->status != RUNNING);
2694 if (can_continue_sema) {
2695 /* may need to reinstall the semaphore */
2696 mzrt_mutex_lock(fs->future_mutex);
2697 if ((ft->status == WAITING_FOR_PRIM)
2698 || (ft->status == WAITING_FOR_FSEMA)) {
2699 ft->can_continue_sema = can_continue_sema;
2700 can_continue_sema = NULL;
2701 }
2702 mzrt_mutex_unlock(fs->future_mutex);
2703 }
2704 }
2705 /* Signal the waiting worker thread that it can continue, since
2706 we either captured the continuation or the result became
2707 available meanwhile: */
2708 if (can_continue_sema)
2709 mzrt_sema_post(can_continue_sema);
2710 } else
2711 break;
2712 }
2713
2714 /* If any future thread has its fuel revoked (must have been a custodian
2715 shutdown) but doesn't have a future (shutdown future must have been
2716 handled), then we can restore the thread's fuel. Races are
2717 possible, but they should be rare, and they lead at worst to bad
2718 performance. */
2719 {
2720 int i;
2721 for (i = 0; i < fs->thread_pool_size; i++) {
2722 if (fs->pool_threads[i]) {
2723 if (!*(fs->pool_threads[i]->fuel_pointer)
2724 && !fs->pool_threads[i]->thread->current_ft) {
2725 *(fs->pool_threads[i]->fuel_pointer) = 1;
2726 *(fs->pool_threads[i]->stack_boundary_pointer) -= FUTURE_C_STACK_SIZE;
2727 }
2728 }
2729 }
2730 }
2731 }
2732
future_do_runtimecall(Scheme_Future_Thread_State * fts,void * func,int is_atomic,int can_suspend,int for_overflow)2733 static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
2734 void *func,
2735 int is_atomic,
2736 int can_suspend,
2737 int for_overflow)
2738 XFORM_SKIP_PROC
2739 /* Called in either future or runtime thread. Can only be called in the runtime thread
2740 if we are in slow-path trace mode (i.e. we are running a future that is bound to the
2741 runtime thread so we can log all of its primitive applications). */
2742 {
2743 future_t *future;
2744 Scheme_Future_State *fs = scheme_future_state;
2745 void *storage[4];
2746 int insist_to_suspend, prefer_to_suspend, fid;
2747
2748 #ifdef MZ_PRECISE_GC
2749 if (for_overflow && (!GC_gen0_alloc_page_ptr || fts->local_capture_failed)) {
2750 /* To increase the chance that later overflows can be handled
2751 without blocking, get more memory for this future thread. The
2752 `local_capture_failed' flag is a heuristic that might be
2753 improved by checking the available memory against an estimate
2754 of the needed memory. */
2755 fts->local_capture_failed = 0;
2756 GC_gen0_alloc_page_ptr = scheme_rtcall_alloc();
2757 }
2758 #endif
2759
2760 /* Fetch the future descriptor for this thread */
2761 future = fts->thread->current_ft;
2762
2763 FUTURE_ASSERT(!future->in_atomic_queue);
2764 FUTURE_ASSERT(!future->in_future_queue);
2765
2766 if (!for_overflow) {
2767 /* Check if this prim in fact does have a
2768 safe C version */
2769 if (func == scheme_even_p || func == scheme_odd_p) {
2770 prim_iS_s f = (prim_iS_s)func;
2771 Scheme_Object *ret;
2772 ret = f(future->arg_i0, future->arg_S1);
2773 future->retval_s = ret;
2774 return;
2775 }
2776 }
2777
2778 /* Check whether we are in slow-path trace mode */
2779 if (fts->is_runtime_thread) {
2780 /* On runtime thread - must be slow-path tracing */
2781 future->prim_func = func;
2782 future->rt_prim_is_atomic = 0;
2783 future->status = WAITING_FOR_PRIM;
2784 invoke_rtcall(scheme_future_state, future, 0);
2785 fts->worker_gc_counter = *fs->gc_counter_ptr;
2786
2787 return;
2788 }
2789
2790 scheme_fill_lwc_end();
2791 future->lwc = scheme_current_lwc;
2792 future->fts = fts;
2793
2794 fid = future->id;
2795
2796 /* If for_overflow, then a suspend is required. Otherwise...
2797 Policy question: When should the future thread suspend
2798 the current future? It costs something to suspend and
2799 resume a future.
2800 The current policy:
2801 Always suspend for a non-atomic (i.e, "unsafe") operation,
2802 because there's no guarantee that `touch' will allow progress
2803 anytime soon. For atomic operations, only suspend if there's
2804 more work available in the future queue, and only if we
2805 can suspend ourselves (because asking the runtime thread
2806 to suspend wouldn't accomplish anything). */
2807 insist_to_suspend = !is_atomic || for_overflow;
2808 prefer_to_suspend = (insist_to_suspend || fs->future_queue_count);
2809
2810 if (!scheme_custodian_is_available(future->cust)) {
2811 insist_to_suspend = 1;
2812 prefer_to_suspend = 1;
2813 }
2814
2815 if (!can_suspend) {
2816 insist_to_suspend = 0;
2817 prefer_to_suspend = 0;
2818 }
2819
2820 if (prefer_to_suspend
2821 && GC_gen0_alloc_page_ptr
2822 && capture_future_continuation(fs, future, storage, 0, for_overflow)) {
2823 /* this future thread will suspend handling the future
2824 continuation until the result of the blocking call is ready;
2825 fts->thread->current_ft was set to NULL */
2826 }
2827
2828 mzrt_mutex_lock(fs->future_mutex);
2829
2830 if (for_overflow) {
2831 record_fevent(FEVENT_OVERFLOW, fid);
2832 } else if (func == touch) {
2833 record_fevent(FEVENT_RTCALL_TOUCH, fid);
2834 } else {
2835 record_fevent(is_atomic ? FEVENT_RTCALL_ATOMIC : FEVENT_RTCALL, fid);
2836 }
2837
2838 if (for_overflow) {
2839 if (!fts->thread->current_ft) {
2840 /* capture complete; re-enqueue so that it continues on fresh stack */
2841 push_suspended_lw(fs, future);
2842 } else {
2843 future->status = WAITING_FOR_OVERFLOW;
2844 future->arg_i1 = for_overflow;
2845 fts->local_capture_failed = 1;
2846 }
2847 } else {
2848 /* Set up the arguments for the runtime call
2849 to be picked up by the main rt thread */
2850 future->prim_func = func;
2851 future->rt_prim_is_atomic = is_atomic;
2852 future->status = WAITING_FOR_PRIM;
2853 }
2854
2855 if (fts->thread->current_ft) {
2856 if (is_atomic && !insist_to_suspend) {
2857 FUTURE_ASSERT(!future->in_atomic_queue);
2858 FUTURE_ASSERT(!future->in_future_queue);
2859 FUTURE_ASSERT(func != touch);
2860 future->next_waiting_atomic = fs->future_waiting_atomic;
2861 fs->future_waiting_atomic = future;
2862 future->in_atomic_queue = 1;
2863 }
2864
2865 if (insist_to_suspend) {
2866 /* couldn't capture the continuation locally, so ask
2867 the runtime thread to capture it: */
2868 if (!future->in_queue_waiting_for_lwc) {
2869 future->next_waiting_lwc = fs->future_waiting_lwc;
2870 fs->future_waiting_lwc = future;
2871 future->in_queue_waiting_for_lwc = 1;
2872 }
2873 future->want_lw = 1;
2874 /* In case of for_overflow, runtime thread is responsible for
2875 enqueuing the future to continue. */
2876 }
2877 }
2878
2879 if (func == touch) {
2880 if (!future->in_future_specific_touch_queue) {
2881 /* Ask the runtime thread to put this future on the queue
2882 of the future being touched: */
2883 if (!future->in_touch_queue) {
2884 future->next_waiting_touch = fs->future_waiting_touch;
2885 fs->future_waiting_touch = future;
2886 future->in_touch_queue = 1;
2887 }
2888 } else {
2889 future->in_future_specific_touch_queue = 0; /* done with back-door argument */
2890 }
2891 }
2892
2893 scheme_signal_received_at(fs->signal_handle);
2894
2895 if (fts->thread->current_ft) {
2896 /* Wait for the signal that the RT call is finished
2897 or a lightweight continuation has been captured: */
2898 future->can_continue_sema = fts->worker_can_continue_sema;
2899 end_gc_not_ok(fts, fs, MZ_RUNSTACK); /* we rely on this putting MZ_CONT_MARK_STACK into the thread record */
2900 mzrt_mutex_unlock(fs->future_mutex);
2901
2902 mzrt_sema_wait(fts->worker_can_continue_sema);
2903
2904 mzrt_mutex_lock(fs->future_mutex);
2905 start_gc_not_ok(fs);
2906 }
2907
2908 /* Fetch the future instance again, in case the GC has moved the pointer
2909 or the future has been requeued. */
2910 future = fts->thread->current_ft;
2911
2912 FUTURE_ASSERT(!future || !future->can_continue_sema);
2913 FUTURE_ASSERT(!future || !for_overflow);
2914 FUTURE_ASSERT(!future || !future->in_atomic_queue);
2915
2916 if (future) {
2917 future->want_lw = 0;
2918 FUTURE_ASSERT(future->status == HANDLING_PRIM);
2919 if (future->no_retval) {
2920 record_fevent(FEVENT_RTCALL_ABORT, fid);
2921 future->status = FINISHED;
2922 trigger_added_touches(fs, future);
2923 } else {
2924 record_fevent(FEVENT_RTCALL_RESULT, fid);
2925 future->status = RUNNING;
2926 }
2927 } else {
2928 if (!for_overflow)
2929 record_fevent(FEVENT_RTCALL_SUSPEND, fid);
2930 }
2931
2932 mzrt_mutex_unlock(fs->future_mutex);
2933
2934 if (!future) {
2935 /* future continuation was requeued */
2936 scheme_future_longjmp(*scheme_current_thread->error_buf, 1);
2937 } else if (future->no_retval) {
2938 /* there was an error => abort the future */
2939 future->no_retval = -1;
2940 scheme_future_longjmp(*scheme_current_thread->error_buf, 1);
2941 } else {
2942 FUTURE_ASSERT(future->status == RUNNING);
2943 record_fevent(FEVENT_START_WORK, fid);
2944 }
2945 }
2946
bad_multi_result(int argc,Scheme_Object ** argv)2947 static Scheme_Object *bad_multi_result(int argc, Scheme_Object **argv)
2948 {
2949 scheme_wrong_return_arity(NULL, 1, argc, argv, NULL);
2950 return NULL;
2951 }
2952
2953 /**********************************************************************/
2954 /* Functions for primitive invocation */
2955 /**********************************************************************/
scheme_wrong_contract_from_ft(const char * who,const char * expected_type,int what,int argc,Scheme_Object ** argv)2956 void scheme_wrong_contract_from_ft(const char *who, const char *expected_type, int what, int argc, Scheme_Object **argv)
2957 XFORM_SKIP_PROC
2958 /* Called in future thread */
2959 {
2960 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
2961 future_t *future = fts->thread->current_ft;
2962
2963 future->prim_protocol = SIG_WRONG_TYPE_EXN;
2964 future->arg_str0 = who;
2965 future->arg_str1 = expected_type;
2966 future->arg_i2 = what;
2967 future->arg_i3 = argc;
2968 future->arg_S4 = argv;
2969
2970 future->time_of_request = get_future_timestamp();
2971 future->source_of_request = who;
2972 future_do_runtimecall(fts, NULL, 0, 1, 0);
2973
2974 /* If more: fetch the future again, in case moved by a GC */
2975 /* future = fts->thread->current_ft; */
2976 }
2977
scheme_rtcall_on_demand(Scheme_Object ** argv)2978 Scheme_Object **scheme_rtcall_on_demand(Scheme_Object **argv)
2979 XFORM_SKIP_PROC
2980 /* Called in future thread */
2981 {
2982 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
2983 future_t *future = fts->thread->current_ft;
2984
2985 future->prim_protocol = SIG_ON_DEMAND;
2986
2987 if (argv != (MZ_RUNSTACK + 2)) {
2988 if (future->in_tracing_mode) {
2989 return scheme_on_demand(argv);
2990 }
2991
2992 FUTURE_ASSERT(0);
2993 }
2994
2995 future->arg_S0 = MZ_RUNSTACK;
2996
2997 future->time_of_request = get_future_timestamp();
2998 future->source_of_request = "[jit_on_demand]";
2999 future->source_type = FSRC_OTHER;
3000
3001 future_do_runtimecall(fts, NULL, 1, 1, 0);
3002
3003 /* Fetch the future again, in case moved by a GC */
3004 future = fts->thread->current_ft;
3005
3006 future->arg_S0 = NULL;
3007 future->retval_is_rs_plus_two = 0;
3008
3009 return MZ_RUNSTACK + 2;
3010 }
3011
scheme_rtcall_make_fsemaphore(Scheme_Object * ready)3012 Scheme_Object *scheme_rtcall_make_fsemaphore(Scheme_Object *ready)
3013 XFORM_SKIP_PROC
3014 /* Called in future thread */
3015 {
3016 Scheme_Object *retval;
3017 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3018 future_t *future = fts->thread->current_ft;
3019 int is_atomic;
3020
3021 future->prim_protocol = SIG_MAKE_FSEMAPHORE;
3022 future->arg_s1 = ready;
3023 future->time_of_request = get_future_timestamp();
3024 future->source_of_request = "[make_fsemaphore]";
3025 future->source_type = FSRC_OTHER;
3026
3027 /* conservative check for when creation can succeed atomically
3028 (because it won't raise an error): */
3029 if (SCHEME_INTP(ready)
3030 && (SCHEME_INT_VAL(ready) >= 0)
3031 && (SCHEME_INT_VAL(ready) < 1024))
3032 is_atomic = 1;
3033 else
3034 is_atomic = 0;
3035
3036 future_do_runtimecall(fts, NULL, is_atomic, 1, 0);
3037
3038 /* Fetch the future again, in case moved by a GC */
3039 future = fts->thread->current_ft;
3040
3041 retval = future->retval_s;
3042 future->retval_s = NULL;
3043
3044 return retval;
3045 }
3046
scheme_rtcall_make_future(Scheme_Object * proc)3047 Scheme_Object *scheme_rtcall_make_future(Scheme_Object *proc)
3048 XFORM_SKIP_PROC
3049 /* Called in future thread */
3050 {
3051 Scheme_Object *retval;
3052 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3053 future_t *future = fts->thread->current_ft;
3054 int is_atomic = 0;
3055
3056 if (SAME_TYPE(SCHEME_TYPE(proc), scheme_native_closure_type)
3057 && scheme_native_arity_check(proc, 0)) {
3058 is_atomic = 1;
3059 }
3060
3061 future->prim_protocol = SIG_FUTURE;
3062 future->arg_s1 = proc;
3063 future->time_of_request = get_future_timestamp();
3064 future->source_of_request = "future";
3065 future->source_type = FSRC_OTHER;
3066
3067 future_do_runtimecall(fts, NULL, is_atomic, 1, 0);
3068
3069 /* Fetch the future again, in case moved by a GC */
3070 future = fts->thread->current_ft;
3071
3072 retval = future->retval_s;
3073 future->retval_s = NULL;
3074
3075 return retval;
3076 }
3077
scheme_rtcall_allocate_values(int count,Scheme_Thread * t)3078 void scheme_rtcall_allocate_values(int count, Scheme_Thread *t)
3079 XFORM_SKIP_PROC
3080 /* Called in future thread */
3081 {
3082 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3083 future_t *future = fts->thread->current_ft;
3084
3085 future->prim_protocol = SIG_ALLOC_VALUES;
3086
3087 future->arg_i0 = count;
3088 future->arg_s0 = (Scheme_Object *)t;
3089
3090 future->time_of_request = get_future_timestamp();
3091 future->source_of_request = "[allocate_values]";
3092 future->source_type = FSRC_OTHER;
3093
3094 future_do_runtimecall(fts, NULL, 1, 0, 0);
3095
3096 /* Fetch the future again, in case moved by a GC */
3097 future = fts->thread->current_ft;
3098
3099 future->arg_s0 = NULL;
3100 }
3101
scheme_rtcall_allocate_structure(int count,Scheme_Struct_Type * t)3102 Scheme_Structure *scheme_rtcall_allocate_structure(int count, Scheme_Struct_Type *t)
3103 XFORM_SKIP_PROC
3104 /* Called in future thread */
3105 {
3106 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3107 future_t *future = fts->thread->current_ft;
3108 Scheme_Object *retval;
3109
3110 future->prim_protocol = SIG_ALLOC_STRUCT;
3111
3112 future->arg_i0 = count;
3113 future->arg_s0 = (Scheme_Object *)t;
3114
3115 future->time_of_request = get_future_timestamp();
3116 future->source_of_request = "[allocate_structure]";
3117 future->source_type = FSRC_OTHER;
3118
3119 future_do_runtimecall(fts, NULL, 1, 0, 0);
3120
3121 /* Fetch the future again, in case moved by a GC */
3122 future = fts->thread->current_ft;
3123
3124 future->arg_s0 = NULL;
3125
3126 retval = future->retval_s;
3127 future->retval_s = NULL;
3128
3129 return (Scheme_Structure *)retval;
3130 }
3131
scheme_rtcall_allocate_vector(int count)3132 Scheme_Object *scheme_rtcall_allocate_vector(int count)
3133 XFORM_SKIP_PROC
3134 /* Called in future thread */
3135 {
3136 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3137 future_t *future = fts->thread->current_ft;
3138 Scheme_Object *retval;
3139
3140 future->prim_protocol = SIG_ALLOC_VECTOR;
3141
3142 future->arg_i0 = count;
3143
3144 future->time_of_request = get_future_timestamp();
3145 future->source_of_request = "[allocate_structure]";
3146 future->source_type = FSRC_OTHER;
3147
3148 future_do_runtimecall(fts, NULL, 1, 0, 0);
3149
3150 /* Fetch the future again, in case moved by a GC */
3151 future = fts->thread->current_ft;
3152
3153 future->arg_s0 = NULL;
3154
3155 retval = future->retval_s;
3156 future->retval_s = NULL;
3157
3158 return retval;
3159 }
3160
scheme_rtcall_tail_apply(Scheme_Object * rator,int argc,Scheme_Object ** argv)3161 Scheme_Object *scheme_rtcall_tail_apply(Scheme_Object *rator, int argc, Scheme_Object **argv)
3162 XFORM_SKIP_PROC
3163 /* Called in future thread */
3164 {
3165 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3166 future_t *future = fts->thread->current_ft;
3167 Scheme_Object *retval;
3168
3169 future->prim_protocol = SIG_TAIL_APPLY;
3170
3171 future->arg_s0 = rator;
3172 future->arg_i0 = argc;
3173 future->arg_S0 = argv;
3174
3175 future->time_of_request = get_future_timestamp();
3176 future->source_of_request = "[tail-call]";
3177 future->source_type = FSRC_OTHER;
3178
3179 future_do_runtimecall(fts, NULL, 1, 0, 0);
3180
3181 /* Fetch the future again, in case moved by a GC */
3182 future = fts->thread->current_ft;
3183
3184 future->arg_s0 = NULL;
3185 future->arg_S0 = NULL;
3186
3187 retval = future->retval_s;
3188 future->retval_s = NULL;
3189
3190 receive_special_result(future, retval, 1);
3191
3192 return retval;
3193 }
3194
scheme_rtcall_apply_with_new_stack(Scheme_Object * rator,int argc,Scheme_Object ** argv,int multi)3195 Scheme_Object *scheme_rtcall_apply_with_new_stack(Scheme_Object *rator, int argc, Scheme_Object **argv,
3196 int multi)
3197 XFORM_SKIP_PROC
3198 /* Called in future thread; rator is a native closure with a runstack limit that fits */
3199 {
3200 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3201 future_t *future = fts->thread->current_ft;
3202 Scheme_Object *retval;
3203
3204 future->prim_protocol = SIG_APPLY_AFRESH;
3205
3206 future->arg_s0 = rator;
3207 future->arg_i0 = argc;
3208 future->arg_S0 = argv;
3209 future->arg_i1 = multi;
3210
3211 future->time_of_request = get_future_timestamp();
3212 future->source_of_request = "[stack-overflow]";
3213 future->source_type = FSRC_OTHER;
3214
3215 future_do_runtimecall(fts, NULL, 1, 1, (multi ? 2 : 1));
3216
3217 /* Fetch the future again, in case moved by a GC */
3218 future = fts->thread->current_ft;
3219
3220 future->arg_s0 = NULL;
3221 future->arg_S0 = NULL;
3222
3223 retval = future->retval_s;
3224 future->retval_s = NULL;
3225
3226 receive_special_result(future, retval, 1);
3227
3228 return retval;
3229 }
3230
3231 #ifdef MZ_PRECISE_GC
scheme_rtcall_alloc()3232 uintptr_t scheme_rtcall_alloc()
3233 XFORM_SKIP_PROC
3234 /* Called in future thread, possibly during future_do_runtimecall() */
3235 {
3236 future_t *future;
3237 uintptr_t retval;
3238 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3239 intptr_t align, sz;
3240 double time_of_request;
3241 const char *source_of_request;
3242 int source_type;
3243 int prim_protocol;
3244 int arg_i0;
3245
3246 align = GC_alloc_alignment();
3247
3248 /* Do we actually still have space? */
3249 if (fts->gen0_start) {
3250 intptr_t cur;
3251 cur = GC_gen0_alloc_page_ptr;
3252 if (cur < (GC_gen0_alloc_page_end - align)) {
3253 if (cur & (align - 1)) {
3254 /* round up to next page boundary */
3255 cur &= ~(align - 1);
3256 cur += align;
3257 }
3258 cur += fts->gen0_initial_offset;
3259 return cur;
3260 }
3261 }
3262
3263 /* Grow nursery size as long as we don't trigger a GC */
3264 if (fts->gen0_size < 16)
3265 fts->gen0_size <<= 1;
3266
3267 future = fts->thread->current_ft;
3268 time_of_request = future->time_of_request;
3269 source_of_request = future->source_of_request;
3270 source_type = future->source_type;
3271 prim_protocol = future->prim_protocol;
3272 arg_i0 = future->arg_i0;
3273
3274 while (1) {
3275 future->time_of_request = get_future_timestamp();
3276 future->source_of_request = "[allocate memory]";
3277 future->source_type = FSRC_OTHER;
3278
3279 future->prim_protocol = SIG_ALLOC;
3280 future->arg_i0 = fts->gen0_size;
3281
3282 /* don't suspend, because this might be a nested call: */
3283 future_do_runtimecall(fts, NULL, 1, 0, 0);
3284
3285 future = fts->thread->current_ft;
3286 retval = future->alloc_retval;
3287 sz = future->alloc_sz_retval;
3288 future->alloc_retval = 0;
3289
3290 if (fts->worker_gc_counter == future->alloc_retval_counter) {
3291 fts->gen0_start = retval;
3292 fts->gen0_initial_offset = retval & (align - 1);
3293 break;
3294 }
3295 }
3296
3297 future->time_of_request = time_of_request;
3298 future->source_of_request = source_of_request;
3299 future->source_type = source_type;
3300 future->prim_protocol = prim_protocol;
3301 future->arg_i0 = arg_i0;
3302
3303 GC_gen0_alloc_page_end = retval + sz;
3304
3305 return retval;
3306 }
3307 #endif
3308
scheme_rtcall_new_mark_segment(Scheme_Thread * p)3309 void scheme_rtcall_new_mark_segment(Scheme_Thread *p)
3310 XFORM_SKIP_PROC
3311 /* Called in future thread */
3312 {
3313 future_t *future;
3314 Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3315
3316 future = fts->thread->current_ft;
3317 future->time_of_request = get_future_timestamp();
3318 future->source_of_request = "[allocate_mark_segment]";
3319 future->source_type = FSRC_OTHER;
3320
3321 future->prim_protocol = SIG_ALLOC_MARK_SEGMENT;
3322 future->arg_s0 = (Scheme_Object *)p;
3323
3324 future_do_runtimecall(fts, NULL, 1, 0, 0);
3325 }
3326
push_marks(future_t * f,Scheme_Cont_Frame_Data * d)3327 static int push_marks(future_t *f, Scheme_Cont_Frame_Data *d)
3328 {
3329 if (f->suspended_lw) {
3330 return scheme_push_marks_from_lightweight_continuation(f->suspended_lw, d);
3331 } else if (f->fts->thread) {
3332 return scheme_push_marks_from_thread(f->fts->thread, d);
3333 }
3334
3335 return 0;
3336 }
3337
pop_marks(Scheme_Cont_Frame_Data * d)3338 static void pop_marks(Scheme_Cont_Frame_Data *d)
3339 {
3340 scheme_pop_continuation_frame(d);
3341 }
3342
receive_special_result(future_t * f,Scheme_Object * retval,int clear)3343 static void receive_special_result(future_t *f, Scheme_Object *retval, int clear)
3344 XFORM_SKIP_PROC
3345 /* Called in future or runtime thread */
3346 {
3347 if (SAME_OBJ(retval, SCHEME_MULTIPLE_VALUES)) {
3348 Scheme_Thread *p = scheme_current_thread;
3349
3350 p->ku.multiple.array = f->multiple_array;
3351 p->ku.multiple.count = f->multiple_count;
3352 if (clear)
3353 f->multiple_array = NULL;
3354 } else if (SAME_OBJ(retval, SCHEME_TAIL_CALL_WAITING)) {
3355 Scheme_Thread *p = scheme_current_thread;
3356
3357 p->ku.apply.tail_rator = f->tail_rator;
3358 p->ku.apply.tail_rands = f->tail_rands;
3359 p->ku.apply.tail_num_rands = f->num_tail_rands;
3360 if (clear) {
3361 f->tail_rator = NULL;
3362 f->tail_rands = NULL;
3363 }
3364 }
3365 }
3366
3367 #include "jit_ts_future_glue.c"
3368
send_special_result(future_t * f,Scheme_Object * retval)3369 static void send_special_result(future_t *f, Scheme_Object *retval)
3370 XFORM_SKIP_PROC
3371 /* Called in future or runtime thread */
3372 {
3373 if (SAME_OBJ(retval, SCHEME_MULTIPLE_VALUES)) {
3374 Scheme_Thread *p = scheme_current_thread;
3375
3376 f->multiple_array = p->ku.multiple.array;
3377 f->multiple_count = p->ku.multiple.count;
3378 if (SAME_OBJ(p->ku.multiple.array, p->values_buffer))
3379 p->values_buffer = NULL;
3380 p->ku.multiple.array = NULL;
3381 } else if (SAME_OBJ(retval, SCHEME_TAIL_CALL_WAITING)) {
3382 Scheme_Thread *p = scheme_current_thread;
3383
3384 f->tail_rator = p->ku.apply.tail_rator;
3385 f->tail_rands = p->ku.apply.tail_rands;
3386 f->num_tail_rands = p->ku.apply.tail_num_rands;
3387 p->ku.apply.tail_rator = NULL;
3388 p->ku.apply.tail_rands = NULL;
3389
3390 if (f->tail_rands == p->tail_buffer) {
3391 /* This only happens in the runtime thread; we need to
3392 disconnect the tail buffer from `f->tail_rands' in
3393 case of a GC. Beware that XFORM is disabled here. */
3394 Scheme_Object **tb;
3395 p->tail_buffer = NULL; /* so args aren't zeroed */
3396 tb = MALLOC_N(Scheme_Object *, p->tail_buffer_size);
3397 p = scheme_current_thread; /* in case GC moves the thread */
3398 p->tail_buffer = tb;
3399 }
3400 }
3401 }
3402
3403 #define ADJUST_RS_ARG(ft, arg_Sx) if (ft->suspended_lw) arg_Sx = scheme_adjust_runstack_argument(ft->suspended_lw, arg_Sx)
3404
3405 /* Does the work of actually invoking a primitive on behalf of a
3406 future. This function is always invoked on the main (runtime)
3407 thread. */
do_invoke_rtcall(Scheme_Future_State * fs,future_t * future)3408 static void do_invoke_rtcall(Scheme_Future_State *fs, future_t *future)
3409 /* Called in runtime thread */
3410 {
3411 Scheme_Cont_Frame_Data mark_d;
3412 int need_pop;
3413
3414 #ifdef DEBUG_FUTURES
3415 g_rtcall_count++;
3416 #endif
3417
3418 if (scheme_log_level_p(scheme_get_future_logger(), SCHEME_LOG_DEBUG)) {
3419 const char *src;
3420 Scheme_Object *userdata;
3421
3422 src = future->source_of_request;
3423 if (future->source_type == FSRC_RATOR) {
3424 int len;
3425 if (SCHEME_PROCP(future->arg_s0)) {
3426 const char *src2;
3427 src2 = scheme_get_proc_name(future->arg_s0, &len, 1);
3428 if (src2) src = src2;
3429 }
3430 } else if (future->source_type == FSRC_PRIM) {
3431 const char *src2;
3432 src2 = scheme_look_for_primitive(future->prim_func);
3433 if (src2) src = src2;
3434 }
3435
3436
3437 flush_future_logs(fs);
3438
3439 /* use lg_future_event so we can include `str' in the message: */
3440 userdata = NULL;
3441 switch (future->prim_protocol)
3442 {
3443 case SIG_ALLOC:
3444 {
3445 userdata = scheme_make_integer(future->arg_i0);
3446 break;
3447 }
3448 case SIG_ON_DEMAND:
3449 {
3450 /* Closure is first in runstack */
3451 GC_CAN_IGNORE Scheme_Object **rs = future->arg_S0;
3452 ADJUST_RS_ARG(future, rs);
3453 userdata = scheme_object_name(rs[0]);
3454 if (!userdata)
3455 userdata = scheme_intern_symbol("[unknown]");
3456
3457 break;
3458 }
3459 }
3460
3461 log_future_event(fs,
3462 "id %d, process %d: %s: %s; time: %f",
3463 src,
3464 -1,
3465 (future->rt_prim_is_atomic ? FEVENT_HANDLE_RTCALL_ATOMIC : FEVENT_HANDLE_RTCALL),
3466 get_future_timestamp(),
3467 future->id,
3468 userdata);
3469 }
3470
3471 if (((future->source_type == FSRC_RATOR)
3472 || (future->source_type == FSRC_MARKS)
3473 || (future->source_type == FSRC_PRIM))
3474 && !future->in_tracing_mode)
3475 need_pop = push_marks(future, &mark_d);
3476 else
3477 need_pop = 0;
3478
3479 switch (future->prim_protocol)
3480 {
3481 case SIG_ON_DEMAND:
3482 {
3483 GC_CAN_IGNORE Scheme_Object **arg_S0 = future->arg_S0;
3484 future->arg_S0 = NULL;
3485
3486 ADJUST_RS_ARG(future, arg_S0);
3487
3488 scheme_on_demand_with_args(arg_S0, arg_S0, 2);
3489
3490 future->retval_is_rs_plus_two = 1;
3491
3492 break;
3493 }
3494 #ifdef MZ_PRECISE_GC
3495 case SIG_ALLOC:
3496 {
3497 uintptr_t ret, sz;
3498 int amt = future->arg_i0;
3499 ret = GC_make_jit_nursery_page(amt, &sz);
3500 future->alloc_retval = ret;
3501 future->alloc_sz_retval = sz;
3502 future->alloc_retval_counter = scheme_did_gc_count;
3503 break;
3504 }
3505 #endif
3506 case SIG_ALLOC_MARK_SEGMENT:
3507 {
3508 GC_CAN_IGNORE Scheme_Thread *p_seg;
3509 p_seg = (Scheme_Thread *)future->arg_s0;
3510 future->arg_s0 = NULL;
3511 scheme_new_mark_segment(p_seg);
3512 break;
3513 }
3514 case SIG_MAKE_FSEMAPHORE:
3515 {
3516 Scheme_Object *s = future->arg_s1;
3517 future->arg_s1 = NULL;
3518 s = scheme_make_fsemaphore_inl(s);
3519 future->retval_s = s;
3520 break;
3521 }
3522 case SIG_FUTURE:
3523 {
3524 GC_CAN_IGNORE Scheme_Object *s = future->arg_s1;
3525 future->arg_s1 = NULL;
3526 s = make_future(s, 1, future);
3527 future->retval_s = s;
3528 break;
3529 }
3530 case SIG_ALLOC_VALUES:
3531 {
3532 GC_CAN_IGNORE Scheme_Object *arg_s0 = future->arg_s0;
3533
3534 future->arg_s0 = NULL;
3535
3536 scheme_jit_allocate_values(future->arg_i0, (Scheme_Thread *)arg_s0);
3537
3538 break;
3539 }
3540 case SIG_ALLOC_STRUCT:
3541 {
3542 GC_CAN_IGNORE Scheme_Object *arg_s0 = future->arg_s0;
3543 GC_CAN_IGNORE Scheme_Structure *res;
3544
3545 future->arg_s0 = NULL;
3546
3547 res = scheme_jit_allocate_structure(future->arg_i0, (Scheme_Struct_Type *)arg_s0);
3548
3549 future->retval_s = (Scheme_Object *)res;
3550
3551 break;
3552 }
3553 case SIG_ALLOC_VECTOR:
3554 {
3555 GC_CAN_IGNORE Scheme_Object *res;
3556 intptr_t count = future->arg_i0;
3557
3558 future->arg_s0 = NULL;
3559
3560 GC_set_accounting_custodian(future->cust);
3561
3562 res = scheme_malloc_tagged(sizeof(Scheme_Vector)
3563 + ((count - mzFLEX_DELTA) * sizeof(Scheme_Object *)));
3564 if (res) {
3565 res->type = scheme_vector_type;
3566 SCHEME_VEC_SIZE(res) = count;
3567 }
3568
3569 GC_set_accounting_custodian(NULL);
3570
3571 future->retval_s = res;
3572
3573 break;
3574 }
3575 case SIG_TAIL_APPLY:
3576 {
3577 GC_CAN_IGNORE Scheme_Object *arg_s0 = future->arg_s0;
3578 GC_CAN_IGNORE Scheme_Object **arg_S0 = future->arg_S0;
3579 GC_CAN_IGNORE Scheme_Object *retval;
3580
3581 future->arg_s0 = NULL;
3582 future->arg_S0 = NULL;
3583
3584 retval = _scheme_tail_apply(arg_s0, future->arg_i0, arg_S0);
3585
3586 future->retval_s = retval;
3587 send_special_result(future, retval);
3588
3589 break;
3590 }
3591 case SIG_WRONG_TYPE_EXN:
3592 {
3593 const char *who;
3594 const char *expected_type;
3595 int what;
3596 int argc;
3597 Scheme_Object **argv;
3598
3599 who = future->arg_str0;
3600 expected_type = future->arg_str1;
3601 what = future->arg_i2;
3602 argc = future->arg_i3;
3603 argv = future->arg_S4;
3604
3605 future->arg_str0 = NULL;
3606 future->arg_str1 = NULL;
3607 future->arg_S4 = NULL;
3608
3609 ADJUST_RS_ARG(future, argv);
3610
3611 scheme_wrong_contract(who, expected_type, what, argc, argv);
3612
3613 /* doesn't return */
3614
3615 break;
3616 }
3617 case SIG_APPLY_AFRESH:
3618 {
3619 GC_CAN_IGNORE Scheme_Object *arg_s0 = future->arg_s0;
3620 GC_CAN_IGNORE Scheme_Object **arg_S0 = future->arg_S0;
3621 GC_CAN_IGNORE Scheme_Object *retval;
3622
3623 /* This code is used only for would-be futures: */
3624 FUTURE_ASSERT(future->in_tracing_mode);
3625
3626 future->arg_s0 = NULL;
3627 future->arg_S0 = NULL;
3628
3629 if (future->arg_i1)
3630 retval = _scheme_apply_multi(arg_s0, future->arg_i0, arg_S0);
3631 else
3632 retval = _scheme_apply(arg_s0, future->arg_i0, arg_S0);
3633
3634 future->retval_s = retval;
3635 send_special_result(future, retval);
3636
3637 break;
3638 }
3639 # define JIT_TS_LOCALIZE(t, f) GC_CAN_IGNORE t f = future->f
3640 # include "jit_ts_runtime_glue.c"
3641 default:
3642 scheme_signal_error("unknown protocol %d", future->prim_protocol);
3643 break;
3644 }
3645
3646 if (need_pop)
3647 pop_marks(&mark_d);
3648
3649 record_fevent(FEVENT_HANDLE_RTCALL_RESULT, future->id);
3650
3651 mzrt_mutex_lock(fs->future_mutex);
3652 complete_rtcall(fs, future);
3653 mzrt_mutex_unlock(fs->future_mutex);
3654 }
3655
3656 typedef Scheme_Object *(*overflow_k_t)(void);
3657
do_invoke_rtcall_k(void)3658 static void *do_invoke_rtcall_k(void)
3659 {
3660 Scheme_Thread *p = scheme_current_thread;
3661 Scheme_Future_State *fs = (Scheme_Future_State *)p->ku.k.p1;
3662 future_t *future = (future_t *)p->ku.k.p2;
3663
3664 #ifdef DO_STACK_CHECK
3665 {
3666 # include "mzstkchk.h"
3667 return scheme_handle_stack_overflow((overflow_k_t)do_invoke_rtcall_k);
3668 }
3669 #endif
3670
3671 p->ku.k.p1 = NULL;
3672 p->ku.k.p2 = NULL;
3673
3674 do_invoke_rtcall(fs, future);
3675
3676 return scheme_void;
3677 }
3678
invoke_rtcall(Scheme_Future_State * volatile fs,future_t * volatile future,volatile int is_atomic)3679 static void invoke_rtcall(Scheme_Future_State * volatile fs, future_t * volatile future,
3680 volatile int is_atomic)
3681 {
3682 Scheme_Thread *p = scheme_current_thread;
3683 mz_jmp_buf newbuf, * volatile savebuf;
3684
3685 FUTURE_ASSERT(!future->want_lw);
3686 FUTURE_ASSERT(!is_atomic || future->rt_prim_is_atomic);
3687 FUTURE_ASSERT(!future->in_atomic_queue);
3688
3689 savebuf = p->error_buf;
3690 p->error_buf = &newbuf;
3691 if (scheme_setjmp(newbuf)) {
3692 record_fevent(FEVENT_HANDLE_RTCALL_ABORT, future->id);
3693 mzrt_mutex_lock(fs->future_mutex);
3694 future->no_retval = 1;
3695
3696 /* If running on a would-be future, no extra work required here */
3697 if (future->suspended_lw || scheme_current_thread->futures_slow_path_tracing) {
3698 /* Abandon the future */
3699 future->status = FINISHED;
3700 future->retval = 0;
3701 future->suspended_lw = NULL;
3702 trigger_added_touches(fs, future);
3703 mzrt_mutex_unlock(fs->future_mutex);
3704 } else {
3705 /* Signal the waiting worker thread that it
3706 can continue running machine code */
3707 mzrt_sema *can_continue_sema = future->can_continue_sema;
3708 FUTURE_ASSERT(!future->in_atomic_queue);
3709 future->can_continue_sema = NULL;
3710 mzrt_sema_post(can_continue_sema);
3711 mzrt_mutex_unlock(fs->future_mutex);
3712 }
3713 if (is_atomic) {
3714 scheme_log_abort("internal error: failure during atomic");
3715 abort();
3716 }
3717 scheme_longjmp(*savebuf, 1);
3718 } else {
3719 if (future->rt_prim_is_atomic) {
3720 do_invoke_rtcall(fs, future);
3721 } else {
3722 /* call with continuation barrier. */
3723 p->ku.k.p1 = fs;
3724 p->ku.k.p2 = future;
3725
3726 (void)scheme_top_level_do(do_invoke_rtcall_k, 1);
3727 }
3728 }
3729 p->error_buf = savebuf;
3730 }
3731
3732
3733 /**********************************************************************/
3734 /* Helpers for manipulating the futures queue */
3735 /**********************************************************************/
3736
enqueue_future(Scheme_Future_State * fs,future_t * ft)3737 future_t *enqueue_future(Scheme_Future_State *fs, future_t *ft)
3738 XFORM_SKIP_PROC
3739 /* called in any thread with lock held */
3740 {
3741 FUTURE_ASSERT(!ft->in_atomic_queue);
3742 FUTURE_ASSERT(!ft->in_future_queue);
3743
3744 if (fs->future_queue_end) {
3745 fs->future_queue_end->next = ft;
3746 ft->prev = fs->future_queue_end;
3747 }
3748 fs->future_queue_end = ft;
3749 if (!fs->future_queue)
3750 fs->future_queue = ft;
3751 fs->future_queue_count++;
3752 ft->in_future_queue = 1;
3753
3754 /* Signal that a future is pending */
3755 mzrt_sema_post(fs->future_pending_sema);
3756
3757 return ft;
3758 }
3759
get_pending_future(Scheme_Future_State * fs)3760 future_t *get_pending_future(Scheme_Future_State *fs)
3761 XFORM_SKIP_PROC
3762 /* Called in future thread with lock held */
3763 {
3764 future_t *f;
3765
3766 while (1) {
3767 f = fs->future_queue;
3768 if (f) {
3769 FUTURE_ASSERT(f->in_future_queue);
3770 dequeue_future(fs, f);
3771 if (!scheme_custodian_is_available(f->cust)) {
3772 f->status = SUSPENDED;
3773 } else {
3774 return f;
3775 }
3776 } else
3777 return NULL;
3778 }
3779 }
3780
3781 #endif
3782
3783 /**********************************************************************/
3784 /* Precise GC */
3785 /**********************************************************************/
3786
3787 #ifdef MZ_PRECISE_GC
3788
3789 START_XFORM_SKIP;
3790
3791 #include "mzmark_future.inc"
3792
register_traversers(void)3793 static void register_traversers(void)
3794 {
3795 #ifdef MZ_USE_FUTURES
3796 GC_REG_TRAV(scheme_future_type, future);
3797 GC_REG_TRAV(scheme_fsemaphore_type, fsemaphore);
3798 #else
3799 GC_REG_TRAV(scheme_future_type, sequential_future);
3800 GC_REG_TRAV(scheme_fsemaphore_type, sequential_fsemaphore);
3801 #endif
3802 }
3803
3804 END_XFORM_SKIP;
3805
3806 #endif
3807