1 #include "schpriv.h"
2 #include "schmach.h"
3 #include "schrktio.h"
4 
future_p(int argc,Scheme_Object * argv[])5 static Scheme_Object *future_p(int argc, Scheme_Object *argv[])
6 {
7   if (SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_future_type))
8     return scheme_true;
9   else
10     return scheme_false;
11 }
12 
scheme_fsemaphore_p(int argc,Scheme_Object * argv[])13 Scheme_Object *scheme_fsemaphore_p(int argc, Scheme_Object *argv[])
14 {
15   if (SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
16     return scheme_true;
17   else
18     return scheme_false;
19 }
20 
futures_enabled(int argc,Scheme_Object * argv[])21 static Scheme_Object *futures_enabled(int argc, Scheme_Object *argv[])
22 {
23 #ifdef MZ_USE_FUTURES
24   return scheme_true;
25 #else
26   return scheme_false;
27 #endif
28 }
29 
30 
31 #ifdef MZ_PRECISE_GC
32 static void register_traversers(void);
33 #endif
34 
35 #ifndef MZ_USE_FUTURES
36 
37 /* Futures not enabled, but make a stub module and implementation */
38 
39 typedef struct future_t {
40   Scheme_Object so;
41   Scheme_Object *running_sema;
42   Scheme_Object *orig_lambda;
43   Scheme_Object *retval;
44   int multiple_count;
45   Scheme_Object **multiple_array;
46   int no_retval;
47 } future_t;
48 
49 typedef struct fsemaphore_t {
50   Scheme_Object so;
51   Scheme_Object *sema;
52 } fsemaphore_t;
53 
scheme_future(int argc,Scheme_Object * argv[])54 Scheme_Object *scheme_future(int argc, Scheme_Object *argv[])
55 {
56   future_t *ft;
57 
58   scheme_check_proc_arity("future", 0, 0, argc, argv);
59 
60   ft = MALLOC_ONE_TAGGED(future_t);
61   ft->so.type = scheme_future_type;
62 
63   ft->orig_lambda = argv[0];
64 
65   return (Scheme_Object *)ft;
66 }
67 
touch(int argc,Scheme_Object * argv[])68 static Scheme_Object *touch(int argc, Scheme_Object *argv[])
69 {
70   future_t * volatile ft;
71 
72   if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_future_type))
73     scheme_wrong_contract("touch", "future?", 0, argc, argv);
74 
75   ft = (future_t *)argv[0];
76 
77   while (1) {
78     if (ft->retval) {
79       if (SAME_OBJ(ft->retval, SCHEME_MULTIPLE_VALUES)) {
80         Scheme_Thread *p = scheme_current_thread;
81         p->ku.multiple.array = ft->multiple_array;
82         p->ku.multiple.count = ft->multiple_count;
83       }
84       return ft->retval;
85     }
86     if (ft->no_retval)
87       scheme_signal_error("touch: future previously aborted");
88 
89     if (ft->running_sema) {
90       scheme_wait_sema(ft->running_sema, 0);
91       scheme_post_sema(ft->running_sema);
92     } else {
93       Scheme_Object *sema;
94       future_t *old_ft;
95       mz_jmp_buf newbuf, * volatile savebuf;
96       Scheme_Thread *p = scheme_current_thread;
97 
98       /* In case another Racket thread touches the future. */
99       sema = scheme_make_sema(0);
100       ft->running_sema = sema;
101 
102       old_ft = p->current_ft;
103       p->current_ft = ft;
104 
105       savebuf = p->error_buf;
106       p->error_buf = &newbuf;
107       if (scheme_setjmp(newbuf)) {
108         ft->no_retval = 1;
109         p->current_ft = old_ft;
110         scheme_post_sema(ft->running_sema);
111         scheme_longjmp(*savebuf, 1);
112       } else {
113         GC_CAN_IGNORE Scheme_Object *retval, *proc;
114         proc = ft->orig_lambda;
115         ft->orig_lambda = NULL; /* don't hold on to proc */
116         retval = scheme_apply_multi(proc, 0, NULL);
117         ft->retval = retval;
118         if (SAME_OBJ(retval, SCHEME_MULTIPLE_VALUES)) {
119           ft->multiple_array = p->ku.multiple.array;
120           ft->multiple_count = p->ku.multiple.count;
121           p->ku.multiple.array = NULL;
122         }
123         scheme_post_sema(ft->running_sema);
124         p->current_ft = old_ft;
125         p->error_buf = savebuf;
126       }
127     }
128   }
129 
130   return NULL;
131 }
132 
133 
134 
processor_count(int argc,Scheme_Object * argv[])135 static Scheme_Object *processor_count(int argc, Scheme_Object *argv[])
136 {
137   return scheme_make_integer(1);
138 }
139 
scheme_is_multithreaded(int now)140 int scheme_is_multithreaded(int now)
141 {
142   return 0;
143 }
144 
scheme_current_future(int argc,Scheme_Object * argv[])145 Scheme_Object *scheme_current_future(int argc, Scheme_Object *argv[])
146 {
147   future_t *ft = scheme_current_thread->current_ft;
148 
149   return (ft ? (Scheme_Object *)ft : scheme_false);
150 }
151 
scheme_make_fsemaphore(int argc,Scheme_Object * argv[])152 Scheme_Object *scheme_make_fsemaphore(int argc, Scheme_Object *argv[])
153 {
154   intptr_t v;
155   fsemaphore_t *fsema;
156   Scheme_Object *sema;
157 
158   v = scheme_get_semaphore_init("make-fsemaphore", argc, argv);
159 
160   fsema = MALLOC_ONE_TAGGED(fsemaphore_t);
161   fsema->so.type = scheme_fsemaphore_type;
162   sema = scheme_make_sema(v);
163   fsema->sema = sema;
164 
165   return (Scheme_Object*)fsema;
166 }
167 
make_fsemaphore(int argc,Scheme_Object * argv[])168 static Scheme_Object *make_fsemaphore(int argc, Scheme_Object *argv[])
169 {
170   return scheme_make_fsemaphore(argc, argv);
171 }
172 
scheme_fsemaphore_post(int argc,Scheme_Object * argv[])173 Scheme_Object *scheme_fsemaphore_post(int argc, Scheme_Object *argv[])
174 {
175   fsemaphore_t *fsema;
176   if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
177     scheme_wrong_contract("fsemaphore-post", "fsemaphore?", 0, argc, argv);
178 
179   fsema = (fsemaphore_t*)argv[0];
180   scheme_post_sema(fsema->sema);
181 
182   return scheme_void;
183 }
184 
scheme_fsemaphore_wait(int argc,Scheme_Object * argv[])185 Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object *argv[])
186 {
187   fsemaphore_t *fsema;
188   if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
189     scheme_wrong_contract("fsemaphore-wait", "fsemaphore?", 0, argc, argv);
190 
191   fsema = (fsemaphore_t*)argv[0];
192   scheme_wait_sema(fsema->sema, 0);
193 
194   return scheme_void;
195 }
196 
scheme_fsemaphore_try_wait(int argc,Scheme_Object * argv[])197 Scheme_Object *scheme_fsemaphore_try_wait(int argc, Scheme_Object *argv[])
198 {
199   fsemaphore_t *fsema;
200   if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
201     scheme_wrong_contract("fsemaphore-try-wait?", "fsemaphore?", 0, argc, argv);
202 
203   fsema = (fsemaphore_t*)argv[0];
204   if (scheme_wait_sema(fsema->sema, 1))
205     return scheme_true;
206 
207   return scheme_false;
208 }
209 
scheme_fsemaphore_count(int argc,Scheme_Object * argv[])210 Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object *argv[])
211 {
212   fsemaphore_t *fsema;
213   if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
214     scheme_wrong_contract("fsemaphore-count", "fsemaphore?", 0, argc, argv);
215 
216   fsema = (fsemaphore_t*)argv[0];
217   return scheme_make_integer(((Scheme_Sema *)fsema->sema)->value);
218 }
219 
would_be_future(int argc,Scheme_Object * argv[])220 static Scheme_Object *would_be_future(int argc, Scheme_Object *argv[])
221 {
222   scheme_check_proc_arity("would-be-future", 0, 0, argc, argv);
223   return scheme_future(argc, argv);
224 }
225 
reset_future_logs_for_tracking(int argc,Scheme_Object * argv[])226 static Scheme_Object *reset_future_logs_for_tracking(int argc, Scheme_Object *argv[])
227 {
228   return scheme_void;
229 }
230 
mark_future_trace_end(int argc,Scheme_Object * argv[])231 static Scheme_Object *mark_future_trace_end(int argc, Scheme_Object *argv[])
232 {
233   return scheme_void;
234 }
235 
scheme_init_futures_once()236 void scheme_init_futures_once()
237 {
238 }
239 
scheme_init_futures_per_place()240 void scheme_init_futures_per_place()
241 {
242 #ifdef MZ_PRECISE_GC
243   register_traversers();
244 #endif
245 }
246 
scheme_end_futures_per_place()247 void scheme_end_futures_per_place()
248 {
249 }
250 
251 /* Set differently below when futures are supported */
252 #define SCHEME_FUTURE_PRIM_IS_NARY_INLINED  SCHEME_PRIM_SOMETIMES_INLINED
253 #define SCHEME_FUTURE_PRIM_IS_UNARY_INLINED SCHEME_PRIM_SOMETIMES_INLINED
254 
255 #else
256 
257 #include "future.h"
258 #include <stdlib.h>
259 #include <string.h>
260 #include "jit.h"
261 
262 #define FUTURE_ASSERT(x) MZ_ASSERT(x)
263 
264 static Scheme_Object *make_fsemaphore(int argc, Scheme_Object *argv[]);
265 static Scheme_Object *touch(int argc, Scheme_Object *argv[]);
266 static Scheme_Object *processor_count(int argc, Scheme_Object *argv[]);
267 static void futures_init(void);
268 static void init_future_thread(struct Scheme_Future_State *fs, int i);
269 static Scheme_Future_Thread_State *alloc_future_thread_state();
270 static void requeue_future(struct future_t *future, struct Scheme_Future_State *fs);
271 static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
272                                   void *func,
273                                   int is_atomic,
274                                   int can_suspend,
275                                   int for_overflow);
276 static int capture_future_continuation(struct Scheme_Future_State *fs, future_t *ft, void **storage,
277                                        int need_lock, int for_overflow);
278 
279 #define FUTURE_C_STACK_SIZE 500000
280 #define FUTURE_RUNSTACK_SIZE 2000
281 
282 #define FEVENT_BUFFER_SIZE    512
283 #define NO_FUTURE_ID -1
284 
285 enum {
286   FEVENT_CREATE,
287   FEVENT_COMPLETE,
288   FEVENT_START_WORK,
289   FEVENT_START_RTONLY_WORK,
290   FEVENT_RESUME_WORK,
291   FEVENT_END_WORK,
292   FEVENT_RTCALL_ATOMIC,
293   FEVENT_HANDLE_RTCALL_ATOMIC,
294   FEVENT_RTCALL,
295   FEVENT_RTCALL_TOUCH,
296   FEVENT_HANDLE_RTCALL,
297   FEVENT_RTCALL_RESULT,
298   FEVENT_HANDLE_RTCALL_RESULT,
299   FEVENT_RTCALL_ABORT,
300   FEVENT_HANDLE_RTCALL_ABORT,
301   FEVENT_RTCALL_SUSPEND,
302   FEVENT_OVERFLOW,
303   FEVENT_TOUCH_PAUSE,
304   FEVENT_TOUCH_RESUME,
305   FEVENT_MISSING,
306   FEVENT_STOP_TRACE,
307   _FEVENT_COUNT_
308 };
309 
310 static const char * const fevent_strs[] = { "create", "complete",
311                                             "start-work", "start-0-work", "start-overflow-work",
312                                             "end-work",
313                                             "sync", "sync", "block", "touch", "block",
314                                             "result", "result", "abort", "abort",
315                                             "suspend", "overflow",
316                                             "touch-pause", "touch-resume", "missing", "stop-trace" };
317 static const char * const fevent_long_strs[] = { "created", "completed",
318                                                  "started work", "started (process 0, only)", "started (overflow)",
319                                                  "ended work",
320                                                  "synchronizing with process 0", "synchronizing",
321                                                  "BLOCKING on process 0", "touching future", "HANDLING",
322                                                  "result from process 0", "result determined",
323                                                  "abort from process 0", "abort determined",
324                                                  "suspended", "overflow",
325                                                  "paused for touch", "resumed for touch",
326                                                  "events missing", "stop future tracing" };
327 
328 
329 typedef struct Scheme_Future_State {
330   int thread_pool_size;
331   Scheme_Future_Thread_State **pool_threads;
332   int busy_thread_count;
333 
334   void *signal_handle;
335 
336   int future_queue_count;
337   future_t *future_queue;
338   future_t *future_queue_end;
339   future_t *future_waiting_atomic;
340   future_t *future_waiting_lwc;
341   future_t *future_waiting_touch;
342   int next_futureid;
343 
344   mzrt_mutex *future_mutex; /* BEWARE: don't allocate while holding this lock */
345   mzrt_sema *future_pending_sema;
346   mzrt_sema *gc_ok_c;
347   mzrt_sema *gc_done_c;
348 
349   int gc_not_ok, wait_for_gc, need_gc_ok_post, need_gc_done_post;
350   int abort_all_futures;
351 
352   int *gc_counter_ptr;
353 
354   int future_threads_created;
355 
356   Fevent_Buffer runtime_fevents;
357   Scheme_Object **fevent_syms;
358   Scheme_Struct_Type *fevent_prefab;
359 } Scheme_Future_State;
360 
361 
362 THREAD_LOCAL_DECL(static Scheme_Future_State *scheme_future_state);
363 THREAD_LOCAL_DECL(void *jit_future_storage[4]);
364 
365 #ifdef MZ_PRECISE_GC
366 THREAD_LOCAL_DECL(extern uintptr_t GC_gen0_alloc_page_ptr);
367 THREAD_LOCAL_DECL(extern uintptr_t GC_gen0_alloc_page_end);
368 THREAD_LOCAL_DECL(extern int GC_gen0_alloc_only);
369 #endif
370 
371 static void start_gc_not_ok(Scheme_Future_State *fs);
372 static void end_gc_not_ok(Scheme_Future_Thread_State *fts,
373                           Scheme_Future_State *fs,
374                           Scheme_Object **current_rs);
375 
376 static void *worker_thread_future_loop(void *arg);
377 static void invoke_rtcall(Scheme_Future_State * volatile fs, future_t * volatile future, volatile int is_atomic);
378 static future_t *enqueue_future(Scheme_Future_State *fs, future_t *ft);;
379 static future_t *get_pending_future(Scheme_Future_State *fs);
380 static void receive_special_result(future_t *f, Scheme_Object *retval, int clear);
381 static void send_special_result(future_t *f, Scheme_Object *retval);
382 static Scheme_Object *_apply_future_lw(future_t *ft);
383 static Scheme_Object *apply_future_lw(future_t *ft);
384 static int fsemaphore_ready(Scheme_Object *obj);
385 static void init_fevent(Fevent_Buffer *b);
386 static void free_fevent(Fevent_Buffer *b);
387 static int future_in_runtime(Scheme_Future_State *fs, future_t * volatile ft, int what);
388 static Scheme_Object *would_be_future(int argc, Scheme_Object *argv[]);
389 static void push_suspended_lw(Scheme_Future_State *fs, future_t *ft);
390 static void pop_suspended_lw(Scheme_Future_State *fs, future_t *ft);
391 
392 static Scheme_Object *bad_multi_result_proc;
393 static Scheme_Object *bad_multi_result(int argc, Scheme_Object **argv);
394 static Scheme_Object *reset_future_logs_for_tracking(int argc, Scheme_Object *argv[]);
395 static Scheme_Object *mark_future_trace_end(int argc, Scheme_Object *argv[]);
396 
397 READ_ONLY static int cpucount;
398 
399 #ifdef MZ_PRECISE_GC
400 # define scheme_future_setjmp(newbuf) scheme_jit_setjmp((newbuf).jb)
401 # define scheme_future_longjmp(newbuf, v) scheme_jit_longjmp((newbuf).jb, v)
402 #else
403 # define scheme_future_setjmp(newbuf) scheme_setjmp(newbuf)
404 # define scheme_future_longjmp(newbuf, v) scheme_longjmp(newbuf, v)
405 #endif
406 
407 #ifndef MZ_PRECISE_GC
408 # define GC_set_accounting_custodian(c) /* nothing */
409 # define GC_register_thread(t, c) /* nothing */
410 # define GC_register_new_thread(t, c) /* nothing */
411 #endif
412 
413 /**********************************************************************/
414 /* Arguments for a newly created future thread                        */
415 /**********************************************************************/
416 
417 typedef struct future_thread_params_t {
418   mzrt_sema *ready_sema;
419   struct NewGC *shared_GC;
420   Scheme_Future_State *fs;
421   Scheme_Future_Thread_State *fts;
422   Scheme_Object **runstack_start;
423 
424   Scheme_Object ***scheme_current_runstack_ptr;
425   Scheme_Object ***scheme_current_runstack_start_ptr;
426   Scheme_Thread **current_thread_ptr;
427   void **jit_future_storage_ptr;
428   Scheme_Current_LWC *lwc;
429 } future_thread_params_t;
430 
431 /* Set differently above when futures are not supported */
432 #define SCHEME_FUTURE_PRIM_IS_NARY_INLINED  SCHEME_PRIM_IS_NARY_INLINED
433 #define SCHEME_FUTURE_PRIM_IS_UNARY_INLINED SCHEME_PRIM_IS_UNARY_INLINED
434 
435 #endif
436 
437 /**********************************************************************/
438 /* Plumbing for Racket initialization                                 */
439 /**********************************************************************/
440 
441 /* Invoked by the runtime on startup to make primitives known */
scheme_init_futures(Scheme_Startup_Env * newenv)442 void scheme_init_futures(Scheme_Startup_Env *newenv)
443 {
444   Scheme_Object *p;
445 
446   scheme_addto_prim_instance("future?",
447                              scheme_make_folding_prim(future_p,
448                                                       "future?",
449                                                       1,
450                                                       1,
451                                                       1),
452                              newenv);
453 
454   p = scheme_make_prim_w_arity(scheme_future, "future", 1, 1);
455   SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
456   scheme_addto_prim_instance("future", p, newenv);
457 
458   scheme_addto_prim_instance("processor-count",
459                              scheme_make_prim_w_arity(processor_count,
460                                                       "processor-count",
461                                                       0,
462                                                       0),
463                              newenv);
464 
465   p = scheme_make_prim_w_arity(touch, "touch", 1, 1);
466   SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
467   scheme_addto_prim_instance("touch", p, newenv);
468 
469   p = scheme_make_immed_prim(scheme_current_future,
470                              "current-future",
471                              0,
472                              0);
473   SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_NARY_INLINED);
474   scheme_addto_prim_instance("current-future", p, newenv);
475 
476   p = scheme_make_immed_prim(scheme_fsemaphore_p,
477                              "fsemaphore?",
478                              1,
479                              1);
480 
481   SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
482   scheme_addto_prim_instance("fsemaphore?", p, newenv);
483 
484   p = scheme_make_immed_prim(make_fsemaphore,
485                              "make-fsemaphore",
486                              1,
487                              1);
488   SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
489   scheme_addto_prim_instance("make-fsemaphore", p, newenv);
490 
491   p = scheme_make_immed_prim(scheme_fsemaphore_count,
492                              "fsemaphore-count",
493                              1,
494                              1);
495   SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
496   scheme_addto_prim_instance("fsemaphore-count", p, newenv);
497 
498   p = scheme_make_immed_prim(scheme_fsemaphore_wait,
499                              "fsemaphore-wait",
500                              1,
501                              1);
502   SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
503   scheme_addto_prim_instance("fsemaphore-wait", p, newenv);
504 
505   p = scheme_make_immed_prim(scheme_fsemaphore_post,
506                              "fsemaphore-post",
507                              1,
508                              1);
509   SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
510   scheme_addto_prim_instance("fsemaphore-post", p, newenv);
511 
512   p = scheme_make_immed_prim(scheme_fsemaphore_try_wait,
513                              "fsemaphore-try-wait?",
514                              1,
515                              1);
516   SCHEME_PRIM_PROC_FLAGS(p) |= scheme_intern_prim_opt_flags(SCHEME_FUTURE_PRIM_IS_UNARY_INLINED);
517   scheme_addto_prim_instance("fsemaphore-try-wait?", p, newenv);
518 
519   ADD_PRIM_W_ARITY("would-be-future", would_be_future, 1, 1, newenv);
520   ADD_PRIM_W_ARITY("futures-enabled?", futures_enabled, 0, 0, newenv);
521   ADD_PRIM_W_ARITY("reset-future-logs-for-tracing!", reset_future_logs_for_tracking, 0, 0, newenv);
522   ADD_PRIM_W_ARITY("mark-future-trace-end!", mark_future_trace_end, 0, 0, newenv);
523 }
524 
525 #ifdef MZ_USE_FUTURES
526 
scheme_init_futures_once()527 void scheme_init_futures_once()
528 {
529   REGISTER_SO(bad_multi_result_proc);
530   bad_multi_result_proc = scheme_make_prim_w_arity(bad_multi_result, "bad-multi-result", 0, -1);
531 }
532 
scheme_init_futures_per_place()533 void scheme_init_futures_per_place()
534 {
535   futures_init();
536 }
537 
set_fts_thread(Scheme_Object * ignored)538 static Scheme_Object *set_fts_thread(Scheme_Object *ignored)
539 {
540   scheme_future_thread_state->thread = scheme_current_thread;
541   return ignored;
542 }
543 
futures_init(void)544 void futures_init(void)
545 {
546   Scheme_Future_State *fs;
547   Scheme_Future_Thread_State *rt_fts;
548   Scheme_Future_Thread_State **ftss;
549   void *hand;
550   Scheme_Object **syms, *sym;
551   Scheme_Struct_Type *stype;
552   int pool_size;
553 
554   if (cpucount < 1)
555     cpucount = rktio_processor_count(scheme_rktio);
556 
557   fs = (Scheme_Future_State *)malloc(sizeof(Scheme_Future_State));
558   memset(fs, 0, sizeof(Scheme_Future_State));
559   scheme_future_state = fs;
560 
561   pool_size = cpucount * 2;
562   ftss = (Scheme_Future_Thread_State **)malloc(pool_size * sizeof(Scheme_Future_Thread_State*));
563   memset(ftss, 0, pool_size * sizeof(Scheme_Future_Thread_State*));
564   fs->pool_threads = ftss;
565   fs->thread_pool_size = pool_size;
566 
567   mzrt_mutex_create(&fs->future_mutex);
568   mzrt_sema_create(&fs->future_pending_sema, 0);
569   mzrt_sema_create(&fs->gc_ok_c, 0);
570   mzrt_sema_create(&fs->gc_done_c, 0);
571   fs->gc_counter_ptr = &scheme_did_gc_count;
572 
573   /* Create a 'dummy' FTS for the RT thread */
574   rt_fts = alloc_future_thread_state();
575   rt_fts->is_runtime_thread = 1;
576   rt_fts->gen0_size = 1;
577   scheme_future_thread_state = rt_fts;
578 
579   scheme_add_swap_callback(set_fts_thread, scheme_false);
580   set_fts_thread(scheme_false);
581 
582   REGISTER_SO(fs->future_queue);
583   REGISTER_SO(fs->future_queue_end);
584   REGISTER_SO(fs->future_waiting_atomic);
585   REGISTER_SO(fs->future_waiting_lwc);
586   REGISTER_SO(fs->future_waiting_touch);
587   REGISTER_SO(fs->fevent_syms);
588   REGISTER_SO(fs->fevent_prefab);
589   REGISTER_SO(jit_future_storage);
590 
591   hand = scheme_get_signal_handle();
592   fs->signal_handle = hand;
593 
594   syms = MALLOC_N(Scheme_Object*, _FEVENT_COUNT_);
595   fs->fevent_syms = syms;
596   sym = scheme_intern_symbol(fevent_strs[FEVENT_HANDLE_RTCALL_ATOMIC]);
597   syms[FEVENT_HANDLE_RTCALL_ATOMIC] = sym;
598   sym = scheme_intern_symbol(fevent_strs[FEVENT_HANDLE_RTCALL]);
599   syms[FEVENT_HANDLE_RTCALL] = sym;
600 
601   sym = scheme_intern_symbol("future-event");
602   stype = scheme_lookup_prefab_type(sym, 6);
603   fs->fevent_prefab = stype;
604 
605   init_fevent(&fs->runtime_fevents);
606 
607 #ifdef MZ_PRECISE_GC
608   register_traversers();
609 #endif
610 }
611 
init_future_thread(Scheme_Future_State * fs,int i)612 static void init_future_thread(Scheme_Future_State *fs, int i)
613 {
614   Scheme_Future_Thread_State *fts;
615   GC_CAN_IGNORE future_thread_params_t params;
616   Scheme_Thread *skeleton;
617   Scheme_Object **runstack_start;
618   mz_proc_thread *t;
619 
620   /* Create the worker thread pool.  These threads will
621      'queue up' and wait for futures to become available. */
622 
623   fts = alloc_future_thread_state();
624   fts->id = i;
625 
626   fts->gen0_size = 1;
627 
628   fts->use_fevents1 = 1;
629   init_fevent(&fts->fevents1);
630   init_fevent(&fts->fevents2);
631 
632   params.shared_GC = GC_instance;
633   params.fts = fts;
634   params.fs = fs;
635 
636   /* Make enough of a thread record to deal with multiple values
637      and to support GC and memory accounting. */
638   skeleton = MALLOC_ONE_TAGGED(Scheme_Thread);
639   skeleton->so.type = scheme_thread_type;
640   GC_register_new_thread(skeleton, main_custodian);
641   skeleton->running = MZTHREAD_RUNNING;
642 
643   fts->thread = skeleton;
644 
645   {
646     Scheme_Object **rs_start;
647     intptr_t init_runstack_size = FUTURE_RUNSTACK_SIZE;
648     rs_start = scheme_alloc_runstack(init_runstack_size);
649     runstack_start = rs_start;
650     fts->runstack_size = init_runstack_size;
651   }
652 
653   /* Fill in GCable values just before creating the thread,
654      because the GC ignores `params': */
655   params.runstack_start = runstack_start;
656 
657   mzrt_sema_create(&params.ready_sema, 0);
658   t = mz_proc_thread_create_w_stacksize(worker_thread_future_loop, &params, FUTURE_C_STACK_SIZE);
659   mzrt_sema_wait(params.ready_sema);
660   mzrt_sema_destroy(params.ready_sema);
661   params.ready_sema = NULL;
662 
663   fts->t = t;
664 
665   scheme_register_static(params.scheme_current_runstack_ptr, sizeof(void*));
666   scheme_register_static(params.scheme_current_runstack_start_ptr, sizeof(void*));
667   scheme_register_static(params.jit_future_storage_ptr, 4 * sizeof(void*));
668   scheme_register_static(params.current_thread_ptr, sizeof(void*));
669 
670   fs->pool_threads[i] = fts;
671 }
672 
alloc_future_thread_state()673 static Scheme_Future_Thread_State *alloc_future_thread_state()
674 {
675   Scheme_Future_Thread_State *fts;
676 
677   fts = (Scheme_Future_Thread_State *)malloc(sizeof(Scheme_Future_Thread_State));
678   memset(fts, 0, sizeof(Scheme_Future_Thread_State));
679   scheme_register_static(&fts->thread, sizeof(Scheme_Thread*));
680 
681   return fts;
682 }
683 
scheme_end_futures_per_place()684 void scheme_end_futures_per_place()
685 {
686   Scheme_Future_State *fs = scheme_future_state;
687 
688   if (fs) {
689     int i;
690 
691     mzrt_mutex_lock(fs->future_mutex);
692     fs->abort_all_futures = 1;
693     fs->wait_for_gc = 1;
694     mzrt_mutex_unlock(fs->future_mutex);
695 
696     /* post enough semas to ensure that every future
697        wakes up and tries to disable GC: */
698     for (i = 0; i < fs->thread_pool_size; i++) {
699       if (fs->pool_threads[i]) {
700         mzrt_sema_post(fs->future_pending_sema);
701         mzrt_sema_post(fs->pool_threads[i]->worker_can_continue_sema);
702       }
703     }
704 
705     scheme_future_block_until_gc();
706 
707     /* wait for all future threads to end: */
708     for (i = 0; i < fs->thread_pool_size; i++) {
709       if (fs->pool_threads[i]) {
710         (void)mz_proc_thread_wait(fs->pool_threads[i]->t);
711 
712         free_fevent(&fs->pool_threads[i]->fevents1);
713         free_fevent(&fs->pool_threads[i]->fevents2);
714 
715         free(fs->pool_threads[i]);
716       }
717     }
718 
719     free_fevent(&fs->runtime_fevents);
720 
721     mzrt_mutex_destroy(fs->future_mutex);
722     mzrt_sema_destroy(fs->future_pending_sema);
723     mzrt_sema_destroy(fs->gc_ok_c);
724     mzrt_sema_destroy(fs->gc_done_c);
725 
726     free(fs->pool_threads);
727     free(fs);
728 
729     scheme_future_state = NULL;
730   }
731 }
732 
check_future_thread_creation(Scheme_Future_State * fs)733 static void check_future_thread_creation(Scheme_Future_State *fs)
734 {
735   if (!fs->future_threads_created && !fs->future_queue_count)
736     return;
737 
738   if (fs->future_threads_created < fs->thread_pool_size) {
739     int count, busy;
740 
741     mzrt_mutex_lock(fs->future_mutex);
742     count = fs->future_queue_count;
743     busy = fs->busy_thread_count;
744     mzrt_mutex_unlock(fs->future_mutex);
745 
746     if (count >= (fs->future_threads_created - busy)) {
747       init_future_thread(fs, fs->future_threads_created);
748       fs->future_threads_created++;
749     }
750   }
751 }
752 
start_gc_not_ok(Scheme_Future_State * fs)753 static void start_gc_not_ok(Scheme_Future_State *fs)
754 /* must have mutex_lock */
755 {
756   Scheme_Thread *p;
757 
758   while (fs->wait_for_gc) {
759     int quit = fs->abort_all_futures;
760     fs->need_gc_done_post++;
761     mzrt_mutex_unlock(fs->future_mutex);
762     if (quit) mz_proc_thread_exit(NULL);
763     mzrt_sema_wait(fs->gc_done_c);
764     mzrt_mutex_lock(fs->future_mutex);
765   }
766 
767   fs->gc_not_ok++;
768 
769 #ifdef MZ_PRECISE_GC
770   {
771     Scheme_Future_Thread_State *fts = scheme_future_thread_state;
772     if (fts->worker_gc_counter != *fs->gc_counter_ptr) {
773       GC_gen0_alloc_page_ptr = 0; /* forces future to ask for memory */
774       GC_gen0_alloc_page_end = 0;
775       fts->gen0_start = 0;
776       if (fts->gen0_size > 1)
777         fts->gen0_size >>= 1;
778       fts->worker_gc_counter = *fs->gc_counter_ptr;
779     }
780   }
781 #endif
782 
783   p = scheme_current_thread;
784   MZ_RUNSTACK = p->runstack;
785   MZ_RUNSTACK_START = p->runstack_start;
786 }
787 
end_gc_not_ok(Scheme_Future_Thread_State * fts,Scheme_Future_State * fs,Scheme_Object ** current_rs)788 static void end_gc_not_ok(Scheme_Future_Thread_State *fts,
789                           Scheme_Future_State *fs,
790                           Scheme_Object **current_rs)
791 /* must have mutex_lock */
792 {
793   Scheme_Thread *p;
794 
795   scheme_set_runstack_limits(MZ_RUNSTACK_START,
796                              fts->runstack_size,
797                              (current_rs
798                               ? current_rs XFORM_OK_MINUS MZ_RUNSTACK_START
799                               : fts->runstack_size),
800                              fts->runstack_size);
801   p = scheme_current_thread;
802   p->runstack = MZ_RUNSTACK;
803   p->runstack_start = MZ_RUNSTACK_START;
804   p->cont_mark_stack = MZ_CONT_MARK_STACK;
805   p->cont_mark_pos = MZ_CONT_MARK_POS;
806 
807   /* To ensure that memory accounting goes through the thread
808      record, clear these roots: */
809   MZ_RUNSTACK = NULL;
810   MZ_RUNSTACK_START = NULL;
811 
812   /* FIXME: clear scheme_current_thread->ku.multiple.array ? */
813 
814   --fs->gc_not_ok;
815   if (fs->need_gc_ok_post) {
816     fs->need_gc_ok_post = 0;
817     mzrt_sema_post(fs->gc_ok_c);
818   }
819 }
820 
scheme_future_block_until_gc()821 void scheme_future_block_until_gc()
822 {
823   Scheme_Future_State *fs = scheme_future_state;
824   int i;
825 
826   if (!fs) return;
827   if (!fs->future_threads_created) return;
828 
829   mzrt_mutex_lock(fs->future_mutex);
830   fs->wait_for_gc = 1;
831   mzrt_mutex_unlock(fs->future_mutex);
832 
833   for (i = 0; i < fs->thread_pool_size; i++) {
834     if (fs->pool_threads[i]) {
835       *(fs->pool_threads[i]->need_gc_pointer) = 1;
836       if (*(fs->pool_threads[i]->fuel_pointer)) {
837         *(fs->pool_threads[i]->fuel_pointer) = 0;
838         *(fs->pool_threads[i]->stack_boundary_pointer) += FUTURE_C_STACK_SIZE;
839       }
840     }
841   }
842 
843   if (cpucount > 1) {
844     /* In principle, we need some sort of fence to ensure that future
845        threads see the change to the fuel pointer. The MFENCE
846        instruction would do that, but it requires SSE2. The CPUID
847        instruction is a non-privileged serializing instruction that
848        should be available on any x86 platform that runs threads. */
849 #if defined(i386) || defined(__i386__) || defined(__x86_64) || defined(__x86_64__) || defined(__amd64__)
850 # ifdef _MSC_VER
851     {
852       int r[4];
853       __cpuid(r, 0);
854     }
855 # else
856     {
857 #  if defined(i386) || defined(__i386__)
858 #   define MZ_PUSH_EBX "pushl %%ebx"
859 #   define MZ_POP_EBX "popl %%ebx"
860 #  endif
861 #  if defined(__x86_64) || defined(__x86_64__) || defined(__amd64__)
862 #   define MZ_PUSH_EBX "pushq %%rbx"
863 #   define MZ_POP_EBX "popq %%rbx"
864 #  endif
865       int _eax, _ebx, _ecx, _edx, op = 0;
866       /* we can't always use EBX, so save and restore it: */
867       __asm__ (MZ_PUSH_EBX "\n\t"
868                "cpuid \n\t"
869                "movl %%ebx, %1 \n\t"
870                MZ_POP_EBX
871                : "=a" (_eax), "=r" (_ebx), "=c" (_ecx), "=d" (_edx) : "a" (op));
872     }
873 #  undef MZ_PUSH_EBX
874 #  undef MZ_POP_EBX
875 # endif
876 #endif
877   }
878 
879   mzrt_mutex_lock(fs->future_mutex);
880   while (fs->gc_not_ok) {
881     fs->need_gc_ok_post = 1;
882     mzrt_mutex_unlock(fs->future_mutex);
883     mzrt_sema_wait(fs->gc_ok_c);
884     mzrt_mutex_lock(fs->future_mutex);
885   }
886   mzrt_mutex_unlock(fs->future_mutex);
887 }
888 
scheme_future_continue_after_gc()889 void scheme_future_continue_after_gc()
890 {
891   Scheme_Future_State *fs = scheme_future_state;
892   int i;
893 
894   if (!fs) return;
895 
896   for (i = 0; i < fs->thread_pool_size; i++) {
897     if (fs->pool_threads[i]) {
898       *(fs->pool_threads[i]->need_gc_pointer) = 0;
899 
900       if (!fs->pool_threads[i]->thread->current_ft
901           || scheme_custodian_is_available(fs->pool_threads[i]->thread->current_ft->cust)) {
902         *(fs->pool_threads[i]->fuel_pointer) = 1;
903         *(fs->pool_threads[i]->stack_boundary_pointer) -= FUTURE_C_STACK_SIZE;
904       } else {
905         /* leave fuel exhausted, which will force the thread into a slow
906            path when it resumes to suspend the computation */
907       }
908     }
909   }
910 
911   mzrt_mutex_lock(fs->future_mutex);
912   fs->wait_for_gc = 0;
913   while (fs->need_gc_done_post) {
914     --fs->need_gc_done_post;
915     mzrt_sema_post(fs->gc_done_c);
916   }
917   mzrt_mutex_unlock(fs->future_mutex);
918 }
919 
scheme_future_gc_pause()920 void scheme_future_gc_pause()
921 /* Called in future thread */
922 {
923   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
924   Scheme_Future_State *fs = scheme_future_state;
925 
926   mzrt_mutex_lock(fs->future_mutex);
927   end_gc_not_ok(fts, fs, MZ_RUNSTACK);
928   start_gc_not_ok(fs); /* waits until wait_for_gc is 0 */
929   mzrt_mutex_unlock(fs->future_mutex);
930 }
931 
scheme_future_check_custodians()932 void scheme_future_check_custodians()
933 {
934   scheme_future_block_until_gc();
935   scheme_future_continue_after_gc();
936 }
937 
scheme_future_is_runtime_thread()938 int scheme_future_is_runtime_thread()
939 {
940   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
941   return fts->is_runtime_thread;
942 }
943 
944 /**********************************************************************/
945 /* Future-event logging                                               */
946 /**********************************************************************/
947 
reset_future_logs_for_tracking(int argc,Scheme_Object ** argv)948 static Scheme_Object *reset_future_logs_for_tracking(int argc, Scheme_Object **argv)
949 {
950   Scheme_Future_State *fs;
951   Scheme_Future_Thread_State *fts;
952   Scheme_Future_Thread_State *rt_fts;
953 
954   fs = scheme_future_state;
955   rt_fts = scheme_future_thread_state;
956   if (fs) {
957     int i;
958     mzrt_mutex_lock(fs->future_mutex);
959     init_fevent(&fs->runtime_fevents);
960 
961     if (rt_fts) {
962       init_fevent(&rt_fts->fevents1);
963       init_fevent(&rt_fts->fevents2);
964       rt_fts->use_fevents1 = 1;
965     }
966 
967     for (i = 0; i < fs->thread_pool_size; i++) {
968       fts = fs->pool_threads[i];
969       if (fts) {
970         init_fevent(&fts->fevents1);
971         init_fevent(&fts->fevents2);
972         fts->use_fevents1 = 1;
973       }
974     }
975     mzrt_mutex_unlock(fs->future_mutex);
976 
977   }
978 
979   return scheme_void;
980 }
981 
get_future_timestamp()982 static double get_future_timestamp() XFORM_SKIP_PROC {
983 #if 1
984   return scheme_get_inexact_milliseconds();
985 #else
986   return 0.0;
987 #endif
988 }
989 
init_fevent(Fevent_Buffer * b)990 static void init_fevent(Fevent_Buffer *b) XFORM_SKIP_PROC
991 {
992   if (b->a) free(b->a);
993 
994   b->pos = 0;
995   b->overflow = 0;
996   b->a = (Fevent *)malloc(FEVENT_BUFFER_SIZE * sizeof(Fevent));
997   memset(b->a, 0, FEVENT_BUFFER_SIZE * sizeof(Fevent));
998 }
999 
free_fevent(Fevent_Buffer * b)1000 static void free_fevent(Fevent_Buffer *b)
1001 {
1002   if (b->a) {
1003     free(b->a);
1004     b->a = NULL;
1005   }
1006 }
1007 
record_fevent_with_data(int what,int fid,int data)1008 static void record_fevent_with_data(int what, int fid, int data)
1009   XFORM_SKIP_PROC
1010 {
1011   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
1012   Fevent_Buffer *b;
1013 
1014   if (!fts->is_runtime_thread) {
1015     if (fts->use_fevents1)
1016       b = &fts->fevents1;
1017     else
1018       b = &fts->fevents2;
1019   } else
1020     b = &scheme_future_state->runtime_fevents;
1021 
1022   b->a[b->pos].timestamp = get_future_timestamp();
1023   b->a[b->pos].what = what;
1024   b->a[b->pos].fid = fid;
1025   b->a[b->pos].data = data;
1026 
1027   b->pos++;
1028   if (b->pos == FEVENT_BUFFER_SIZE) {
1029     b->overflow = 1;
1030     b->pos = 0;
1031   }
1032 }
1033 
record_fevent(int what,int fid)1034 static void record_fevent(int what, int fid) XFORM_SKIP_PROC
1035 /* call with the lock or in the runtime thread */
1036 {
1037   record_fevent_with_data(what, fid, 0);
1038 }
1039 
init_traversal(Fevent_Buffer * b)1040 static void init_traversal(Fevent_Buffer *b)
1041 {
1042   if (b->overflow) {
1043     b->count = FEVENT_BUFFER_SIZE;
1044     b->i = b->pos;
1045   } else {
1046     b->i = 0;
1047     b->count = b->pos;
1048   }
1049 }
1050 
end_traversal(Fevent_Buffer * b)1051 static void end_traversal(Fevent_Buffer *b)
1052 {
1053   b->overflow = 0;
1054   b->pos = 0;
1055 }
1056 
log_future_event(Scheme_Future_State * fs,const char * msg_str,const char * extra_str,int which,int what,double timestamp,int fid,Scheme_Object * user_data)1057 static void log_future_event(Scheme_Future_State *fs,
1058                              const char *msg_str,
1059                              const char *extra_str,
1060                              int which,
1061                              int what,
1062                              double timestamp,
1063                              int fid,
1064                              Scheme_Object *user_data)
1065 {
1066   Scheme_Object *data, *v;
1067   Scheme_Logger *fl;
1068 
1069   fl = scheme_get_future_logger();
1070   if (!scheme_log_level_p(fl, SCHEME_LOG_DEBUG))
1071     return;
1072 
1073   data = scheme_make_blank_prefab_struct_instance(fs->fevent_prefab);
1074   if (what == FEVENT_MISSING || fid == NO_FUTURE_ID)
1075     ((Scheme_Structure *)data)->slots[0] = scheme_false;
1076   else
1077     ((Scheme_Structure *)data)->slots[0] = scheme_make_integer(fid);
1078   ((Scheme_Structure *)data)->slots[1] = scheme_make_integer((which+1));
1079   v = fs->fevent_syms[what];
1080   if (!v) {
1081     v = scheme_intern_symbol(fevent_strs[what]);
1082     fs->fevent_syms[what] = v;
1083   }
1084   ((Scheme_Structure *)data)->slots[2] = v;
1085   v = scheme_make_double(timestamp);
1086   ((Scheme_Structure *)data)->slots[3] = v;
1087   if (what == FEVENT_HANDLE_RTCALL || what == FEVENT_HANDLE_RTCALL_ATOMIC) {
1088     v = scheme_intern_symbol(extra_str);
1089     ((Scheme_Structure *)data)->slots[4] = v;
1090   } else
1091     ((Scheme_Structure *)data)->slots[4] = scheme_false;
1092 
1093   /* User data (target fid for creates, alloc amount for allocation */
1094   if (!user_data)
1095     user_data = scheme_false;
1096 
1097   ((Scheme_Structure *)data)->slots[5] = user_data;
1098 
1099   scheme_log_w_data(fl, SCHEME_LOG_DEBUG, 0,
1100                     data,
1101                     msg_str,
1102                     fid,
1103                     which+1,
1104                     fevent_long_strs[what],
1105                     extra_str,
1106                     timestamp);
1107 
1108 }
1109 
mark_future_trace_end(int argc,Scheme_Object ** argv)1110 static Scheme_Object *mark_future_trace_end(int argc, Scheme_Object **argv)
1111 {
1112   Scheme_Future_State *fs;
1113   fs = scheme_future_state;
1114   log_future_event(fs,
1115                    "id %d, process %d: %s: %s; time: %f",
1116                    "tracing",
1117                    -1,
1118                    FEVENT_STOP_TRACE,
1119                    get_future_timestamp(),
1120                    0,
1121                    0);
1122 
1123   return scheme_void;
1124 }
1125 
log_overflow_event(Scheme_Future_State * fs,int which,double timestamp)1126 static void log_overflow_event(Scheme_Future_State *fs, int which, double timestamp)
1127 {
1128   log_future_event(fs,
1129                    "id ??%-, process %d: %s%s; before time: %f",
1130                    "",
1131                    which,
1132                    FEVENT_MISSING,
1133                    timestamp,
1134                    0,
1135                    NULL);
1136 }
1137 
flush_future_logs(Scheme_Future_State * fs)1138 static void flush_future_logs(Scheme_Future_State *fs)
1139 {
1140   Scheme_Future_Thread_State *fts;
1141   double t, min_t;
1142   int i, min_which, min_set;
1143   Fevent_Buffer *b, *min_b;
1144   Scheme_Object *data_val;
1145 
1146   if (scheme_log_level_p(scheme_get_future_logger(), SCHEME_LOG_DEBUG)) {
1147     /* Hold lock while swapping buffers: */
1148     mzrt_mutex_lock(fs->future_mutex);
1149     for (i = 0; i < fs->thread_pool_size; i++) {
1150       fts = fs->pool_threads[i];
1151       if (fts) {
1152         fts->use_fevents1 = !fts->use_fevents1;
1153         if (fts->use_fevents1)
1154           b = &fts->fevents2;
1155         else
1156           b = &fts->fevents1;
1157         init_traversal(b);
1158       }
1159     }
1160     mzrt_mutex_unlock(fs->future_mutex);
1161     init_traversal(&fs->runtime_fevents);
1162 
1163     if (fs->runtime_fevents.overflow)
1164       log_overflow_event(fs, -1, fs->runtime_fevents.a[fs->runtime_fevents.i].timestamp);
1165     for (i = 0; i < fs->thread_pool_size; i++) {
1166         fts = fs->pool_threads[i];
1167         if (fts) {
1168           if (fts->use_fevents1)
1169             b = &fts->fevents2;
1170           else
1171             b = &fts->fevents1;
1172           if (b->overflow)
1173             log_overflow_event(fs, i, b->a[b->i].timestamp);
1174         }
1175     }
1176 
1177     while (1) {
1178       min_set = 0;
1179       min_t = 0;
1180       min_b = NULL;
1181       min_which = -1;
1182       if (fs->runtime_fevents.count) {
1183         t = fs->runtime_fevents.a[fs->runtime_fevents.i].timestamp;
1184         if (!min_set || (t < min_t)) {
1185           min_t = t;
1186           min_b = &fs->runtime_fevents;
1187           min_set = 1;
1188         }
1189       }
1190       for (i = 0; i < fs->thread_pool_size; i++) {
1191         fts = fs->pool_threads[i];
1192         if (fts) {
1193           if (fts->use_fevents1)
1194             b = &fts->fevents2;
1195           else
1196             b = &fts->fevents1;
1197 
1198           if (b->count) {
1199             t = b->a[b->i].timestamp;
1200             if (!min_set || (t < min_t)) {
1201               min_t = t;
1202               min_b = b;
1203               min_which = i;
1204               min_set = 1;
1205             }
1206           }
1207         }
1208       }
1209 
1210       if (!min_b)
1211         break;
1212 
1213       data_val = scheme_make_integer(min_b->a[min_b->i].data);
1214       log_future_event(fs,
1215                        "id %d, process %d: %s%s; time: %f",
1216                        "",
1217                        min_which,
1218                        min_b->a[min_b->i].what,
1219                        min_b->a[min_b->i].timestamp,
1220                        min_b->a[min_b->i].fid,
1221                        data_val);
1222 
1223       --min_b->count;
1224       min_b->i++;
1225       if (min_b->i == FEVENT_BUFFER_SIZE)
1226         min_b->i = 0;
1227     }
1228 
1229     for (i = 0; i < fs->thread_pool_size; i++) {
1230       fts = fs->pool_threads[i];
1231       if (fts) {
1232         if (fts->use_fevents1)
1233           b = &fts->fevents2;
1234         else
1235           b = &fts->fevents1;
1236         end_traversal(b);
1237       }
1238     }
1239     end_traversal(&fs->runtime_fevents);
1240   }
1241 }
1242 
1243 /**********************************************************************/
1244 /* Primitive implementations                                          */
1245 /**********************************************************************/
1246 
1247 void scheme_wrong_contract_from_ft(const char *who, const char *expected_type, int what, int argc, Scheme_Object **argv);
1248 
1249 #define SCHEME_WRONG_CONTRACT_MAYBE_IN_FT(who, expected_type, what, argc, argv) \
1250   if (scheme_use_rtcall) \
1251     scheme_wrong_contract_from_ft(who, expected_type, what, argc, argv); \
1252   else \
1253     scheme_wrong_contract(who, expected_type, what, argc, argv);
1254 
1255 
make_future(Scheme_Object * lambda,int enqueue,future_t * cur_ft)1256 static Scheme_Object *make_future(Scheme_Object *lambda, int enqueue, future_t *cur_ft)
1257 /* Called in runtime thread --- as atomic on behalf of a future thread
1258    if `lambda' is known to be a thunk */
1259 {
1260   Scheme_Future_State *fs = scheme_future_state;
1261   int futureid;
1262   future_t *ft;
1263   Scheme_Native_Closure *nc;
1264   Scheme_Native_Lambda *ncd;
1265   Scheme_Custodian *c;
1266 
1267   if (SAME_TYPE(SCHEME_TYPE(lambda), scheme_native_closure_type)) {
1268     nc = (Scheme_Native_Closure*)lambda;
1269     ncd = nc->code;
1270   } else {
1271     nc = NULL;
1272     ncd = NULL;
1273   }
1274 
1275   /* Create the future descriptor and add to the queue as 'pending' */
1276   ft = MALLOC_ONE_TAGGED(future_t);
1277   ft->so.type = scheme_future_type;
1278 
1279   ft->orig_lambda = lambda;
1280   ft->status = PENDING;
1281 
1282   if (scheme_current_thread->mref)
1283     c = scheme_custodian_extract_reference(scheme_current_thread->mref);
1284   else {
1285     /* must be in a future thread (if futures can be created in future threads) */
1286     c = scheme_current_thread->current_ft->cust;
1287   }
1288   ft->cust = c;
1289 
1290   /* JIT the code if not already JITted */
1291   if (ncd) {
1292     scheme_jit_now(lambda);
1293 
1294     if (ncd->max_let_depth > FUTURE_RUNSTACK_SIZE * sizeof(void*)) {
1295       /* Can't even call it in a future thread */
1296       ft->status = PENDING_OVERSIZE;
1297     }
1298   } else
1299     ft->status = PENDING_OVERSIZE;
1300 
1301   mzrt_mutex_lock(fs->future_mutex);
1302   futureid = ++fs->next_futureid;
1303   ft->id = futureid;
1304   record_fevent_with_data(FEVENT_CREATE, (cur_ft ? cur_ft->id : NO_FUTURE_ID), futureid);
1305   if (enqueue) {
1306     if (ft->status != PENDING_OVERSIZE)
1307       enqueue_future(fs, ft);
1308   }
1309 
1310   mzrt_mutex_unlock(fs->future_mutex);
1311   if (enqueue)
1312     check_future_thread_creation(fs);
1313 
1314   return (Scheme_Object*)ft;
1315 }
1316 
scheme_can_apply_native_in_future(Scheme_Object * proc)1317 int scheme_can_apply_native_in_future(Scheme_Object *proc)
1318   XFORM_SKIP_PROC /* can be called from future thread */
1319 {
1320   return (((Scheme_Native_Closure *)proc)->code->max_let_depth < FUTURE_RUNSTACK_SIZE * sizeof(void*));
1321 }
1322 
do_make_future(int argc,Scheme_Object * argv[],future_t * cur_ft)1323 static Scheme_Object *do_make_future(int argc, Scheme_Object *argv[], future_t *cur_ft)
1324 {
1325   Scheme_Future_State *fs;
1326   scheme_check_proc_arity("future", 0, 0, argc, argv);
1327 
1328   fs = scheme_future_state;
1329   flush_future_logs(fs);
1330 
1331   return make_future(argv[0], 1, cur_ft);
1332 }
1333 
scheme_future(int argc,Scheme_Object * argv[])1334 Scheme_Object *scheme_future(int argc, Scheme_Object *argv[])
1335   XFORM_SKIP_PROC /* can be called from future thread */
1336 {
1337   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
1338   if (fts->is_runtime_thread) {
1339     return do_make_future(argc, argv, (scheme_current_thread ? scheme_current_thread->current_ft : NULL));
1340   }
1341   else {
1342     Scheme_Object *proc = argv[0];
1343 #ifdef MZ_PRECISE_GC
1344     if (SAME_TYPE(SCHEME_TYPE(proc), scheme_native_closure_type)
1345         && scheme_native_arity_check(proc, 0)
1346         && (((Scheme_Native_Closure *)proc)->code->start_code != scheme_on_demand_jit_code)
1347         && scheme_can_apply_native_in_future(proc)) {
1348       /* try to allocate a future in the future thread */
1349       future_t *ft;
1350       ft = MALLOC_ONE_TAGGED(future_t);
1351       if (ft) {
1352         future_t *cur_ft = scheme_current_thread->current_ft;
1353         Scheme_Future_State *fs = scheme_future_state;
1354 
1355         ft->so.type = scheme_future_type;
1356         ft->orig_lambda = proc;
1357         ft->status = PENDING;
1358         ft->cust = scheme_current_thread->current_ft->cust;
1359 
1360         mzrt_mutex_lock(fs->future_mutex);
1361         ft->id = ++fs->next_futureid;
1362         record_fevent_with_data(FEVENT_CREATE, (cur_ft ? cur_ft->id : NO_FUTURE_ID), ft->id);
1363         enqueue_future(fs, ft);
1364         mzrt_mutex_unlock(fs->future_mutex);
1365 
1366         return (Scheme_Object *)ft;
1367       } else {
1368         /* It would be nice to encourage allocation of a page for
1369            the future thread in this case, since it might try to
1370            allocate more futures. */
1371         return scheme_rtcall_make_future(proc);
1372       }
1373     } else {
1374       return scheme_rtcall_make_future(proc);
1375     }
1376 #else
1377     /* future-local allocation is not supported */
1378     return scheme_rtcall_make_future(proc);
1379 #endif
1380   }
1381 }
1382 
would_be_future(int argc,Scheme_Object * argv[])1383 static Scheme_Object *would_be_future(int argc, Scheme_Object *argv[])
1384   /* Called in runtime thread */
1385 {
1386   future_t *ft;
1387   Scheme_Future_Thread_State *fts;
1388   scheme_check_proc_arity("would-be-future", 0, 0, argc, argv);
1389   fts = scheme_future_thread_state;
1390 
1391   ft = (future_t*)make_future(argv[0], 0, (fts->thread ? fts->thread->current_ft : NULL));
1392   ft->in_tracing_mode = 1;
1393   ft->fts = scheme_future_thread_state;
1394 
1395   return (Scheme_Object*)ft;
1396 }
1397 
run_would_be_future(future_t * ft)1398 static void run_would_be_future(future_t *ft)
1399 {
1400   mz_jmp_buf newbuf, *savebuf;
1401   Scheme_Thread *p;
1402   Scheme_Future_State *fs;
1403   int aborted = 0;
1404 
1405   p = scheme_current_thread;
1406   fs = scheme_future_state;
1407 
1408   /* Setup the future thread state */
1409   p->futures_slow_path_tracing++;
1410   scheme_use_rtcall++;
1411 
1412   savebuf = p->error_buf;
1413   p->error_buf = &newbuf;
1414 
1415   /* Run the future */
1416   if (scheme_setjmp(newbuf)) {
1417     aborted = 1;
1418   } else {
1419     future_in_runtime(fs, ft, FEVENT_START_WORK);
1420   }
1421 
1422   scheme_use_rtcall--;
1423   p->futures_slow_path_tracing--;
1424   ft->in_tracing_mode = 0;
1425 
1426   p->error_buf = savebuf;
1427   if (aborted)
1428     scheme_longjmp(*savebuf, 1);
1429 
1430   return;
1431 }
1432 
fsemaphore_finalize(void * p,void * data)1433 void fsemaphore_finalize(void *p, void *data)
1434 {
1435   fsemaphore_t *sema;
1436   sema = (fsemaphore_t*)p;
1437   mzrt_mutex_destroy(sema->mut);
1438 }
1439 
scheme_make_fsemaphore_inl(Scheme_Object * ready)1440 Scheme_Object *scheme_make_fsemaphore_inl(Scheme_Object *ready)
1441 /* Called in runtime thread */
1442 {
1443   fsemaphore_t *sema;
1444   intptr_t v;
1445 
1446   v = scheme_get_semaphore_init("make-fsemaphore", 1, &ready);
1447 
1448   sema = MALLOC_ONE_TAGGED(fsemaphore_t);
1449   sema->so.type = scheme_fsemaphore_type;
1450 
1451   mzrt_mutex_create(&sema->mut);
1452   sema->ready = v;
1453 
1454   scheme_register_finalizer((void*)sema, fsemaphore_finalize, NULL, NULL, NULL);
1455 
1456   return (Scheme_Object*)sema;
1457 }
1458 
1459 
make_fsemaphore(int argc,Scheme_Object ** argv)1460 Scheme_Object *make_fsemaphore(int argc, Scheme_Object **argv)
1461   /* Called in runtime thread (atomic/synchronized) */
1462 {
1463   return scheme_make_fsemaphore_inl(argv[0]);
1464 }
1465 
scheme_fsemaphore_count(int argc,Scheme_Object ** argv)1466 Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object **argv)
1467   XFORM_SKIP_PROC
1468 {
1469   fsemaphore_t *sema;
1470 
1471   if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
1472     SCHEME_WRONG_CONTRACT_MAYBE_IN_FT("fsemaphore-count", "fsemaphore?", 0, argc, argv);
1473   }
1474 
1475   sema = (fsemaphore_t*)argv[0];
1476   return scheme_make_integer(sema->ready);
1477 }
1478 
requeue_future_within_lock(future_t * future,Scheme_Future_State * fs)1479 static void requeue_future_within_lock(future_t *future, Scheme_Future_State *fs)
1480   XFORM_SKIP_PROC
1481 /* called in any thread with lock held */
1482 {
1483   if (scheme_custodian_is_available(future->cust)) {
1484     future->status = PENDING;
1485     enqueue_future(fs, future);
1486   } else {
1487     /* The future's constodian is shut down, so don't try to
1488        run it in a future thread anymore */
1489     future->status = SUSPENDED;
1490   }
1491 }
1492 
requeue_future(future_t * future,Scheme_Future_State * fs)1493 static void requeue_future(future_t *future, Scheme_Future_State *fs)
1494 {
1495   mzrt_mutex_lock(fs->future_mutex);
1496   requeue_future_within_lock(future, fs);
1497   mzrt_mutex_unlock(fs->future_mutex);
1498 }
1499 
try_resume_future_from_fsema_wait(fsemaphore_t * sema,Scheme_Future_State * fs)1500 static int try_resume_future_from_fsema_wait(fsemaphore_t *sema, Scheme_Future_State *fs)
1501   XFORM_SKIP_PROC
1502 {
1503   future_t *ft;
1504 
1505   if (!sema->queue_front)
1506     return 0;
1507 
1508   ft = sema->queue_front;
1509   sema->queue_front = ft->next_in_fsema_queue;
1510   ft->next_in_fsema_queue = NULL;
1511 
1512   if (!sema->queue_front)
1513     sema->queue_end = NULL;
1514   else
1515     sema->queue_front->prev_in_fsema_queue = NULL;
1516 
1517   sema->ready--;
1518 
1519   ft->retval_s = scheme_void;
1520 
1521   /* Place the waiting future back on the run queue */
1522   requeue_future(ft, fs);
1523 
1524   return 1;
1525 }
1526 
scheme_fsemaphore_post(int argc,Scheme_Object ** argv)1527 Scheme_Object *scheme_fsemaphore_post(int argc, Scheme_Object **argv)
1528   XFORM_SKIP_PROC
1529 {
1530   fsemaphore_t *sema;
1531   Scheme_Future_State *fs;
1532   int old_count;
1533 
1534   if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
1535     SCHEME_WRONG_CONTRACT_MAYBE_IN_FT("fsemaphore-post", "fsemaphore?", 0, argc, argv);
1536   }
1537 
1538   sema = (fsemaphore_t*)argv[0];
1539 
1540 #ifdef FSEMA_LOGGING
1541   printf("[Thread %p]: scheme_fsemaphore_post for sema at %p. Count before V: %d\n",
1542     pthread_self(),
1543     sema,
1544     sema->ready);
1545 #endif
1546 
1547   fs = scheme_future_state;
1548   mzrt_mutex_lock(sema->mut);
1549 
1550   old_count = sema->ready;
1551   sema->ready++;
1552   if (!old_count) {
1553     try_resume_future_from_fsema_wait(sema, fs);
1554   }
1555 
1556   mzrt_mutex_unlock(sema->mut);
1557   return scheme_void;
1558 }
1559 
enqueue_future_for_fsema(future_t * ft,fsemaphore_t * sema)1560 static void enqueue_future_for_fsema(future_t *ft, fsemaphore_t *sema)
1561   /* This function assumed sema->mut has already been acquired! */
1562 {
1563   future_t *front;
1564 
1565   /* Enqueue this future in the semaphore's queue */
1566   front = sema->queue_front;
1567   if (!front) {
1568     sema->queue_front = ft;
1569     sema->queue_end = ft;
1570   } else {
1571     future_t *end = sema->queue_end;
1572     end->next_in_fsema_queue = ft;
1573     ft->prev_in_fsema_queue = end;
1574     sema->queue_end = ft;
1575   }
1576 }
1577 
block_until_sema_ready(fsemaphore_t * sema)1578 static fsemaphore_t *block_until_sema_ready(fsemaphore_t *sema)
1579 {
1580   /* This little function cooperates with the GC, unlike the
1581      function that calls it. */
1582   scheme_block_until(fsemaphore_ready, NULL, (Scheme_Object*)sema, 0);
1583   return sema;
1584 }
1585 
scheme_fsemaphore_wait(int argc,Scheme_Object ** argv)1586 Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object **argv)
1587   XFORM_SKIP_PROC
1588 {
1589   fsemaphore_t *sema;
1590   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
1591   Scheme_Future_State *fs = scheme_future_state;
1592   void *storage[4];
1593 
1594   if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
1595     SCHEME_WRONG_CONTRACT_MAYBE_IN_FT("fsemaphore-wait", "fsemaphore?", 0, argc, argv);
1596   }
1597 
1598   sema = (fsemaphore_t*)argv[0];
1599 
1600 #ifdef FSEMA_LOGGING
1601   printf("[Thread %p]: scheme_fsemaphore_wait for sema at %p. Count before P: %d\n",
1602     pthread_self(),
1603     sema,
1604     sema->ready);
1605 #endif
1606 
1607   mzrt_mutex_lock(sema->mut);
1608   if (!sema->ready) {
1609     if (fts->is_runtime_thread) {
1610       /* Then we are on the runtime thread -- if in would-be-future
1611           mode, should we just return?  Otherwise, block and wait
1612           for the fsema to be ready while cooperating with the
1613           scheduler */
1614       if (scheme_current_thread->futures_slow_path_tracing) {
1615         mzrt_mutex_unlock(sema->mut);
1616         return scheme_void;
1617       }
1618 
1619       do {
1620         mzrt_mutex_unlock(sema->mut);
1621         sema = block_until_sema_ready(sema);
1622         mzrt_mutex_lock(sema->mut);
1623       } while (!sema->ready);
1624     } else {
1625       /* On a future thread, suspend the future (to be
1626         resumed whenever the fsema becomes ready */
1627       future_t *future = fts->thread->current_ft;
1628       jit_future_storage[0] = (void*)sema;
1629       jit_future_storage[1] = (void*)future;
1630       if (!future) {
1631         /* Should never be here */
1632         scheme_log_abort("fsemaphore-wait: future was NULL for future thread.");
1633         abort();
1634       }
1635 
1636       /* Setup for LWC capture */
1637       mzrt_mutex_unlock(sema->mut);
1638       scheme_fill_lwc_end();
1639       future->lwc = scheme_current_lwc;
1640       future->fts = fts;
1641       future->prim_protocol = SIG_s_s;
1642 
1643       /* Try to capture it locally (on this thread) */
1644       if (GC_gen0_alloc_page_ptr
1645           && capture_future_continuation(fs, future, storage, 0, 0)) {
1646         /* capture sets fts->thread->current_ft to NULL */
1647         mzrt_mutex_lock(fs->future_mutex);
1648       } else {
1649         /* Can't capture the continuation locally, so ask the runtime
1650             thread to do it */
1651         mzrt_mutex_lock(fs->future_mutex);
1652         if (!future->in_queue_waiting_for_lwc) {
1653           future->next_waiting_lwc = fs->future_waiting_lwc;
1654           fs->future_waiting_lwc = future;
1655           future->in_queue_waiting_for_lwc = 1;
1656         }
1657         future->want_lw = 1;
1658       }
1659       future->status = WAITING_FOR_FSEMA;
1660 
1661       scheme_signal_received_at(fs->signal_handle);
1662       if (fts->thread->current_ft) {
1663         /* Will get here if relying on runtime thread to capture for us --
1664             wait for the signal that LW continuation was captured
1665             by the runtime thread. */
1666         future->can_continue_sema = fts->worker_can_continue_sema;
1667         end_gc_not_ok(fts, fs, MZ_RUNSTACK);
1668         mzrt_mutex_unlock(fs->future_mutex);
1669 
1670         mzrt_sema_wait(fts->worker_can_continue_sema);
1671 
1672         mzrt_mutex_lock(fs->future_mutex);
1673         start_gc_not_ok(fs);
1674       }
1675       mzrt_mutex_unlock(fs->future_mutex);
1676 
1677       FUTURE_ASSERT(!fts->thread->current_ft);
1678 
1679       /* Fetch the future and sema pointers again, in case moved by a GC */
1680       sema = (fsemaphore_t*)jit_future_storage[0];
1681       future = (future_t*)jit_future_storage[1];
1682 
1683       FUTURE_ASSERT(future->suspended_lw);
1684       FUTURE_ASSERT(!future->can_continue_sema);
1685 
1686       /* Check again to see whether the sema has become ready */
1687       mzrt_mutex_lock(sema->mut);
1688       if (sema->ready) {
1689         /* Then resume the future immediately (requeue) */
1690         sema->ready--;
1691         requeue_future(future, fs);
1692       } else {
1693         /* Add the future to the sema's wait queue */
1694         enqueue_future_for_fsema(future, sema);
1695       }
1696 
1697       mzrt_mutex_unlock(sema->mut);
1698 
1699       /* Jump back to the worker thread future loop (this thread
1700           is now free */
1701       scheme_future_longjmp(*scheme_current_thread->error_buf, 1);
1702     }
1703   }
1704 
1705   /* Semaphore is ready -- decrement and continue */
1706   sema->ready--;
1707   mzrt_mutex_unlock(sema->mut);
1708   return scheme_void;
1709 }
1710 
scheme_fsemaphore_try_wait(int argc,Scheme_Object ** argv)1711 Scheme_Object *scheme_fsemaphore_try_wait(int argc, Scheme_Object **argv)
1712   XFORM_SKIP_PROC
1713 {
1714   fsemaphore_t *sema;
1715   Scheme_Object *ret;
1716 
1717   if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
1718     SCHEME_WRONG_CONTRACT_MAYBE_IN_FT("fsemaphore-try-wait?", "fsemaphore?", 0, argc, argv);
1719   }
1720 
1721   sema = (fsemaphore_t*)argv[0];
1722   mzrt_mutex_lock(sema->mut);
1723   if (!sema->ready) {
1724     ret = scheme_false;
1725   } else {
1726     sema->ready--;
1727     ret = scheme_true;
1728   }
1729 
1730   mzrt_mutex_unlock(sema->mut);
1731   return ret;
1732 }
1733 
fsemaphore_ready(Scheme_Object * obj)1734 static int fsemaphore_ready(Scheme_Object *obj)
1735 /* Called in runtime thread by Racket scheduler */
1736 {
1737   int ret = 0;
1738   fsemaphore_t *fsema = (fsemaphore_t*)obj;
1739   mzrt_mutex_lock(fsema->mut);
1740   ret = fsema->ready;
1741   mzrt_mutex_unlock(fsema->mut);
1742   return ret;
1743 }
1744 
1745 
future_ready(Scheme_Object * obj)1746 int future_ready(Scheme_Object *obj)
1747 /* Called in runtime thread by Racket scheduler */
1748 {
1749   Scheme_Future_State *fs = scheme_future_state;
1750   int ret = 0;
1751   future_t *ft = (future_t*)obj;
1752 
1753   mzrt_mutex_lock(fs->future_mutex);
1754   if ((ft->status != RUNNING)
1755       && (ft->status != WAITING_FOR_FSEMA)
1756       && (ft->status != HANDLING_PRIM)) {
1757     ret = 1;
1758   }
1759   mzrt_mutex_unlock(fs->future_mutex);
1760 
1761   return ret;
1762 }
1763 
dequeue_future(Scheme_Future_State * fs,future_t * ft)1764 static void dequeue_future(Scheme_Future_State *fs, future_t *ft)
1765   XFORM_SKIP_PROC
1766 /* called from both future and runtime threads */
1767 {
1768   if (ft->prev == NULL)
1769     fs->future_queue = ft->next;
1770   else
1771     ft->prev->next = ft->next;
1772 
1773   if (ft->next == NULL)
1774     fs->future_queue_end = ft->prev;
1775   else
1776     ft->next->prev = ft->prev;
1777 
1778   ft->next = NULL;
1779   ft->prev = NULL;
1780 
1781   if (ft->in_future_queue) {
1782     --fs->future_queue_count;
1783     ft->in_future_queue = 0;
1784   }
1785 }
1786 
complete_rtcall(Scheme_Future_State * fs,future_t * future)1787 static void complete_rtcall(Scheme_Future_State *fs, future_t *future)
1788   XFORM_SKIP_PROC
1789 /* called in any thread with lock held */
1790 {
1791   if (future->suspended_lw) {
1792     /* Re-enqueue the future so that some future thread can continue */
1793     requeue_future_within_lock(future, fs);
1794   } else {
1795     /* Signal the waiting worker thread that it
1796        can continue running machine code */
1797     future->want_lw = 0; /* needed if we get here via direct_future_to_future_touch() */
1798     if (future->can_continue_sema) {
1799       mzrt_sema *can_continue_sema = future->can_continue_sema;
1800       FUTURE_ASSERT(!future->in_atomic_queue);
1801       future->can_continue_sema = NULL;
1802       mzrt_sema_post(can_continue_sema);
1803     }
1804   }
1805 }
1806 
direct_future_to_future_touch(Scheme_Future_State * fs,future_t * ft,future_t * t_ft)1807 static void direct_future_to_future_touch(Scheme_Future_State *fs, future_t *ft, future_t *t_ft)
1808   XFORM_SKIP_PROC
1809 /* called in any thread with lock held */
1810 {
1811   Scheme_Object *retval = ft->retval;
1812 
1813   receive_special_result(ft, retval, 0);
1814   t_ft->retval_s = retval;
1815   send_special_result(t_ft, retval);
1816 
1817   t_ft->arg_S1 = NULL;
1818   t_ft->status = HANDLING_PRIM; /* handled as if by runtime thread */
1819 
1820   complete_rtcall(fs, t_ft);
1821 }
1822 
get_future_for_touch(future_t * ft)1823 static future_t *get_future_for_touch(future_t *ft)
1824   XFORM_SKIP_PROC
1825 /* called in any thread with lock held */
1826 {
1827   if ((ft->status == WAITING_FOR_PRIM) && (ft->prim_func == touch)) {
1828     /* try to enqueue it... */
1829     Scheme_Object **a = ft->arg_S1;
1830     if (ft->suspended_lw)
1831       a = scheme_adjust_runstack_argument(ft->suspended_lw, a);
1832     return (future_t *)a[0];
1833   } else
1834     return NULL;
1835 }
1836 
trigger_added_touches(Scheme_Future_State * fs,future_t * ft)1837 static void trigger_added_touches(Scheme_Future_State *fs, future_t *ft)
1838   XFORM_SKIP_PROC
1839 /* lock held; called from both future and runtime threads */
1840 {
1841   if (ft->touching) {
1842     Scheme_Object *touching = ft->touching;
1843     ft->touching = NULL;
1844     while (!SCHEME_NULLP(touching)) {
1845       Scheme_Object *wb = SCHEME_CAR(touching);
1846       future_t *t_ft = (future_t *)SCHEME_WEAK_BOX_VAL(wb);
1847 
1848       if (t_ft && (get_future_for_touch(t_ft) == ft)) {
1849         direct_future_to_future_touch(fs, ft, t_ft);
1850       }
1851 
1852       touching = SCHEME_CDR(touching);
1853     }
1854   }
1855 }
1856 
shallower_apply_future_lw_k(void)1857 static Scheme_Object *shallower_apply_future_lw_k(void)
1858 {
1859   Scheme_Thread *p = scheme_current_thread;
1860   future_t *ft = (future_t *)p->ku.k.p1;
1861 
1862   p->ku.k.p1 = NULL;
1863 
1864   return apply_future_lw(ft);
1865 }
1866 
future_in_runtime(Scheme_Future_State * fs,future_t * volatile ft,int what)1867 static int future_in_runtime(Scheme_Future_State *fs, future_t * volatile ft, int what)
1868 {
1869   mz_jmp_buf newbuf, * volatile savebuf;
1870   Scheme_Thread *p = scheme_current_thread;
1871   Scheme_Object * volatile retval;
1872   future_t * volatile old_ft;
1873   int done;
1874 
1875   old_ft = p->current_ft;
1876   p->current_ft = ft;
1877 
1878   savebuf = p->error_buf;
1879   p->error_buf = &newbuf;
1880 
1881   record_fevent(what, ft->id);
1882 
1883   if (scheme_setjmp(newbuf)) {
1884     ft->no_retval = 1;
1885     retval = NULL;
1886   } else {
1887     if (ft->suspended_lw) {
1888       if (scheme_can_apply_lightweight_continuation(ft->suspended_lw, 1) > 1) {
1889         p->ku.k.p1 = ft;
1890         retval = scheme_handle_stack_overflow(shallower_apply_future_lw_k);
1891       } else
1892         retval = apply_future_lw(ft);
1893     } else {
1894       if (ft->suspended_lw_stack) {
1895         Scheme_Object *rator, **argv;
1896         int argc;
1897         Scheme_Lightweight_Continuation *lc;
1898         rator = (Scheme_Object *)ft->suspended_lw_stack[2];
1899         argc = SCHEME_INT_VAL((Scheme_Object *)ft->suspended_lw_stack[3]);
1900         argv = (Scheme_Object **)ft->suspended_lw_stack[4];
1901         ft->suspended_lw_stack[2] = NULL;
1902         ft->suspended_lw_stack[4] = NULL;
1903 
1904         lc = (Scheme_Lightweight_Continuation *)ft->suspended_lw_stack[1];
1905         scheme_restore_lightweight_continuation_marks(lc);
1906 
1907         if (ft->suspended_lw_stack[5])
1908           retval = _scheme_apply_multi(rator, argc, argv);
1909         else
1910           retval = _scheme_apply(rator, argc, argv);
1911       } else
1912         retval = scheme_apply_multi(ft->orig_lambda, 0, NULL);
1913     }
1914     send_special_result(ft, retval);
1915   }
1916 
1917   p->error_buf = savebuf;
1918   p->current_ft = old_ft;
1919 
1920   ft->retval = retval;
1921 
1922   mzrt_mutex_lock(fs->future_mutex);
1923 
1924   if (ft->suspended_lw_stack && retval) {
1925     pop_suspended_lw(fs, ft);
1926     done = 0;
1927   } else {
1928     if (!retval)
1929       ft->suspended_lw_stack = NULL;
1930     ft->status = FINISHED;
1931     trigger_added_touches(fs, ft);
1932     done = 1;
1933   }
1934   record_fevent(FEVENT_COMPLETE, ft->id);
1935   mzrt_mutex_unlock(fs->future_mutex);
1936 
1937   record_fevent(FEVENT_END_WORK, ft->id);
1938 
1939   if (!retval) {
1940     scheme_longjmp(*savebuf, 1);
1941   }
1942 
1943   return done;
1944 }
1945 
prefer_to_apply_future_in_runtime()1946 static int prefer_to_apply_future_in_runtime()
1947 /* Called with the future mutex held. */
1948 {
1949   /* Policy question: if the runtime thread is blocked on a
1950      future, should we just run the future (or its suspended continuation)
1951      directly in the runtime thread?
1952 
1953      If we don't, then we're better able to handle non-blocking requests
1954      from future threads. At the same time, the runtime thread shouldn't
1955      wait if no one is working on the future. We err on the safe side
1956      and always run when we're waiting on the future: */
1957   return 1;
1958 }
1959 
general_touch(int argc,Scheme_Object * argv[])1960 Scheme_Object *general_touch(int argc, Scheme_Object *argv[])
1961 /* Called in runtime thread */
1962 {
1963   Scheme_Future_State *fs = scheme_future_state;
1964   Scheme_Object *retval = NULL;
1965   future_t *ft;
1966 
1967   if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_future_type))
1968     scheme_wrong_contract("touch", "future?", 0, argc, argv);
1969 
1970   ft = (future_t*)argv[0];
1971 
1972   /* Spin waiting for primitive calls or a return value from
1973      the worker thread */
1974   while (1) {
1975     mzrt_mutex_lock(fs->future_mutex);
1976     if ((((ft->status == PENDING)
1977           && prefer_to_apply_future_in_runtime())
1978          || (ft->status == PENDING_OVERSIZE)
1979          || (ft->status == SUSPENDED))
1980         && (!ft->suspended_lw
1981             || scheme_can_apply_lightweight_continuation(ft->suspended_lw, 0)))
1982       {
1983         int what = FEVENT_START_WORK;
1984         if (ft->status == PENDING_OVERSIZE) {
1985           what = FEVENT_START_RTONLY_WORK;
1986         } else if (ft->status != SUSPENDED) {
1987           dequeue_future(fs, ft);
1988           if (ft->suspended_lw_stack)
1989             what = FEVENT_RESUME_WORK;
1990         }
1991         ft->status = RUNNING;
1992         mzrt_mutex_unlock(fs->future_mutex);
1993         if (ft->in_tracing_mode) {
1994           run_would_be_future(ft);
1995           retval = ft->retval;
1996           break;
1997         } else {
1998           if (future_in_runtime(fs, ft, what)) {
1999             retval = ft->retval;
2000             break;
2001           }
2002         }
2003       }
2004     else if ((ft->status == RUNNING)
2005         || (ft->status == WAITING_FOR_FSEMA)
2006         || (ft->status == HANDLING_PRIM))
2007       {
2008         /* someone else got to it first */
2009         mzrt_mutex_unlock(fs->future_mutex);
2010       }
2011     else if (ft->status == FINISHED)
2012       {
2013         retval = ft->retval;
2014 
2015         mzrt_mutex_unlock(fs->future_mutex);
2016 
2017         break;
2018       }
2019     else if (ft->status == WAITING_FOR_PRIM)
2020       {
2021 	if (ft->in_atomic_queue) {
2022 	  /* Should be in the atomic-wait queue, so
2023 	     handle those actions now: */
2024 	  mzrt_mutex_unlock(fs->future_mutex);
2025 	  scheme_check_future_work();
2026 	} else {
2027           /* Invoke the primitive and stash the result.
2028              Release the lock so other threads can manipulate the queue
2029              while the runtime call executes. */
2030           ft->status = HANDLING_PRIM;
2031           ft->want_lw = 0;
2032           mzrt_mutex_unlock(fs->future_mutex);
2033           invoke_rtcall(fs, ft, 0);
2034         }
2035       }
2036     else if (ft->maybe_suspended_lw && (ft->status != WAITING_FOR_FSEMA))
2037       {
2038         ft->maybe_suspended_lw = 0;
2039         if (ft->suspended_lw) {
2040           if (scheme_can_apply_lightweight_continuation(ft->suspended_lw, 0)
2041               && prefer_to_apply_future_in_runtime()) {
2042             if (ft->status != SUSPENDED)
2043               dequeue_future(fs, ft);
2044             ft->status = RUNNING;
2045             /* may raise an exception or escape: */
2046             mzrt_mutex_unlock(fs->future_mutex);
2047             (void)future_in_runtime(fs, ft, FEVENT_START_WORK);
2048           } else {
2049             /* Someone needs to handle the future. We're banking on some
2050                future thread eventually picking up the future, which is
2051                not actually guaranteed if they're all busy looping... */
2052             mzrt_mutex_unlock(fs->future_mutex);
2053           }
2054         } else {
2055           mzrt_mutex_unlock(fs->future_mutex);
2056         }
2057       }
2058     else
2059       {
2060         mzrt_mutex_unlock(fs->future_mutex);
2061       }
2062 
2063     scheme_thread_block(0.0); /* to ensure check for futures work */
2064 
2065     record_fevent(FEVENT_TOUCH_PAUSE, ft->id);
2066     scheme_block_until(future_ready, NULL, (Scheme_Object*)ft, 0);
2067     record_fevent(FEVENT_TOUCH_RESUME, ft->id);
2068   }
2069 
2070   if (!retval) {
2071     scheme_signal_error("touch: future previously aborted");
2072   }
2073 
2074   receive_special_result(ft, retval, 0);
2075 
2076   flush_future_logs(fs);
2077 
2078   return retval;
2079 }
2080 
touch(int argc,Scheme_Object * argv[])2081 Scheme_Object *touch(int argc, Scheme_Object *argv[])
2082   XFORM_SKIP_PROC
2083 /* can be called in future thread */
2084 {
2085   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
2086   if (fts->is_runtime_thread) {
2087     future_t *ft;
2088     if (fts->thread
2089         && (ft = fts->thread->current_ft)
2090         && ft->in_tracing_mode) {
2091       future_t *targ_ft = (future_t*)argv[0];
2092       Scheme_Future_State *fs = scheme_future_state;
2093       Scheme_Object *targid_obj;
2094       targid_obj = scheme_make_integer(targ_ft->id);
2095       log_future_event(fs,
2096                        "id %d, process %d: %s: %s; time: %f",
2097                        "touch",
2098                        -1,
2099                        FEVENT_RTCALL_TOUCH,
2100                        get_future_timestamp(),
2101                        ft->id,
2102                        targid_obj);
2103     }
2104 
2105     return general_touch(argc, argv);
2106   } else {
2107     if (SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_future_type)) {
2108       Scheme_Future_State *fs = scheme_future_state;
2109       future_t *ft = (future_t *)argv[0];
2110       int status;
2111 
2112       mzrt_mutex_lock(fs->future_mutex);
2113       status = ft->status;
2114       mzrt_mutex_unlock(fs->future_mutex);
2115 
2116       if (status == FINISHED) {
2117         Scheme_Object *retval = ft->retval;
2118         receive_special_result(ft, retval, 0);
2119         return retval;
2120       } else {
2121 #ifdef MZ_PRECISE_GC
2122         /* Try adding current future to ft's chain of touching futures */
2123         Scheme_Object *wb, *pr;
2124         future_t *current_ft = scheme_current_thread->current_ft;
2125 
2126         wb = GC_malloc_weak_box(current_ft, NULL, 0, 0);
2127         if (wb) {
2128           pr = GC_malloc_pair(wb, scheme_null);
2129           if (pr) {
2130             mzrt_mutex_lock(fs->future_mutex);
2131             if (ft->status != FINISHED) {
2132               if (ft->touching)
2133                 SCHEME_CDR(pr) = ft->touching;
2134               ft->touching = pr;
2135               current_ft->in_future_specific_touch_queue = 1;
2136               mzrt_mutex_unlock(fs->future_mutex);
2137             } else {
2138               /* `ft' switched to FINISHED while we were trying add,
2139                  so carry on with its result */
2140               Scheme_Object *retval = ft->retval;
2141               mzrt_mutex_unlock(fs->future_mutex);
2142               receive_special_result(ft, retval, 0);
2143               return retval;
2144             }
2145           }
2146         }
2147 #endif
2148       }
2149     }
2150     return scheme_rtcall_iS_s("touch", FSRC_PRIM, touch, argc, argv);
2151   }
2152 }
2153 
scheme_is_multithreaded(int now)2154 int scheme_is_multithreaded(int now)
2155 {
2156   if (!now)
2157     return 1;
2158   else {
2159     Scheme_Future_State *fs = scheme_future_state;
2160     return (fs && fs->future_threads_created);
2161   }
2162 }
2163 
processor_count(int argc,Scheme_Object * argv[])2164 Scheme_Object *processor_count(int argc, Scheme_Object *argv[])
2165 /* Called in runtime thread */
2166 {
2167   return scheme_make_integer(rktio_processor_count(scheme_rktio));
2168 }
2169 
scheme_current_future(int argc,Scheme_Object * argv[])2170 Scheme_Object *scheme_current_future(int argc, Scheme_Object *argv[])
2171   XFORM_SKIP_PROC
2172 /* Called from any thread (either runtime or future) */
2173 {
2174   future_t *ft = scheme_current_thread->current_ft;
2175 
2176   return (ft ? (Scheme_Object *)ft : scheme_false);
2177 }
2178 
2179 /* Entry point for a worker thread allocated for
2180    executing futures.  This function will never terminate
2181    (until the process dies). */
worker_thread_future_loop(void * arg)2182 void *worker_thread_future_loop(void *arg)
2183   XFORM_SKIP_PROC
2184 /* Called in future thread; runtime thread is blocked until ready_sema
2185   is signaled. */
2186 {
2187   /* valid only until signaling */
2188   future_thread_params_t *params = (future_thread_params_t *)arg;
2189   Scheme_Future_Thread_State *fts = params->fts;
2190   Scheme_Future_State *fs = params->fs;
2191   Scheme_Object *v;
2192   Scheme_Native_Proc *jitcode;
2193   future_t *ft;
2194   mz_jmp_buf newbuf;
2195   int fid, what;
2196 
2197   scheme_future_state = fs;
2198   scheme_future_thread_state = fts;
2199 
2200   GC_instance = params->shared_GC;
2201 
2202   GC_gen0_alloc_only = 1;
2203 
2204   /* Set processor affinity */
2205   /*mzrt_mutex_lock(fs->future_mutex);
2206       static uintptr_t cur_cpu_mask = 1;
2207     if (pthread_setaffinity_np(pthread_self(), sizeof(g_cur_cpu_mask), &g_cur_cpu_mask))
2208     {
2209     printf(
2210     "Could not set CPU affinity (%lu) for thread %p!\n",
2211     ++g_cur_cpu_mask,
2212     pthread_self());
2213     }
2214 
2215     mzrt_mutex_unlock(fs->future_mutex);
2216   */
2217 
2218   scheme_configure_floating_point();
2219 
2220   mzrt_sema_create(&fts->worker_can_continue_sema, 0);
2221 
2222   scheme_use_rtcall = 1;
2223 
2224   scheme_current_thread = fts->thread;
2225 
2226   scheme_fuel_counter = 1;
2227   scheme_jit_stack_boundary = ((uintptr_t)&v) - FUTURE_C_STACK_SIZE;
2228 
2229   fts->need_gc_pointer = &scheme_future_need_gc_pause;
2230   fts->fuel_pointer = &scheme_fuel_counter;
2231   fts->stack_boundary_pointer = &scheme_jit_stack_boundary;
2232 
2233   MZ_RUNSTACK_START = params->runstack_start;
2234   MZ_RUNSTACK = MZ_RUNSTACK_START + fts->runstack_size;
2235 
2236   params->scheme_current_runstack_ptr = &scheme_current_runstack;
2237   params->scheme_current_runstack_start_ptr = &scheme_current_runstack_start;
2238   params->current_thread_ptr = &scheme_current_thread;
2239   params->jit_future_storage_ptr = &jit_future_storage[0];
2240 
2241   scheme_init_thread_lwc();
2242   params->lwc = scheme_current_lwc;
2243 
2244   mzrt_sema_post(params->ready_sema);
2245 
2246   scheme_current_thread->runstack = MZ_RUNSTACK;
2247   scheme_current_thread->runstack_start = MZ_RUNSTACK_START;
2248 
2249   while (1) {
2250     mzrt_sema_wait(fs->future_pending_sema);
2251     mzrt_mutex_lock(fs->future_mutex);
2252     start_gc_not_ok(fs);
2253 
2254     ft = get_pending_future(fs);
2255 
2256     if (ft) {
2257       FUTURE_ASSERT(!ft->in_atomic_queue);
2258       FUTURE_ASSERT(!ft->in_future_queue);
2259 
2260       fs->busy_thread_count++;
2261 
2262       if (ft->suspended_lw_stack)
2263         what = FEVENT_RESUME_WORK;
2264       else
2265         what = FEVENT_START_WORK;
2266       fid = ft->id;
2267       record_fevent(what, fid);
2268 
2269       /* Work is available for this thread */
2270       ft->status = RUNNING;
2271       ft->maybe_suspended_lw = 0;
2272       mzrt_mutex_unlock(fs->future_mutex);
2273 
2274       ft->thread_short_id = fts->id;
2275 
2276       /* Set up the JIT compiler for this thread  */
2277       scheme_jit_fill_threadlocal_table();
2278 
2279       fts->thread->current_ft = ft;
2280       GC_register_thread(fts->thread, ft->cust);
2281 
2282       MZ_RUNSTACK = MZ_RUNSTACK_START + fts->runstack_size;
2283       MZ_CONT_MARK_STACK = 0;
2284       MZ_CONT_MARK_POS = (MZ_MARK_POS_TYPE)1;
2285 
2286       if (ft->suspended_lw) {
2287         /* invoke a lightweight continuation */
2288         scheme_current_thread->error_buf = &newbuf;
2289         if (scheme_future_setjmp(newbuf)) {
2290           /* failed or suspended */
2291           v = NULL;
2292         } else {
2293           v = _apply_future_lw(ft);
2294         }
2295       } else {
2296         /* Run the code:
2297            The lambda passed to a future will always be a parameterless
2298            function.
2299            From this thread's perspective, this call will never return
2300            until all the work to be done in the future has been completed,
2301            including runtime calls.
2302            If jitcode asks the runtime thread to do work, then
2303            a GC can occur. */
2304 
2305         scheme_current_thread->error_buf = &newbuf;
2306         if (scheme_future_setjmp(newbuf)) {
2307           /* failed or suspended */
2308           v = NULL;
2309         } else {
2310           Scheme_Object *rator, **argv;
2311           int argc;
2312 
2313           scheme_fill_lwc_start();
2314 
2315           if (ft->suspended_lw_stack) {
2316             Scheme_Lightweight_Continuation *lc;
2317 
2318             lc = (Scheme_Lightweight_Continuation *)ft->suspended_lw_stack[1];
2319             scheme_restore_lightweight_continuation_marks(lc); /* might trigger GC */
2320             ft = fts->thread->current_ft;
2321 
2322             rator = (Scheme_Object *)ft->suspended_lw_stack[2];
2323             argc = SCHEME_INT_VAL((Scheme_Object *)ft->suspended_lw_stack[3]);
2324             argv = (Scheme_Object **)ft->suspended_lw_stack[4];
2325             ft->suspended_lw_stack[2] = NULL;
2326             ft->suspended_lw_stack[4] = NULL;
2327           } else {
2328             rator = ft->orig_lambda;
2329             argc = 0;
2330             argv = NULL;
2331           }
2332 
2333           jitcode = ((Scheme_Native_Closure *)rator)->code->start_code;
2334           v = scheme_call_as_lightweight_continuation(jitcode, rator, argc, argv);
2335           if (SAME_OBJ(v, SCHEME_TAIL_CALL_WAITING))
2336             v = scheme_force_value_same_mark_as_lightweight_continuation(v);
2337         }
2338       }
2339 
2340       /* Get future again, since a GC may have occurred or
2341          future may have been suspended */
2342       ft = fts->thread->current_ft;
2343 
2344       mzrt_mutex_lock(fs->future_mutex);
2345 
2346       if (!ft) {
2347         /* continuation of future will be requeued, and this future
2348            thread can do something else */
2349       } else {
2350         FUTURE_ASSERT(v || ft->no_retval);
2351 
2352         if (ft->no_retval >= 0) {
2353           /* Set the return val in the descriptor */
2354           ft->retval = v;
2355           /* In case of multiple values: */
2356           send_special_result(ft, v);
2357 
2358           if (ft->suspended_lw_stack) {
2359             if (!ft->suspended_lw_stack[5] && SAME_OBJ(v, SCHEME_MULTIPLE_VALUES)) {
2360               /* multiple results are not allowed; keep the same lw stack,
2361                  but replace the function to call: */
2362               ft->status = PENDING_OVERSIZE;
2363               ft->suspended_lw_stack[2] = bad_multi_result_proc;
2364               ft->suspended_lw_stack[3] = scheme_make_integer(ft->multiple_count);
2365               ft->suspended_lw_stack[4] = ft->multiple_array;
2366               ft->retval_s = NULL;
2367               ft->multiple_array = NULL;
2368             } else
2369               pop_suspended_lw(fs, ft);
2370           } else {
2371             /* Update the status */
2372             ft->status = FINISHED;
2373             trigger_added_touches(fs, ft);
2374           }
2375           record_fevent(FEVENT_COMPLETE, fid);
2376         } else {
2377           ft->suspended_lw_stack = NULL;
2378         }
2379 
2380         fts->thread->current_ft = NULL;
2381         GC_register_thread(fts->thread, main_custodian);
2382       }
2383 
2384       /* Clear stacks */
2385       MZ_RUNSTACK = MZ_RUNSTACK_START + fts->runstack_size;
2386       MZ_CONT_MARK_STACK = 0;
2387 
2388       if (ft)
2389         scheme_signal_received_at(fs->signal_handle);
2390 
2391       record_fevent(FEVENT_END_WORK, fid);
2392 
2393       --fs->busy_thread_count;
2394     }
2395 
2396     end_gc_not_ok(fts, fs, NULL);
2397     mzrt_mutex_unlock(fs->future_mutex);
2398   }
2399 
2400   return NULL;
2401 }
2402 
_apply_future_lw(future_t * ft)2403 static Scheme_Object *_apply_future_lw(future_t *ft)
2404   XFORM_SKIP_PROC
2405 /* Called from any thread (either runtime or future) */
2406 {
2407   struct Scheme_Lightweight_Continuation *lw = ft->suspended_lw;
2408   Scheme_Object *v;
2409   int result_is_rs_plus_two;
2410 
2411   ft->suspended_lw = NULL;
2412 
2413   v = ft->retval_s;
2414   if (ft->retval_is_rs_plus_two) {
2415     result_is_rs_plus_two = 1;
2416     ft->retval_is_rs_plus_two = 0;
2417   } else {
2418     ft->retval_s = NULL;
2419     receive_special_result(ft, v, 1);
2420     result_is_rs_plus_two = 0;
2421   }
2422 
2423   FUTURE_ASSERT((ft->prim_protocol != SIG_ON_DEMAND) == !result_is_rs_plus_two);
2424   FUTURE_ASSERT(v || (ft->prim_protocol != SIG_ALLOC));
2425 
2426   v = scheme_apply_lightweight_continuation(lw, v, result_is_rs_plus_two,
2427                                             FUTURE_RUNSTACK_SIZE);
2428 
2429   if (SAME_OBJ(v, SCHEME_TAIL_CALL_WAITING)) {
2430     if (scheme_future_thread_state->is_runtime_thread)
2431       v = scheme_force_value_same_mark(v);
2432     else
2433       v = scheme_force_value_same_mark_as_lightweight_continuation(v);
2434   }
2435 
2436   return v;
2437 }
2438 
apply_future_lw_k(void)2439 static void *apply_future_lw_k(void)
2440 {
2441   Scheme_Thread *p = scheme_current_thread;
2442   future_t *ft = (future_t *)p->ku.k.p1;
2443 
2444   p->ku.k.p1 = NULL;
2445 
2446   return _apply_future_lw(ft);
2447 }
2448 
apply_future_lw(future_t * ft)2449 static Scheme_Object *apply_future_lw(future_t *ft)
2450 {
2451   Scheme_Thread *p = scheme_current_thread;
2452 
2453   p->ku.k.p1 = ft;
2454 
2455   return (Scheme_Object *)scheme_top_level_do(apply_future_lw_k, 0);
2456 }
2457 
capture_future_continuation(Scheme_Future_State * fs,future_t * ft,void ** storage,int need_lock,int for_overflow)2458 static int capture_future_continuation(Scheme_Future_State *fs, future_t *ft, void **storage,
2459                                        int need_lock, int for_overflow)
2460   XFORM_SKIP_PROC
2461 /* The lock is *not* held when calling this function.
2462    This function explicitly cooperates with the GC by storing the
2463    pointers it needs to save across a collection in `storage', so
2464    it can be used in a future thread. If future-thread-local
2465    allocation fails, the result is 0. */
2466 {
2467   Scheme_Lightweight_Continuation *lw;
2468   Scheme_Object **arg_S;
2469   void **stack;
2470 
2471 #ifndef MZ_PRECISE_GC
2472   if (scheme_use_rtcall)
2473     return 0;
2474 #endif
2475 
2476   storage[2] = ft;
2477 
2478   if (for_overflow) {
2479     stack = MALLOC_N(void*, 6);
2480     if (!stack) return 0;
2481     storage[3] = stack;
2482     ft = (future_t *)storage[2];
2483   } else
2484     stack = NULL;
2485 
2486   lw = scheme_capture_lightweight_continuation(ft->fts->thread, ft->lwc, storage);
2487   if (!lw) return 0;
2488 
2489   ft = (future_t *)storage[2];
2490   stack = (void **)storage[3];
2491 
2492   if (need_lock) {
2493     mzrt_mutex_lock(fs->future_mutex);
2494 
2495     if (!ft->want_lw) {
2496       /* Future can already continue. This can only happen
2497          if ft was blocked on another future, and the other
2498          future decided that it could continue while we were
2499          trying to grab the continuation. Drop the captured
2500          continuation. */
2501       return 1;
2502     }
2503 
2504     ft->want_lw = 0;
2505   }
2506 
2507   ft->fts->thread->current_ft = NULL; /* tells worker thread that it no longer
2508                                          needs to handle the future */
2509   GC_register_thread(ft->fts->thread, main_custodian);
2510 
2511   ft->suspended_lw = lw;
2512   ft->maybe_suspended_lw = 1;
2513 
2514   if (ft->arg_S0) {
2515     arg_S = scheme_adjust_runstack_argument(lw, ft->arg_S0);
2516     ft->arg_S0 = arg_S;
2517   }
2518   if (ft->arg_S1) {
2519     arg_S = scheme_adjust_runstack_argument(lw, ft->arg_S1);
2520     ft->arg_S1 = arg_S;
2521   }
2522   if (ft->arg_S2) {
2523     arg_S = scheme_adjust_runstack_argument(lw, ft->arg_S2);
2524     ft->arg_S2 = arg_S;
2525   }
2526 
2527   if (for_overflow) {
2528     stack[0] = ft->suspended_lw_stack;
2529     stack[5] = ((for_overflow > 1) ? scheme_true : NULL);
2530     ft->suspended_lw_stack = stack;
2531   }
2532 
2533   return 1;
2534 }
2535 
push_suspended_lw(Scheme_Future_State * fs,future_t * ft)2536 static void push_suspended_lw(Scheme_Future_State *fs, future_t *ft)
2537   XFORM_SKIP_PROC
2538 /* called in any thread; needs lock */
2539 {
2540   ft->suspended_lw_stack[1] = ft->suspended_lw;
2541   ft->suspended_lw = NULL;
2542 
2543   ft->suspended_lw_stack[2] = ft->arg_s0;
2544   ft->arg_s0 = NULL;
2545   ft->suspended_lw_stack[3] = scheme_make_integer(ft->arg_i0);
2546   ft->suspended_lw_stack[4] = ft->arg_S0;
2547   ft->arg_S0 = NULL;
2548 
2549   ft->status = PENDING;
2550   (void)enqueue_future(fs, ft);
2551 }
2552 
pop_suspended_lw(Scheme_Future_State * fs,future_t * ft)2553 static void pop_suspended_lw(Scheme_Future_State *fs, future_t *ft)
2554   XFORM_SKIP_PROC
2555 /* called in any thread; needs lock */
2556 {
2557   ft->retval_s = ft->retval;
2558   ft->retval = NULL;
2559 
2560   ft->suspended_lw = (Scheme_Lightweight_Continuation *)ft->suspended_lw_stack[1];
2561   ft->maybe_suspended_lw = 1;
2562 
2563   ft->suspended_lw_stack = (void **)ft->suspended_lw_stack[0];
2564 
2565   ft->status = PENDING;
2566   (void)enqueue_future(fs, ft);
2567 }
2568 
scheme_check_future_work()2569 void scheme_check_future_work()
2570 /* Called in the runtime thread by the scheduler */
2571 {
2572   /* Check for work that future threads need from the runtime thread
2573      and that can be done in any Racket thread (e.g., get a new page
2574      for allocation). */
2575   future_t *ft, *other_ft;
2576   Scheme_Future_State *fs = scheme_future_state;
2577   mzrt_sema *can_continue_sema;
2578   int more;
2579 
2580   if (!fs) return;
2581 
2582   flush_future_logs(fs);
2583 
2584   check_future_thread_creation(fs);
2585 
2586   if (!fs->future_threads_created) return;
2587 
2588   more = 1;
2589   while (more) {
2590     /* Try to get a future waiting on a atomic operation */
2591     mzrt_mutex_lock(fs->future_mutex);
2592     ft = fs->future_waiting_atomic;
2593     if (ft) {
2594       fs->future_waiting_atomic = ft->next_waiting_atomic;
2595       ft->next_waiting_atomic = NULL;
2596       ft->in_atomic_queue = 0;
2597       if ((ft->status == WAITING_FOR_PRIM) && ft->rt_prim_is_atomic) {
2598         ft->status = HANDLING_PRIM;
2599         ft->want_lw = 0; /* we expect to handle it quickly,
2600                             so the future thread should just wait */
2601       } else
2602         ft = NULL;
2603       more = 1;
2604     } else
2605       more = 0;
2606     mzrt_mutex_unlock(fs->future_mutex);
2607 
2608     if (ft)
2609       invoke_rtcall(fs, ft, 1);
2610   }
2611 
2612   more = 1;
2613   while (more) {
2614     /* Try to get a future that's waiting to touch another future: */
2615     mzrt_mutex_lock(fs->future_mutex);
2616     ft = fs->future_waiting_touch;
2617     if (ft) {
2618       fs->future_waiting_touch = ft->next_waiting_touch;
2619       ft->next_waiting_touch = NULL;
2620       ft->in_touch_queue = 0;
2621       other_ft = get_future_for_touch(ft);
2622       more = 1;
2623     } else {
2624       other_ft = NULL;
2625       more = 0;
2626     }
2627     mzrt_mutex_unlock(fs->future_mutex);
2628 
2629     if (other_ft) {
2630       /* Chain to `ft' from `other_ft': */
2631       Scheme_Object *wb, *pr;
2632 
2633       wb = scheme_make_weak_box((Scheme_Object *)ft);
2634       pr = scheme_make_pair(wb, scheme_null);
2635 
2636       mzrt_mutex_lock(fs->future_mutex);
2637       if (other_ft->status == FINISHED) {
2638         /* Completed while we tried to allocate a chain link. */
2639         direct_future_to_future_touch(fs, other_ft, ft);
2640       } else {
2641         /* enqueue */
2642         if (other_ft->touching)
2643           SCHEME_CDR(pr) = other_ft->touching;
2644         other_ft->touching = pr;
2645       }
2646       mzrt_mutex_unlock(fs->future_mutex);
2647     }
2648   }
2649 
2650   while (1) {
2651     /* Try to get a future waiting to be suspended */
2652     mzrt_mutex_lock(fs->future_mutex);
2653     ft = fs->future_waiting_lwc;
2654     if (ft) {
2655       fs->future_waiting_lwc = ft->next_waiting_lwc;
2656       ft->next_waiting_lwc = NULL;
2657       ft->in_queue_waiting_for_lwc = 0;
2658       if (!ft->want_lw)
2659         ft = NULL;
2660       else {
2661         /* If ft is touching another future, then the other
2662            future may resume ft while we grab the continuation.
2663            Withhold ft->can_continue_sema for now, so that we can
2664            capture the continuation, and then double-check
2665            afterward whether the continuation wants a lwc: */
2666         can_continue_sema = ft->can_continue_sema;
2667         ft->can_continue_sema = NULL;
2668         FUTURE_ASSERT(!ft->in_atomic_queue);
2669       }
2670     }
2671     mzrt_mutex_unlock(fs->future_mutex);
2672 
2673     if (ft) {
2674       void *storage[4];
2675 
2676       if (capture_future_continuation(fs, ft, storage, 1,
2677                                       ((ft->status == WAITING_FOR_OVERFLOW)
2678                                        ? ft->arg_i1
2679                                        : 0))) {
2680         /* capture performs mzrt_mutex_lock(fs->future_mutex) on success. */
2681         if (ft->suspended_lw) {
2682           FUTURE_ASSERT((ft->status == WAITING_FOR_PRIM)
2683                         || (ft->status == WAITING_FOR_FSEMA)
2684                         || (ft->status == WAITING_FOR_OVERFLOW));
2685           if (ft->status == WAITING_FOR_OVERFLOW) {
2686             push_suspended_lw(fs, ft);
2687           }
2688         } else
2689           FUTURE_ASSERT(ft->status != RUNNING);
2690         mzrt_mutex_unlock(fs->future_mutex);
2691       } else {
2692         /* Couldn't capture the continuation. */
2693         FUTURE_ASSERT(ft->status != RUNNING);
2694         if (can_continue_sema) {
2695           /* may need to reinstall the semaphore */
2696           mzrt_mutex_lock(fs->future_mutex);
2697           if ((ft->status == WAITING_FOR_PRIM)
2698               || (ft->status == WAITING_FOR_FSEMA)) {
2699             ft->can_continue_sema = can_continue_sema;
2700             can_continue_sema = NULL;
2701           }
2702           mzrt_mutex_unlock(fs->future_mutex);
2703         }
2704       }
2705       /* Signal the waiting worker thread that it can continue, since
2706          we either captured the continuation or the result became
2707          available meanwhile: */
2708       if (can_continue_sema)
2709         mzrt_sema_post(can_continue_sema);
2710     } else
2711       break;
2712   }
2713 
2714   /* If any future thread has its fuel revoked (must have been a custodian
2715      shutdown) but doesn't have a future (shutdown future must have been
2716      handled), then we can restore the thread's fuel. Races are
2717      possible, but they should be rare, and they lead at worst to bad
2718      performance. */
2719   {
2720     int i;
2721     for (i = 0; i < fs->thread_pool_size; i++) {
2722       if (fs->pool_threads[i]) {
2723         if (!*(fs->pool_threads[i]->fuel_pointer)
2724             && !fs->pool_threads[i]->thread->current_ft) {
2725           *(fs->pool_threads[i]->fuel_pointer) = 1;
2726           *(fs->pool_threads[i]->stack_boundary_pointer) -= FUTURE_C_STACK_SIZE;
2727         }
2728       }
2729     }
2730   }
2731 }
2732 
future_do_runtimecall(Scheme_Future_Thread_State * fts,void * func,int is_atomic,int can_suspend,int for_overflow)2733 static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
2734                                   void *func,
2735                                   int is_atomic,
2736                                   int can_suspend,
2737                                   int for_overflow)
2738   XFORM_SKIP_PROC
2739 /* Called in either future or runtime thread.  Can only be called in the runtime thread
2740   if we are in slow-path trace mode (i.e. we are running a future that is bound to the
2741   runtime thread so we can log all of its primitive applications). */
2742 {
2743   future_t *future;
2744   Scheme_Future_State *fs = scheme_future_state;
2745   void *storage[4];
2746   int insist_to_suspend, prefer_to_suspend, fid;
2747 
2748 #ifdef MZ_PRECISE_GC
2749   if (for_overflow && (!GC_gen0_alloc_page_ptr || fts->local_capture_failed)) {
2750     /* To increase the chance that later overflows can be handled
2751        without blocking, get more memory for this future thread. The
2752        `local_capture_failed' flag is a heuristic that might be
2753        improved by checking the available memory against an estimate
2754        of the needed memory. */
2755     fts->local_capture_failed = 0;
2756     GC_gen0_alloc_page_ptr = scheme_rtcall_alloc();
2757   }
2758 #endif
2759 
2760   /* Fetch the future descriptor for this thread */
2761   future = fts->thread->current_ft;
2762 
2763   FUTURE_ASSERT(!future->in_atomic_queue);
2764   FUTURE_ASSERT(!future->in_future_queue);
2765 
2766   if (!for_overflow) {
2767     /* Check if this prim in fact does have a
2768         safe C version */
2769     if (func == scheme_even_p || func == scheme_odd_p) {
2770       prim_iS_s f = (prim_iS_s)func;
2771       Scheme_Object *ret;
2772       ret = f(future->arg_i0, future->arg_S1);
2773       future->retval_s = ret;
2774       return;
2775     }
2776   }
2777 
2778   /* Check whether we are in slow-path trace mode */
2779   if (fts->is_runtime_thread) {
2780     /* On runtime thread - must be slow-path tracing */
2781     future->prim_func = func;
2782     future->rt_prim_is_atomic = 0;
2783     future->status = WAITING_FOR_PRIM;
2784     invoke_rtcall(scheme_future_state, future, 0);
2785     fts->worker_gc_counter = *fs->gc_counter_ptr;
2786 
2787     return;
2788   }
2789 
2790   scheme_fill_lwc_end();
2791   future->lwc = scheme_current_lwc;
2792   future->fts = fts;
2793 
2794   fid = future->id;
2795 
2796   /* If for_overflow, then a suspend is required. Otherwise...
2797      Policy question: When should the future thread suspend
2798      the current future? It costs something to suspend and
2799      resume a future.
2800      The current policy:
2801      Always suspend for a non-atomic (i.e, "unsafe") operation,
2802      because there's no guarantee that `touch' will allow progress
2803      anytime soon. For atomic operations, only suspend if there's
2804      more work available in the future queue, and only if we
2805      can suspend ourselves (because asking the runtime thread
2806      to suspend wouldn't accomplish anything). */
2807   insist_to_suspend = !is_atomic || for_overflow;
2808   prefer_to_suspend = (insist_to_suspend || fs->future_queue_count);
2809 
2810   if (!scheme_custodian_is_available(future->cust)) {
2811     insist_to_suspend = 1;
2812     prefer_to_suspend = 1;
2813   }
2814 
2815   if (!can_suspend) {
2816     insist_to_suspend = 0;
2817     prefer_to_suspend = 0;
2818   }
2819 
2820   if (prefer_to_suspend
2821       && GC_gen0_alloc_page_ptr
2822       && capture_future_continuation(fs, future, storage, 0, for_overflow)) {
2823     /* this future thread will suspend handling the future
2824        continuation until the result of the blocking call is ready;
2825        fts->thread->current_ft was set to NULL */
2826   }
2827 
2828   mzrt_mutex_lock(fs->future_mutex);
2829 
2830   if (for_overflow) {
2831     record_fevent(FEVENT_OVERFLOW, fid);
2832   } else if (func == touch) {
2833     record_fevent(FEVENT_RTCALL_TOUCH, fid);
2834   } else {
2835     record_fevent(is_atomic ? FEVENT_RTCALL_ATOMIC : FEVENT_RTCALL, fid);
2836   }
2837 
2838   if (for_overflow) {
2839     if (!fts->thread->current_ft) {
2840       /* capture complete; re-enqueue so that it continues on fresh stack */
2841       push_suspended_lw(fs, future);
2842     } else {
2843       future->status = WAITING_FOR_OVERFLOW;
2844       future->arg_i1 = for_overflow;
2845       fts->local_capture_failed = 1;
2846     }
2847   } else {
2848     /* Set up the arguments for the runtime call
2849        to be picked up by the main rt thread */
2850     future->prim_func = func;
2851     future->rt_prim_is_atomic = is_atomic;
2852     future->status = WAITING_FOR_PRIM;
2853   }
2854 
2855   if (fts->thread->current_ft) {
2856     if (is_atomic && !insist_to_suspend) {
2857       FUTURE_ASSERT(!future->in_atomic_queue);
2858       FUTURE_ASSERT(!future->in_future_queue);
2859       FUTURE_ASSERT(func != touch);
2860       future->next_waiting_atomic = fs->future_waiting_atomic;
2861       fs->future_waiting_atomic = future;
2862       future->in_atomic_queue = 1;
2863     }
2864 
2865     if (insist_to_suspend) {
2866       /* couldn't capture the continuation locally, so ask
2867          the runtime thread to capture it: */
2868       if (!future->in_queue_waiting_for_lwc) {
2869         future->next_waiting_lwc = fs->future_waiting_lwc;
2870         fs->future_waiting_lwc = future;
2871         future->in_queue_waiting_for_lwc = 1;
2872       }
2873       future->want_lw = 1;
2874       /* In case of for_overflow, runtime thread is responsible for
2875          enqueuing the future to continue. */
2876     }
2877   }
2878 
2879   if (func == touch) {
2880     if (!future->in_future_specific_touch_queue) {
2881       /* Ask the runtime thread to put this future on the queue
2882          of the future being touched: */
2883       if (!future->in_touch_queue) {
2884         future->next_waiting_touch = fs->future_waiting_touch;
2885         fs->future_waiting_touch = future;
2886         future->in_touch_queue = 1;
2887       }
2888     } else {
2889       future->in_future_specific_touch_queue = 0; /* done with back-door argument */
2890     }
2891   }
2892 
2893   scheme_signal_received_at(fs->signal_handle);
2894 
2895   if (fts->thread->current_ft) {
2896     /* Wait for the signal that the RT call is finished
2897        or a lightweight continuation has been captured: */
2898     future->can_continue_sema = fts->worker_can_continue_sema;
2899     end_gc_not_ok(fts, fs, MZ_RUNSTACK); /* we rely on this putting MZ_CONT_MARK_STACK into the thread record */
2900     mzrt_mutex_unlock(fs->future_mutex);
2901 
2902     mzrt_sema_wait(fts->worker_can_continue_sema);
2903 
2904     mzrt_mutex_lock(fs->future_mutex);
2905     start_gc_not_ok(fs);
2906   }
2907 
2908   /* Fetch the future instance again, in case the GC has moved the pointer
2909      or the future has been requeued. */
2910   future = fts->thread->current_ft;
2911 
2912   FUTURE_ASSERT(!future || !future->can_continue_sema);
2913   FUTURE_ASSERT(!future || !for_overflow);
2914   FUTURE_ASSERT(!future || !future->in_atomic_queue);
2915 
2916   if (future) {
2917     future->want_lw = 0;
2918     FUTURE_ASSERT(future->status == HANDLING_PRIM);
2919     if (future->no_retval) {
2920       record_fevent(FEVENT_RTCALL_ABORT, fid);
2921       future->status = FINISHED;
2922       trigger_added_touches(fs, future);
2923     } else {
2924       record_fevent(FEVENT_RTCALL_RESULT, fid);
2925       future->status = RUNNING;
2926     }
2927   } else {
2928     if (!for_overflow)
2929       record_fevent(FEVENT_RTCALL_SUSPEND, fid);
2930   }
2931 
2932   mzrt_mutex_unlock(fs->future_mutex);
2933 
2934   if (!future) {
2935     /* future continuation was requeued */
2936     scheme_future_longjmp(*scheme_current_thread->error_buf, 1);
2937   } else if (future->no_retval) {
2938     /* there was an error => abort the future */
2939     future->no_retval = -1;
2940     scheme_future_longjmp(*scheme_current_thread->error_buf, 1);
2941   } else {
2942     FUTURE_ASSERT(future->status == RUNNING);
2943     record_fevent(FEVENT_START_WORK, fid);
2944   }
2945 }
2946 
bad_multi_result(int argc,Scheme_Object ** argv)2947 static Scheme_Object *bad_multi_result(int argc, Scheme_Object **argv)
2948 {
2949   scheme_wrong_return_arity(NULL, 1, argc, argv, NULL);
2950   return NULL;
2951 }
2952 
2953 /**********************************************************************/
2954 /* Functions for primitive invocation          			      */
2955 /**********************************************************************/
scheme_wrong_contract_from_ft(const char * who,const char * expected_type,int what,int argc,Scheme_Object ** argv)2956 void scheme_wrong_contract_from_ft(const char *who, const char *expected_type, int what, int argc, Scheme_Object **argv)
2957   XFORM_SKIP_PROC
2958 /* Called in future thread */
2959 {
2960   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
2961   future_t *future = fts->thread->current_ft;
2962 
2963   future->prim_protocol = SIG_WRONG_TYPE_EXN;
2964   future->arg_str0 = who;
2965   future->arg_str1 = expected_type;
2966   future->arg_i2 = what;
2967   future->arg_i3 = argc;
2968   future->arg_S4 = argv;
2969 
2970   future->time_of_request = get_future_timestamp();
2971   future->source_of_request = who;
2972   future_do_runtimecall(fts, NULL, 0, 1, 0);
2973 
2974   /* If more: fetch the future again, in case moved by a GC */
2975   /* future = fts->thread->current_ft; */
2976 }
2977 
scheme_rtcall_on_demand(Scheme_Object ** argv)2978 Scheme_Object **scheme_rtcall_on_demand(Scheme_Object **argv)
2979   XFORM_SKIP_PROC
2980 /* Called in future thread */
2981 {
2982   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
2983   future_t *future = fts->thread->current_ft;
2984 
2985   future->prim_protocol = SIG_ON_DEMAND;
2986 
2987   if (argv != (MZ_RUNSTACK + 2)) {
2988     if (future->in_tracing_mode) {
2989       return scheme_on_demand(argv);
2990     }
2991 
2992     FUTURE_ASSERT(0);
2993   }
2994 
2995   future->arg_S0 = MZ_RUNSTACK;
2996 
2997   future->time_of_request = get_future_timestamp();
2998   future->source_of_request = "[jit_on_demand]";
2999   future->source_type = FSRC_OTHER;
3000 
3001   future_do_runtimecall(fts, NULL, 1, 1, 0);
3002 
3003   /* Fetch the future again, in case moved by a GC */
3004   future = fts->thread->current_ft;
3005 
3006   future->arg_S0 = NULL;
3007   future->retval_is_rs_plus_two = 0;
3008 
3009   return MZ_RUNSTACK + 2;
3010 }
3011 
scheme_rtcall_make_fsemaphore(Scheme_Object * ready)3012 Scheme_Object *scheme_rtcall_make_fsemaphore(Scheme_Object *ready)
3013   XFORM_SKIP_PROC
3014 /* Called in future thread */
3015 {
3016   Scheme_Object *retval;
3017   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3018   future_t *future = fts->thread->current_ft;
3019   int is_atomic;
3020 
3021   future->prim_protocol = SIG_MAKE_FSEMAPHORE;
3022   future->arg_s1 = ready;
3023   future->time_of_request = get_future_timestamp();
3024   future->source_of_request = "[make_fsemaphore]";
3025   future->source_type = FSRC_OTHER;
3026 
3027   /* conservative check for when creation can succeed atomically
3028      (because it won't raise an error): */
3029   if (SCHEME_INTP(ready)
3030       && (SCHEME_INT_VAL(ready) >= 0)
3031       && (SCHEME_INT_VAL(ready) < 1024))
3032     is_atomic = 1;
3033   else
3034     is_atomic = 0;
3035 
3036   future_do_runtimecall(fts, NULL, is_atomic, 1, 0);
3037 
3038   /* Fetch the future again, in case moved by a GC */
3039   future = fts->thread->current_ft;
3040 
3041   retval = future->retval_s;
3042   future->retval_s = NULL;
3043 
3044   return retval;
3045 }
3046 
scheme_rtcall_make_future(Scheme_Object * proc)3047 Scheme_Object *scheme_rtcall_make_future(Scheme_Object *proc)
3048   XFORM_SKIP_PROC
3049 /* Called in future thread */
3050 {
3051   Scheme_Object *retval;
3052   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3053   future_t *future = fts->thread->current_ft;
3054   int is_atomic = 0;
3055 
3056   if (SAME_TYPE(SCHEME_TYPE(proc), scheme_native_closure_type)
3057       && scheme_native_arity_check(proc, 0)) {
3058     is_atomic = 1;
3059   }
3060 
3061   future->prim_protocol = SIG_FUTURE;
3062   future->arg_s1 = proc;
3063   future->time_of_request = get_future_timestamp();
3064   future->source_of_request = "future";
3065   future->source_type = FSRC_OTHER;
3066 
3067   future_do_runtimecall(fts, NULL, is_atomic, 1, 0);
3068 
3069   /* Fetch the future again, in case moved by a GC */
3070   future = fts->thread->current_ft;
3071 
3072   retval = future->retval_s;
3073   future->retval_s = NULL;
3074 
3075   return retval;
3076 }
3077 
scheme_rtcall_allocate_values(int count,Scheme_Thread * t)3078 void scheme_rtcall_allocate_values(int count, Scheme_Thread *t)
3079   XFORM_SKIP_PROC
3080 /* Called in future thread */
3081 {
3082   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3083   future_t *future = fts->thread->current_ft;
3084 
3085   future->prim_protocol = SIG_ALLOC_VALUES;
3086 
3087   future->arg_i0 = count;
3088   future->arg_s0 = (Scheme_Object *)t;
3089 
3090   future->time_of_request = get_future_timestamp();
3091   future->source_of_request = "[allocate_values]";
3092   future->source_type = FSRC_OTHER;
3093 
3094   future_do_runtimecall(fts, NULL, 1, 0, 0);
3095 
3096   /* Fetch the future again, in case moved by a GC */
3097   future = fts->thread->current_ft;
3098 
3099   future->arg_s0 = NULL;
3100 }
3101 
scheme_rtcall_allocate_structure(int count,Scheme_Struct_Type * t)3102 Scheme_Structure *scheme_rtcall_allocate_structure(int count, Scheme_Struct_Type *t)
3103   XFORM_SKIP_PROC
3104 /* Called in future thread */
3105 {
3106   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3107   future_t *future = fts->thread->current_ft;
3108   Scheme_Object *retval;
3109 
3110   future->prim_protocol = SIG_ALLOC_STRUCT;
3111 
3112   future->arg_i0 = count;
3113   future->arg_s0 = (Scheme_Object *)t;
3114 
3115   future->time_of_request = get_future_timestamp();
3116   future->source_of_request = "[allocate_structure]";
3117   future->source_type = FSRC_OTHER;
3118 
3119   future_do_runtimecall(fts, NULL, 1, 0, 0);
3120 
3121   /* Fetch the future again, in case moved by a GC */
3122   future = fts->thread->current_ft;
3123 
3124   future->arg_s0 = NULL;
3125 
3126   retval = future->retval_s;
3127   future->retval_s = NULL;
3128 
3129   return (Scheme_Structure *)retval;
3130 }
3131 
scheme_rtcall_allocate_vector(int count)3132 Scheme_Object *scheme_rtcall_allocate_vector(int count)
3133   XFORM_SKIP_PROC
3134 /* Called in future thread */
3135 {
3136   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3137   future_t *future = fts->thread->current_ft;
3138   Scheme_Object *retval;
3139 
3140   future->prim_protocol = SIG_ALLOC_VECTOR;
3141 
3142   future->arg_i0 = count;
3143 
3144   future->time_of_request = get_future_timestamp();
3145   future->source_of_request = "[allocate_structure]";
3146   future->source_type = FSRC_OTHER;
3147 
3148   future_do_runtimecall(fts, NULL, 1, 0, 0);
3149 
3150   /* Fetch the future again, in case moved by a GC */
3151   future = fts->thread->current_ft;
3152 
3153   future->arg_s0 = NULL;
3154 
3155   retval = future->retval_s;
3156   future->retval_s = NULL;
3157 
3158   return retval;
3159 }
3160 
scheme_rtcall_tail_apply(Scheme_Object * rator,int argc,Scheme_Object ** argv)3161 Scheme_Object *scheme_rtcall_tail_apply(Scheme_Object *rator, int argc, Scheme_Object **argv)
3162   XFORM_SKIP_PROC
3163 /* Called in future thread */
3164 {
3165   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3166   future_t *future = fts->thread->current_ft;
3167   Scheme_Object *retval;
3168 
3169   future->prim_protocol = SIG_TAIL_APPLY;
3170 
3171   future->arg_s0 = rator;
3172   future->arg_i0 = argc;
3173   future->arg_S0 = argv;
3174 
3175   future->time_of_request = get_future_timestamp();
3176   future->source_of_request = "[tail-call]";
3177   future->source_type = FSRC_OTHER;
3178 
3179   future_do_runtimecall(fts, NULL, 1, 0, 0);
3180 
3181   /* Fetch the future again, in case moved by a GC */
3182   future = fts->thread->current_ft;
3183 
3184   future->arg_s0 = NULL;
3185   future->arg_S0 = NULL;
3186 
3187   retval = future->retval_s;
3188   future->retval_s = NULL;
3189 
3190   receive_special_result(future, retval, 1);
3191 
3192   return retval;
3193 }
3194 
scheme_rtcall_apply_with_new_stack(Scheme_Object * rator,int argc,Scheme_Object ** argv,int multi)3195 Scheme_Object *scheme_rtcall_apply_with_new_stack(Scheme_Object *rator, int argc, Scheme_Object **argv,
3196                                                   int multi)
3197   XFORM_SKIP_PROC
3198 /* Called in future thread; rator is a native closure with a runstack limit that fits */
3199 {
3200   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3201   future_t *future = fts->thread->current_ft;
3202   Scheme_Object *retval;
3203 
3204   future->prim_protocol = SIG_APPLY_AFRESH;
3205 
3206   future->arg_s0 = rator;
3207   future->arg_i0 = argc;
3208   future->arg_S0 = argv;
3209   future->arg_i1 = multi;
3210 
3211   future->time_of_request = get_future_timestamp();
3212   future->source_of_request = "[stack-overflow]";
3213   future->source_type = FSRC_OTHER;
3214 
3215   future_do_runtimecall(fts, NULL, 1, 1, (multi ? 2 : 1));
3216 
3217   /* Fetch the future again, in case moved by a GC */
3218   future = fts->thread->current_ft;
3219 
3220   future->arg_s0 = NULL;
3221   future->arg_S0 = NULL;
3222 
3223   retval = future->retval_s;
3224   future->retval_s = NULL;
3225 
3226   receive_special_result(future, retval, 1);
3227 
3228   return retval;
3229 }
3230 
3231 #ifdef MZ_PRECISE_GC
scheme_rtcall_alloc()3232 uintptr_t scheme_rtcall_alloc()
3233   XFORM_SKIP_PROC
3234 /* Called in future thread, possibly during future_do_runtimecall()  */
3235 {
3236   future_t *future;
3237   uintptr_t retval;
3238   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3239   intptr_t align, sz;
3240   double time_of_request;
3241   const char *source_of_request;
3242   int source_type;
3243   int prim_protocol;
3244   int arg_i0;
3245 
3246   align = GC_alloc_alignment();
3247 
3248   /* Do we actually still have space? */
3249   if (fts->gen0_start) {
3250     intptr_t cur;
3251     cur = GC_gen0_alloc_page_ptr;
3252     if (cur < (GC_gen0_alloc_page_end - align)) {
3253       if (cur & (align - 1)) {
3254         /* round up to next page boundary */
3255         cur &= ~(align - 1);
3256         cur += align;
3257       }
3258       cur += fts->gen0_initial_offset;
3259       return cur;
3260     }
3261   }
3262 
3263   /* Grow nursery size as long as we don't trigger a GC */
3264   if (fts->gen0_size < 16)
3265     fts->gen0_size <<= 1;
3266 
3267   future = fts->thread->current_ft;
3268   time_of_request = future->time_of_request;
3269   source_of_request = future->source_of_request;
3270   source_type = future->source_type;
3271   prim_protocol = future->prim_protocol;
3272   arg_i0 = future->arg_i0;
3273 
3274   while (1) {
3275     future->time_of_request = get_future_timestamp();
3276     future->source_of_request = "[allocate memory]";
3277     future->source_type = FSRC_OTHER;
3278 
3279     future->prim_protocol = SIG_ALLOC;
3280     future->arg_i0 = fts->gen0_size;
3281 
3282     /* don't suspend, because this might be a nested call: */
3283     future_do_runtimecall(fts, NULL, 1, 0, 0);
3284 
3285     future = fts->thread->current_ft;
3286     retval = future->alloc_retval;
3287     sz = future->alloc_sz_retval;
3288     future->alloc_retval = 0;
3289 
3290     if (fts->worker_gc_counter == future->alloc_retval_counter) {
3291       fts->gen0_start = retval;
3292       fts->gen0_initial_offset = retval & (align - 1);
3293       break;
3294     }
3295   }
3296 
3297   future->time_of_request = time_of_request;
3298   future->source_of_request = source_of_request;
3299   future->source_type = source_type;
3300   future->prim_protocol = prim_protocol;
3301   future->arg_i0 = arg_i0;
3302 
3303   GC_gen0_alloc_page_end = retval + sz;
3304 
3305   return retval;
3306 }
3307 #endif
3308 
scheme_rtcall_new_mark_segment(Scheme_Thread * p)3309 void scheme_rtcall_new_mark_segment(Scheme_Thread *p)
3310   XFORM_SKIP_PROC
3311 /* Called in future thread */
3312 {
3313   future_t *future;
3314   Scheme_Future_Thread_State *fts = scheme_future_thread_state;
3315 
3316   future = fts->thread->current_ft;
3317   future->time_of_request = get_future_timestamp();
3318   future->source_of_request = "[allocate_mark_segment]";
3319   future->source_type = FSRC_OTHER;
3320 
3321   future->prim_protocol = SIG_ALLOC_MARK_SEGMENT;
3322   future->arg_s0 = (Scheme_Object *)p;
3323 
3324   future_do_runtimecall(fts, NULL, 1, 0, 0);
3325 }
3326 
push_marks(future_t * f,Scheme_Cont_Frame_Data * d)3327 static int push_marks(future_t *f, Scheme_Cont_Frame_Data *d)
3328 {
3329   if (f->suspended_lw) {
3330     return scheme_push_marks_from_lightweight_continuation(f->suspended_lw, d);
3331   } else if (f->fts->thread) {
3332     return scheme_push_marks_from_thread(f->fts->thread, d);
3333   }
3334 
3335   return 0;
3336 }
3337 
pop_marks(Scheme_Cont_Frame_Data * d)3338 static void pop_marks(Scheme_Cont_Frame_Data *d)
3339 {
3340   scheme_pop_continuation_frame(d);
3341 }
3342 
receive_special_result(future_t * f,Scheme_Object * retval,int clear)3343 static void receive_special_result(future_t *f, Scheme_Object *retval, int clear)
3344   XFORM_SKIP_PROC
3345 /* Called in future or runtime thread */
3346 {
3347   if (SAME_OBJ(retval, SCHEME_MULTIPLE_VALUES)) {
3348     Scheme_Thread *p = scheme_current_thread;
3349 
3350     p->ku.multiple.array = f->multiple_array;
3351     p->ku.multiple.count = f->multiple_count;
3352     if (clear)
3353       f->multiple_array = NULL;
3354   } else if (SAME_OBJ(retval, SCHEME_TAIL_CALL_WAITING)) {
3355     Scheme_Thread *p = scheme_current_thread;
3356 
3357     p->ku.apply.tail_rator = f->tail_rator;
3358     p->ku.apply.tail_rands = f->tail_rands;
3359     p->ku.apply.tail_num_rands = f->num_tail_rands;
3360     if (clear) {
3361       f->tail_rator = NULL;
3362       f->tail_rands = NULL;
3363     }
3364   }
3365 }
3366 
3367 #include "jit_ts_future_glue.c"
3368 
send_special_result(future_t * f,Scheme_Object * retval)3369 static void send_special_result(future_t *f, Scheme_Object *retval)
3370   XFORM_SKIP_PROC
3371 /* Called in future or runtime thread */
3372 {
3373   if (SAME_OBJ(retval, SCHEME_MULTIPLE_VALUES)) {
3374     Scheme_Thread *p = scheme_current_thread;
3375 
3376     f->multiple_array = p->ku.multiple.array;
3377     f->multiple_count = p->ku.multiple.count;
3378     if (SAME_OBJ(p->ku.multiple.array, p->values_buffer))
3379       p->values_buffer = NULL;
3380     p->ku.multiple.array = NULL;
3381   } else if (SAME_OBJ(retval, SCHEME_TAIL_CALL_WAITING)) {
3382     Scheme_Thread *p = scheme_current_thread;
3383 
3384     f->tail_rator = p->ku.apply.tail_rator;
3385     f->tail_rands = p->ku.apply.tail_rands;
3386     f->num_tail_rands = p->ku.apply.tail_num_rands;
3387     p->ku.apply.tail_rator = NULL;
3388     p->ku.apply.tail_rands = NULL;
3389 
3390     if (f->tail_rands == p->tail_buffer) {
3391       /* This only happens in the runtime thread; we need to
3392          disconnect the tail buffer from `f->tail_rands' in
3393          case of a GC. Beware that XFORM is disabled here. */
3394       Scheme_Object **tb;
3395       p->tail_buffer = NULL; /* so args aren't zeroed */
3396       tb = MALLOC_N(Scheme_Object *, p->tail_buffer_size);
3397       p = scheme_current_thread; /* in case GC moves the thread */
3398       p->tail_buffer = tb;
3399     }
3400   }
3401 }
3402 
3403 #define ADJUST_RS_ARG(ft, arg_Sx) if (ft->suspended_lw) arg_Sx = scheme_adjust_runstack_argument(ft->suspended_lw, arg_Sx)
3404 
3405 /* Does the work of actually invoking a primitive on behalf of a
3406    future.  This function is always invoked on the main (runtime)
3407    thread. */
do_invoke_rtcall(Scheme_Future_State * fs,future_t * future)3408 static void do_invoke_rtcall(Scheme_Future_State *fs, future_t *future)
3409 /* Called in runtime thread */
3410 {
3411   Scheme_Cont_Frame_Data mark_d;
3412   int need_pop;
3413 
3414 #ifdef DEBUG_FUTURES
3415   g_rtcall_count++;
3416 #endif
3417 
3418   if (scheme_log_level_p(scheme_get_future_logger(), SCHEME_LOG_DEBUG)) {
3419     const char *src;
3420     Scheme_Object *userdata;
3421 
3422     src = future->source_of_request;
3423     if (future->source_type == FSRC_RATOR) {
3424       int len;
3425       if (SCHEME_PROCP(future->arg_s0)) {
3426         const char *src2;
3427         src2 = scheme_get_proc_name(future->arg_s0, &len, 1);
3428         if (src2) src = src2;
3429       }
3430     } else if (future->source_type == FSRC_PRIM) {
3431       const char *src2;
3432       src2 = scheme_look_for_primitive(future->prim_func);
3433       if (src2) src = src2;
3434     }
3435 
3436 
3437     flush_future_logs(fs);
3438 
3439     /* use lg_future_event so we can include `str' in the message: */
3440     userdata = NULL;
3441     switch (future->prim_protocol)
3442       {
3443       case SIG_ALLOC:
3444         {
3445           userdata = scheme_make_integer(future->arg_i0);
3446           break;
3447         }
3448       case SIG_ON_DEMAND:
3449         {
3450           /* Closure is first in runstack */
3451           GC_CAN_IGNORE Scheme_Object **rs = future->arg_S0;
3452           ADJUST_RS_ARG(future, rs);
3453           userdata = scheme_object_name(rs[0]);
3454           if (!userdata)
3455             userdata = scheme_intern_symbol("[unknown]");
3456 
3457           break;
3458         }
3459       }
3460 
3461     log_future_event(fs,
3462                       "id %d, process %d: %s: %s; time: %f",
3463                       src,
3464                       -1,
3465                       (future->rt_prim_is_atomic ? FEVENT_HANDLE_RTCALL_ATOMIC : FEVENT_HANDLE_RTCALL),
3466                       get_future_timestamp(),
3467                       future->id,
3468                       userdata);
3469   }
3470 
3471   if (((future->source_type == FSRC_RATOR)
3472           || (future->source_type == FSRC_MARKS)
3473           || (future->source_type == FSRC_PRIM))
3474         && !future->in_tracing_mode)
3475     need_pop = push_marks(future, &mark_d);
3476   else
3477     need_pop = 0;
3478 
3479   switch (future->prim_protocol)
3480     {
3481     case SIG_ON_DEMAND:
3482       {
3483         GC_CAN_IGNORE Scheme_Object **arg_S0 = future->arg_S0;
3484         future->arg_S0 = NULL;
3485 
3486         ADJUST_RS_ARG(future, arg_S0);
3487 
3488         scheme_on_demand_with_args(arg_S0, arg_S0, 2);
3489 
3490         future->retval_is_rs_plus_two = 1;
3491 
3492         break;
3493       }
3494 #ifdef MZ_PRECISE_GC
3495     case SIG_ALLOC:
3496       {
3497         uintptr_t ret, sz;
3498         int amt = future->arg_i0;
3499         ret = GC_make_jit_nursery_page(amt, &sz);
3500         future->alloc_retval = ret;
3501         future->alloc_sz_retval = sz;
3502         future->alloc_retval_counter = scheme_did_gc_count;
3503         break;
3504       }
3505 #endif
3506     case SIG_ALLOC_MARK_SEGMENT:
3507       {
3508         GC_CAN_IGNORE Scheme_Thread *p_seg;
3509         p_seg = (Scheme_Thread *)future->arg_s0;
3510         future->arg_s0 = NULL;
3511         scheme_new_mark_segment(p_seg);
3512         break;
3513       }
3514     case SIG_MAKE_FSEMAPHORE:
3515       {
3516         Scheme_Object *s = future->arg_s1;
3517         future->arg_s1 = NULL;
3518         s = scheme_make_fsemaphore_inl(s);
3519         future->retval_s = s;
3520         break;
3521       }
3522     case SIG_FUTURE:
3523       {
3524         GC_CAN_IGNORE Scheme_Object *s = future->arg_s1;
3525         future->arg_s1 = NULL;
3526         s = make_future(s, 1, future);
3527         future->retval_s = s;
3528         break;
3529       }
3530     case SIG_ALLOC_VALUES:
3531       {
3532         GC_CAN_IGNORE Scheme_Object *arg_s0 = future->arg_s0;
3533 
3534         future->arg_s0 = NULL;
3535 
3536         scheme_jit_allocate_values(future->arg_i0, (Scheme_Thread *)arg_s0);
3537 
3538         break;
3539       }
3540     case SIG_ALLOC_STRUCT:
3541       {
3542         GC_CAN_IGNORE Scheme_Object *arg_s0 = future->arg_s0;
3543         GC_CAN_IGNORE Scheme_Structure *res;
3544 
3545         future->arg_s0 = NULL;
3546 
3547         res = scheme_jit_allocate_structure(future->arg_i0, (Scheme_Struct_Type *)arg_s0);
3548 
3549         future->retval_s = (Scheme_Object *)res;
3550 
3551         break;
3552       }
3553     case SIG_ALLOC_VECTOR:
3554       {
3555         GC_CAN_IGNORE Scheme_Object *res;
3556         intptr_t count = future->arg_i0;
3557 
3558         future->arg_s0 = NULL;
3559 
3560         GC_set_accounting_custodian(future->cust);
3561 
3562         res = scheme_malloc_tagged(sizeof(Scheme_Vector)
3563                                    + ((count - mzFLEX_DELTA) * sizeof(Scheme_Object *)));
3564         if (res) {
3565           res->type = scheme_vector_type;
3566           SCHEME_VEC_SIZE(res) = count;
3567         }
3568 
3569         GC_set_accounting_custodian(NULL);
3570 
3571         future->retval_s = res;
3572 
3573         break;
3574       }
3575     case SIG_TAIL_APPLY:
3576       {
3577         GC_CAN_IGNORE Scheme_Object *arg_s0 = future->arg_s0;
3578         GC_CAN_IGNORE Scheme_Object **arg_S0 = future->arg_S0;
3579         GC_CAN_IGNORE Scheme_Object *retval;
3580 
3581         future->arg_s0 = NULL;
3582         future->arg_S0 = NULL;
3583 
3584         retval = _scheme_tail_apply(arg_s0, future->arg_i0, arg_S0);
3585 
3586         future->retval_s = retval;
3587         send_special_result(future, retval);
3588 
3589         break;
3590       }
3591     case SIG_WRONG_TYPE_EXN:
3592       {
3593         const char *who;
3594         const char *expected_type;
3595         int what;
3596         int argc;
3597         Scheme_Object **argv;
3598 
3599         who = future->arg_str0;
3600         expected_type = future->arg_str1;
3601         what = future->arg_i2;
3602         argc = future->arg_i3;
3603         argv = future->arg_S4;
3604 
3605         future->arg_str0 = NULL;
3606         future->arg_str1 = NULL;
3607         future->arg_S4 = NULL;
3608 
3609         ADJUST_RS_ARG(future, argv);
3610 
3611         scheme_wrong_contract(who, expected_type, what, argc, argv);
3612 
3613         /* doesn't return */
3614 
3615         break;
3616       }
3617     case SIG_APPLY_AFRESH:
3618       {
3619         GC_CAN_IGNORE Scheme_Object *arg_s0 = future->arg_s0;
3620         GC_CAN_IGNORE Scheme_Object **arg_S0 = future->arg_S0;
3621         GC_CAN_IGNORE Scheme_Object *retval;
3622 
3623         /* This code is used only for would-be futures: */
3624         FUTURE_ASSERT(future->in_tracing_mode);
3625 
3626         future->arg_s0 = NULL;
3627         future->arg_S0 = NULL;
3628 
3629         if (future->arg_i1)
3630           retval = _scheme_apply_multi(arg_s0, future->arg_i0, arg_S0);
3631         else
3632           retval = _scheme_apply(arg_s0, future->arg_i0, arg_S0);
3633 
3634         future->retval_s = retval;
3635         send_special_result(future, retval);
3636 
3637         break;
3638       }
3639 # define JIT_TS_LOCALIZE(t, f) GC_CAN_IGNORE t f = future->f
3640 # include "jit_ts_runtime_glue.c"
3641     default:
3642       scheme_signal_error("unknown protocol %d", future->prim_protocol);
3643       break;
3644     }
3645 
3646   if (need_pop)
3647     pop_marks(&mark_d);
3648 
3649   record_fevent(FEVENT_HANDLE_RTCALL_RESULT, future->id);
3650 
3651   mzrt_mutex_lock(fs->future_mutex);
3652   complete_rtcall(fs, future);
3653   mzrt_mutex_unlock(fs->future_mutex);
3654 }
3655 
3656 typedef Scheme_Object *(*overflow_k_t)(void);
3657 
do_invoke_rtcall_k(void)3658 static void *do_invoke_rtcall_k(void)
3659 {
3660   Scheme_Thread *p = scheme_current_thread;
3661   Scheme_Future_State *fs = (Scheme_Future_State *)p->ku.k.p1;
3662   future_t *future = (future_t *)p->ku.k.p2;
3663 
3664 #ifdef DO_STACK_CHECK
3665   {
3666 # include "mzstkchk.h"
3667     return scheme_handle_stack_overflow((overflow_k_t)do_invoke_rtcall_k);
3668   }
3669 #endif
3670 
3671   p->ku.k.p1 = NULL;
3672   p->ku.k.p2 = NULL;
3673 
3674   do_invoke_rtcall(fs, future);
3675 
3676   return scheme_void;
3677 }
3678 
invoke_rtcall(Scheme_Future_State * volatile fs,future_t * volatile future,volatile int is_atomic)3679 static void invoke_rtcall(Scheme_Future_State * volatile fs, future_t * volatile future,
3680                           volatile int is_atomic)
3681 {
3682   Scheme_Thread *p = scheme_current_thread;
3683   mz_jmp_buf newbuf, * volatile savebuf;
3684 
3685   FUTURE_ASSERT(!future->want_lw);
3686   FUTURE_ASSERT(!is_atomic || future->rt_prim_is_atomic);
3687   FUTURE_ASSERT(!future->in_atomic_queue);
3688 
3689   savebuf = p->error_buf;
3690   p->error_buf = &newbuf;
3691   if (scheme_setjmp(newbuf)) {
3692     record_fevent(FEVENT_HANDLE_RTCALL_ABORT, future->id);
3693     mzrt_mutex_lock(fs->future_mutex);
3694     future->no_retval = 1;
3695 
3696     /* If running on a would-be future, no extra work required here */
3697     if (future->suspended_lw || scheme_current_thread->futures_slow_path_tracing) {
3698       /* Abandon the future */
3699       future->status = FINISHED;
3700       future->retval = 0;
3701       future->suspended_lw = NULL;
3702       trigger_added_touches(fs, future);
3703       mzrt_mutex_unlock(fs->future_mutex);
3704     } else {
3705       /* Signal the waiting worker thread that it
3706          can continue running machine code */
3707       mzrt_sema *can_continue_sema = future->can_continue_sema;
3708       FUTURE_ASSERT(!future->in_atomic_queue);
3709       future->can_continue_sema = NULL;
3710       mzrt_sema_post(can_continue_sema);
3711       mzrt_mutex_unlock(fs->future_mutex);
3712     }
3713     if (is_atomic) {
3714       scheme_log_abort("internal error: failure during atomic");
3715       abort();
3716     }
3717     scheme_longjmp(*savebuf, 1);
3718   } else {
3719     if (future->rt_prim_is_atomic) {
3720       do_invoke_rtcall(fs, future);
3721     } else {
3722       /* call with continuation barrier. */
3723       p->ku.k.p1 = fs;
3724       p->ku.k.p2 = future;
3725 
3726       (void)scheme_top_level_do(do_invoke_rtcall_k, 1);
3727     }
3728   }
3729   p->error_buf = savebuf;
3730 }
3731 
3732 
3733 /**********************************************************************/
3734 /* Helpers for manipulating the futures queue                         */
3735 /**********************************************************************/
3736 
enqueue_future(Scheme_Future_State * fs,future_t * ft)3737 future_t *enqueue_future(Scheme_Future_State *fs, future_t *ft)
3738   XFORM_SKIP_PROC
3739 /* called in any thread with lock held */
3740 {
3741   FUTURE_ASSERT(!ft->in_atomic_queue);
3742   FUTURE_ASSERT(!ft->in_future_queue);
3743 
3744   if (fs->future_queue_end) {
3745     fs->future_queue_end->next = ft;
3746     ft->prev = fs->future_queue_end;
3747   }
3748   fs->future_queue_end = ft;
3749   if (!fs->future_queue)
3750     fs->future_queue = ft;
3751   fs->future_queue_count++;
3752   ft->in_future_queue = 1;
3753 
3754   /* Signal that a future is pending */
3755   mzrt_sema_post(fs->future_pending_sema);
3756 
3757   return ft;
3758 }
3759 
get_pending_future(Scheme_Future_State * fs)3760 future_t *get_pending_future(Scheme_Future_State *fs)
3761   XFORM_SKIP_PROC
3762 /* Called in future thread with lock held */
3763 {
3764   future_t *f;
3765 
3766   while (1) {
3767     f = fs->future_queue;
3768     if (f) {
3769       FUTURE_ASSERT(f->in_future_queue);
3770       dequeue_future(fs, f);
3771       if (!scheme_custodian_is_available(f->cust)) {
3772         f->status = SUSPENDED;
3773       } else {
3774         return f;
3775       }
3776     } else
3777       return NULL;
3778   }
3779 }
3780 
3781 #endif
3782 
3783 /**********************************************************************/
3784 /*                           Precise GC                               */
3785 /**********************************************************************/
3786 
3787 #ifdef MZ_PRECISE_GC
3788 
3789 START_XFORM_SKIP;
3790 
3791 #include "mzmark_future.inc"
3792 
register_traversers(void)3793 static void register_traversers(void)
3794 {
3795 #ifdef MZ_USE_FUTURES
3796   GC_REG_TRAV(scheme_future_type, future);
3797   GC_REG_TRAV(scheme_fsemaphore_type, fsemaphore);
3798 #else
3799   GC_REG_TRAV(scheme_future_type, sequential_future);
3800   GC_REG_TRAV(scheme_fsemaphore_type, sequential_fsemaphore);
3801 #endif
3802 }
3803 
3804 END_XFORM_SKIP;
3805 
3806 #endif
3807